# Kafka Summit 2024

## Install Kafka / Redpanda

https://kafka.apache.org/quickstart

## Install Apache Flink

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/try-flink/local_installation/

## Install Python packages

In [ ]:
!pip install -r requirements.txt

## Let's start

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from IPython.display import display
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka import Producer, Consumer, KafkaException
import pandas as pd
import numpy as np
import json
from tqdm import tqdm

## Kafka config

In [3]:
with open('../config.json') as fp:
    config_obj = json.load(fp)
config_obj

{'flink.servers': 'localhost:8081',
 'kafka.brokers': 'localhost:51932',
 'kafka.topic.price.name': 'local.price',
 'kafka.topic.holding.name': 'local.holding',
 'kafka.topic.order.name': 'local.order'}

In [11]:
conf = {
    'bootstrap.servers': config_obj['kafka.brokers']
}
producer = Producer(conf)

price_topic_name = config_obj['kafka.topic.price.name']
holding_topic_name = config_obj['kafka.topic.holding.name']
order_topic_name = config_obj['kafka.topic.order.name']

## Create topics

In [57]:
admin = AdminClient(conf)

price_topic = NewTopic(price_topic_name, num_partitions=3, replication_factor=1)
admin.create_topics([price_topic])

holding_topic = NewTopic(holding_topic_name, num_partitions=3, replication_factor=1)
admin.create_topics([holding_topic])

order_topic = NewTopic(order_topic_name, num_partitions=3, replication_factor=1)
admin.create_topics([order_topic])

{'local.order': <Future at 0x108963cd0 state=running>}

In [5]:
!rpk cluster info

CLUSTER
redpanda.998fd8e2-ed31-46f1-96d4-20f7bd02855b

BROKERS
ID    HOST       PORT
0*    127.0.0.1  51932

TOPICS
NAME                PARTITIONS  REPLICAS
__consumer_offsets  3           1
holding_topic       1           1
local.holding       3           1
local.order         3           1
local.price         3           1



## Prepare sample price data

In [9]:
prices_df = pd.read_csv('../data/prices.csv')
prices_df.drop(['ipoDate', 'delistingDate'], inplace=True, axis=1)
# Lazily remove any broken data
prices_df = prices_df[~prices_df['price'].isna()]
prices_df = prices_df[~prices_df['name'].isna()]
prices_df = prices_df[~prices_df['symbol'].isna()]
prices_df


Unnamed: 0,symbol,name,exchange,assetType,status,price
0,A,Agilent Technologies Inc,NYSE,Stock,Active,147.39
1,AA,Alcoa Corp,NYSE,Stock,Active,29.67
2,AAA,AXS First Priority CLO Bond ETF,NYSE ARCA,ETF,Active,25.14
3,AAAU,Goldman Sachs Physical Gold ETF,BATS,ETF,Active,21.36
4,AACG,ATA Creativity Global,NASDAQ,Stock,Active,1.40
...,...,...,...,...,...,...
11597,ZWS,Zurn Elkay Water Solutions Corp,NYSE,Stock,Active,32.13
11601,ZYME,Zymeworks BC Inc,NASDAQ,Stock,Active,10.53
11603,ZYRX,Global Earnings Capital Ltd,NASDAQ,Stock,Active,161.00
11604,ZYXI,Zynex Inc,NASDAQ,Stock,Active,12.93


## Publish price data

In [13]:
for p in tqdm(prices_df.to_dict(orient='records')):
    try:
        producer.produce(price_topic_name, key=p['symbol'], value=json.dumps(p))
    except Exception as e:
        print(f'Failed to publish {p}')
producer.flush()
print(f'{len(prices_df)} items produced.')

100%|██████████| 9886/9886 [00:00<00:00, 252599.03it/s]

9886 items produced.





## Prepare holding data (SPY)

In [8]:
holding_df = pd.read_excel('../data/holdings-daily-us-en-spy.xlsx', skiprows=4)
holding_df = holding_df[~holding_df['Ticker'].isna()]
holding_df

Unnamed: 0,Name,Ticker,Identifier,SEDOL,Weight,Sector,Shares Held,Local Currency
0,MICROSOFT CORP,MSFT,594918104,2588173,7.208694,-,90771106.0,USD
1,APPLE INC,AAPL,037833100,2046251,5.884039,-,177321492.0,USD
2,NVIDIA CORP,NVDA,67066G104,2379504,5.083447,-,30173549.0,USD
3,AMAZON.COM INC,AMZN,023135106,2000019,3.701629,-,111666519.0,USD
4,META PLATFORMS INC CLASS A,META,30303M102,B7TL820,2.513395,-,26876014.0,USD
...,...,...,...,...,...,...,...,...
499,MOHAWK INDUSTRIES INC,MHK,608190104,2598699,0.014293,-,627392.0,USD
500,PARAMOUNT GLOBAL CLASS B,PARA,92556H206,BKTNTR9,0.012439,-,5898665.0,USD
501,VF CORP,VFC,918204108,2928683,0.011064,-,4049110.0,USD
502,FOX CORP CLASS B,FOX,35137L204,BJJMGY5,0.008238,-,1613107.0,USD


## Publish holding data (SPY)

In [9]:
for h in tqdm(holding_df.to_dict(orient='records')):
    try:
        producer.produce(holding_topic_name, key=h['Ticker'], value=json.dumps(h))
    except Exception as e:
        print(f'Failed to publish {h}')

producer.flush()
print(f'{len(holding_df)} items produced.')    

100%|██████████| 504/504 [00:00<00:00, 144374.35it/s]


504 items produced.


In [16]:
consumer = Consumer({
    **conf,
    'group.id': 'test-client-1',
    'auto.offset.reset': 'earliest'
})

consumer.subscribe([price_topic_name])
msg = consumer.poll(1.0)
if msg is None:
    print("No messages")
elif msg.error():
    raise KafkaException(msg.error())
else:
    print(f'Received message: {msg.key().decode("utf-8")} {msg.value().decode("utf-8")}')

consumer.close

Received message: A {"symbol": "A", "name": "Agilent Technologies Inc", "exchange": "NYSE", "assetType": "Stock", "status": "Active", "price": 147.39}


<function Consumer.close>

In [19]:
consumer = Consumer({
    **conf,
    'group.id': 'test-client-1',
    'auto.offset.reset': 'earliest'
})

consumer.subscribe([holding_topic_name])
msg = consumer.poll(1.0)
if msg is None:
    print("No messages")
elif msg.error():
    raise KafkaException(msg.error())
else:
    print(f'Received message: {msg.key().decode("utf-8")} {msg.value().decode("utf-8")}')

consumer.close

No messages


<function Consumer.close>