## Sample Python Notebook to perform Desktop RPA using Natural Language and Visual Instructions.
This automation is powered by Opensource tools like OpenCV, Huggingface Transformers, Meta's SAM and DinoV2 models, Mindee's docTR models for OCR, PyAutoGUI for bot-actions and many incredible Python libraries.

This framework is configuration driven, designed to be modular and open-ended.

To get started, define the RPA bot action-steps aka flow using Yaml-based config file with Text and Image based instructions/prompts.

Text Prompts can be a text field which can be interacted with on the UI

Visual Prompts are images/icons which can interacted with on the UI. 
- They can be captured using Screen grab tools and saved as .jpg or .png format
- For issues with detecting small objects, explore Accessibility Features like Magnifier, high contrast colors, available on the Desktop OS

### Install the required Python Libraries (Expect installation hiccups on Windows OS)

In [None]:
# Optical Character Recognition (OCR) Library- docTR Installation Prerequisite
# Ref: https://github.com/mindee/doctr/issues/701
#1. Install GTK for Windows, downdload "gtk3-runtime-3.24.31-2022-01-04-ts-win64.exe"
## Executable Available At- https://github.com/tschoonj/GTK-for-Windows-Runtime-Environment-Installer/releases
#2. Create a new venv

# PIP Install
#pip install torch "python-doctr[torch]" pyautogui transformers opencv-python
#pip install git+https://github.com/facebookresearch/segment-anything.git

In [None]:
# Import needed libraries
import os
import yaml
import time

# Set path for downstream library imports like docTR, Transformers
GTK_FOLDER = r'C:\Program Files\GTK3-Runtime Win64\bin'
os.environ['PATH'] = GTK_FOLDER + os.pathsep + os.environ.get('PATH', '')
os.environ['USE_TORCH'] = '1'
os.environ['HF_HUB_DISABLE_SYMLINKS_WARNING'] = '1'

In [None]:
# Import Remaining libraries
import numpy as np
import torch
import torch.nn as nn
from PIL import Image

import cv2
import pyautogui
from transformers import AutoImageProcessor, AutoModel
from doctr.io import DocumentFile
from doctr.models import ocr_predictor
from segment_anything import sam_model_registry, SamAutomaticMaskGenerator

In [None]:
# Set PyAutoGUI's fail-safe mode to True- moving the mouse to the upper-left will abort the program in case of infinite loop
pyautogui.FAILSAFE = True

#Set Device for Models
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# For testing with CPU- remove it for code check-in
device = "cpu"

In [None]:
# Download a Pretrained SAM Model, from https://github.com/facebookresearch/segment-anything#model-checkpoints
# Pass the model weights path to the model-loading step. This example uses "vit_h" model
sam_models_root_path = "C:/Users/mural/sam-models"
model_type = "vit_h"
sam_model_checkpoint = sam_models_root_path+"/sam_vit_h_4b8939.pth"

### Load the Models

In [None]:
# Text Prompts Support- Load OCR Model from docTR: https://mindee.github.io/doctr/using_doctr/using_models.html
docTR_ocr_model = ocr_predictor(det_arch='db_resnet50', reco_arch='crnn_vgg16_bn', pretrained=True)

In [None]:
# Image Prompts Support- Load the SAM model from Facebook/Meta
# SAM Model- Used to Extract all the Objects (Image Patches/Masks) of Interest from the Image
sam = sam_model_registry[model_type](checkpoint=sam_model_checkpoint)
sam.to(device=device)

# There are several tunable parameters in automatic mask generation
# Default Segmentation Mask (Image Patch) Generator
sam_mask_generator = SamAutomaticMaskGenerator(sam)

# Custom Segmentation Mask (Image Patch) Generator
# Ref: https://github.com/facebookresearch/segment-anything/blob/main/notebooks/automatic_mask_generator_example.ipynb
#sam_mask_generator = SamAutomaticMaskGenerator(
#    model=sam,
#    points_per_side=32,
#    pred_iou_thresh=0.86,
#    stability_score_thresh=0.92,
#    crop_n_layers=1,
#    crop_n_points_downscale_factor=2,
#    min_mask_region_area=100,  # Requires open-cv to run post-processing
#)

In [None]:
# Image Prompts Support- Load the DinoV2 model from Facebook/Meta
# DinoV2 Model- Used for Image Similarity via Feature based Template Matching
## Ref: https://github.com/facebookresearch/dinov2

