# First Prototype

**Implemented Features:**
* Nodes
* Artifacts
* Processes
* Edges (used, wasGeneratedBy) between Processes and Artifacts
* Edges (triggered) between Processes
* decorator API for graph creation via @optex_process and @optex_composition
* Implicit graph Accounts -- A Process node may contain some child_processes. These child_processes are Processes which are scoped within the original Node

**Not Implemented:**
* Agents
* Graph registration or anything with merging
* Roles
* Annotations

### TODO in this prototype:
* Edges (derivedFrom) between Artifacts. This will be done via a context manager.
* Better display of results. It's hard to read this code and understand how it corresponds to graph structure

### Feedback Requested
How is the graph structure? Is it easy to navigate programmatically? Probably not! What alternatives are there?

In [1]:
from __future__ import annotations
from typing import Sequence, Mapping, Any, Callable, Tuple
import dataclasses
import enum
import numpy as np
import pandas as pd
from functools import wraps

        
class Node():
    def __init__(self):
        self.parents = []
        self.children = []
        self.agents = []


class Artifact(Node):
    def __init__(self, data):
        super().__init__()
        self.data = data
        self.entered_scope = None
        self.name = 'default'
        
    def add_child(self, child):
        if not self.entered_scope:
            self.children.append(child)
        else:
            self.entered_scope.add_child_process(child)
    
    def add_parent(self, parent):
        self.parents.append(parent)
    
    def enter_scope(self, process):
        self.entered_scope = process
    
    def leave_scope(self, process):
        self.entered_scope = None
        self.parents[0].add_child_process(process)
        self.parents[0].remove_child(self)
        self.parents = []
        
    def __str__(self):
        return self.name


class Process(Node):
    def __init__(self, transformation):
        super().__init__()
        self.transformation = transformation
        self.child_processes = []

    def add_child(self, child):
        self.children.append(child)

    def add_parent(self, parent):
        if not parent.entered_scope:
            self.parents.append(parent)
            
    def add_child_process(self, process):
        self.child_processes.append(process)
        
    def remove_child(self, child):
        self.children = list(filter(lambda c: c != child, self.children))
        
    def __str__(self):
        return str(self.transformation)


def optex_composition(transformation):
    @wraps(transformation)
    def wrapped_function(*artifact_args, **artifact_kwargs):
        
        process = Process(transformation)

        for arg in artifact_args:
            arg.add_child(process)
            process.add_parent(arg)
            arg.enter_scope(process)

        for k, v in artifact_kwargs.items():
            v.add_child(process)
            process.add_parent(v)
            v.enter_scope(process)

        results = transformation(*artifact_args, **artifact_kwargs)

        if isinstance(results, Tuple):
            artifact_results = []
            for artifact in results:
                artifact.leave_scope(process)
                artifact.add_parent(process)
                artifact_results.append(artifact)
                process.add_child(artifact)
            return artifact_results
        else:
            results.leave_scope(process)
            results.add_parent(process)
            process.add_child(results)
            return results
    return wrapped_function


def optex_process(transformation):
    @wraps(transformation)
    def wrapped_function(*artifact_args, **artifact_kwargs):
        args = []
        kwargs = {}
        
        process = Process(transformation)

        for arg in artifact_args:
            arg.add_child(process)
            args.append(arg.data)
            process.add_parent(arg)

        for k, v in artifact_kwargs.items():
            v.add_child(process)
            kwargs[k] = v.data
            process.add_parent(v)

        results = transformation(*args, **kwargs)

        if isinstance(results, Tuple):
            artifact_results = []
            for result in results:
                artifact = Artifact(result)
                artifact.add_parent(process)
                artifact_results.append(artifact)
                process.add_child(artifact)
            return artifact_results
        else:
            artifact = Artifact(results)
            artifact.add_parent(process)
            process.add_child(artifact)
            return artifact
    return wrapped_function

In [2]:
@optex_process
def load_data(dataset_name):
    if dataset_name == "dataset_one":
        return pd.DataFrame({
            'a': [1,2,3,4,5,2,4],
            'b': [1,1,6,32,5,1,3],
            'fk': [1,1,0,1,0,0,0]
        })
    elif dataset_name == "dataset_two":
        return pd.DataFrame({
            'fk': [1,0],
            'x': [10,2],
            'y': [0,4],
            'z': [34,199]
        })
    else:
        raise Exception("Dataset not recognized!")


@optex_process
def join_datasets(df1, df2):
    return pd.merge(df1, df2, on="fk")


@optex_process
def standardize(df):
    return (df - df.mean()) / df.std()


class D1Pipeline:
    def __init__(self):
        pass
    
    @staticmethod
    @optex_process
    def transform(df):
        df = df.copy()
        df['a'] = df['a'] + 10
        return df


class D2Pipeline:
    def __init__(self):
        pass
    
    @staticmethod
    @optex_process
    def transform(df):
        fk = df['fk'].copy()
        # df = standardize(df.drop('fk', axis=1))
        df = (df - df.mean()) / df.std()
        df['fk'] = fk
        return df


class JoinedPipeline:
    def __init__(self):
        pass
    
    @staticmethod
    @optex_process
    def transform(df):
        df = df.drop('fk', axis=1)
        df['ax'] = df['a'] + df['x']
        df['by'] = df['b'] + df['y']
        return df.drop(columns=['a', 'b', 'x', 'y'])


