<center><font size="12">Pipeline example</font> </center>

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import pandas as pd

In [3]:
from src.shared_code import connectors as conn, dataset
from src import data_assets, project_env, example

This notebook gives a structured example on how to leverage the template software architecture in a simple pipeline.

The framework stablishes the following sequence:
* Define a set of sources and sinks, by implementing  a `data connector` and pointing a `dataset` to  each one.
    * The `dataset` abstraction, when instanciate with a source, allows to read actual data (in this case a csv file) and then materialize it in an adeacuate data structure (a pandas DataFrame)
    * A `data connector` is instanciated using a factory function `connections.data_connections_factory` that takes as input the `data asset` (source or sink) definition
    * On the other thand, when the `dataset` is instaciated with a sink, the writing of the data to the actual storage implemented by the `data connector`
    * Shared `data assets` (sources and sinks) can be leveraged by importing from `src/shared_code/utils.py`, whereas project specific datasets should be implemented in `src/data_assets.py`
* Materialize the data in actual data structures (pandas DataFrames) and implement desired pipline

# Prerequisites

* Define the necesary environment variables to instanciate the data connection desired, in this case, a local one. Other data connectors many need other kind of pre-requisites, like implementing a credentials instance
* the file `data/raw/testing_io/test_reading.csv` should exist

In [4]:
project_env.set_env_vars()

# Args

In [5]:
factor = 2

# Define Sources

In [6]:
source_data = data_assets.sources[0]
source_data

DataAsset(name='testing_source', kind='local', layer='raw', path='testing_io/test_reading', extension='csv', description='features and labels from UCI datasets')

In [7]:
conn_src = conn.data_connections_factory(source_data)

In [8]:
ds_src = dataset.DatasetPandas(conn_src)

# Define Sinks

In [9]:
sink_data = data_assets.sources[0] 
sink_data

DataAsset(name='testing_source', kind='local', layer='raw', path='testing_io/test_reading', extension='csv', description='features and labels from UCI datasets')

In [10]:
conn_sink = conn.data_connections_factory(sink_data)

In [11]:
ds_sink = dataset.DatasetPandas(conn_sink)

# Run Pipeline

In [12]:
df_src = ds_src.read(func=pd.read_csv)
df_src.head()

Unnamed: 0,x_1,x_2,y
0,4,8,12
1,16,20,24


In [13]:
df_sink = example.example_pipeline(df_src, factor)

df_sink.head()

Unnamed: 0,x_1,x_2,y
0,8,16,24
1,32,40,48


In [14]:
ds_src.write(df_sink, kwargs={"index": False})