# LK2 Python API | Automation

## Small example graph
We will count the number of connected components in a (call-)graph.

In [None]:
import lynx.kite

lk = lynx.kite.LynxKite()

In [None]:
# First we use a random graph as input

graph = lk.createVertices(size=1000).createRandomEdges(degree=2, seed=12345)
component_metrics = graph.findConnectedComponents().sql('''
select 
max(size) as max_size,
min(size) as min_size,
count(*) as num_components
from `connected_components.vertices`''')

component_metrics.df()

## Automating a workspace
To automate the above code, 
- we need to put it into a `Workspace` (can be done easily using the `@lk.workspace()` decorator
- and we have to create a `WorkspaceSequence` which defines the scheduling of the workspace.

The schedule parameter of the `WorkspaceSequence` is a cron expression. (For example `'0 6 * * *'` means "at 6am every day".)

<center><h3>See Python source file</h3></center>
![python_source.png](python_source.png)

In [None]:
print(open('tedx_v1.py').read())

<center><h3>Airflow demo</h3></center>
![airflow_icon.png](airflow_icon.png)

## Using the execution date in automation
If the automated workspace has a workspace parameter called `date`,
then Airflow will automatically pass the execution date to the
workspace (converted to string).

<center><h3>See Python source file</h3></center>
![python_source.png](python_source.png)

In [None]:
import tedx_v2
components_by_date_wss = tedx_v2.get_components_by_date_wss(lk)
print(open('tedx_v2.py').read())

<center><h3>Airflow demo</h3></center>
![airflow_icon.png](airflow_icon.png)

## Accessing outputs of automation: snapshot sequences
Workspaces sequences automatically save the ouputs into snapshot sequences.
We can access these snapshots using the `SnapshotSequence` or `TableSnapshotSequence`
classes.

In [None]:
from datetime import datetime

# Using one output
# It needs the first output of the automation

result_sequence = components_by_date_wss.output_sequences['metrics']
one_output = result_sequence.read_date(datetime(2018,7,13,0,30))
one_output.df()

In [None]:
# Using union of output tables
# It needs the first five output of the automation

union_of_outputs = result_sequence.read_interval(datetime(2018,7,13,0,30),datetime(2018,7,13,4,30))
union_of_outputs.df()

In [None]:
# Working with outputs
union_of_outputs.sql('''
select date_id, num_components as max_num
from input 
where num_components=(select max(num_components) from input)''').df()

## Input recipes

In [None]:
input_folder = '/home/petererben/biggraph-dev/remote_api/python/documentation/tedx/automation/input'

def input_generator(date):
    import random
    path = input_folder + '/' + date.strftime("%Y-%m-%d-%H-%M") + '.csv'
    marker = input_folder + '/' + date.strftime("%Y-%m-%d-%H-%M") + '.SUCCESS'
    with open(path, 'w') as f:
        f.write('src,dst\n')
        for i in range(2000):
            src=random.randint(1,1000)
            dst=random.randint(1,1000)
            f.write(f'{src},{dst}\n')
    with open(marker, 'w') as f:
        f.write('READY')

<center><h3>See Python source file</h3></center>
![python_source.png](python_source.png)

In [None]:
import tedx_v3

example_date = datetime(2018,7,13,0,30)

# Define recipe
csv_recipe = tedx_v3.CSVRecipe(input_folder, lk)

# Target name
print(csv_recipe.full_path(example_date))

# Remove if exists
try:
    os.remove(csv_recipe.full_path(example_date))
    os.remove(csv_recipe.marker(example_date))
except:
    pass

# Check
print(csv_recipe.is_ready(example_date))

# Create
input_generator(example_date)

# Re-check
print(csv_recipe.is_ready(example_date))

# Load
csv_recipe.build_boxes(example_date).sql('select * from input limit 5').df()


<center><h3>See Python source file</h3></center>
![python_source.png](python_source.png)

In [None]:
components_from_inputs_wss = tedx_v3.get_components_from_inputs_wss(input_folder, lk)
print(open('tedx_v3.py').read())

<center><h3>Airflow demo</h3></center>
![airflow_icon.png](airflow_icon.png)

**Note**: the demo uses Airflow's sequential executor which does not allow parallel task execution. 

<center><h3>Input genration</h3></center>
![csv_icon.png](csv_icon.png)


In [None]:
import croniter
iter = croniter.croniter('30 * * * *', datetime(2018, 7, 13))
for i in range(100):
    input_generator(iter.next(datetime))

<center><h3>Airflow demo</h3></center>
![airflow_icon.png](airflow_icon.png)

## Automated exports (automation side effects)

<center><h3>See Python source file</h3></center>
![python_source.png](python_source.png)

In [None]:
# Double $ is Scala escaping, the folder will be used as a parametric parameter
output_folder = 'DATA$$/tedx/automation/output'
print(open('tedx_v4.py').read())

<center><h3>Airflow demo</h3></center>
![airflow_icon.png](airflow_icon.png)

In [None]:
# Check result
folder = lk.get_prefixed_path('DATA$/tedx/automation/output').resolved.split(':')[1]

import glob
list = glob.glob(folder + '/*')

list