Permalink
Browse files

fix(db): add export_date column and sane deletion logic for flow data

  • Loading branch information...
1 parent 2412ef8 commit 73631a146e9dca1c479d21d647abfc7920e80489 @philbooth philbooth committed Dec 17, 2016
Showing with 36 additions and 18 deletions.
  1. +36 −18 import_flow_events.py
View
@@ -79,7 +79,8 @@
utm_content VARCHAR(40) ENCODE lzo,
utm_medium VARCHAR(40) ENCODE lzo,
utm_source VARCHAR(40) ENCODE lzo,
- utm_term VARCHAR(40) ENCODE lzo
+ utm_term VARCHAR(40) ENCODE lzo,
+ export_date DATE NOT NULL ENCODE lzo
);
"""
Q_CREATE_EVENTS_TABLE = """
@@ -89,23 +90,30 @@
-- but redshift doesn't support that.
flow_time BIGINT NOT NULL ENCODE lzo,
flow_id VARCHAR(64) NOT NULL DISTKEY ENCODE lzo,
- type VARCHAR(64) NOT NULL ENCODE lzo
+ type VARCHAR(64) NOT NULL ENCODE lzo,
+ export_date DATE NOT NULL ENCODE lzo
);
"""
Q_CHECK_FOR_DAY = """
SELECT timestamp FROM flow_events
- WHERE timestamp::DATE = '{day}'::DATE
+ WHERE timestamp::DATE >= '{day}'::DATE - '1 day'::INTERVAL
+ AND timestamp::DATE <= '{day}'::DATE + '1 day'::INTERVAL
+ AND export_date = '{day}'::DATE
LIMIT 1;
"""
-Q_CLEAR_DAY_METADATA = """
- DELETE FROM flow_metadata
- WHERE begin_time::DATE = '{day}'::DATE;
-"""
Q_CLEAR_DAY_EVENTS = """
DELETE FROM flow_events
- WHERE timestamp::DATE = '{day}'::DATE;
+ WHERE timestamp::DATE >= '{day}'::DATE - '1 day'::INTERVAL
+ AND timestamp::DATE <= '{day}'::DATE + '1 day'::INTERVAL
+ AND export_date = '{day}'::DATE;
+"""
+Q_CLEAR_DAY_METADATA = """
+ DELETE FROM flow_metadata
+ WHERE begin_time::DATE >= '{day}'::DATE - '1 day'::INTERVAL
+ AND begin_time::DATE <= '{day}'::DATE + '1 day'::INTERVAL
+ AND export_date = '{day}'::DATE;
"""
Q_COPY_CSV = """
@@ -148,7 +156,8 @@
utm_content,
utm_medium,
utm_source,
- utm_term
+ utm_term,
+ export_date
)
SELECT
flow_id,
@@ -164,7 +173,8 @@
utm_content,
utm_medium,
utm_source,
- utm_term
+ utm_term,
+ '{day}'::DATE
FROM temporary_raw_flow_data
WHERE type LIKE 'flow%begin';
"""
@@ -173,7 +183,9 @@
SET duration = durations.flow_time
FROM (
SELECT flow_id, MAX(flow_time) AS flow_time
- FROM temporary_raw_flow_data
+ FROM flow_events
+ WHERE "timestamp" >= '{day}'::DATE
+ AND "timestamp" <= '{day}'::DATE + '1 day'::INTERVAL
GROUP BY flow_id
) AS durations
WHERE flow_metadata.flow_id = durations.flow_id
@@ -185,8 +197,10 @@
SET completed = TRUE
FROM (
SELECT flow_id
- FROM temporary_raw_flow_data
+ FROM flow_events
WHERE type = 'flow.complete'
+ AND "timestamp" >= '{day}'::DATE
+ AND "timestamp" <= '{day}'::DATE + '1 day'::INTERVAL
) AS complete
WHERE flow_metadata.flow_id = complete.flow_id
AND flow_metadata.begin_time >= '{day}'::DATE - '1 day'::INTERVAL
@@ -197,8 +211,10 @@
SET new_account = TRUE
FROM (
SELECT flow_id
- FROM temporary_raw_flow_data
+ FROM flow_events
WHERE type = 'account.created'
+ AND "timestamp" >= '{day}'::DATE
+ AND "timestamp" <= '{day}'::DATE + '1 day'::INTERVAL
) AS created
WHERE flow_metadata.flow_id = created.flow_id
AND flow_metadata.begin_time >= '{day}'::DATE - '1 day'::INTERVAL
@@ -242,13 +258,15 @@
timestamp,
flow_time,
flow_id,
- type
+ type,
+ export_date
)
SELECT
'epoch'::TIMESTAMP + timestamp * '1 second'::INTERVAL,
flow_time,
flow_id,
- type
+ type,
+ '{day}'::DATE
FROM temporary_raw_flow_data;
"""
@@ -287,22 +305,22 @@ def import_events(force_reload=False):
# Create the temporary table
db.run(Q_CREATE_CSV_TABLE)
# Clear any existing data for the day, to avoid duplicates.
- db.run(Q_CLEAR_DAY_METADATA.format(day=day))
db.run(Q_CLEAR_DAY_EVENTS.format(day=day))
+ db.run(Q_CLEAR_DAY_METADATA.format(day=day))
s3path = EVENTS_FILE_URL.format(day=day)
# Copy data from s3 into redshift
db.run(Q_COPY_CSV.format(
s3path=s3path,
**CONFIG
))
# Populate the flow_metadata table
- db.run(Q_INSERT_METADATA)
+ db.run(Q_INSERT_METADATA.format(day=day))
db.run(Q_UPDATE_DURATION.format(day=day))
db.run(Q_UPDATE_COMPLETED.format(day=day))
db.run(Q_UPDATE_NEW_ACCOUNT.format(day=day))
db.run(Q_UPDATE_METRICS_CONTEXT.format(day=day))
# Populate the flow_events table
- db.run(Q_INSERT_EVENTS)
+ db.run(Q_INSERT_EVENTS.format(day=day))
# Print the timestamps for sanity-checking.
print " MIN TIMESTAMP", db.one("SELECT MIN(timestamp) FROM temporary_raw_flow_data")
print " MAX TIMESTAMP", db.one("SELECT MAX(timestamp) FROM temporary_raw_flow_data")

0 comments on commit 73631a1

Please sign in to comment.