Skip to content

Commit

Permalink
fix(fn): don't send identify data to the http api
Browse files Browse the repository at this point in the history
  • Loading branch information
philbooth committed Sep 29, 2017
1 parent 792d49d commit bb1018b
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 25 deletions.
68 changes: 47 additions & 21 deletions packages/fxa-amplitude-send/amplitude.py
Expand Up @@ -18,9 +18,11 @@
MAX_BATCHES_PER_SECOND = 100
MIN_BATCH_INTERVAL = 1.0 / MAX_BATCHES_PER_SECOND

IDENTIFY_VERBS = ("$set", "$setOnce", "$add", "$append", "$unset")

# Cargo-culted from the internet. zlib >= 1.2.3.5 apparently supports
# specifying wbits=0 but that didn't work for me locally. This did.
ZLIB_HEADER_SIZE = 32
ZLIB_WBITS = 32 + zlib.MAX_WBITS

def kms_decrypt_env(key):
"""Decrypt environment variable"""
Expand Down Expand Up @@ -53,23 +55,23 @@ def handle (message, context):
s3_object = s3.Object(record["s3"]["bucket"]["name"], record["s3"]["object"]["key"])

# This will fail if the data is not compressed.
process_compressed(s3_object.get()['Body'].read())
process_compressed(s3_object.get()["Body"].read())

def process_compressed (data):
events = ""
batch = []
batches = None

for chunk in decompress(data):
events += chunk
partitioned_events = partition_available_events(events)
if is_partitioned(partitioned_events):
events = partitioned_events[2]
batch = process(partitioned_events[0], batch, False)
batches = process(partitioned_events[0], is_last_call = False)

process(events, batch)
process(events, batches)

def decompress (data):
decompressor = zlib.decompressobj(ZLIB_HEADER_SIZE + zlib.MAX_WBITS)
decompressor = zlib.decompressobj(ZLIB_WBITS)
for chunk in data:
decompressed = decompressor.decompress(chunk)
if decompressed:
Expand All @@ -86,7 +88,7 @@ def partition_available_events (events):
def is_partitioned (partition):
return partition[1] != ""

def process (events, batch = [], is_last_call = True):
def process (events, batches = {"identify": [], "event": []}, is_last_call = True):
for event_string in events.splitlines():
event = json.loads(event_string)

Expand All @@ -107,15 +109,17 @@ def process (events, batch = [], is_last_call = True):
print "skipping malformed event", event
continue

user_id = device_id = None
insert_id_hmac = hmac.new(HMAC_KEY, digestmod=hashlib.sha256)

if "user_id" in event:
user_id_hmac = hmac.new(HMAC_KEY, event["user_id"], hashlib.sha256)
event["user_id"] = user_id_hmac.hexdigest()
insert_id_hmac.update(event["user_id"])
user_id = event["user_id"] = user_id_hmac.hexdigest()
insert_id_hmac.update(user_id)

if "device_id" in event:
insert_id_hmac.update(event["device_id"])
device_id = event["device_id"]
insert_id_hmac.update(device_id)

if "session_id" in event:
insert_id_hmac.update(str(event["session_id"]))
Expand All @@ -124,31 +128,53 @@ def process (events, batch = [], is_last_call = True):
insert_id_hmac.update(str(event["time"]))
event["insert_id"] = insert_id_hmac.hexdigest()

batch.append(event)
if len(batch) == MAX_EVENTS_PER_BATCH:
send(batch)
batch = []
if contains_identify_verbs(event["user_properties"]):
result = process_identify_verbs(event["user_properties"])
batches["identify"].append({"user_id": user_id, "device_id": device_id,
"user_properties": result["identify"]})
event["user_properties"] = result["pruned"]

batches["event"].append(event)
if len(batches["event"]) == MAX_EVENTS_PER_BATCH:
send(batches)
batches["identify"] = []
batches["event"] = []

if not is_last_call:
return batch
return batches

if len(batch) > 0:
send(batch)
if len(batches["event"]) > 0:
send(batches)

def is_event_ok (event):
# https://amplitude.zendesk.com/hc/en-us/articles/204771828#keys-for-the-event-argument
return ("device_id" in event or "user_id" in event) and "event_type" in event and "time" in event

def send (batch):
def contains_identify_verbs (user_properties):
return reduce(lambda contains, verb: contains or verb in user_properties, IDENTIFY_VERBS, False)

