### Set up
1. Run `pip install -r requirements.txt`
2. Create a .env file
3. Create the following variables in the .env file
    - afsid
    - GCLB
    - APP_ID
        - https://create.appfarm.io/leasi/apps/<APP_ID>
    - email (your appfarm email)
    - password (your appfarm password)
    - chrome_driver_path
        - https://googlechromelabs.github.io/chrome-for-testing/
        - download the latest stable chromedriver version for your OS
        - specify the file path of the chromedriver.exe file
4. afsid and GCLB can be found in the cookies of the appfarm website
    - Log into Leasi app
    - Inspect element > Network > Refresh
    - Search for whoami > Headers > Request Headers > Cookie
5. afsid and GCLB tokens will change occasionally, so you will need to update them

In [None]:
import copy
import json
import os
import re
import requests
import subprocess
from bidict import bidict
from collections import Counter, defaultdict
from dotenv import load_dotenv
from enum import Enum
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By

class DataSource(Enum):
    DATABASE_CONNECTED = 'database_connected'
    DATA_CONNECTOR = 'data_connector'
    RUNTIME = 'runtime'

class CustomEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, Enum):
            return obj.value
        if isinstance(obj, set):
            return list(obj)
        return super().default(obj)

load_dotenv()

APP_ID = os.getenv("APP_ID")
COOKIES = {
    'afsid': os.getenv("afsid"),
    'GCLB': os.getenv("GCLB"),
}
CHROME_DRIVER_PATH = os.getenv("chrome_driver_path")
URL = "https://create.appfarm.io/leasi/apps/81rSrt/data"

os.makedirs("out", exist_ok=True)
os.makedirs("out/result", exist_ok=True)

def fetch_json(url, filename):
    response = requests.get(url, cookies=COOKIES)
    print(response.status_code)
    response = response.json()
    with open(f"./out/{filename}.json", "w") as f:
        json.dump(response, f, indent=4)

def fetch_js(url, filename):
    response = requests.get(url, cookies=COOKIES)
    print(response.status_code)
    response = response.text
    
    pattern = (
        r"window\.AF_APP_" + re.escape(APP_ID) + r"\s*=\s*\{\s*\.\.\.window\.AF_APP_" +
        re.escape(APP_ID) + r",\s*(fn\d+:\(\{resolve,reject\}\)=>\{.*?\})\s*\}"
    )
    
    replacement = r"module.exports = {\1}"
    
    response = re.sub(
        pattern,
        replacement,
        response,
        flags=re.DOTALL
    )

    with open(f"./out/{filename}.js", "w", encoding="utf-8") as f:
        f.write(response)
    
    subprocess.run(
        ["node", "format.js"],
        capture_output=True,
        text=True
    )
    

def load_data(filename):
    try:
        with open(f"./out/{filename}.json", "r") as f:
            data = json.load(f)
        return data
    except FileNotFoundError:
        return None

Load in data

In [None]:
fetch_json(f'https://leasi-dev.appfarm.app/api/v1/apps/{APP_ID}/metadata/actions', 'actions')
fetch_json(f'https://leasi-dev.appfarm.app/api/v1/apps/{APP_ID}/metadata/app', 'app')
fetch_json(f'https://leasi-dev.appfarm.app/api/v1/apps/{APP_ID}/metadata/datasources', 'datasources')
fetch_json(f'https://leasi-dev.appfarm.app/api/v1/apps/{APP_ID}/metadata/enumeratedtypes', 'enumeratedtypes')
fetch_js(f'https://leasi-dev.appfarm.app/api/v1/apps/{APP_ID}/metadata/functions', 'functions')
fetch_json(f'https://leasi-dev.appfarm.app/api/v1/apps/{APP_ID}/metadata/layout', 'layout')
fetch_json(f'https://leasi-dev.appfarm.app/api/v1/apps/{APP_ID}/metadata/objectclasses', 'objectclasses')

Finds the object class ID of the given object with the nodeName. If an object class is not referenced in another object class, this approach does not work.

NOTE: Some nodes have been renamed so there can be multiple object class IDs mapping to the same nodeName. It should normally be the one with the highest occurance.

In [None]:
object_classes = load_data('objectclasses')['objectClasses']

