diff --git a/amplitude.py b/amplitude.py index 3a3e16b..b9c60d0 100644 --- a/amplitude.py +++ b/amplitude.py @@ -59,16 +59,16 @@ def handle (message, context): def process_compressed (data): events = "" - batch = [] + batches = None for chunk in decompress(data): events += chunk partitioned_events = partition_available_events(events) if is_partitioned(partitioned_events): events = partitioned_events[2] - batch = process(partitioned_events[0], batch, False) + batches = process(partitioned_events[0], is_last_call = False) - process(events, batch) + process(events, batches) def decompress (data): decompressor = zlib.decompressobj(ZLIB_WBITS) @@ -88,7 +88,7 @@ def partition_available_events (events): def is_partitioned (partition): return partition[1] != "" -def process (events, batch = [], is_last_call = True): +def process (events, batches = {"identify": [], "event": []}, is_last_call = True): for event_string in events.splitlines(): event = json.loads(event_string) @@ -129,18 +129,22 @@ def process (events, batch = [], is_last_call = True): event["insert_id"] = insert_id_hmac.hexdigest() if contains_identify_verbs(event["user_properties"]): - event["user_properties"] = process_identify_verbs(event["user_properties"], user_id, device_id) + result = process_identify_verbs(event["user_properties"]) + batches["identify"].append({"user_id": user_id, "device_id": device_id, + "user_properties": result["identify"]}) + event["user_properties"] = result["pruned"] - batch.append(event) - if len(batch) == MAX_EVENTS_PER_BATCH: - send(batch) - batch = [] + batches["event"].append(event) + if len(batches["event"]) == MAX_EVENTS_PER_BATCH: + send(batches) + batches["identify"] = [] + batches["event"] = [] if not is_last_call: - return batch + return batches - if len(batch) > 0: - send(batch) + if len(batches["event"]) > 0: + send(batches) def is_event_ok (event): # https://amplitude.zendesk.com/hc/en-us/articles/204771828#keys-for-the-event-argument @@ -149,32 +153,27 @@ def is_event_ok (event): def contains_identify_verbs (user_properties): return reduce(lambda contains, verb: contains or verb in user_properties, IDENTIFY_VERBS, False) -def process_identify_verbs (user_properties, user_id, device_id): +def process_identify_verbs (user_properties): def split (payloads, key): payloads["identify" if key in IDENTIFY_VERBS else "pruned"][key] = user_properties[key] return payloads - payloads = reduce(split, user_properties.keys(), {"pruned": {}, "identify": {}}) + return reduce(split, user_properties.keys(), {"identify": {}, "pruned": {}}) - # https://amplitude.zendesk.com/hc/en-us/articles/205406617-Identify-API-Modify-User-Properties#request-format - requests.post("https://api.amplitude.com/identify", - data={"api_key": AMPLITUDE_API_KEY, - "identification": json.dumps({"user_id": user_id, - "device_id": device_id, - "user_properties": payloads["identify"]})}) - - return payloads["pruned"] - -def send (batch): +def send (batches): batch_interval = time.time() - send.batch_time if batch_interval < MIN_BATCH_INTERVAL: - print "sleeping", MIN_BATCH_INTERVAL - batch_interval time.sleep(MIN_BATCH_INTERVAL - batch_interval) - print "sending", len(batch) + print "sending, identify: {}, event: {}".format(len(batches["identify"]), len(batches["event"])) + + # https://amplitude.zendesk.com/hc/en-us/articles/205406617-Identify-API-Modify-User-Properties#request-format + requests.post("https://api.amplitude.com/identify", + data={"api_key": AMPLITUDE_API_KEY, "identification": json.dumps(batches["identify"])}) + # https://amplitude.zendesk.com/hc/en-us/articles/204771828#request-format response = requests.post("https://api.amplitude.com/httpapi", - data={"api_key": AMPLITUDE_API_KEY, "event": json.dumps(batch)}) + data={"api_key": AMPLITUDE_API_KEY, "event": json.dumps(batches["event"])}) # For want of a better error-handling mechanism, # one failed request fails an entire dump from S3.