def process_identify_verbs (user_properties):
def split (payloads, key):
payloads["identify" if key in IDENTIFY_VERBS else "pruned"][key] = user_properties[key]
return payloads

return reduce(split, user_properties.keys(), {"identify": {}, "pruned": {}})

def send (batches):
batch_interval = time.time() - send.batch_time
if batch_interval < MIN_BATCH_INTERVAL:
print "sleeping", MIN_BATCH_INTERVAL - batch_interval
time.sleep(MIN_BATCH_INTERVAL - batch_interval)

print "sending", len(batch)
print "sending, identify: {}, event: {}".format(len(batches["identify"]), len(batches["event"]))

if len(batches["identify"]) > 0:
# https://amplitude.zendesk.com/hc/en-us/articles/205406617-Identify-API-Modify-User-Properties#request-format
requests.post("https://api.amplitude.com/identify",
data={"api_key": AMPLITUDE_API_KEY, "identification": json.dumps(batches["identify"])})

# https://amplitude.zendesk.com/hc/en-us/articles/204771828#request-format
response = requests.post("https://api.amplitude.com/httpapi",
data={"api_key": AMPLITUDE_API_KEY, "event": json.dumps(batch)})
data={"api_key": AMPLITUDE_API_KEY, "event": json.dumps(batches["event"])})