dinov2_model = AutoModel.from_pretrained('facebook/dinov2-base').to(device)
dinov2_processor = AutoImageProcessor.from_pretrained('facebook/dinov2-base')

### Set the directory paths for configs, image prompts, logs

In [None]:
flow_name= "rpa_instruct_demo"

In [None]:
# Set the Yaml Configuration File Path: Contains all the Steps and Actions for the Flow
configs_path_prefix= "./configs/"
config_file_name= "/flow_config.yml"
flow_config_path= os.path.join(configs_path_prefix, flow_name)
config_file_path= flow_config_path+config_file_name

if not os.path.isfile(config_file_path):
    print("Config File Not Found. Check the File Path and Rerun this Step!")

In [None]:
# Set the Directory Containing the Images for the Icons/ Interactable Elements on the UI
image_prompts_path_prefix= "./image_prompts/"
image_prompts_dir= os.path.join(image_prompts_path_prefix, flow_name)

if not os.path.isdir(image_prompts_dir):
    print("Image Prompts Directory Not Found. Check the Folder Path and Rerun this Step!")

In [None]:
# Initialize Input and Output Paths needed for Logging
# This provides the tracing for each step and helps in the explainability of each step's successful or failed action 

# Create Input Logs Directory if not exists :store each step's input screenshot
screenshot_store_path_prefix= "./logs/logs_flow_input/"
flow_screenshot_store_path= os.path.join(screenshot_store_path_prefix, flow_name)
print("Creating Directory for Inputs:",flow_screenshot_store_path)
os.makedirs(flow_screenshot_store_path, exist_ok=True)

# Create Output Logs Directory if not exists: store each step's processing results like image patches of text and/or icons
scr_segments_store_path_prefix= "./logs/logs_flow_output/"
flow_scr_segments_store_path= os.path.join(scr_segments_store_path_prefix, flow_name)
print("Creating Directory for Outputs:",flow_scr_segments_store_path)
os.makedirs(flow_scr_segments_store_path, exist_ok=True)

### Validate Yaml, check all steps are in increasing order and all required fields are present. 

In [None]:
with open(config_file_path, 'r') as file:
    config_file = yaml.safe_load(file)

In [None]:
rpa_flow_steps= config_file['Flow']

In [None]:
prev_step_num=0
valid_prompt_types= ['text', 'image', 'command', 'dataentry']
for curr_step in rpa_flow_steps:
    curr_step_num= curr_step['Step']
        
    curr_step_type= curr_step['prompt_type']
    if curr_step_num-prev_step_num <0:
        print(curr_step)
        print("Steps are not in increasing order. Exiting the program!")
        break
    if curr_step_type not in valid_prompt_types:
        print(curr_step)
        print("Current Step's Prompt Type is Not Supported. Exiting the program! Choose 'text', 'image' , 'command' or 'dataentry' type and Rerun")
        break
    prev_step_num= curr_step_num

### Text Prompt Support Functions

In [None]:
def retrieve_text_location_screenshot(input_scr_shot_path, search_text, step_num, text_case_convert=True):

    # Perform OCR on the Screenshot, Search for the Text and Return its Location
    ocr_start_time = time.time()
    
    # Load the Screenshot
    curr_screenshot_doc = DocumentFile.from_images(input_scr_shot_path)
    ocr_result = docTR_ocr_model(curr_screenshot_doc)
    
    ocr_complete_time = time.time()
    ocr_time_duration_sec = round(ocr_complete_time - ocr_start_time)
    print("OCR on the Screenshot is Complete. Time taken in Seconds:",ocr_time_duration_sec)
    
    # Convert the text to search into lower case
    if text_case_convert:
        search_text= search_text.lower()
    
    # Export the OCR Results and flatten it to find the keyword
    json_output = ocr_result.export()
    page_words = [[word for block in page['blocks'] for line in block['lines'] for word in line['words']] for page in json_output['pages']]
    page_dims = [page['dimensions'] for page in json_output['pages']]
    
    # Get all the ocr text coordinates in absolute dimensions, format in [xmin, ymin, xmax, ymax]
    words_abs_coords = [
        [[int(round(word['geometry'][0][0] * dims[1])), int(round(word['geometry'][0][1] * dims[0])), int(round(word['geometry'][1][0] * dims[1])), int(round(word['geometry'][1][1] * dims[0]))] for word in words]
        for words, dims in zip(page_words, page_dims)
    ]
    
    # Index 0 indicates 1st page, as this function is per screenshot, the index is always [0]
    all_words_ocr= page_words[0]
    all_words_coords= words_abs_coords[0]
    num_words= len(all_words_ocr)
    
    # Create directory for current_step outputs if not exists
    curr_step_directory = "step_"+str(step_num) 
    flow_curr_step_scr_segments_path= os.path.join(flow_scr_segments_store_path, curr_step_directory)
    print("Creating Directory for Current Step Outputs:",flow_curr_step_scr_segments_path)
    os.makedirs(flow_curr_step_scr_segments_path, exist_ok=True)
    
    keyword_match_locs= []
    curr_screenshot_image = cv2.imread(input_scr_shot_path)
    for word_ind in range(num_words):
        curr_word_location= all_words_coords[word_ind]
        x1, y1, x2, y2 = curr_word_location
        cropped_image = curr_screenshot_image[y1:y2, x1:x2]
        cropped_image_filename = os.path.join(flow_curr_step_scr_segments_path, str(word_ind) + '.png')
        cv2.imwrite(cropped_image_filename, cropped_image)
        
        curr_word_value= all_words_ocr[word_ind]['value']
        if text_case_convert:
            curr_word_value= curr_word_value.lower()
            
        if search_text in curr_word_value:
            keyword_match_locs.append(curr_word_location)
            #print(word_ind)
    
    return keyword_match_locs

