Skip to content
This repository has been archived by the owner on Mar 19, 2021. It is now read-only.

Commit

Permalink
fix(db): add export_date column and sane deletion logic for flow data
Browse files Browse the repository at this point in the history
  • Loading branch information
philbooth committed Dec 17, 2016
1 parent b41fb27 commit 1ef2481
Showing 1 changed file with 36 additions and 18 deletions.
54 changes: 36 additions & 18 deletions import_flow_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@
utm_content VARCHAR(40) ENCODE lzo,
utm_medium VARCHAR(40) ENCODE lzo,
utm_source VARCHAR(40) ENCODE lzo,
utm_term VARCHAR(40) ENCODE lzo
utm_term VARCHAR(40) ENCODE lzo,
export_date DATE NOT NULL ENCODE lzo
);
"""
Q_CREATE_EVENTS_TABLE = """
Expand All @@ -89,23 +90,30 @@
-- but redshift doesn't support that.
flow_time BIGINT NOT NULL ENCODE lzo,
flow_id VARCHAR(64) NOT NULL DISTKEY ENCODE lzo,
type VARCHAR(64) NOT NULL ENCODE lzo
type VARCHAR(64) NOT NULL ENCODE lzo,
export_date DATE NOT NULL ENCODE lzo
);
"""

Q_CHECK_FOR_DAY = """
SELECT timestamp FROM flow_events
WHERE timestamp::DATE = '{day}'::DATE
WHERE timestamp::DATE >= '{day}'::DATE - '1 day'::INTERVAL
AND timestamp::DATE <= '{day}'::DATE + '1 day'::INTERVAL
AND export_date = '{day}'::DATE
LIMIT 1;
"""

Q_CLEAR_DAY_METADATA = """
DELETE FROM flow_metadata
WHERE begin_time::DATE = '{day}'::DATE;
"""
Q_CLEAR_DAY_EVENTS = """
DELETE FROM flow_events
WHERE timestamp::DATE = '{day}'::DATE;
WHERE timestamp::DATE >= '{day}'::DATE - '1 day'::INTERVAL
AND timestamp::DATE <= '{day}'::DATE + '1 day'::INTERVAL
AND export_date = '{day}'::DATE;
"""
Q_CLEAR_DAY_METADATA = """
DELETE FROM flow_metadata
WHERE begin_time::DATE >= '{day}'::DATE - '1 day'::INTERVAL
AND begin_time::DATE <= '{day}'::DATE + '1 day'::INTERVAL
AND export_date = '{day}'::DATE;
"""

Q_COPY_CSV = """
Expand Down Expand Up @@ -148,7 +156,8 @@
utm_content,
utm_medium,
utm_source,
utm_term
utm_term,
export_date
)
SELECT
flow_id,
Expand All @@ -164,7 +173,8 @@
utm_content,
utm_medium,
utm_source,
utm_term
utm_term,
'{day}'::DATE
FROM temporary_raw_flow_data
WHERE type LIKE 'flow%begin';
"""
Expand All @@ -173,7 +183,9 @@
SET duration = durations.flow_time
FROM (
SELECT flow_id, MAX(flow_time) AS flow_time
FROM temporary_raw_flow_data
FROM flow_events
WHERE "timestamp" >= '{day}'::DATE
AND "timestamp" <= '{day}'::DATE + '1 day'::INTERVAL
GROUP BY flow_id
) AS durations
WHERE flow_metadata.flow_id = durations.flow_id
Expand All @@ -185,8 +197,10 @@
SET completed = TRUE
FROM (
SELECT flow_id
FROM temporary_raw_flow_data
FROM flow_events
WHERE type = 'flow.complete'
AND "timestamp" >= '{day}'::DATE
AND "timestamp" <= '{day}'::DATE + '1 day'::INTERVAL
) AS complete
WHERE flow_metadata.flow_id = complete.flow_id
AND flow_metadata.begin_time >= '{day}'::DATE - '1 day'::INTERVAL
Expand All @@ -197,8 +211,10 @@
SET new_account = TRUE
FROM (
SELECT flow_id
FROM temporary_raw_flow_data
FROM flow_events
WHERE type = 'account.created'
AND "timestamp" >= '{day}'::DATE
AND "timestamp" <= '{day}'::DATE + '1 day'::INTERVAL
) AS created
WHERE flow_metadata.flow_id = created.flow_id
AND flow_metadata.begin_time >= '{day}'::DATE - '1 day'::INTERVAL
Expand Down Expand Up @@ -242,13 +258,15 @@
timestamp,
flow_time,
flow_id,
type
type,
export_date
)
SELECT
'epoch'::TIMESTAMP + timestamp * '1 second'::INTERVAL,
flow_time,
flow_id,
type
type,
'{day}'::DATE
FROM temporary_raw_flow_data;
"""

Expand Down Expand Up @@ -287,22 +305,22 @@ def import_events(force_reload=False):
# Create the temporary table
db.run(Q_CREATE_CSV_TABLE)
# Clear any existing data for the day, to avoid duplicates.
db.run(Q_CLEAR_DAY_METADATA.format(day=day))
db.run(Q_CLEAR_DAY_EVENTS.format(day=day))
db.run(Q_CLEAR_DAY_METADATA.format(day=day))
s3path = EVENTS_FILE_URL.format(day=day)
# Copy data from s3 into redshift
db.run(Q_COPY_CSV.format(
s3path=s3path,
**CONFIG
))
# Populate the flow_metadata table
db.run(Q_INSERT_METADATA)
db.run(Q_INSERT_METADATA.format(day=day))
db.run(Q_UPDATE_DURATION.format(day=day))
db.run(Q_UPDATE_COMPLETED.format(day=day))
db.run(Q_UPDATE_NEW_ACCOUNT.format(day=day))
db.run(Q_UPDATE_METRICS_CONTEXT.format(day=day))
# Populate the flow_events table
db.run(Q_INSERT_EVENTS)
db.run(Q_INSERT_EVENTS.format(day=day))
# Print the timestamps for sanity-checking.
print " MIN TIMESTAMP", db.one("SELECT MIN(timestamp) FROM temporary_raw_flow_data")
print " MAX TIMESTAMP", db.one("SELECT MAX(timestamp) FROM temporary_raw_flow_data")
Expand Down

0 comments on commit 1ef2481

Please sign in to comment.