In [1]:
import datetime, time, random, string, json, threading
from produce_pb2 import *

In [2]:

def one_station(name):
    # temp pattern
    month_avg = [27,31,44,58,70,79,83,81,74,61,46,32]
    shift = (random.random()-0.5) * 30
    month_avg = [m + shift + (random.random()-0.5) * 5 for m in month_avg]
    
    # rain pattern
    start_rain = [0.1,0.1,0.3,0.5,0.4,0.2,0.2,0.1,0.2,0.2,0.2,0.1]
    shift = (random.random()-0.5) * 0.1
    start_rain = [r + shift + (random.random() - 0.5) * 0.2 for r in start_rain]
    stop_rain = 0.2 + random.random() * 0.2

    # day's state
    today = datetime.date(2000, 1, 1)
    temp = month_avg[0]
    raining = False
    
    # gen weather
    while True:
        # choose temp+rain
        month = today.month - 1
        temp = temp * 0.8 + month_avg[month] * 0.2 + (random.random()-0.5) * 20
        if temp < 32:
            raining=False
        elif raining and random.random() < stop_rain:
            raining = False
        elif not raining and random.random() < start_rain[month]:
            raining = True

        yield (today.strftime("%Y-%m-%d"), name, temp, raining)

        # next day
        today += datetime.timedelta(days=1)
        
def all_stations(count=10, sleep_sec=1):
    assert count <= 26
    stations = []
    for name in string.ascii_uppercase[:count]:
        stations.append(one_station(name))
    while True:
        for station in stations:
            yield next(station)
        time.sleep(sleep_sec)

In [3]:
# loops forever because the weather never ends...
# for row in all_stations(3):
#     print(row) # date, station, temp, raining

In [4]:
from kafka import KafkaAdminClient, KafkaProducer, KafkaConsumer, TopicPartition
from kafka.admin import NewTopic
from kafka.errors import TopicAlreadyExistsError, UnknownTopicOrPartitionError

admin = KafkaAdminClient(bootstrap_servers=["kafka:9092"])
try:
    admin.delete_topics(["stations", "stations-json"])
    print("deleted")
except UnknownTopicOrPartitionError:
    print("cannot delete (may not exist yet)")

time.sleep(1)
admin.create_topics([NewTopic("stations", 6, 1)])
admin.create_topics([NewTopic("stations-json", 6, 1)])
admin.list_topics()

deleted


['stations-json', 'stations']

In [5]:

def produce():
    producer = KafkaProducer(bootstrap_servers=["kafka:9092"], acks="all", retries=10)
    
    for date, station, degrees, raining in all_stations(15):
        # send to "stations" stream using protobuf
        resp = Report(date=str(date), station=str(station), degrees=degrees, raining=bool(raining))
        producer.send("stations", key=bytes(station, "utf-8"), value=resp.SerializeToString())
        # send to "stations-json" using JSON
        rain = 0
        if raining:
            rain = 1
        d = {"date":date, "station":station,  "degrees":degrees, "raining":rain}
        producer.send("stations-json", key=bytes(station, "utf-8"), value=bytes(json.dumps(d), "utf-8"))
# start thread to run produce
t1 = threading.Thread(target=produce)
t1.start()

# never join thread because we want it to run forever

#check if thread ran successfully
# while True:
#     time.sleep(1)
#     if t1.is_alive():
#         print('Still running')
#     else:
#         print('Completed')


In [6]:
#Part 2: Kafka Consumer

In [7]:
import os, json

for partition in range(6):
    path = f"partition-{partition}.json"
    if os.path.exists(path):
        os.remove(path)

In [8]:
def load_partition(partition_num):
    path = f"partition-{partition_num}.json"
    if os.path.exists(path):
        with open(path, "r") as file:
            return json.load(file)
    else:
        return { "offset": 0, "partition": partition_num }

def save_partition(partition):
    path = f"partition-{partition['partition']}.json"
    path2 =  path + ".tmp"
    with open(path2, "w") as file:
        json.dump(partition, file)
        os.rename(path2, path)
        
#atomic file writes:
# path = ????
# path2 = path + ".tmp"
# with open(path2, "w") as f:
#     # TODO: write the data
#     os.rename(path2, path)

