# Waiter

In [None]:
from collections import defaultdict
import json
from confluent_kafka import Consumer, Producer

config = read_config()
config["group.id"] = "waiter4"
config["auto.offset.reset"] = "earliest"
topic = "coffee_ordered"

producer = Producer(config)
consumer = Consumer(config)
consumer.subscribe([topic])

def produce(order_id, order):
  topic = "orders"
  
  key = "order#" + str(order_id)
  producer.produce(topic, key=key, value=order)
  print(f"Produced message to topic {topic}: key = {key:12} value = {order:12}")
  producer.flush()

orders = defaultdict(list)

try:
  while True:
    msg = consumer.poll(1.0)
    if msg is not None and msg.error() is None:
      key = msg.key().decode("utf-8")
      value = msg.value().decode("utf-8")
      print(f"Consumed message from topic {topic}: key = {key:12} value = {value:12}")
      
      order_id = int(key.split("#")[1])
      orders[order_id].append(value)
      produce(order_id, json.dumps(orders[order_id]))
except KeyboardInterrupt:
  pass
finally:
  consumer.close()

# Utils

In [1]:
def read_config():
  config = {}
  with open(".client.properties") as fh:
    for line in fh:
      line = line.strip()
      if len(line) != 0 and line[0] != "#":
        parameter, value = line.strip().split('=', 1)
        config[parameter] = value.strip()
  return config