In [1]:
from confluent_kafka import Consumer, KafkaException
from confluent_kafka.schema_registry.json_schema import JSONDeserializer
from confluent_kafka.serialization import SerializationContext, MessageField
from datasketches import frequent_strings_sketch, frequent_items_error_type
import uuid

config = { 'bootstrap.servers': 'localhost:19092','group.id':'demand_consumer_group' + str(uuid.uuid4()),
          'auto.offset.reset': "earliest"
     }

k = 10
fi = frequent_strings_sketch(k)

In [2]:
class DailyDemand(object):
    def __init__(self, day_of_year, year, demand):
        self._day_of_year = day_of_year 
        self._year = year
        self._demand = demand

In [3]:
schema_str = """{
    "$schema": "https://json-schema.org/draft/2020-12/schema",
    "title": "Daily_Demand",
    "description": "Daily Demand of SKUs",
    "type": "object",
    "properties": {
      "day_of_year": {
        "description": "day of the year",
        "type": "number"
      },
      "year": {
        "description": "year for which demand is encoded",
        "type": "number"
      },
      "demand": {
        "description": "composite object listing demand for each SKU as a key",
        "type": "object"
      }
    }
  }"""

In [4]:
def dict_to_demand(daily_demand, ctx):
    """
    Returns a dict representation of a User instance for serialization.

    Args:
        daily_demand (DailyDemand): DailyDemand instance.

        ctx (SerializationContext): Metadata pertaining to the serialization
            operation.

    Returns:
        dict: Dict populated with daily_demand attributes to be serialized.
    """
    if daily_demand is None:
        return None

    return DailyDemand(day_of_year=daily_demand['day_of_year'],
                year=daily_demand['year'], demand=daily_demand['demand'])


In [5]:
topic = "DAILY_DEMAND"
consumer = Consumer(config)
consumer.subscribe([topic])

In [6]:
def assignment_callback(consumer, partitions):
    for p in partitions:
        print(f'Assigned to {p.topic}, partition {p.partition}')

In [7]:

consumer = Consumer(config)
consumer.subscribe(['DAILY_DEMAND'], on_assign=assignment_callback)
json_deserializer = JSONDeserializer(schema_str,
                                         from_dict=dict_to_demand)
consumer.subscribe([topic])

In [8]:
    try:
        while True:
            msg = consumer.poll(1.0)
            if msg is None:
                continue

            dd = json_deserializer(msg.value(), SerializationContext(msg.topic(), MessageField.VALUE))

            if dd is not None:
                print("Daily Demand record record : {}\n"
                      "\t day of year: {}\n"
                      "\t year: {}\n"
                      .format(msg.key(), dd._day_of_year,
                              dd._year))
                dd_info = dd._demand
                for sku, demand in dd_info.items():
                    fi.update(sku, int(demand))
        
    except KeyboardInterrupt:
        print('Canceled by user.')
    finally:
        consumer.close()

Daily Demand record record : b'day 11 of 2010'
	 day of year: 11
	 year: 2010

Daily Demand record record : b'day 30 of 2010'
	 day of year: 30
	 year: 2010

Daily Demand record record : b'day 32 of 2010'
	 day of year: 32
	 year: 2010

Daily Demand record record : b'day 43 of 2010'
	 day of year: 43
	 year: 2010

Daily Demand record record : b'day 45 of 2010'
	 day of year: 45
	 year: 2010

Daily Demand record record : b'day 55 of 2010'
	 day of year: 55
	 year: 2010

Daily Demand record record : b'day 64 of 2010'
	 day of year: 64
	 year: 2010

Daily Demand record record : b'day 66 of 2010'
	 day of year: 66
	 year: 2010

Daily Demand record record : b'day 68 of 2010'
	 day of year: 68
	 year: 2010

Daily Demand record record : b'day 72 of 2010'
	 day of year: 72
	 year: 2010

Daily Demand record record : b'day 11 of 2010'
	 day of year: 11
	 year: 2010

Daily Demand record record : b'day 30 of 2010'
	 day of year: 30
	 year: 2010

Daily Demand record record : b'day 32 of 2010'
	 day

In [9]:
import pandas as pd
fp = "../../data/retail_q1_demand_2010_summary.csv"
dfQ1_PA = pd.read_csv(fp)

In [10]:
dfQ1_PA.sum(axis=0).sort_values(ascending=False)[:10]

37410     25301.0
21091     13630.0
21085     13588.0
21099     13283.0
85123A    12974.0
21980     12826.0
21092     12806.0
21212     12505.0
21984     12192.0
85220     11984.0
dtype: float64

In [12]:
fi.get_frequent_items(frequent_items_error_type.NO_FALSE_POSITIVES)[:10]

[('37410', 77766, 72852, 77766),
 ('21091', 42052, 37138, 42052),
 ('21099', 41916, 37002, 41916),
 ('21085', 41580, 36666, 41580),
 ('21092', 40296, 35382, 40296),
 ('85123A', 38922, 34008, 38922),
 ('21980', 38484, 33570, 38484),
 ('85220', 38127, 33213, 38127),
 ('21212', 37515, 32601, 37515),
 ('21984', 37358, 32444, 37358)]