In [1]:
import json

import pandas as pd

from datetime import datetime

from kafka import KafkaConsumer
from kafka import TopicPartition
import memory_profiler

In [2]:
from IPython import get_ipython
get_ipython().register_magics(memory_profiler.MemoryProfilerMagics)

## Connect to Kafka and verify that data exists

In [59]:
consumer = KafkaConsumer(
    bootstrap_servers=['127.0.0.1:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='earliest', 
    enable_auto_commit=False,
    consumer_timeout_ms=1000
)

In [103]:
consumer.assign([TopicPartition(topic='stream', partition=0)])
consumer.end_offsets([TopicPartition(topic='stream', partition=0)]) # Verify number of entries

{TopicPartition(topic='stream', partition=0): 1000000}

In [5]:
# This should be zero, since `earliest` was specified
consumer.position(TopicPartition(topic='stream', partition=0))

0

## Print one entry to stdout

In [5]:
# Print the first entry
c = next(consumer)

In [6]:
c

ConsumerRecord(topic='stream', partition=0, offset=0, timestamp=1649249246928, timestamp_type=0, key=None, value={'est': ['transactional\tcode\tthat\tis\teasy\tto\twrite\tand', 'memory\x0cmanagement\x0cmakes\x0cfor\x0csafe,\x0csimple,\x0cand\x0crobust', [0.0542993, 'pleasant\\for\\tasks,\\both\\small\\and\\large.\\Show', [{'tempor': 'transactional\tcode\tthat\tis\teasy\tto\twrite\tand', 'aliqua.': ['transactional\tcode\tthat\tis\teasy\tto\twrite\tand', 0.403514, 'example', []], 'Excepteur': 'the\rRAII\ridiom)\rand\rscope\rstatements\rfor\rdeterministic'}, 'D allows writing large code fragments without redundantly', 'other\\hand,\\static\\inference\\deduces\\types\\and\\other'], 'code/properties,/giving/the/best/of/both/the', 'slices,"and"ranges"make"daily"programming"simple"and', {}, 'transactional\tcode\tthat\tis\teasy\tto\twrite\tand'], 'read. Show example Built-in linear and associative arrays,', 'example', 'read. Show example Built-in linear and associative arrays,', 'Не верил он л

## Check ordering

In [19]:
# Reset to the beginning, and check if entries are ordered (they should be in the dummy data)
consumer.seek_to_beginning()
consumer.position(TopicPartition(topic='stream', partition=0))

0

In [12]:
out_of_order_entries = 0

current_ts = datetime.fromtimestamp(0)
for msg in consumer:
    ts = datetime.fromtimestamp(msg.value['ts'])
    if ts >= current_ts:
        current_ts = ts
    else:
        out_of_order_entries += 1

In [13]:
out_of_order_entries

0

## Count distinct users per minute, in batch mode, keeping all minutes

In [39]:
def naive_count(consumer):
    consumer.seek_to_beginning()
    out = {}
    for msg in consumer:
        ts = datetime.fromtimestamp(msg.value['ts']).replace(second=0)
        if ts not in out:
            out[ts] = set()
        out[ts].add(msg.value['uid'])

    return pd.DataFrame.from_dict({
        k.strftime('%F %H:%M'): len(v) for k,v in out.items()}, 
        orient='index', 
        columns=['count'])

In [40]:
%time %memit df_naive_count = naive_count(consumer)

peak memory: 243.35 MiB, increment: 85.93 MiB
CPU times: user 1min 42s, sys: 2.44 s, total: 1min 44s
Wall time: 1min 48s


In [41]:
df_naive_count

Unnamed: 0,count
2016-07-11 15:39,16193
2016-07-11 15:40,41130
2016-07-11 15:41,47369
2016-07-11 15:42,49488
2016-07-11 15:43,47863
2016-07-11 15:44,40439
2016-07-11 15:45,42859
2016-07-11 15:46,47312
2016-07-11 15:47,48180
2016-07-11 15:48,47981


## Another round of printing, but this time we output ASAP and try to use less memory

Note that here we still print as soon as the minute changes, so we don't wait for late messages! But now we print some time metrics

In [42]:
def print_stats(current_minute, current_minute_users, start, checkpoint):
    if current_minute == datetime.fromtimestamp(0):
        return
    now = datetime.now()
    print(f'Minute: {current_minute.strftime("%H:%M")}, '
          f'Unique users: {len(current_minute_users):>5}, '
          f'Total time: {now - start}',
          f'Time since last checkpoint: {now - checkpoint}')

In [43]:
def count_and_print_asap(consumer):
    consumer.seek_to_beginning()

    start = datetime.now()
    checkpoint = start

    current_minute = datetime.fromtimestamp(0)
    current_minute_users = set()

    for msg in consumer:
        ts_min = datetime.fromtimestamp(msg.value['ts']).replace(second=0)
        if current_minute < ts_min:
            print_stats(current_minute, current_minute_users, start, checkpoint)
            checkpoint = datetime.now()
            current_minute = ts_min
            current_minute_users = set()
        else:
            current_minute_users.add(msg.value['uid'])

    # Print the last minute
    print_stats(current_minute, current_minute_users, start, checkpoint)

In [44]:
%time %memit count_and_print_asap(consumer)

Minute: 15:39, Unique users: 16193, Total time: 0:00:01.753791 Time since last checkpoint: 0:00:01.705800
Minute: 15:40, Unique users: 41129, Total time: 0:00:06.948764 Time since last checkpoint: 0:00:05.194251
Minute: 15:41, Unique users: 47369, Total time: 0:00:13.344816 Time since last checkpoint: 0:00:06.395833
Minute: 15:42, Unique users: 49488, Total time: 0:00:20.109043 Time since last checkpoint: 0:00:06.764051
Minute: 15:43, Unique users: 47862, Total time: 0:00:26.584151 Time since last checkpoint: 0:00:06.474922
Minute: 15:44, Unique users: 40438, Total time: 0:00:31.513072 Time since last checkpoint: 0:00:04.928713
Minute: 15:45, Unique users: 42858, Total time: 0:00:36.951451 Time since last checkpoint: 0:00:05.438194
Minute: 15:46, Unique users: 47311, Total time: 0:00:43.022482 Time since last checkpoint: 0:00:06.070836
Minute: 15:47, Unique users: 48179, Total time: 0:00:49.410435 Time since last checkpoint: 0:00:06.387770
Minute: 15:48, Unique users: 47980, Total time

We're still using quite a bit of memory (191MB compared to 243 when storing the entire dictionary)

## JSON serializer overhead

In [64]:
consumer_with_serializer = KafkaConsumer(
    bootstrap_servers=['127.0.0.1:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='earliest', 
    enable_auto_commit=False,
    consumer_timeout_ms=1000
)

consumer_with_serializer.assign([TopicPartition(topic='stream', partition=0)])

In [65]:
def loop_over_all_values(consumer, n=0):
    consumer.seek(p, n)
    n = 0
    for m in consumer:
        n += 1
    return n

In [76]:
%%time
%%memit
loop_over_all_values(consumer_with_serializer)

peak memory: 200.64 MiB, increment: 0.00 MiB
CPU times: user 1min 32s, sys: 2.04 s, total: 1min 34s
Wall time: 1min 37s


In [77]:
consumer_without_serializer = KafkaConsumer(
    bootstrap_servers=['127.0.0.1:9092'],
    auto_offset_reset='earliest', 
    enable_auto_commit=False,
    consumer_timeout_ms=1000
)

consumer_without_serializer.assign([TopicPartition(topic='stream', partition=0)])

In [78]:
%%time
%%memit
loop_over_all_values(consumer_without_serializer)

peak memory: 199.77 MiB, increment: 0.32 MiB
CPU times: user 26.2 s, sys: 1.66 s, total: 27.9 s
Wall time: 30.3 s


In [79]:
30/97-1

-0.6907216494845361

## Output to Kafka

Allow test messages from stdin

In [94]:
from kafka import KafkaProducer

In [95]:
producer = KafkaProducer(
    bootstrap_servers=['127.0.0.1:9092'],
    value_serializer=lambda m: json.dumps(m).encode('utf-8')
)

In [96]:
producer.send('processed', key=b'main-key', value={'test-key': 'test-value'})
producer.send('processed', key=b'some-different-key', value={'test-key': 'test-value'})

<kafka.producer.future.FutureRecordMetadata at 0x115a9ae20>

In [97]:
consumer.subscribe('processed')

In [98]:
consumer.subscription()

{'processed'}

In [99]:
consumer.partitions_for_topic('processed')

{0}

In [100]:
consumer.end_offsets([TopicPartition(topic='processed', partition=0)])

{TopicPartition(topic='processed', partition=0): 2}

I was expecting multiple partitions...oh well

In [101]:
for msg in consumer:
    print(msg)

ConsumerRecord(topic='processed', partition=0, offset=0, timestamp=1649323845400, timestamp_type=0, key=b'main-key', value={'test-key': 'test-value'}, headers=[], checksum=None, serialized_key_size=8, serialized_value_size=26, serialized_header_size=-1)
ConsumerRecord(topic='processed', partition=0, offset=1, timestamp=1649323845400, timestamp_type=0, key=b'some-different-key', value={'test-key': 'test-value'}, headers=[], checksum=None, serialized_key_size=18, serialized_value_size=26, serialized_header_size=-1)


### Back to streams

In [108]:
consumer.unsubscribe()
consumer.assign([TopicPartition(topic='stream', partition=0)])

In [105]:
def publish_minute(producer, topic, current_minute, current_minute_users):
    if current_minute == datetime.fromtimestamp(0):
        return
    return producer.send(topic, {current_minute.strftime("%H:%M"): len(current_minute_users)})

In [106]:
def count_and_publish_asap(consumer, producer):
    consumer.seek_to_beginning()

    current_minute = datetime.fromtimestamp(0)
    current_minute_users = set()

    for msg in consumer:
        ts_min = datetime.fromtimestamp(msg.value['ts']).replace(second=0)
        if current_minute < ts_min:
            publish_minute(producer, 'processed', current_minute, current_minute_users)
            current_minute = ts_min
            current_minute_users = set()
        else:
            current_minute_users.add(msg.value['uid'])

    # Publish the last minute
    publish_minute(producer, 'processed', current_minute, current_minute_users)

In [107]:
count_and_publish_asap(consumer, producer)

In [113]:
consumer.unsubscribe()
consumer.assign([TopicPartition(topic='processed', partition=0)])

In [116]:
consumer.seek_to_beginning()
for msg in consumer:
    if msg.key is None: # Exclude test
        print(msg.value)

{'15:39': 16193}
{'15:40': 41129}
{'15:41': 47369}
{'15:42': 49488}
{'15:43': 47862}
{'15:44': 40438}
{'15:45': 42858}
{'15:46': 47311}
{'15:47': 48179}
{'15:48': 47980}
{'15:49': 42193}
{'15:50': 45069}
{'15:51': 43658}
{'15:52': 48611}
{'15:53': 42741}
{'15:54': 51930}
{'15:55': 45470}
{'15:56': 137}


## Random timestamps

In [117]:
import math
from numpy import random

In [118]:
dt = int(datetime.now().timestamp())

In [119]:
def generate_random_timestamp(dt):
    dt_str = str(dt)
    # Ignore flips on first and last digit
    for i in range(1, len(dt_str) + 1):
        bitflip = str(random.randint(1, 9))
        if i == 1:
            new_dt = bitflip, dt_str[1:]
        elif i == len(dt_str):
            new_dt = dt_str[:-1], bitflip
        else:
            new_dt = dt_str[:i-1],  bitflip, dt_str[i:]
        new_dt_int = int(''.join(new_dt))
        diff = datetime.fromtimestamp(dt) - datetime.fromtimestamp(new_dt_int)
        print(i, '-'.join(new_dt), dt_str, int(diff.total_seconds()))

In [120]:
generate_random_timestamp(dt)

1 8-649326946 1649326946 -6999996400
2 1-5-49326946 1649326946 100003600
3 16-5-9326946 1649326946 -10000000
4 164-4-326946 1649326946 5003600
5 1649-7-26946 1649326946 -400000
6 16493-7-6946 1649326946 -50000
7 164932-7-946 1649326946 -1000
8 1649326-8-46 1649326946 100
9 16493269-6-6 1649326946 -20
10 164932694-5 1649326946 1