# For want of a better error-handling mechanism,
# one failed request fails an entire dump from S3.
Expand Down
8 changes: 4 additions & 4 deletions packages/fxa-amplitude-send/fixtures.txt
@@ -1,10 +1,10 @@
{"time":__TIME__,"device_id":"deadbeefdeadbeefdeadbeefdeadbeef","event_type":"fxa_login - view","session_id":__SESSION__,"event_properties":{"device_id":"deadbeefdeadbeefdeadbeefdeadbeef","entrypoint":"menupanel","service":"sync"},"user_properties":{"flow_id":"cdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd"},"app_version":"96","language":"en-GB"}
{"time":__TIME__,"device_id":"deadbeefdeadbeefdeadbeefdeadbeef","event_type":"fxa_login - engage","session_id":__SESSION__,"event_properties":{"device_id":"deadbeefdeadbeefdeadbeefdeadbeef","entrypoint":"menupanel","service":"sync"},"user_properties":{"flow_id":"cdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd"},"app_version":"96","language":"en-GB"}
{"time":__TIME__,"device_id":"deadbeefdeadbeefdeadbeefdeadbeef","event_type":"fxa_login - submit","session_id":__SESSION__,"event_properties":{"device_id":"deadbeefdeadbeefdeadbeefdeadbeef","entrypoint":"menupanel","service":"sync"},"user_properties":{"flow_id":"cdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd"},"app_version":"96","language":"en-GB"}
{"time":__TIME__,"device_id":"deadbeefdeadbeefdeadbeefdeadbeef","user_id":"baadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfood","event_type":"fxa_login - success","session_id":__SESSION__,"event_properties":{"device_id":"deadbeefdeadbeefdeadbeefdeadbeef","service":"sync"},"user_properties":{"fxa_uid":"baadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfood","flow_id":"cdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd","ua_browser":"Firefox","ua_version":"57","ua_os":"Linux","user_country":"United Kingdom","user_locale":"en-GB","user_state":"England"},"app_version":"96","language":"en-GB"}
{"time":__TIME__,"device_id":"deadbeefdeadbeefdeadbeefdeadbeef","user_id":"baadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfood","event_type":"fxa_login - email_confirmed","session_id":__SESSION__,"event_properties":{"device_id":"deadbeefdeadbeefdeadbeefdeadbeef","service":"sync"},"user_properties":{"fxa_uid":"baadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfood","flow_id":"cdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd","ua_browser":"Firefox","ua_version":"57","ua_os":"Linux","user_country":"United Kingdom","user_locale":"en-GB","user_state":"England"},"app_version":"96","language":"en-GB"}
{"time":__TIME__,"device_id":"deadbeefdeadbeefdeadbeefdeadbeef","user_id":"baadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfood","event_type":"fxa_activity - cert_signed","session_id":__SESSION__,"event_properties":{"device_id":"deadbeefdeadbeefdeadbeefdeadbeef","service":"sync"},"user_properties":{"fxa_uid":"baadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfood","flow_id":"cdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd","ua_browser":"Firefox","ua_version":"57","ua_os":"Linux","user_country":"United Kingdom","user_locale":"en-GB","user_state":"England"},"app_version":"96","language":"en-GB"}
{"time":__TIME__,"device_id":"deadbeefdeadbeefdeadbeefdeadbeef","user_id":"baadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfood","event_type":"fxa_login - complete","session_id":__SESSION__,"event_properties":{"device_id":"deadbeefdeadbeefdeadbeefdeadbeef","service":"sync"},"user_properties":{"fxa_uid":"baadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfood","flow_id":"cdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd","ua_browser":"Firefox","ua_version":"57","ua_os":"Linux","user_country":"United Kingdom","user_locale":"en-GB","user_state":"England"},"app_version":"96","language":"en-GB"}
{"time":__TIME__,"device_id":"deadbeefdeadbeefdeadbeefdeadbeef","user_id":"baadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfood","event_type":"fxa_login - success","session_id":__SESSION__,"event_properties":{"device_id":"deadbeefdeadbeefdeadbeefdeadbeef","service":"sync"},"user_properties":{"fxa_uid":"baadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfood","flow_id":"cdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd","ua_browser":"Firefox","ua_version":"57","ua_os":"Linux","user_country":"United Kingdom","user_locale":"en-GB","user_state":"England","$append":{"fxa_services_used":"sync"}},"app_version":"96","language":"en-GB"}
{"time":__TIME__,"device_id":"deadbeefdeadbeefdeadbeefdeadbeef","user_id":"baadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfood","event_type":"fxa_login - email_confirmed","session_id":__SESSION__,"event_properties":{"device_id":"deadbeefdeadbeefdeadbeefdeadbeef","service":"sync"},"user_properties":{"fxa_uid":"baadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfood","flow_id":"cdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd","ua_browser":"Firefox","ua_version":"57","ua_os":"Linux","user_country":"United Kingdom","user_locale":"en-GB","user_state":"England","$append":{"fxa_services_used":"sync"}},"app_version":"96","language":"en-GB"}
{"time":__TIME__,"device_id":"deadbeefdeadbeefdeadbeefdeadbeef","user_id":"baadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfood","event_type":"fxa_activity - cert_signed","session_id":__SESSION__,"event_properties":{"device_id":"deadbeefdeadbeefdeadbeefdeadbeef","service":"sync"},"user_properties":{"fxa_uid":"baadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfood","flow_id":"cdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd","ua_browser":"Firefox","ua_version":"57","ua_os":"Linux","user_country":"United Kingdom","user_locale":"en-GB","user_state":"England","$append":{"fxa_services_used":"sync"}},"app_version":"96","language":"en-GB"}
{"time":__TIME__,"device_id":"deadbeefdeadbeefdeadbeefdeadbeef","user_id":"baadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfood","event_type":"fxa_login - complete","session_id":__SESSION__,"event_properties":{"device_id":"deadbeefdeadbeefdeadbeefdeadbeef","service":"sync"},"user_properties":{"fxa_uid":"baadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfood","flow_id":"cdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd","ua_browser":"Firefox","ua_version":"57","ua_os":"Linux","user_country":"United Kingdom","user_locale":"en-GB","user_state":"England","$append":{"fxa_services_used":"sync"}},"app_version":"96","language":"en-GB"}
{"time":__TIME__,"device_id":"deadbeefdeadbeefdeadbeefdeadbeef","user_id":"baadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfood","event_type":"fxa_pref - logout","session_id":__SESSION__,"event_properties":{"device_id":"deadbeefdeadbeefdeadbeefdeadbeef","service":"sync"},"user_properties":{"fxa_uid":"baadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfoodbaadfood","flow_id":"cdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd"},"app_version":"96","language":"en-GB"}
{"time":__TIME__,"device_id":"deadbeefdeadbeefdeadbeefdeadbeef","event_type":"fxa_login - view","session_id":__SESSION__,"event_properties":{"device_id":"deadbeefdeadbeefdeadbeefdeadbeef","entrypoint":"menupanel","service":"sync"},"user_properties":{"flow_id":"cdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd"},"app_version":"96","language":"en-GB"}
{"time":__TIME__,"device_id":"deadbeefdeadbeefdeadbeefdeadbeef","event_type":"fxa_login - engage","session_id":__SESSION__,"event_properties":{"device_id":"deadbeefdeadbeefdeadbeefdeadbeef","entrypoint":"menupanel","service":"sync"},"user_properties":{"flow_id":"cdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd"},"app_version":"96","language":"en-GB"}
Expand Down

0 comments on commit bb1018b

Please sign in to comment.