Skip to content
This repository has been archived by the owner on Mar 20, 2022. It is now read-only.

Commit

Permalink
fix(fn): group identify payloads in to batches
Browse files Browse the repository at this point in the history
  • Loading branch information
philbooth committed Sep 29, 2017
1 parent 3addb85 commit d85f96a
Showing 1 changed file with 26 additions and 27 deletions.
53 changes: 26 additions & 27 deletions amplitude.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,16 @@ def handle (message, context):

def process_compressed (data):
events = ""
batch = []
batches = None

for chunk in decompress(data):
events += chunk
partitioned_events = partition_available_events(events)
if is_partitioned(partitioned_events):
events = partitioned_events[2]
batch = process(partitioned_events[0], batch, False)
batches = process(partitioned_events[0], is_last_call = False)

process(events, batch)
process(events, batches)

def decompress (data):
decompressor = zlib.decompressobj(ZLIB_WBITS)
Expand All @@ -88,7 +88,7 @@ def partition_available_events (events):
def is_partitioned (partition):
return partition[1] != ""

def process (events, batch = [], is_last_call = True):
def process (events, batches = {"identify": [], "event": []}, is_last_call = True):
for event_string in events.splitlines():
event = json.loads(event_string)

Expand Down Expand Up @@ -129,18 +129,22 @@ def process (events, batch = [], is_last_call = True):
event["insert_id"] = insert_id_hmac.hexdigest()

if contains_identify_verbs(event["user_properties"]):
event["user_properties"] = process_identify_verbs(event["user_properties"], user_id, device_id)
result = process_identify_verbs(event["user_properties"])
batches["identify"].append({"user_id": user_id, "device_id": device_id,
"user_properties": result["identify"]})
event["user_properties"] = result["pruned"]

batch.append(event)
if len(batch) == MAX_EVENTS_PER_BATCH:
send(batch)
batch = []
batches["event"].append(event)
if len(batches["event"]) == MAX_EVENTS_PER_BATCH:
send(batches)
batches["identify"] = []
batches["event"] = []

if not is_last_call:
return batch
return batches

if len(batch) > 0:
send(batch)
if len(batches["event"]) > 0:
send(batches)

def is_event_ok (event):
# https://amplitude.zendesk.com/hc/en-us/articles/204771828#keys-for-the-event-argument
Expand All @@ -149,32 +153,27 @@ def is_event_ok (event):
def contains_identify_verbs (user_properties):
return reduce(lambda contains, verb: contains or verb in user_properties, IDENTIFY_VERBS, False)

def process_identify_verbs (user_properties, user_id, device_id):
def process_identify_verbs (user_properties):
def split (payloads, key):
payloads["identify" if key in IDENTIFY_VERBS else "pruned"][key] = user_properties[key]
return payloads

payloads = reduce(split, user_properties.keys(), {"pruned": {}, "identify": {}})
return reduce(split, user_properties.keys(), {"identify": {}, "pruned": {}})

# https://amplitude.zendesk.com/hc/en-us/articles/205406617-Identify-API-Modify-User-Properties#request-format
requests.post("https://api.amplitude.com/identify",
data={"api_key": AMPLITUDE_API_KEY,
"identification": json.dumps({"user_id": user_id,
"device_id": device_id,
"user_properties": payloads["identify"]})})

return payloads["pruned"]

def send (batch):
def send (batches):
batch_interval = time.time() - send.batch_time
if batch_interval < MIN_BATCH_INTERVAL:
print "sleeping", MIN_BATCH_INTERVAL - batch_interval
time.sleep(MIN_BATCH_INTERVAL - batch_interval)

print "sending", len(batch)
print "sending, identify: {}, event: {}".format(len(batches["identify"]), len(batches["event"]))

# https://amplitude.zendesk.com/hc/en-us/articles/205406617-Identify-API-Modify-User-Properties#request-format
requests.post("https://api.amplitude.com/identify",
data={"api_key": AMPLITUDE_API_KEY, "identification": json.dumps(batches["identify"])})

# https://amplitude.zendesk.com/hc/en-us/articles/204771828#request-format
response = requests.post("https://api.amplitude.com/httpapi",
data={"api_key": AMPLITUDE_API_KEY, "event": json.dumps(batch)})
data={"api_key": AMPLITUDE_API_KEY, "event": json.dumps(batches["event"])})

# For want of a better error-handling mechanism,
# one failed request fails an entire dump from S3.
Expand Down

0 comments on commit d85f96a

Please sign in to comment.