Skip to content

Commit

Permalink
fixes recipe
Browse files Browse the repository at this point in the history
  • Loading branch information
Eduardo Blancas Reyes committed Oct 4, 2020
1 parent 92bbd02 commit 4e2cca8
Showing 1 changed file with 39 additions and 30 deletions.
69 changes: 39 additions & 30 deletions recipes/polling.py
Expand Up @@ -34,38 +34,53 @@ def make(tmp):
dag.clients[SQLUpload] = client_target
dag.clients[SQLiteRelation] = client_target

cur = client_target.connection.execute("""
SELECT name FROM sqlite_master WHERE type='table' AND name='plus_one'""")

if cur.fetchone():
cur = client_target.connection.execute('SELECT MAX(id) FROM plus_one')
last_id = cur.fetchone()[0]
else:
last_id = None

# we dump new observations to this file
dumped_data = File(tmp / 'x.csv')
# we add a hook that allows us to save info on the latest seen value
dumped_data.prepare_metadata = add_last_value

# the actual task that dumps data
dump = SQLDump("""
-- if the product (x.csv file) already has metadata (it has run before)
-- use the value in the metadata to only get row above such value
{% if product.metadata.timestamp %}
SELECT * FROM data
WHERE x > {{product.metadata['last_value']}}
-- if there is no metadata, just get everything
{% else %}
SELECT * FROM data
{% endif %}
""", dumped_data, dag=dag, name='dump', chunksize=None)
{% if last_id %}
WHERE id > {{last_id}}
{% endif %}
""",
dumped_data,
dag=dag,
name='dump',
chunksize=None,
params=dict(last_id=last_id))

# on finish hook, will stop DAG execution if there aren't new observations
dump.on_finish = dump_on_finish

# a dummy task to modify the data
plus_one = PythonCallable(_plus_one, File(tmp / 'plus_one.csv'),
dag=dag, name='plus_one')
plus_one = PythonCallable(_plus_one,
File(tmp / 'plus_one.csv'),
dag=dag,
name='plus_one')

# upload the data to the target database
upload = SQLUpload('{{upstream["plus_one"]}}',
product=SQLiteRelation((None, 'plus_one', 'table')),
dag=dag,
name='upload',
# append observations if the table already exists
to_sql_kwargs={'if_exists': 'append', 'index': False})
upload = SQLUpload(
'{{upstream["plus_one"]}}',
product=SQLiteRelation((None, 'plus_one', 'table')),
dag=dag,
name='upload',
# append observations if the table already exists
to_sql_kwargs={
'if_exists': 'append',
'index': False
})

dump >> plus_one >> upload

Expand All @@ -88,8 +103,7 @@ def add_last_value(metadata, product):

# if running this for the first time or last value it's bigger than the
# one we saved, save a new last_value
if ('last_value' not in metadata
or new_max > metadata['last_value']):
if ('last_value' not in metadata or new_max > metadata['last_value']):
metadata['last_value'] = new_max
return metadata

Expand All @@ -110,47 +124,42 @@ def dump_on_finish(product):
raise DAGBuildEarlyStop('No new observations')


# create dag
tmp = tempfile.mkdtemp()
dag = make(tmp)


# add some sample data to the database
engine = create_engine('sqlite:///' + str(Path(tmp, 'source.db')))
df = pd.DataFrame({'x': range(10)})
df = pd.DataFrame({'x': range(10), 'id': range(10)})
df.to_sql('data', engine)

target = create_engine('sqlite:///' + str(Path(tmp, 'target.db')))

# run dag, should pull this first 10 observations
dag.build(force=True)
make(tmp).build(force=True)

# checking downloaded data with the plus one added
df = pd.read_sql('SELECT * FROM plus_one', target)
assert df.x.max() == 10 and df.shape[0] == 10
df

# run the dag again, this time plus one should be the same
dag.build(force=True)
# instantiate and run the dag again, this time plus one should be the same
make(tmp).build(force=True)

df = pd.read_sql('SELECT * FROM plus_one', target)
assert df.x.max() == 10 and df.shape[0] == 10
df


# simulate new data arrival
df = pd.DataFrame({'x': range(10)})
df = pd.DataFrame({'x': range(10), 'id': range(10, 20)})
df['x'] = df.x + 10
df.to_sql('data', engine, if_exists='append')

# +
# should only get new rows
dag.build(force=True)
make(tmp).build(force=True)

df = pd.read_sql('SELECT * FROM plus_one', target)
assert df.x.max() == 20 and df.shape[0] == 20
df
# -


shutil.rmtree(tmp)

0 comments on commit 4e2cca8

Please sign in to comment.