def get_referenceId(obj):
    result = Counter()
    for object_class in object_classes:
        for item in object_class.get('properties', []):
            if item.get('nodeName') == obj:
                result[item.get('referenceId')] += 1
    return result

get_referenceId('product')

Groups all data sources with the same object class id.

NOTE: There is no name associated with the object class id, so you will need to manually find the name of the object class. Object classs name can be inferred from the data source names.

In [None]:
data_sources = load_data('datasources')['dataSources']

def group_data_sources_by_object_class_id():
    result = defaultdict(list)
    for data_source in data_sources.values():
        if data_source.get('name') == 'GENERATED':
            continue
        if data_source.get('dataSourceType') == 'OBJECT_CLASS':
            data_type = DataSource.DATABASE_CONNECTED
            if data_source.get('dataConnector'):
                data_type = DataSource.DATA_CONNECTOR
            elif data_source.get('local'):
                data_type = DataSource.RUNTIME
            result[data_source.get('objectClassId')].append({
                'id': data_source.get('id'),
                'name': data_source.get('name'),
                'cardinality': data_source.get('cardinality'),
                'data_source_type': data_type,
                'runtime_properties': set([k for k, v in data_source.get('properties', {}).items() if v.get('runtime', False)])
            })
    return result

grouped_data_sources = group_data_sources_by_object_class_id()

In [None]:
with open('./out/result/grouped_data_sources.json', 'w') as f:
    json.dump(grouped_data_sources, f, indent=4, cls=CustomEncoder)

Web scraper to map object class id to object class name.

In [None]:
# Load page
service = Service(CHROME_DRIVER_PATH)
driver = webdriver.Chrome(service=service)
driver.implicitly_wait(10)
driver.get(URL)

In [None]:
# Login
login_with_google = driver.find_element(By.CSS_SELECTOR, ".MuiButtonBase-root.MuiButton-root.MuiButton-contained")
login_with_google.click()

email = driver.find_element(By.XPATH, '//*[@id="identifierId"]')
email.send_keys(os.getenv("email"))
next_button = driver.find_element(By.XPATH, '//*[@id="identifierNext"]/div/button')
next_button.click()

password = driver.find_element(By.XPATH, '//*[@id="password"]/div[1]/div/div[1]/input')
password.send_keys(os.getenv("password"))
next_button = driver.find_element(By.XPATH, '//*[@id="passwordNext"]/div/button')
next_button.click()

In [None]:
object_class_names = load_data('object_class_names') or {}

def open_search():
    search = driver.find_element(By.XPATH, '//*[@id="app"]/div/div/main/div/div/div/div[1]/div[2]/div[1]/div[1]/div/div[2]/button')
    search.click()

def clear_search():
    search_clear = driver.find_element(By.XPATH, '//*[@id="app"]/div/div/main/div[1]/div/div/div[1]/div[2]/div[1]/div[1]/div/div[2]/button')
    search_clear.click()

def search_menu_item(name):
    search_input = driver.find_element(By.XPATH, '//*[@id="app"]/div/div/main/div/div/div/div[1]/div[2]/div[1]/div[1]/div/div[1]/input')
    search_input.send_keys(name)

    menu_items = driver.find_element(By.XPATH, '//*[@id="app"]/div/div/main/div/div/div/div[1]/div[2]/div[2]/div/div[1]/ul').find_elements(By.TAG_NAME, "div")
    for menu_item in menu_items:
        if menu_item.text == name:
            menu_item.click()
            break

def get_object_class_name(data_source_id):
    object_class_name = driver.find_element(By.XPATH, f'//*[@id="p_{data_source_id}_objectClassId"]/div/span')
    return object_class_name.text

def map_object_class_names(grouped_data_sources):
    result = {}
    for object_class_id, data_sources in grouped_data_sources.items():
        if object_class_id in object_class_names:
            continue
        open_search()
        search_menu_item(data_sources[0]['name'])
        clear_search()
        object_class_name = get_object_class_name(data_sources[0]['id'])
        result[object_class_id] = object_class_name
    return result

object_class_names |= map_object_class_names(grouped_data_sources)
driver.quit()

In [None]:
with open('./out/object_class_names.json', 'w') as f:
    json.dump(object_class_names, f, indent=4, cls=CustomEncoder)