class Pipeline:
    def __init__(self, d1_name, d2_name):
        self.dataset1 = load_data(Artifact(d1_name))
        self.dataset1.name = 'd1'
        self.dataset2 = load_data(Artifact(d2_name))
        self.dataset2.name = 'd2'
    
    def run_pipline(self):
        return Pipeline.transform(self.dataset1, self.dataset2)

    @staticmethod
    @optex_composition
    def transform(d1, d2):
        d1 = D1Pipeline.transform(d1)
        d1.name = 'd1p_out'
        d2 = D2Pipeline.transform(d2)
        d2.name = 'd2p_out'
        df = join_datasets(d1, d2)
        df.name = 'joined'
        final = JoinedPipeline.transform(df)
        final.name = 'final'
        return final

In [3]:
x = Pipeline("dataset_one", "dataset_two").run_pipline()

In [4]:
str(x)

'final'

In [5]:
str(x.parents[0])

'<function Pipeline.transform at 0x7fcd462749d0>'

In [6]:
list(map(str, x.parents[0].parents))

['d1', 'd2']

In [7]:
str(x.parents[0].parents[0].parents[0].parents[0].parents)

'[]'

In [8]:
str(x.parents[0].child_processes[0].children[0].children[0].parents[1])

'd2p_out'

In [9]:
from pyspark.sql import SparkSession
import time
from PIL import Image, ImageFilter
import deeplake
import torchvision.transforms as T

In [33]:
@optex_process
def start_spark_session():
    # Create SparkSession
    spark = SparkSession.builder \
        .master("local[1]") \
        .appName("pagerank") \
        .getOrCreate()
    
    return spark 

@optex_process
def load_imagenet_data(spark_session, batch_size, curr_epoch):
    
    # load dataset from deep lake
    # you MUST include a valid token to access the data
    # to get a valid token, you must create a deep lake account 
    ds = deeplake.load("hub://activeloop/imagenet-val", token='eyJhbGciOiJIUzUxMiIsImlhdCI6MTY2NzE0NTM3OCwiZXhwIjoxNjcxNTU1MzAwfQ.eyJpZCI6ImNiYXNpbGllcmUifQ.PiuT0jl1U9n8JgzrMCmCvsLxN4BXtQoJJzVHSgWOHLrNKmyKcSJhRjOpoNlqc2Jc2nharFq6D667n7IHymLtAA')
    
    # define function to translate deep lake tensors to pillow images
    transform = T.ToPILImage()
    
    # for the indices for this batch, translate the tensors to byte objects representing the images
    tensor_data = []
    i = 0 + batch_size*curr_epoch
    last_batch_index = i + batch_size
    while i < last_batch_index:
        tensor_data.append((ds.labels[i].data()['text'][0], transform(ds.images[i].numpy()).tobytes()))
        i = i + 1
        if i%100 == 0:
            print(i)
    df_labels = ['label', 'image']
    
    # create a spark dataframe from the image data
    image_df = spark_session.data.createDataFrame(data = tensor_data, schema = df_labels)
    image_df.printSchema()
    
    #finally, convert spark byte image objects to pillow image objects
    converted_image_df = image_df.rdd.map(lambda x: convert_spark_to_pil(x))
    
    return converted_image_df

In [34]:
spark_session = start_spark_session()

image_data = load_imagenet_data(Artifact(spark_session), Artifact(200), Artifact(0))

hub://activeloop/imagenet-val loaded successfully.
This dataset can be visualized in Jupyter Notebook by ds.visualize() or at https://app.activeloop.ai/activeloop/imagenet-val
100
200
root
 |-- label: string (nullable = true)
 |-- image: binary (nullable = true)



In [35]:
@optex_process
def resize_image(df, width, height):
    return df.map(lambda img: img.resize((width, height)))

@optex_process
def rotate_image(df, angle):
    return df.map(lambda img: img.rotate(angle=angle))

@optex_process
def blur_image(df):
    return df.map(lambda img: img.filter(ImageFilter.BLUR))

@optex_process
def recolor_image(df):
    return df.map(lambda img: img.convert(mode='CMYK'))

In [40]:
class ImagePipeline:
    def __init__(self, batch_size, number_epochs):
        self.batch_size = batch_size
        self.number_epochs = number_epochs
        self.spark_session = start_spark_session()
    
    def run_pipline(self):
        epoch_count = 0
        while epoch_count < self.number_epochs:
            image_df = load_imagenet_data(Artifact(self.spark_session), Artifact(self.batch_size), Artifact(epoch_count))
            batch_output = ImagePipeline.transform(image_df)
            epoch_count = epoch_count + 1
        return batch_output

    @staticmethod
    @optex_composition
    def transform(df):
        resized_df = resize_image(df, Artifact(100), Artifact(200))
        resized_df.name = 'resize_out'
        rotated_df = rotate_image(resized_df, Artifact(100))
        rotated_df.name = 'rotate_out'
        blur_df = blur_image(rotated_df)
        blur_df.name = 'blur_df'
        recolor_df = recolor_image(rotated_df)
        recolor_df.name = 'recolor_df'
        return recolor_df

In [41]:
images = ImagePipeline(200, 3).run_pipline()

hub://activeloop/imagenet-val loaded successfully.
This dataset can be visualized in Jupyter Notebook by ds.visualize() or at https://app.activeloop.ai/activeloop/imagenet-val
100
200
root
 |-- label: string (nullable = true)
 |-- image: binary (nullable = true)

hub://activeloop/imagenet-val loaded successfully.
This dataset can be visualized in Jupyter Notebook by ds.visualize() or at https://app.activeloop.ai/activeloop/imagenet-val
300
400
root
 |-- label: string (nullable = true)
 |-- image: binary (nullable = true)

hub://activeloop/imagenet-val loaded successfully.
This dataset can be visualized in Jupyter Notebook by ds.visualize() or at https://app.activeloop.ai/activeloop/imagenet-val
500
600
root
 |-- label: string (nullable = true)
 |-- image: binary (nullable = true)



In [44]:
str(images)

'recolor_df'