In [9]:
def updatePartition(partition, resp_dict_):
    lock = threading.Lock()
    with lock:
        if resp_dict_["station"] not in partition.keys():
            partition[resp_dict_["station"]] = {}
            
        if "end" not in partition[resp_dict_["station"]].keys():
            partition[resp_dict_["station"]]["end"] = resp_dict_["date"]
            partition[resp_dict_["station"]]["start"] = resp_dict_["date"]
        else:
            d1 = partition[resp_dict_["station"]]["end"].split('-')
            d1 = datetime.datetime(int(d1[0]), int(d1[1]), int(d1[2]))
            d2 = resp_dict_["date"].split('-')
            d2 = datetime.datetime(int(d2[0]), int(d2[1]), int(d2[2]))
            if d2 > d1:
                partition[resp_dict_["station"]]["end"] = resp_dict_["date"]
            else:
                return partition

        if "sum" not in partition[resp_dict_["station"]].keys():
            partition[resp_dict_["station"]]["sum"] = float(resp_dict_["degrees"])
        else:
            partition[resp_dict_["station"]]["sum"] += float(resp_dict_["degrees"])

        if "count" not in partition[resp_dict_["station"]].keys():
            partition[resp_dict_["station"]]["count"] = 1
        else:
            partition[resp_dict_["station"]]["count"] += 1

        if "avg" not in partition[resp_dict_["station"]].keys():
            partition[resp_dict_["station"]]["avg"] = float(resp_dict_["degrees"])
        else:
            partition[resp_dict_["station"]]["avg"] = partition[resp_dict_["station"]]["sum"] / partition[resp_dict_["station"]]["count"]

    return partition

In [10]:
def consume(part_nums=[], iterations=10):
    consumer = KafkaConsumer(bootstrap_servers=["kafka:9092"])
    # create list of TopicPartition objects
    topics = [TopicPartition("stations", part_nums[0]), TopicPartition("stations", part_nums[1])]
    consumer.assign(topics)
    # PART 1: initialization
    partitions = {} # key=partition num, value=snapshot dict
    # load partitions from JSON files (if they exist) or create fresh dicts
    partitions[part_nums[0]] = load_partition(part_nums[0])
    partitions[part_nums[1]] = load_partition(part_nums[1])
    # if offsets were specified in previous JSON files, the consumer
    # should seek to those; else, seek to offset 0.
    for partition in partitions:
        consumer.seek(TopicPartition("stations", partition), partitions[partition]["offset"])
    # PART 2: process batches
    for i in range(iterations):
        batch = consumer.poll(1000) # 1s timeout
        for topic, messages in batch.items():
            for msg in messages:
            # update the partitions based on new messages
            # Report.FromString(msg.value), report_pb2.Report().ParseFromString(msg.value)
                resp = str(Report.FromString(msg.value)).replace('\"', ' ').split('\n')
                resp_dict_ = {}
                for data in resp:
                    kv_pair = data.split(':')
                    resp_dict_[kv_pair[0]] = kv_pair[-1]
                partitions[msg.partition] = updatePartition(partitions[msg.partition], resp_dict_)
                for tp in consumer.assignment():
                    partitions[tp.partition]["offset"] = consumer.position(tp)
                # save the data back to the JSON file
                for partition in partitions:
                    save_partition(partitions[partition])
    print("exiting")

for i in range(2):
    print("ROUND", i)
    t1 = threading.Thread(target=consume, args=([0,1], 30))
    t2 = threading.Thread(target=consume, args=([2,3], 30))
    t3 = threading.Thread(target=consume, args=([4,5], 30))
    t1.start()
    t2.start()
    t3.start()
    t1.join()
    t2.join()
    t3.join()

ROUND 0
exiting
exiting
exiting
ROUND 1
exiting
exitingexiting



In [11]:
!cat partition*.json

{"offset": 51, "partition": 0, "  N ": {"end": "  2000-02-20 ", "start": "  2000-01-01 ", "sum": 2003.7745612544854, "count": 51, "avg": 39.28969727949971}}{"offset": 102, "partition": 1, "  E ": {"end": "  2000-02-20 ", "start": "  2000-01-01 ", "sum": 1007.2312302797652, "count": 51, "avg": 19.749631966269906}, "  O ": {"end": "  2000-02-20 ", "start": "  2000-01-01 ", "sum": 2063.0232002616667, "count": 51, "avg": 40.45143529924837}}{"offset": 156, "partition": 2, "  F ": {"end": "  2000-02-21 ", "start": "  2000-01-01 ", "sum": 1838.5927544225622, "count": 52, "avg": 35.35755296966466}, "  I ": {"end": "  2000-02-21 ", "start": "  2000-01-01 ", "sum": 1910.0227400261424, "count": 52, "avg": 36.73120653896428}, "  J ": {"end": "  2000-02-21 ", "start": "  2000-01-01 ", "sum": 1485.9166655019587, "count": 52, "avg": 28.575320490422282}}{"offset": 156, "partition": 3, "  D ": {"end": "  2000-02-21 ", "start": "  2000-01-01 ", "sum": 549.7057402295935, "count": 52, "avg": 10.5712642351