In [4]:
import ujson, csv, time
from dtran import Pipeline, IFunc, ArgType
from funcs import ReadFunc, WriteFuncGraph, UnitTransFunc
from pydrepr import Graph

DEMO_CSV_INPUT_FILE = "./examples/s01_ethiopia_commodity_price.csv"

# This is a demo with an example flow for the MINT transformation pipeline

This notebook presents an abstraction of the MINT Data Transformations architecture and walks thourgh an example through the pipeline we are suggesting; we show a simple CSV --(unit transformations)--> CSV flow.

In [5]:
# show first 5 rows of the input demo file
import pandas as pd
df = pd.read_csv(DEMO_CSV_INPUT_FILE)
print(df.head(5))

  Indicator Code              IndicatorName   Unit Frequency     Date  Value
0    CRUDE_PETRO  Crude oil, average($/bbl)  $/bbl         M  1960M01   1.63
1    CRUDE_PETRO  Crude oil, average($/bbl)  $/bbl         M  1960M02   1.63
2    CRUDE_PETRO  Crude oil, average($/bbl)  $/bbl         M  1960M03   1.63
3    CRUDE_PETRO  Crude oil, average($/bbl)  $/bbl         M  1960M04   1.63
4    CRUDE_PETRO  Crude oil, average($/bbl)  $/bbl         M  1960M05   1.63


# (Semantic) Description of a component (reader/writer/transformation functions)

Providing semantic description of a function may allow constructing the pipeline semi-automatically. There are two levels of a semantic description:

1. Providing definition for inputs/outputs of a component: it enables input data validation and compatibility checking between connected components.
2. Providing definition of the purpose of the component: it allows us to construct the transformation pipeline based on some specification from users

```python
class IFunc(abc.ABC):
    id: str = ""
        
    # level 2
    description = None
    
    # level 1
    inputs: Dict[str, ArgType] = {}
    outputs: Dict[str, ArgType] = {}

    @abc.abstractmethod
    def validate(self) -> bool:
        """
        Check if the inputs are correct or not
        :return:
        """
        raise NotImplementedError()

    @abc.abstractmethod
    def exec(self) -> dict:
        """
        Execute the transformation function and return the result
        :return:
        """
        raise NotImplementedError()
```

## This is the semantic description of the interface (control plane) between all of the components in the pipeline (Reader/Writer/Transformation Adapters)


## Here's an example flow:

## Component 1: Reader Adapter

An instance of a reader adapter can be used as an entry point in the pipeline. It reads an input file `input.csv` file and a `input.yaml` file describing the D-REPR layout of this file. The data are representated in general way in a python object (Graph or NumPY array) and will be used in the next steps in the pipeline.

```python
class ReadFunc(IFunc):
    id = "read_func"
    inputs = {
        "repr_file": ArgType.FilePath, 
        "resources": ArgType.String
    }
    outputs = {"data": ArgType.Graph(None)}
    
    ...
```

## Component 2: Transformation Adapter

An instance of a transformation adapter does not materialize the data into an output, it just reproduces the data, transformaing its content (the actual data) and performing the needed configrations in the 'control plane' parameters.
Given the pythonic object (graph) representing the data and the rest of the needed configurations in the calling API ('control plane' parameters) we can re-construct the data in a new pythonic object (graph) and prepare it for the next steps in the pipeline.

```python
class TransFunc(IFunc):
    id = "trans_func"
    inputs = {"graph": ArgType.Graph(None)}
    outputs = {"graph": ArgType.Graph(None)}
    
    def __init__(self, graph: Graph):
        self.graph = graph
        
    def exec(self):
        for node in self.graph.nodes:
            # do something for each node
        return {
            "graph": self.graph
        }
    ...
```

## Component 3: Writer Adapter

An instance of a writer adapter can be used as an exit point in the pipeline. It writes an output file `output.csv` based on a given `output.yaml` (D-REPR layout) and an additional configuration file `output.config`

```python
class WriteFuncGraph(IFunc):
    id = "write_func_graph"
    inputs = {"graph": ArgType.Graph(None), "repr_file": ArgType.FilePath, "main_class": ArgType.String}
    outputs = {"resources": ArgType.String}

    def __init__(self, graph: Graph, repr_file: str, main_class: str):
        self.graph = graph
        self.main_class = main_class

        self.repr = Repr.from_file(repr_file)

    def exec(self) -> dict:
        all_data_rows = []
        for rid, resource in self.repr.iter_resources():
            if resource["type"] == "csv":
                all_data_rows = self.tabularize_data()
                self._dump_to_csv(all_data_rows)
            elif resource["type"] == "json":
                all_data_rows = self.tabularize_data()
                self._dump_to_json(all_data_rows)
        return {"data": all_data_rows}
    ...

```

Main idea for generalizing writers: 
- Implement a random indexer for each file type. 
- Design a configuration file to specify dimensional mapping between different attributes 
    1. extend Binh's representation file
    2. redesign a new configuration file

# Pipeline

In [7]:
DEMO_CSV_OUTPUT_FILE = "./examples/s01_ethiopia_commodity_price_write.csv"

inputs = {
    ReadFunc.I.repr_file: "./examples/s01_ethiopia_commodity_price.yml",
    ReadFunc.I.resources: DEMO_CSV_INPUT_FILE,
    UnitTransFunc.I.unit_value: "rdf:value",
    UnitTransFunc.I.unit_label: "eg:unit",
    UnitTransFunc.I.unit_desired: "$/liter",
    WriteFuncGraph.I.main_class: "qb:Observation",
    WriteFuncGraph.I.output_file: DEMO_CSV_OUTPUT_FILE
}

pipeline = Pipeline([
    ReadFunc,
    UnitTransFunc,
    WriteFuncGraph
], wired=[
    ReadFunc.O.data == UnitTransFunc.I.graph,
    UnitTransFunc.O.graph == WriteFuncGraph.I.graph
])
outputs = pipeline.exec(inputs)

In [9]:
# show first 5 rows of the output demo file
import pandas as pd
df = pd.read_csv(DEMO_CSV_OUTPUT_FILE)
print(df.head(5))

   eg:unit                 rdfs:label  rdf:value eg:recorded_at
0  $/liter  Crude oil, average($/bbl)   0.010252     1960-01-01
1  $/liter  Crude oil, average($/bbl)   0.010252     1960-02-01
2  $/liter  Crude oil, average($/bbl)   0.010252     1960-03-01
3  $/liter  Crude oil, average($/bbl)   0.010252     1960-04-01
4  $/liter  Crude oil, average($/bbl)   0.010252     1960-05-01
