Skip to content
This repository has been archived by the owner on Mar 19, 2021. It is now read-only.

Commit

Permalink
fix(scripts): calculate daily summaries against new schema
Browse files Browse the repository at this point in the history
  • Loading branch information
philbooth committed Sep 13, 2016
1 parent 8a5bcac commit 64f6bdd
Showing 1 changed file with 62 additions and 57 deletions.
119 changes: 62 additions & 57 deletions calculate_daily_summary.py
Expand Up @@ -22,108 +22,113 @@
DB = "postgresql://{db_username}:{db_password}@{db_host}:{db_port}/{db_name}".format(**CONFIG)

# For the daily device activity summary,
# we maintain a table giving (day, uid, userAgentOS).
# In the future this would use device-id rather than userAgentOS.
# we maintain a table giving (day, uid, device_id).

Q_DAILY_DEVICES_CREATE_TABLE = """
CREATE TABLE IF NOT EXISTS daily_activity_per_device (
day BIGINT NOT NULL sortkey,
uid VARCHAR(64) distkey,
userAgentOS VARCHAR(30)
day DATE NOT NULL SORTKEY,
uid VARCHAR(64) NOT NULL DISTKEY,
device_id VARCHAR(32) NOT NULL
);
"""

Q_DAILY_DEVICES_CLEAR = """
DELETE FROM daily_activity_per_device
WHERE day >= {timestamp_min}
AND day < {timestamp_max}
DELETE FROM daily_activity_per_device
WHERE day >= '{day_from}'::DATE
AND day <= '{day_until}'::DATE;
"""

Q_DAILY_DEVICES_SUMMARIZE = """
INSERT INTO daily_activity_per_device (day, uid, userAgentOS)
SELECT DISTINCT ((timestamp / 86400) * 86400) as day, uid, userAgentOS
FROM events
WHERE userAgentOS != ''
AND timestamp >= {timestamp_min}
AND timestamp < {timestamp_max}
ORDER BY 1
INSERT INTO daily_activity_per_device (day, uid, device_id)
SELECT DISTINCT timestamp::DATE as day, uid, device_id
FROM activity_events
WHERE device_id != ''
AND timestamp::DATE >= '{day_from}'::DATE
AND timestamp::DATE <= '{day_until}'::DATE
ORDER BY 1;
"""

# For the daily multi-device-users summary,
# we maintain a table of (day, uid) pairs
# based on whether that user had multiple devices
# active in the last five days.


Q_MD_USERS_CREATE_TABLE = """
CREATE TABLE IF NOT EXISTS daily_multi_device_users (
day BIGINT NOT NULL sortkey,
uid VARCHAR(64) distkey
day DATE NOT NULL SORTKEY,
uid VARCHAR(64) NOT NULL DISTKEY
);
"""

Q_MD_USERS_CLEAR = """
DELETE FROM daily_multi_device_users
WHERE day >= {timestamp_min}
AND day < {timestamp_max}
DELETE FROM daily_multi_device_users
WHERE day >= '{day_from}'::DATE
AND day <= '{day_until}'::DATE;
"""

Q_MD_USERS_SUMMARIZE = """
INSERT INTO daily_multi_device_users (day, uid)
SELECT DISTINCT aPresent.day, aPresent.uid
FROM daily_activity_per_device as aPresent
INNER JOIN daily_activity_per_device as aPast
ON
aPresent.uid = aPast.uid
AND aPresent.userAgentOS != aPast.userAgentOS
AND aPast.day <= aPresent.day
AND aPast.day >= (aPresent.day - (86400 * 5))
WHERE aPresent.day >= {timestamp_min}
AND aPresent.day < {timestamp_max}
AND aPast.day < {timestamp_max}
AND aPast.day >= ({timestamp_min} - (86400 * 5))
ORDER BY 1
INSERT INTO daily_multi_device_users (day, uid)
SELECT DISTINCT present.day, present.uid
FROM daily_activity_per_device as present
INNER JOIN daily_activity_per_device as past
ON
present.uid = past.uid
AND present.device_id != past.device_id
AND past.day <= present.day
AND past.day >= (present.day - '5 days'::INTERVAL)
WHERE present.day >= '{day_from}'::DATE
AND present.day <= '{day_until}'::DATE
ORDER BY 1;
"""

Q_GET_FIRST_UNPROCESSED_DAY = """
SELECT (MAX(day) + '1 day'::INTERVAL) AS timestamp
FROM daily_multi_device_users;
"""

Q_GET_FIRST_AVAILABLE_DAY = """
SELECT MIN(timestamp)::DATE
FROM activity_events;
"""

def summarize_events(timestamp_max=None, timestamp_min=None):
Q_GET_LAST_AVAILABLE_DAY = """
SELECT MAX(timestamp)::DATE
FROM activity_events;
"""

def summarize_events(day_from=None, day_until=None):
db = postgres.Postgres(DB)
db.run("BEGIN TRANSACTION")
try:
db.run(Q_DAILY_DEVICES_CREATE_TABLE)
db.run(Q_MD_USERS_CREATE_TABLE)
# By default, summarize the latest days that are not yet summarized.
if timestamp_max is None:
timestamp_max = db.one("SELECT MAX(timestamp) FROM events")
if timestamp_max is None:
raise RuntimeError('no events in db')
if timestamp_min is None:
timestamp_min = db.one("SELECT MAX(day) + 86400 AS timestamp FROM daily_multi_device_users")

if timestamp_min is None:
timestamp_min = db.one("SELECT MIN(timestamp) FROM events")
# Round up/down to nearest whole day to ensure consistency.
timestamp_max = ((timestamp_max // 86400) + 1) * 86400
timestamp_min = (timestamp_min // 86400) * 86400
params = {
"timestamp_max": timestamp_max,
"timestamp_min": timestamp_min,
if day_from is None:
day_from = db.one(Q_GET_FIRST_UNPROCESSED_DAY)
if day_from is None:
day_from = db.one(Q_GET_FIRST_AVAILABLE_DAY)
if day_from is None:
raise RuntimeError('no events in db')
if day_until is None:
day_until = db.one(Q_GET_LAST_AVAILABLE_DAY)
days = {
"day_from": day_from,
"day_until": day_until,
}
print "SUMMARIZING", (timestamp_max - timestamp_min) / 86400, "DAYS"
print "SUMMARIZING FROM", day_from, "UNTIL", day_until
# Update daily device activity.
print "UPDATING DAILY ACTIVE DEVICES SUMMARY"
db.run(Q_DAILY_DEVICES_CLEAR.format(**params))
db.run(Q_DAILY_DEVICES_SUMMARIZE.format(**params))
db.run(Q_DAILY_DEVICES_CLEAR.format(**days))
db.run(Q_DAILY_DEVICES_SUMMARIZE.format(**days))
# Update multi-device-user assessments.
print "UPDATING MULTI-DEVICE USERS SUMMARY"
db.run(Q_MD_USERS_CLEAR.format(**params))
db.run(Q_MD_USERS_SUMMARIZE.format(**params))
db.run(Q_MD_USERS_CLEAR.format(**days))
db.run(Q_MD_USERS_SUMMARIZE.format(**days))
except:
db.run("ROLLBACK TRANSACTION")
raise
else:
db.run("COMMIT TRANSACTION")


if __name__ == "__main__":
summarize_events()

0 comments on commit 64f6bdd

Please sign in to comment.