### Image Prompt Support Functions

In [None]:
def retrieve_image_location_screenshot(input_scr_shot_path, search_image_path, step_num, image_sim_threshold=1):
    curr_step_screenshot = cv2.imread(input_scr_shot_path)
    curr_step_screenshot_sam = cv2.cvtColor(curr_step_screenshot, cv2.COLOR_BGR2RGB)
    
    get_masks_start_time = time.time()
    masks = sam_mask_generator.generate(curr_step_screenshot_sam)
    get_masks_end_time = time.time()
    
    mask_generation_duration_sec = round(get_masks_end_time - get_masks_start_time)
    print("Generating Image Patches/Masks for the Screenshot is Complete. Time taken in Seconds:",mask_generation_duration_sec)

    # Create directory for current_step outputs if not exists
    curr_step_directory = "step_"+str(step_num) 
    flow_curr_step_scr_segments_path= os.path.join(flow_scr_segments_store_path, curr_step_directory)
    print("Creating Directory for Current Step Outputs:",flow_curr_step_scr_segments_path)
    os.makedirs(flow_curr_step_scr_segments_path, exist_ok=True)
    
    # Extract all the Image Patches identified by SAM- Crop out all masks
    for mask_ind in range(len(masks)):
        x, y, width, height = masks[mask_ind]['bbox']
        cropped_image = curr_step_screenshot[int(y):int(y+height), int(x):int(x+width)]
        cropped_image_filename = os.path.join(flow_curr_step_scr_segments_path, str(mask_ind) + '.png')
        cv2.imwrite(cropped_image_filename, cropped_image)
    
    # Perform Template Matching to Search for Images 
    # Template Matching with Pixels like OpenCV Template Matching is not Robust
    ## Ref: https://docs.opencv.org/4.x/d4/dc6/tutorial_py_template_matching.html    

    # Template Matching with Features like DinoV2 Features is Robust
    ## Ref: https://github.com/facebookresearch/dinov2
    template_image = Image.open(search_image_path)
    
    # Extract DinoV2 features for Template Image
    with torch.no_grad():
        inputs_dinov2_template_image = dinov2_processor(images=template_image, return_tensors="pt").to(device)
        outputs_dinov2_template_image = dinov2_model(**inputs_dinov2_template_image)
        template_image_features_h = outputs_dinov2_template_image.last_hidden_state
        template_image_features = template_image_features_h.mean(dim=1)
    
    # Calculate Similarity of the Query Image against Template Image using Cosine Similarity 
    cos_sim_score_list=[]
    for mask_ind in range(len(masks)):
        fname=f"{flow_curr_step_scr_segments_path}/{mask_ind}.png"
        query_image = Image.open(fname)
        
        # Extract DinoV2 features for all Query Images: Patches/Crops
        with torch.no_grad():
            inputs_dinov2_query_image = dinov2_processor(images=query_image, return_tensors="pt").to(device)
            outputs_dinov2_query_image = dinov2_model(**inputs_dinov2_query_image)
            query_image_patch_features_h = outputs_dinov2_query_image.last_hidden_state
            query_image_patch_features = query_image_patch_features_h.mean(dim=1)
            
        # Compute the cosine similarity between image feature vectors, then scale it into 0-1 range
        cos_fn = nn.CosineSimilarity(dim=0)
        sim_score = cos_fn(template_image_features[0],query_image_patch_features[0]).item()
        sim_score = (sim_score+1)/2
        #print('Template and Query Image Patch's Similarity Score is:', sim_score)
        cos_sim_score_list.append(sim_score)
    
    #Assumption- it is assumed that the template image being searched for will atleast match one location in the query image. 
    max_sim_val= max(cos_sim_score_list)
    threshold_sim_val= image_sim_threshold * max_sim_val
    image_match_locs = []
    
    for mask_ind in range(len(masks)):
        curr_sim_val= cos_sim_score_list[mask_ind]
        
        if curr_sim_val >= threshold_sim_val:
            x, y, width, height = masks[mask_ind]['bbox']
            curr_mask_location = [x,y, x+width, y+height]
            image_match_locs.append(curr_mask_location)
            #print(mask_ind)
    
    return image_match_locs

