diff --git a/import_flow_events.py b/import_flow_events.py index f671737..fa05eed 100644 --- a/import_flow_events.py +++ b/import_flow_events.py @@ -79,7 +79,8 @@ utm_content VARCHAR(40) ENCODE lzo, utm_medium VARCHAR(40) ENCODE lzo, utm_source VARCHAR(40) ENCODE lzo, - utm_term VARCHAR(40) ENCODE lzo + utm_term VARCHAR(40) ENCODE lzo, + export_date DATE NOT NULL ENCODE lzo ); """ Q_CREATE_EVENTS_TABLE = """ @@ -89,23 +90,30 @@ -- but redshift doesn't support that. flow_time BIGINT NOT NULL ENCODE lzo, flow_id VARCHAR(64) NOT NULL DISTKEY ENCODE lzo, - type VARCHAR(64) NOT NULL ENCODE lzo + type VARCHAR(64) NOT NULL ENCODE lzo, + export_date DATE NOT NULL ENCODE lzo ); """ Q_CHECK_FOR_DAY = """ SELECT timestamp FROM flow_events - WHERE timestamp::DATE = '{day}'::DATE + WHERE timestamp::DATE >= '{day}'::DATE - '1 day'::INTERVAL + AND timestamp::DATE <= '{day}'::DATE + '1 day'::INTERVAL + AND export_date = '{day}'::DATE LIMIT 1; """ -Q_CLEAR_DAY_METADATA = """ - DELETE FROM flow_metadata - WHERE begin_time::DATE = '{day}'::DATE; -""" Q_CLEAR_DAY_EVENTS = """ DELETE FROM flow_events - WHERE timestamp::DATE = '{day}'::DATE; + WHERE timestamp::DATE >= '{day}'::DATE - '1 day'::INTERVAL + AND timestamp::DATE <= '{day}'::DATE + '1 day'::INTERVAL + AND export_date = '{day}'::DATE; +""" +Q_CLEAR_DAY_METADATA = """ + DELETE FROM flow_metadata + WHERE begin_time::DATE >= '{day}'::DATE - '1 day'::INTERVAL + AND begin_time::DATE <= '{day}'::DATE + '1 day'::INTERVAL + AND export_date = '{day}'::DATE; """ Q_COPY_CSV = """ @@ -148,7 +156,8 @@ utm_content, utm_medium, utm_source, - utm_term + utm_term, + export_date ) SELECT flow_id, @@ -164,7 +173,8 @@ utm_content, utm_medium, utm_source, - utm_term + utm_term, + '{day}'::DATE FROM temporary_raw_flow_data WHERE type LIKE 'flow%begin'; """ @@ -173,7 +183,9 @@ SET duration = durations.flow_time FROM ( SELECT flow_id, MAX(flow_time) AS flow_time - FROM temporary_raw_flow_data + FROM flow_events + WHERE "timestamp" >= '{day}'::DATE + AND "timestamp" <= '{day}'::DATE + '1 day'::INTERVAL GROUP BY flow_id ) AS durations WHERE flow_metadata.flow_id = durations.flow_id @@ -185,8 +197,10 @@ SET completed = TRUE FROM ( SELECT flow_id - FROM temporary_raw_flow_data + FROM flow_events WHERE type = 'flow.complete' + AND "timestamp" >= '{day}'::DATE + AND "timestamp" <= '{day}'::DATE + '1 day'::INTERVAL ) AS complete WHERE flow_metadata.flow_id = complete.flow_id AND flow_metadata.begin_time >= '{day}'::DATE - '1 day'::INTERVAL @@ -197,8 +211,10 @@ SET new_account = TRUE FROM ( SELECT flow_id - FROM temporary_raw_flow_data + FROM flow_events WHERE type = 'account.created' + AND "timestamp" >= '{day}'::DATE + AND "timestamp" <= '{day}'::DATE + '1 day'::INTERVAL ) AS created WHERE flow_metadata.flow_id = created.flow_id AND flow_metadata.begin_time >= '{day}'::DATE - '1 day'::INTERVAL @@ -242,13 +258,15 @@ timestamp, flow_time, flow_id, - type + type, + export_date ) SELECT 'epoch'::TIMESTAMP + timestamp * '1 second'::INTERVAL, flow_time, flow_id, - type + type, + '{day}'::DATE FROM temporary_raw_flow_data; """ @@ -287,8 +305,8 @@ def import_events(force_reload=False): # Create the temporary table db.run(Q_CREATE_CSV_TABLE) # Clear any existing data for the day, to avoid duplicates. - db.run(Q_CLEAR_DAY_METADATA.format(day=day)) db.run(Q_CLEAR_DAY_EVENTS.format(day=day)) + db.run(Q_CLEAR_DAY_METADATA.format(day=day)) s3path = EVENTS_FILE_URL.format(day=day) # Copy data from s3 into redshift db.run(Q_COPY_CSV.format( @@ -296,13 +314,13 @@ def import_events(force_reload=False): **CONFIG )) # Populate the flow_metadata table - db.run(Q_INSERT_METADATA) + db.run(Q_INSERT_METADATA.format(day=day)) db.run(Q_UPDATE_DURATION.format(day=day)) db.run(Q_UPDATE_COMPLETED.format(day=day)) db.run(Q_UPDATE_NEW_ACCOUNT.format(day=day)) db.run(Q_UPDATE_METRICS_CONTEXT.format(day=day)) # Populate the flow_events table - db.run(Q_INSERT_EVENTS) + db.run(Q_INSERT_EVENTS.format(day=day)) # Print the timestamps for sanity-checking. print " MIN TIMESTAMP", db.one("SELECT MIN(timestamp) FROM temporary_raw_flow_data") print " MAX TIMESTAMP", db.one("SELECT MAX(timestamp) FROM temporary_raw_flow_data")