# simran4@wisc.edu, rgundavarapu@wisc.edu

In [1]:
import datetime, time, random, string

def one_station(name):
    # temp pattern
    month_avg = [27,31,44,58,70,79,83,81,74,61,46,32]
    shift = (random.random()-0.5) * 30
    month_avg = [m + shift + (random.random()-0.5) * 5 for m in month_avg]
    
    # rain pattern
    start_rain = [0.1,0.1,0.3,0.5,0.4,0.2,0.2,0.1,0.2,0.2,0.2,0.1]
    shift = (random.random()-0.5) * 0.1
    start_rain = [r + shift + (random.random() - 0.5) * 0.2 for r in start_rain]
    stop_rain = 0.2 + random.random() * 0.2

    # day's state
    today = datetime.date(2000, 1, 1)
    temp = month_avg[0]
    raining = False
    
    # gen weather
    while True:
        # choose temp+rain
        month = today.month - 1
        temp = temp * 0.8 + month_avg[month] * 0.2 + (random.random()-0.5) * 20
        if temp < 32:
            raining=False
        elif raining and random.random() < stop_rain:
            raining = False
        elif not raining and random.random() < start_rain[month]:
            raining = True

        yield (today.strftime("%Y-%m-%d"), name, temp, raining)

        # next day
        today += datetime.timedelta(days=1)
        
def all_stations(count=10, sleep_sec=1):
    assert count <= 26
    stations = []
    for name in string.ascii_uppercase[:count]:
        stations.append(one_station(name))
    while True:
        for station in stations:
            yield next(station)
        time.sleep(sleep_sec)

In [2]:
from kafka import KafkaAdminClient, KafkaProducer, KafkaConsumer, TopicPartition
from kafka.admin import NewTopic
from kafka.errors import TopicAlreadyExistsError, UnknownTopicOrPartitionError

broker = "kafka:9092"
admin = KafkaAdminClient(bootstrap_servers =[broker])
try:
    admin.delete_topics(["stations", "stations-json"])
    print("deleted")
except UnknownTopicOrPartitionError:
    print("cannot delete (may not exist yet)")

time.sleep(1)
admin.create_topics([NewTopic("stations", 6, 1)])
admin.create_topics([NewTopic("stations-json", 6, 1)])
admin.list_topics()

cannot delete (may not exist yet)


['stations-json', 'stations']

In [3]:
from message_pb2 import *
from threading import Thread
import threading,json
def produce():
    producer = KafkaProducer(bootstrap_servers=["kafka:9092"],retries=10,acks=-1)
    
    for date, station, degrees, raining in all_stations(15):
        #print(date, station, degrees, raining)
        s = Report(date=date, station=station,degrees=(degrees),raining=raining)
        value = s.SerializeToString()
        key=station.encode('utf-8')
        producer.send('stations',key=key ,value=value)
        value_json = {"date": date, "station": station,'degrees':(degrees),'raining':int(raining)}
        value_json = bytes(json.dumps(value_json), "utf-8")
        producer.send('stations-json',key=key,value=value_json)
        # TODO: send to "stations" stream using protobuf
        # TODO: send to "stations-json" using JSON

# TODO: start thread to run produce
threading.Thread(target=produce).start()
# never join thread because we want it to run forever

## PART 2 

In [4]:
import os, json

for partition in range(6):
    path = f"partition-{partition}.json"
    if os.path.exists(path):
        os.remove(path)

In [5]:
def load_partition(partition_num):
    path = f"partition-{partition_num}.json"
    if os.path.exists(path):
        with open(path, "r") as file:
            return json.load(file)
    else:
        return {'partition':partition_num}

def save_partition(partition):
    path = f"partition-{partition['partition']}.json"
    with open(path, "w") as file:
        json.dump(partition, file)

