## Tiny Blocks Examples

1. Simple Pipeline
2. Merging (FanIn)
3. Splitting (FanOut)

In [1]:
# extract blocks
from tiny_blocks.extract import FromCSV
from tiny_blocks.extract import FromSQLQuery

# transform blocks 
from tiny_blocks.transform import DropDuplicates
from tiny_blocks.transform import Fillna
from tiny_blocks.transform import Rename
from tiny_blocks.transform import Merge
from tiny_blocks.transform import Apply
from tiny_blocks.transform import Sort

# load blocks
from tiny_blocks.load import ToCSV
from tiny_blocks.load import ToSQL
from tiny_blocks.load.to_kafka import ToKafka

# pipeline operations
from tiny_blocks import FanIn, FanOut

# mock data
from tests.conftest import add_mocked_data
from tests.conftest import delete_mocked_data

import pandas as pd

In [2]:
add_mocked_data()

In [4]:
# check sources
df1 = pd.read_csv("/code/tests/data/source.csv", sep="|")
df2 = pd.read_sql_table(con="postgresql+psycopg2://user:pass@postgres:5432/db", table_name="source")

df2

Unnamed: 0,a,b,c
0,uno,4,7.0
1,dos,5,8.0
2,dos,6,


### Example 1. Basic Pipeline 

In [8]:
# ETL
# extract block
from_sql = FromSQLQuery(
    dsn_conn="postgresql+psycopg2://user:pass@postgres:5432/db", 
    sql="select * from source"
)

# transform blocks
fillna = Fillna(value="Hola Mundo")
drop_dupl = DropDuplicates(subset=['b']) # play with subset or not
sort = Sort(by=['b'], ascending=False)  # play with a or b

# load block
to_csv = ToCSV(path="/code/tests/data/sink.csv")

In [9]:
''' 
Read from SQL -> Fill Null -> Drop Duplicates -> Sort columns -> Write to CSV
'''

from_sql >> fillna >> drop_dupl >> sort >> to_csv


# You can run it also like this

# generator = from_sql.get_iter()
# generator = fillna.get_iter(generator)
# generator = drop_dupl.get_iter(generator)
# generator = sort.get_iter(generator)
# to_csv.exhaust(generator)

In [10]:
df = pd.read_csv(to_csv.path, sep="|")
df

Unnamed: 0,a,b,c
0,dos,6,Hola Mundo
1,dos,5,8.0
2,uno,4,7.0


### Example 2. Merging Pipes (FanIn)

In [11]:
# extract
from_sql = FromSQLQuery(
    dsn_conn="postgresql+psycopg2://user:pass@postgres:5432/db", 
    sql="select * from source"
)
from_csv = FromCSV(path="/code/tests/data/source.csv")

# transform
fillna = Fillna(value="Hola Mundo")
merge = Merge(how="left", left_on="a", right_on="d")
drop_dupl = DropDuplicates(subset=['a'])
rename = Rename(columns={"a": "New_A"})

# load
to_csv = ToCSV(path="/code/tests/data/sink.csv")

In [12]:
'''
read SQL -> FillNull -|
                      |-> Merge -> Drop Duplicates -> Rename Column -> Write to CSV
read CSV -------------|
'''

FanIn(from_sql >> fillna, from_csv) >> merge >> drop_dupl >> rename >> to_csv

In [13]:
df = pd.read_csv(to_csv.path, sep="|")
df

Unnamed: 0,New_A,b,c,d,e,f
0,dos,5,8.0,dos,dos,
1,uno,4,7.0,uno,uno,7.0


### Example 3. FanOut

In [14]:
# extract
from_csv = FromCSV(path="/code/tests/data/source1.csv")

# transform. Apply a custom func to a column and set result in the same column
apply = Apply(
    func=lambda x: x+1, 
    apply_to_column="A", 
    set_to_column="A"
)

# load
to_csv_1 = ToCSV(path="/code/tests/data/sink1.csv")
to_csv_2 = ToCSV(path="/code/tests/data/sink2.csv")
to_csv_3 = ToCSV(path="/code/tests/data/sink3.csv")

In [16]:
# check source
df = pd.read_csv(from_csv.path, sep="|")
df

Unnamed: 0,A,B
0,1,Hola
1,1,Hola
2,1,Mundo


In [17]:
'''
                                                                
read SQL -> Appply +1  | -> Appply +1 -> Appply +1 -> |-> Appply +1 -> CSV3
                       |                              |
                       |                              |
                       | -> CSV1                      | -> CSV2

'''

from_csv >> apply >> FanOut(to_csv_1) >> apply >> apply >> FanOut(to_csv_2) >> apply >> to_csv_3

In [18]:
df_1 = pd.read_csv(to_csv_1.path, sep="|")
df_2 = pd.read_csv(to_csv_2.path, sep="|")
df_3 = pd.read_csv(to_csv_3.path, sep="|")

print(df_1.iloc[0,0])  # 2
print(df_2.iloc[0,0])  # 4
print(df_3.iloc[0,0])  # 5

2
4
5


In [20]:
df_3

Unnamed: 0,A,B
0,5,Hola
1,5,Hola
2,5,Mundo


In [15]:
delete_mocked_data()