In [None]:
# import statements.
import datetime
import glob
import json
import numpy as np
import os
import PIL
import matplotlib.pyplot as plt
import seaborn as sbs
import random
import tensorflow_hub as hub
import tensorflow as tf
from torchvision import transforms
from sklearn.model_selection import train_test_split

In [None]:
LONG_TOUCH_THRESHOLD = 5
MAX_TOKEN = 64
DIM_X = 1440
DIM_Y = 2560
IMG_DIM = 224
PLACEHOLDER_IMG = 'Blank.jpg'
PLACEHOLDER_TEXT = 'n/a'
VAL_SIZE = 0.1
TEST_SIZE = 0.1

In [None]:
image_module_selection = ("mobilenet_v2_100_224", 224)
handle_base, pixels = image_module_selection
IMAGE_MODULE_HANDLE ="https://tfhub.dev/google/imagenet/{}/feature_vector/4".format(handle_base)
IMAGE_SIZE = (pixels, pixels)
print("Using {} with input size {}".format(IMAGE_MODULE_HANDLE, IMAGE_SIZE))

image_model = hub.KerasLayer(IMAGE_MODULE_HANDLE)

TEXT_MODULE_HANDLE ="https://tfhub.dev/google/tf2-preview/nnlm-en-dim128-with-normalization/1"
text_model = hub.KerasLayer(TEXT_MODULE_HANDLE)

In [None]:
Image.MAX_IMAGE_PIXELS = None

# We create a custom standardization function to lowercase the text and 
# remove punctuation.
def custom_standardization(input_data):
  lowercase = tf.strings.lower(input_data)
  return tf.strings.regex_replace(lowercase,
                                  '[%s]' % re.escape(string.punctuation), '')

# Gets all leaf nodes for a given element.
def get_leaf_nodes(element, leaf_nodes):
    if not element:
        return leaf_nodes
    if 'children' not in element:
        leaf_nodes.append(element)
        return leaf_nodes
    for child in element['children']:
        get_leaf_nodes(child, leaf_nodes)
    return leaf_nodes


def get_all_leaf_nodes(view_hierarchy_json):
    activity = view_hierarchy_json.get('activity')
    if not activity:
        return dataset
    root = activity.get('root')
    return get_leaf_nodes(root, [])


def get_target_text(leaf_nodes, x, y):
    target_text = None
    for leaf_node in leaf_nodes:
        bounds = leaf_node['bounds']
        if bounds[0] <= x and bounds[2] >= x and bounds[1] <= y and bounds[3] >= y:
            if 'text' in leaf_node:
                text = leaf_node['text'] or leaf_node.get('text-hint') 
                target_text = custom_standardization(str(text))
    return target_text


def get_target_image(screenshot, leaf_nodes, x, y):
    target_image = None
    for leaf_node in leaf_nodes:
        bounds = leaf_node['bounds']
        if bounds[0] <= x and bounds[2] >= x and bounds[1] <= y and bounds[3] >= y:
            temp_image = screenshot.crop(bounds)
            target_image = temp_image.resize((IMG_DIM, IMG_DIM), Image.ANTIALIAS)
            break
    return target_image


def get_target_image_index(screenshot, leaf_nodes, x, y):
    i = 0
    for leaf_node in leaf_nodes:
        bounds = leaf_node['bounds']
        if bounds[0] <= x and bounds[2] >= x and bounds[1] <= y and bounds[3] >= y:
            return i
        i += 1
    return -1
    

def get_leaf_node_texts(leaf_nodes):
    i = 1
    element_texts = []
    for leaf_node in leaf_nodes:
        if 'text' in leaf_node:
            text = leaf_node['text'] or leaf_node.get('text-hint')
            processed_text = custom_standardization(str(text))
            element_texts.append(processed_text)
        else:
            element_texts.append(PLACEHOLDER_TEXT)
            
        i += 1
        if i == MAX_TOKEN:
            break
    return element_texts


def get_leaf_node_images(screenshot, leaf_nodes):
    i = 0
    element_images = []
    for leaf_node in leaf_nodes:
        bounds = leaf_node['bounds']
        temp_image = screenshot
        image = temp_image.crop(bounds)
        resized_image = image.resize((IMG_DIM, IMG_DIM), Image.ANTIALIAS)
        element_images.append(resized_image)
        i += 1
        if i == MAX_TOKEN:
            break
    return element_images


def get_image_embeddings(images):
    image_list = []
    for image in images:
        temp = transforms.ToTensor()(image)
        image = tf.transpose(temp, perm=[2, 1, 0])
        image_list.append(image)

    embeddings = image_model(image_list)
    return embeddings


def get_text_embeddings(texts):
    text_list = []
    for text in texts:
        temp = transforms.ToTensor()(text)
        text_transformed = tf.transpose(temp, perm=[2, 1, 0])
        image_list.append(text_transformed)

    embeddings = text_model(text_list)
    return embeddings


def get_labels_tensor(target_index):
    if target_index >= 0:
        st = tf.sparse.SparseTensor(indices=[[target_index, 0]], values=[1], dense_shape=[MAX_TOKEN, 1])
        return tf.sparse.to_dense(st)
    else:
        return tf.zeros([MAX_TOKEN, 1])


# Identifies if a given gesture is a TOUCH gesture.
# In this task, we will only be focussing on TOUCH gestures.
def is_touch_gesture(gesture):
    if len(gesture) <= LONG_TOUCH_THRESHOLD:
        return True
    return False