In [6]:
def consume(part_nums=[], iterations=10):
    consumer = KafkaConsumer(bootstrap_servers=[broker])
    # TODO: create list of TopicPartition objects
    partition_objs=[TopicPartition('stations',part_num) for part_num in part_nums]
    consumer.assign(partition_objs)
    partitions={}
    # PART 1: initialization
    for part_num in part_nums:
        partitions['partition']=load_partition(part_num)['partition'] # key=partition num, value=snapshot dict
        if 'offset' in partitions.keys():
            consumer.seek(TopicPartition('stations',part_num),partitions['offset'])
        else:
            consumer.seek_to_beginning(TopicPartition('stations',part_num)) # else
        counter=1
        for i in range(iterations):
            batch = consumer.poll(1000) # 1s timeout
            date_list=[]
            for tp, messages in batch.items():
                for msg in messages:
                    s = Report.FromString(msg.value)
                    date_list.append(s.date)
                    #print(s.station)
                    if s.station in partitions.keys():
                        if s.date<=partitions[s.station]['end']:
                            print('fortnut')
                            break
                            break
                        partitions[s.station]['sum']+=s.degrees
                        partitions[s.station]['count']+=1
                        partitions[s.station]['avg']=(partitions[s.station]['sum']/counter)
                        partitions[s.station]['start']=s.date if s.date<partitions[s.station]['start'] else partitions[s.station]['start']
                        partitions[s.station]['end']=s.date
                    else:
                        partitions[s.station]={}
                        partitions[s.station]['sum']=s.degrees
                        partitions[s.station]['count']=0
                        partitions[s.station]['avg']=0
                        partitions[s.station]['start']=s.date
                        partitions[s.station]['end']=s.date

                    partitions['offset']=consumer.position(tp)
                    save_partition(partitions)
    print("exiting")

for i in range(2):
    print("ROUND", i)
    t1 = threading.Thread(target=consume, args=([0,1], 30))
    t2 = threading.Thread(target=consume, args=([2,3], 30))
    t3 = threading.Thread(target=consume, args=([4,5], 30))
    t1.start()
    t2.start()
    t3.start()
    t1.join()
    t2.join()
    t3.join()

ROUND 0
fortnut
exiting
exiting
exiting
ROUND 1
fortnut
exiting
exiting
exiting


In [7]:
!cat partition-2.json

{"partition": 2, "F": {"sum": 1812.4314752510593, "count": 74, "avg": 1812.4314752510593, "start": "2000-01-01", "end": "2000-03-15"}, "offset": 223, "I": {"sum": 2225.7466875256, "count": 73, "avg": 2225.7466875256, "start": "2000-01-01", "end": "2000-03-14"}, "J": {"sum": 2158.563633201066, "count": 73, "avg": 2158.563633201066, "start": "2000-01-01", "end": "2000-03-14"}, "D": {"sum": 1115.9907526147645, "count": 25, "avg": 1115.9907526147645, "start": "2000-02-19", "end": "2000-03-15"}, "G": {"sum": 869.5137816608352, "count": 24, "avg": 869.5137816608352, "start": "2000-02-19", "end": "2000-03-14"}, "M": {"sum": 869.3182081640207, "count": 24, "avg": 869.3182081640207, "start": "2000-02-19", "end": "2000-03-14"}}

part_nums=[0,1]
consumer = KafkaConsumer(bootstrap_servers=[broker])
# TODO: create list of TopicPartition objects
partition_objs=[TopicPartition('stations',part_num) for part_num in part_nums]
consumer.assign(partition_objs)
partitions={}
counter=1
print(consumer.assignment())

for i in part_nums:
    partitions['partition']=load_partition(i)['partition'] # key=partition num, value=snapshot dict
    if 'offset' in partitions.keys():
        print('loading previous offset')
        print('PARTITION: ',i)
        consumer.seek(TopicPartition('stations',i),partitions['offset'])
    else:
        print('NEW READ , ',i)
        consumer.seek_to_beginning(TopicPartition('stations',i)) # else
    batch = consumer.poll(1000)
    date_list=[]
    for tp, messages in batch.items():
        print(tp)
        for msg in messages:
            s = Report.FromString(msg.value)
            date_list.append(s.date)
            #print(s.station)
            if s.station in partitions.keys():
                if s.date<=partitions[s.station]['end']:
                    break
                counter+=1
                partitions[s.station]['sum']+=s.degrees
                partitions[s.station]['count']+=1
                partitions[s.station]['avg']=(partitions[s.station]['sum']/counter)
                partitions[s.station]['start'] = s.date if s.date<partitions[s.station]['start'] else partitions[s.station]['start']
                partitions[s.station]['end']=s.date
            else:
                partitions[s.station]={}
                partitions[s.station]['sum']=s.degrees
                partitions[s.station]['count']=counter
                partitions[s.station]['avg']=0
                partitions[s.station]['start']=s.date
                partitions[s.station]['end']=s.date

        partitions['offset']=consumer.position(tp)
        save_partition(partitions)