In [1]:
from kafka import KafkaProducer, KafkaConsumer
import json
import time
from collections import defaultdict

In [2]:
def run_producer(topic_name):
    producer = KafkaProducer(
                             bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda v: json.dumps(v).encode('utf-8'))
    return producer     

In [3]:
def run_consumer(topic_name):    
    consumer = KafkaConsumer(topic_name, 
                             auto_offset_reset='earliest', 
                             value_deserializer=lambda m: json.loads(m.decode('utf-8')),
                             bootstrap_servers=['localhost:9092'], 
                             api_version=(0, 10), 
                             consumer_timeout_ms=1000)
  
    return consumer

In [None]:
#STEP 4
def print_input(topic_name):
    consumer = run_consumer(topic_name)
    for msg in consumer:
        
        uid = msg.value["uid"]
        timestamp = msg.value["ts"]
        print(uid,timestamp)
        
    print('end of print input')

print_input('input_topic')

In [4]:
#STEP 5
#print data to stdout
def count_uid_per_min_stdout(topic_name):
    consumer = run_consumer(topic_name)
    dic = defaultdict(int)
    for msg in consumer:     
        uid = msg.value["uid"]
        timestamp_unix = msg.value["ts"]
        timestamp_minite = time.strftime("%Y-%D-%H-%M", time.gmtime(timestamp_unix))
        dic[uid] = 1
        break
    current = timestamp_minite   
    
    for msg in consumer:
        uid = msg.value["uid"]
        timestamp_unix = msg.value["ts"]
        timestamp_minite = time.strftime("%Y-%D-%H-%M", time.gmtime(timestamp_unix))
        if current == timestamp_minite and dic[uid] == 0:
            dic[uid] == 1
        elif current != timestamp_minite:
            print(current,len(dic))
            #reset
            current = timestamp_minite
            dic = defaultdict(int)
            dic[uid] == 1    

In [5]:
#STEP 6
#Benchmark

%timeit count_uid_per_min_stdout('input_topic')

2016-07/11/16-13-39 16193
2016-07/11/16-13-40 41130
2016-07/11/16-13-41 47369
2016-07/11/16-13-42 49488
2016-07/11/16-13-43 47863
2016-07/11/16-13-44 40439
2016-07/11/16-13-45 42859
2016-07/11/16-13-46 47312
2016-07/11/16-13-47 48180
2016-07/11/16-13-48 47981
2016-07/11/16-13-49 42194
2016-07/11/16-13-50 45070
2016-07/11/16-13-51 43659
2016-07/11/16-13-52 48611
2016-07/11/16-13-53 42742
2016-07/11/16-13-54 51930
2016-07/11/16-13-55 45471
2016-07/11/16-13-39 16193
2016-07/11/16-13-40 41130
2016-07/11/16-13-41 47369
2016-07/11/16-13-42 49488
2016-07/11/16-13-43 47863
2016-07/11/16-13-44 40439
2016-07/11/16-13-45 42859
2016-07/11/16-13-46 47312
2016-07/11/16-13-47 48180
2016-07/11/16-13-48 47981
2016-07/11/16-13-49 42194
2016-07/11/16-13-50 45070
2016-07/11/16-13-51 43659
2016-07/11/16-13-52 48611
2016-07/11/16-13-53 42742
2016-07/11/16-13-54 51930
2016-07/11/16-13-55 45471
2016-07/11/16-13-39 16193
2016-07/11/16-13-40 41130
2016-07/11/16-13-41 47369
2016-07/11/16-13-42 49488
2016-07/11/1

In [8]:
#STEP 7
#send data to Kafka topic
def count_uid_per_min_kafka(topic_name_consumer, topic_name_producer):
    consumer = run_consumer(topic_name_consumer)
    producer = run_producer(topic_name_producer)
    
    dic = defaultdict(int)
    for msg in consumer:     
        uid = msg.value["uid"]
        timestamp_unix = msg.value["ts"]
        timestamp_minite = time.strftime("%Y-%D-%H-%M", time.gmtime(timestamp_unix))
        dic[uid] = 1
        break
    current = timestamp_minite   
    
    for msg in consumer:
        uid = msg.value["uid"]
        timestamp_unix = msg.value["ts"]
        timestamp_minite = time.strftime("%Y-%D-%H-%M", time.gmtime(timestamp_unix))
        if current == timestamp_minite and dic[uid] == 0:
            dic[uid] == 1
        elif current != timestamp_minite:
            producer.send(topic_name_producer,{"timestamp":current, "count":len(dic)})
            #reset
            current = timestamp_minite
            dic = defaultdict(int)
            dic[uid] == 1    
            


In [None]:
%timeit count_uid_per_min_kafka('input_topic', 'output_topic')

In [31]:
def print_output(topic_name):
    consumer = run_consumer(topic_name)
    for msg in consumer:
        
        print(msg.value)
        
    print('end of print input')
print_output('output_topic')

{'timestamp': '2016-07/11/16-13-39', 'count': 16193}
{'timestamp': '2016-07/11/16-13-40', 'count': 41130}
{'timestamp': '2016-07/11/16-13-41', 'count': 47369}
{'timestamp': '2016-07/11/16-13-42', 'count': 49488}
{'timestamp': '2016-07/11/16-13-43', 'count': 47863}
{'timestamp': '2016-07/11/16-13-44', 'count': 40439}
{'timestamp': '2016-07/11/16-13-45', 'count': 42859}
{'timestamp': '2016-07/11/16-13-46', 'count': 47312}
{'timestamp': '2016-07/11/16-13-47', 'count': 48180}
{'timestamp': '2016-07/11/16-13-48', 'count': 47981}
{'timestamp': '2016-07/11/16-13-49', 'count': 42194}
{'timestamp': '2016-07/11/16-13-50', 'count': 45070}
{'timestamp': '2016-07/11/16-13-51', 'count': 43659}
{'timestamp': '2016-07/11/16-13-52', 'count': 48611}
{'timestamp': '2016-07/11/16-13-53', 'count': 42742}
{'timestamp': '2016-07/11/16-13-54', 'count': 51930}
{'timestamp': '2016-07/11/16-13-55', 'count': 45471}
end of print input


In [None]:
def multi_threads(topic_name_consumer, topic_name_producer):
    consumer = run_consumer(topic_name_consumer)
    producer = run_producer(topic_name_producer)

    map(lambda x:)