# Publish example data to Staging Input Kafka Topic

In [None]:
# run this notebook from root directory of the repository
import os
os.chdir("../")

In [None]:
from uuid import uuid4
import json
import time

import pandas as pd

from pyshipt_streams import KafkaProducer, KafkaConsumer
from components.data_models import InputMessage
from components.data_models import InputMessage

In [None]:
# STAGE
KAFKA_BROKERS = "pkc-oqmoy.us-central1.gcp.confluent.cloud:9092"
INPUT = "stg.bus.ds-marketplace.v1.bundle_engine_input"
OUTPUT = "stg.bus.ds-marketplace.v1.bundle_engine_output"
# ask in bundle-engine channel for Kafka Credentials

KAFKA_KEY = os.environ["KAFKA_KEY"],
KAFKA_SECRET = os.environ["KAFKA_SECRET"]

In [None]:
p = KafkaProducer(
    brokers=KAFKA_BROKERS,
    username=KAFKA_KEY,
    password=KAFKA_SECRET
)

# Simulate input data

In [None]:
example_data = InputMessage.schema()["examples"][0].copy()
example_data

In [None]:
# number of stores to simulate
NUM_STORES = 5
# number of orders/store
NUM_ORDERS = 50

In [None]:
REPL = max(int(NUM_ORDERS / len(example_data["orders"])), 1)
example_data["orders"] = example_data["orders"] * REPL

In [None]:
print(f"{INPUT=}")
print(f"{KAFKA_BROKERS=}")
print(f'''{len(example_data["orders"])=}''')

In [None]:
for i in range(NUM_STORES):
    example_data["bundle_request_id"] = f"smoke-test-{i}"
    p.publish(
        INPUT,
        example_data,
        serialize=True
    )

# Listen for message on the output topic

In [None]:
# setup a consumer
me = os.environ["USER"]
c = KafkaConsumer(
    consumer_group=f"{me}-staging-inspector",
    brokers=KAFKA_BROKERS,
    username=KAFKA_KEY,
    password=KAFKA_SECRET,
)
c.subscribe(topics=[OUTPUT])
messages = []

In [None]:
# consumer the messages
try:
    while True:
        msg = c.poll(0.25)
        if msg is None:
            continue
        if msg.error():
            raise Exception("errr")
        else:
            m = json.loads(msg.value().decode("utf-8"))
            messages.append(m)
            try:
                print(m["bundle_request_id"])
            except KeyError:
                print(m)
except KeyboardInterrupt:
    print("aborted")
finally:
    c.close()

In [None]:
# Inspect the mssages
messages[-1]