Finds all actions associated with the data source that satisfy `is_valid_node`.

In [None]:
actions = load_data('actions')['actions']

def is_valid_node(data_source_type, action_node):
    if data_source_type == DataSource.DATABASE_CONNECTED:
        return action_node.get('nodeType') in {'create_object', 'modify_object', 'delete_object'}
    elif data_source_type == DataSource.DATA_CONNECTOR:
        return action_node.get('nodeType') in {'create_object', 'modify_object', 'delete_object'}
    elif data_source_type == DataSource.RUNTIME:
        return action_node.get('nodeType') in {'persist'}

def find_action(action, action_nodes, object_data_source, disabled=False):
    result = []
    for action_node in action_nodes:
        is_disabled = disabled or action_node.get('disabled', False)
        if action_node.get('dataSourceId') == object_data_source.get('id'):
            if is_valid_node(object_data_source.get('data_source_type'), action_node):
                result.append({
                    'action_id': action.get('id'),
                    'action_name': action.get('name'),
                    'action_node_id': action_node.get('id'),
                    'action_node_name': action_node.get('name'),
                    'action_node_type': action_node.get('nodeType'),
                    'disabled': is_disabled,
                    'update_values': action_node.get('defaultValues')
                })
        if action_node.get('children'):
            result += find_action(action, action_node['children'], object_data_source, is_disabled)
    return result

def get_actions(object_data_sources):
    result = []
    for object_data_source in object_data_sources:
        actions_for_data_source = []
        for action in actions.values():
            actions_for_data_source += find_action(action, action['actionNodes'], object_data_source)
        object_data_source['actions'] = actions_for_data_source
        result.append(object_data_source)
    return result

object_class_names = bidict(object_class_names)
object_data_sources = grouped_data_sources[object_class_names.inv['Product']]
objects = get_actions(object_data_sources)
objects

In [None]:
# simple dfs to analyse action.json
node_type = Counter()

def dfs(actions, f):
    def helper(action_nodes, action, f):
        for action_node in action_nodes:
            f(action_node, action)
            helper(action_node.get('children', []), action, f)
    for action in actions.values():
        helper(action.get('actionNodes', []), action, f)

def count_node_type(action_node, *args):
    node_type[action_node.get('nodeType')] += 1

dfs(actions, count_node_type)
node_type

In [None]:
# process result
def get_action_counts(objects):
    result = []
    for obj in objects:
        counts = len(obj.get('actions'))
        if counts == 0:
            continue
        result.append({
            'name': obj.get('name'),
            'counts': counts
        })
    return result

def remove_disabled(objects):
    result = []
    for obj in objects:
        obj = copy.deepcopy(obj)
        actions = obj.get('actions')
        actions = [action for action in actions if not action.get('disabled')]
        if len(actions) > 0:
            obj['actions'] = actions
            result.append(obj)
    return result

def remove_runtime_properties(objects, object_data_sources):
    object_data_sources = {obj.get('id'): obj for obj in object_data_sources}
    result = []
    for obj in objects:
        object_data_source = object_data_sources.get(obj.get('id'))
        actions = []
        for action in obj.get('actions', []):
            action = copy.deepcopy(action)
            if action['action_node_type'] != 'modify_object':
                actions.append(action)
                continue
            runtime_properties = object_data_source.get('runtime_properties')
            update_values = action.get('update_values') or []
            action['update_values'] = [item for item in update_values if item.get('propertyId') not in runtime_properties]
            if len(action['update_values']) > 0:
                actions.append(action)
        obj['actions'] = actions
        result.append(obj)
    return result

def generate_link(objects):
    objects = copy.deepcopy(objects)
    for obj in objects:
        for action in obj.get('actions', []):
            action['url'] = f"https://create.appfarm.io/leasi/apps/{APP_ID}/actions/{action['action_id']}/{action['action_node_id']}"
    return objects

In [None]:
result = generate_link(remove_runtime_properties(remove_disabled(objects), object_data_sources))
get_action_counts(result)

In [None]:
# save result
with open('./out/result/result.json', 'w') as f:
    json.dump(result, f, cls=CustomEncoder, indent=4)

### Findings

functions.json: ids map to a function called in any of the other json files with "functionValue" as its key.