Skip to content

Commit

Permalink
adds parametrizing example
Browse files Browse the repository at this point in the history
  • Loading branch information
Eduardo Blancas Reyes committed Jan 29, 2020
1 parent 8c688e9 commit 96ca051
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 14 deletions.
67 changes: 54 additions & 13 deletions examples/parametrizing.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,22 @@
import pandas as pd

from ploomber import DAG
from ploomber.tasks import PythonCallable
from ploomber.products import File
from ploomber.tasks import PythonCallable, SQLUpload, SQLScript
from ploomber.products import File, SQLiteRelation
from ploomber.clients import SQLAlchemyClient


tmp_dir = Path(tempfile.mkdtemp())
path_to_db = 'sqlite:///' + str(tmp_dir / 'my_db.db')
print('temporary dir: ', tmp_dir)

dag = DAG()

client = SQLAlchemyClient(path_to_db)
dag.clients[SQLUpload] = client
dag.clients[SQLiteRelation] = client
dag.clients[SQLScript] = client


def get_data(product, filename):
"""Get red wine data
Expand All @@ -21,42 +31,73 @@ def get_data(product, filename):
'wine-quality/' + filename)
df = pd.read_csv(url,
sep=';', index_col=False)
# producg is a File type so you have to cast it to a str
df.to_csv(str(product))
df.to_parquet(str(product))


def concat_data(upstream, product):
"""Concatenate red and white wine data
"""
red = pd.read_csv(str(upstream['red']))
white = pd.read_csv(str(upstream['white']))
red = pd.read_parquet(str(upstream['red']))
red['kind'] = 'red'
white = pd.read_parquet(str(upstream['white']))
white['kind'] = 'white'
df = pd.concat([red, white])
df.to_csv(str(product))
df.to_parquet(str(product))


tmp_dir = Path(tempfile.mkdtemp())
print('temporary dir: ', tmp_dir)

# in both red_task and white_task, we use the same function get_data,
# but pass different parameters
red_task = PythonCallable(get_data,
product=File(tmp_dir / 'red.csv'),
product=File(tmp_dir / 'red.parquet'),
dag=dag,
name='red',
params={'filename': 'winequality-red.csv'})

white_task = PythonCallable(get_data,
product=File(tmp_dir / 'white.csv'),
product=File(tmp_dir / 'white.parquet'),
dag=dag,
name='white',
params={'filename': 'winequality-white.csv'})

concat_task = PythonCallable(concat_data,
product=File(tmp_dir / 'all.csv'),
product=File(tmp_dir / 'all.parquet'),
dag=dag, name='all')


upload_task = SQLUpload(tmp_dir / 'all.parquet',
product=SQLiteRelation((None, 'data', 'table')),
dag=dag,
name='upload')


# you can use jinja2 to parametrize SQL, {{upstream}} and {{product}}
# are available for your script. this way you could switch products without
# changing your source code (e.g. each Data Scientist in your team writes
# to his/her own db schema to have isolated runs)
sql = """
CREATE TABLE {{product}} AS
SELECT *,
pH > AVG(pH) AS high_pH
FROM {{upstream['upload']}}
"""

features = SQLScript(sql,
product=SQLiteRelation((None, 'features', 'table')),
dag=dag,
name='features')


red_task >> concat_task
white_task >> concat_task

concat_task >> upload_task >> features

# render will pass all parameters so you can see exactly which SQL code
# will be executed
dag.render()

# print source code for task "features"
print(dag['features'].source_code)


dag.build()
2 changes: 1 addition & 1 deletion src/ploomber/tasks/Task.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def __init__(self, source, product, dag, name, params=None):
----------
source: str or pathlib.Path
Source code for the task, for tasks that do not take source code
as input (such as PostgresCopy), this can be other thing. The
as input (such as PostgresCopy), this can be another thing. The
source can be a template and can make references to any parameter
in "params", "upstream" parameters or its own "product", not all
Tasks have templated source (templating code is mostly used by
Expand Down

0 comments on commit 96ca051

Please sign in to comment.