In [None]:
# Processes view hierarchies to construct dataset.
# Extract texts from MAX_TOKEN elements from both view hierarchies.
# Construct the dataset in the following format -
# [[e11, e21, ... e1(MAX_TOKEN), e21, e22, ... e2(MAX_TOKEN), TARGET_TEXT], ...]
def process_view_hierarchy(view_hierarchy1, view_hierarchy2, dataset, is_positive_sample = True):
    if not view_hierarchy1 or not view_hierarchy2:
        return dataset
    
    trace_path = view_hierarchy1.split('view_hierarchies')[0]
    gesture_path = f'{trace_path}/gestures.json'
    with open(gesture_path) as file:
        gestures = json.load(file)

    with open(view_hierarchy1) as file:
        view_hierarchy1_json = json.load(file)
    with open(view_hierarchy2) as file:
        view_hierarchy2_json = json.load(file)
    
    if not view_hierarchy1_json or not view_hierarchy2_json:
        return dataset

    ui_number = view_hierarchy1.split('/')[-1].split('.')[0]
    gesture = gestures[ui_number]
    if not is_touch_gesture(gesture):
        return dataset
    
    if not len(gesture):
        return dataset
    
    screenshot1_path = f'{trace_path}/screenshots/{ui_number}.jpg'
    trace_path2 = view_hierarchy2.split('view_hierarchies')[0]
    ui_number2 = view_hierarchy2.split('/')[-1].split('.')[0]
    screenshot2_path = f'{trace_path2}/screenshots/{ui_number2}.jpg'
    
    # Plot the screenshot with position where click(tap) was performed.
    screenshot1 = PIL.Image.open(screenshot1_path)
    screenshot2 = PIL.Image.open(screenshot2_path)
    
    if not screenshot1 or not screenshot2:
        return dataset
    
    # The size of the screenshot is 1080 * 1920.
    # Rescaling it to 1440 * 2560, dimension used by view hierarchy data.
    resized_screenshot1 = screenshot1.resize((DIM_X, DIM_Y), Image.ANTIALIAS)
    resized_screenshot2 = screenshot2.resize((DIM_X, DIM_Y), Image.ANTIALIAS)
    
    x_cord = gesture[0][0]
    y_cord = gesture[0][1]
    x = x_cord * DIM_X
    y = y_cord * DIM_Y

    leaf_nodes1 = get_all_leaf_nodes(view_hierarchy1_json)
    leaf_nodes2 = get_all_leaf_nodes(view_hierarchy2_json)
    
    target_text = get_target_text(leaf_nodes1, x, y)
    if not target_text:
        return dataset

    target_image_index = get_target_image_index(resized_screenshot1, leaf_nodes1, x, y)
    if target_image_index == -1 or target_image_index >= MAX_TOKEN:
        return dataset
    
    screen1_element_text = get_leaf_node_texts(leaf_nodes1)
    screen2_element_text = get_leaf_node_texts(leaf_nodes2)
    
    screen1_element_image = get_leaf_node_images(resized_screenshot1, leaf_nodes1)
    screen2_element_image = get_leaf_node_images(resized_screenshot1, leaf_nodes2)
    
    placeholder = PIL.Image.open(PLACEHOLDER_IMG)
    resized_placeholder = placeholder.resize((IMG_DIM, IMG_DIM), Image.ANTIALIAS)
    for i in range(len(screen1_element), MAX_TOKEN):
        screen1_element_text.append(PLACEHOLDER_TEXT)
        screen1_element_image.append(resized_placeholder)
    for i in range(len(screen2_element), MAX_TOKEN):
        screen2_element_text.append(PLACEHOLDER_TEXT)
        screen2_element_image.append(resized_placeholder)
        
    image_embeddings = get_image_embeddings(screen1_element_image + screen2_element_image)
    text_embeddings = get_text_embeddings(screen1_element_text + screen2_element_text)
    
    if is_positive_sample:
        labels = get_labels_tensor(target_image_index)
        label = 1
    else:
        labels = get_labels_tensor(-1)
        label = 0
        
    entry=[]
    entry.append(image_embeddings)
    entry.append(labels)
    entry.append(label)
    entry.append(text_embeddings)
    dataset.append(entry)
    return dataset
        

def process_trace(trace_path, dataset):
    view_hierarchies_path = f'{trace_path}/view_hierarchies/*'
    view_hierarchies = sorted(glob.glob(view_hierarchies_path))
    for i in range(len(view_hierarchies) - 1):
        dataset = process_view_hierarchy(view_hierarchies[i], view_hierarchies[i+1], dataset)


def add_negative_samples(dataset):
    traces = sorted(glob.glob(TRACES_PATH))
    total_positive_samples = len(dataset)
    negative_samples_threshold = 0.1 * total_positive_samples
    negative_samples_counter = 0
    for i in range(len(traces) - 1):
        trace_path1 = traces[i]
        trace_path2 = traces[i+1]
        view_hierarchies1_path = sorted(glob.glob(f'{trace_path1}/view_hierarchies/*'))
        view_hierarchies2_path = sorted(glob.glob(f'{trace_path2}/view_hierarchies/*'))
        for (view_hierarchy1, view_hierarchy2) in zip(view_hierarchies1_path, view_hierarchies2_path):
            dataset = process_view_hierarchy(view_hierarchy1, view_hierarchy2, dataset, False)
            negative_samples_counter += 1
            if negative_samples_counter >= negative_samples_threshold:
                break
    return dataset


dataset = []
for trace_path in sorted(glob.glob(TRACES_PATH)):
    process_trace(trace_path, dataset)

dataset = add_negative_samples(dataset)