# Batch processing using Apache Beam - Python SDK
## A Python Pipeline with dataframe transform
0. Please download the new/input data from generate.py
1. I selected Apache Beam which is dedicated to batch processing and more. 

2. In this notebook, I set up the the environment and work through a naive approach using the DirectRunner (local compute power)

- General Logic

 - Start of the pipeline: Determines the what kind of `Read` transform I'll have to apply
 - What does data look like: determines how to represent in pipeline's PCollection, in our case json element has schema
 - What I want to do with data: data processing like parsing and splitting 70/30 for each class
 - What does my output data look like, and where should it go: Write transforms to json format

### Beam Pipeline

<img src="../img/beam_pipeline.jpg">

# Pipeline design

In [17]:
#setup 
import apache_beam as beam
import json
from apache_beam.dataframe.transforms import DataframeTransform
from apache_beam.dataframe.convert import to_dataframe
from apache_beam.dataframe.convert import to_pcollection

In [17]:
#ouput folder: where to save files
train_output_filename = "./result1/train/data.json"
eval_output_filename = "./result1/eval/data.json"

In [17]:
#A Dataframe Transform requires input PCollections to have a schema
#set schema for a json element
class applying_schema(typing.NamedTuple):
    """Represents a Json element with schema (classes and value)."""
    classes: str
    value: float
beam.coders.registry.register_coder(applying_schema, beam.coders.RowCoder)

In [17]:
# splitting Pcollection.
def split_dataset(elem, num_partitions, ratio):
    """Returns splitted pcoll
    Arg1: elem: the element being processed
    Arg2: num_partitions: How many partitions 
    Arg3: ratio: ratio of splitting
    Returns: The processed pcoll
    """
    assert num_partitions == len(ratio)
    bucket = sum(map(ord, json.dumps(elem))) % sum(ratio)
    total = 0
    for i, part in enumerate(ratio):
        total += part
        if bucket < total:
            return i
    return len(ratio) - 1

In [17]:
# Create a pipeline.
with beam.Pipeline() as pdf:
    # res is a Pcollection
    res = (pdf 
            |"reading json file" >> beam.io.ReadFromText("./my_data.json")
            |"loading json" >> beam.Map(json.loads) 
            |"applying schema" >> beam.Map(lambda x:applying_schema(**x)).with_output_types(applying_schema))
    
    
    # Converting to a beam dataframe    
    df = to_dataframe(res)
    
    
    # Two dataframes for each classes
    df1 = df.loc[df['classes']=='asset_1']
    df2 = df.loc[df['classes']=='asset_2']
    
    
    # Converting again to Pcollections to apply Partition (70/30) function
    # For asset_1 class
    train_1,eval_1 = (to_pcollection(df1) 
                      | 'Partition1' >> beam.Partition(split_dataset, 2, ratio=[7, 3]))
    # For asset_2 class
    train_2,eval_2 = (to_pcollection(df2) 
                      | 'Partition2' >> beam.Partition(split_dataset, 2, ratio=[7, 3]))

    
    # Aggregating 70/30 partition for each class to pcol
    #aggregating train and eval for asset_1 class
    TRAIN = ((train_1,train_2) |"aggregate train" >> beam.Flatten())
    #aggregating train and eval for asset_2 class
    EVAL = ((eval_1,eval_2)|"aggregate eval" >> beam.Flatten())
    
    
    # Writing each pcol to json files
    # Saving TRAIN data for each class to output folder result
    result1 = (TRAIN|"write to train" >> beam.io.WriteToText(train_output_filename))
    # Saving EVAL data for each class to output folder result
    result1 = (EVAL|"write to eval" >> beam.io.WriteToText(eval_output_filename))