### Run the RPA Flow

In [None]:
#To-do: 

#1. Support for Basic Auth
##username= pyautogui.prompt(text='', title='' , default='')
##password= pyautogui.password(text='', title='', default='', mask='*')

#2. For Text Matching add support for regex pattern 

#3. Steps 4-8 happen on Same Screeshot- Add support for multi-step actions.
## Single Screen-shot Multi Element Detection for Same Prompt Type     

In [None]:
for curr_step in rpa_flow_steps:
    # Capture Screenshot for the Current Step in the RPA Flow
    curr_step_num= curr_step['Step']
    curr_scr_shot_filename = os.path.join(flow_screenshot_store_path, str(curr_step_num) + '.png')
    pyautogui.screenshot(curr_scr_shot_filename)
    
    # Wait for 5 Seconds
    time.sleep(5)
    
    curr_step_type= curr_step['prompt_type']
    curr_step_prompt_content= curr_step['prompt_content']
    curr_step_action= curr_step['action']
    
    if curr_step_type == 'text' or curr_step_type == 'image':
        
        if curr_step_type == 'text':
            match_locs= retrieve_text_location_screenshot(curr_scr_shot_filename, curr_step_prompt_content, curr_step_num) 
        else:
            match_locs= retrieve_image_location_screenshot(curr_scr_shot_filename, curr_step_prompt_content, curr_step_num)
        
        # Multiple matches -Tiebreak criteria  min (default), max, max_x, max_y, min_x, min_y (not handled)
        print(match_locs)
        if "tie_break" in curr_step.keys(): 
            match_location= match_locs[-1]
        else:
            match_location= match_locs[0]
            
        xmin, ymin, xmax, ymax= match_location
        cx = (xmin + xmax) // 2
        cy = (ymin + ymax) // 2

        if curr_step_action == 'click':
            # Move the mouse to cx, cy coordinates and click it.
            pyautogui.click(cx, cy) 
            time.sleep(10)
            print(f"For Step {curr_step_num} performed mouse click at ({cx}, {cy})")
            
        else:
            print(f"For Step {curr_step_num} Invalid Action")
            continue #No Action is defined
            
    elif curr_step_type == 'dataentry':
        pyautogui.write(curr_step_prompt_content)
        print(f"For Step {curr_step_num} performed data entry")
        
    elif curr_step_type == 'command':
        
        if curr_step_action == 'enter':
            pyautogui.write(curr_step_prompt_content)
            print(f"For Step {curr_step_num} performed data entry")
            time.sleep(10) #Add Delay/Wait
            
            pyautogui.press('enter') 
            time.sleep(10)
            print(f"For Step {curr_step_num} performed key press for enter")
            
        elif curr_step_action == 'hotkey':
            for keyDown in curr_step_prompt_content:
                pyautogui.keyDown(keyDown)
                print(f"For Step {curr_step_num} performed hotkey {keyDown} key-down")
            for keyUp in reversed(curr_step_prompt_content):
                pyautogui.keyUp(keyUp)
                print(f"For Step {curr_step_num} performed hotkey {keyUp} key-up")

        else:
            print(f"For Step {curr_step_num} Invalid Action")
            continue #No Action is defined
            
    else:
        print(f"For Step {curr_step_num} Invalid Action")
        continue #No Action is defined