# How to aggregate a real-time kafka feed efficiently with python and atoti?

In this notebook we will create an analytical application that will aggregate a live kafka feed. 

The example is taken from finance: we will emulate real-time trading activity. Every time a new trade arrives, a live dashboard in atoti app will blink to visualize the new portfolio summary.

atoti is designed to aggregate big volumes of data. In this notebook we will focus on the incremental aspect of real-time aggregations.

# Starting kafka server

Let's start a kafka server as desribed here: [https://kafka.apache.org/quickstart](https://kafka.apache.org/quickstart).

# Starting atoti application

In [1]:
import atoti as tt

from atoti.config import create_config

config = create_config(metadata_db="./metadata.db", sampling_mode=tt.sampling.FULL)
session = tt.create_session(config=config)

trades_datastore = session.read_csv(
    "trades.csv",
    types={
        "TradeId": tt.types.STRING,
        "Strike": tt.types.STRING,
        "Quantity": tt.types.DOUBLE_NULLABLE,
    },
    keys=["TradeId"],
    store_name="Trade Attributes",
)

cube = session.create_cube(trades_datastore, "Portfolio")

Welcome to atoti 0.4.2!

By using this community edition, you agree with the license available at https://www.atoti.io/eula.
Browse the official documentation at https://docs.atoti.io.
Join the community at https://www.atoti.io/register.

You can hide this message by setting the ATOTI_HIDE_EULA_MESSAGE environment variable to True.


# Live dashboard in atoti

atoti comes with a user interface to create dashboards, look at this example. I have toggled the widgets to "real-time" mode so that we are able to see how the numbers "blink" when new messages arrive from kafka.

In [2]:
session.url + "/#/dashboard/27b"

'http://localhost:65182/#/dashboard/27b'

# Connecting atoti to the kafka feed

Let's point the `trades_datastore` to the kafka feed. atoti will consume new trades data as soon as it arrives, and the widgets will refresh (need to toggle them to real-time mode).

I launched kafka with the default parameters, hence the `bootstrap.server` is `localhost:9092`.
    
I'm using the `load_kafka` method in atoti with the default serializer. Please refer to the atoti documentation to read more about [load_kafka](https://docs.atoti.io/0.4.2/lib/atoti.html?highlight=kafka#atoti.store.Store.load_kafka).

In [3]:
trades_datastore.load_kafka(bootstrap_server ="localhost:9092",topic = "trades", group_id = "atoti")

In [4]:
import json
from kafka import KafkaProducer

producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode("utf-8"))


trd = {
    "TradeId": "Trd_TEST",
    "Ticker": "OXY",
    "Book": "EQ_VOL_HED",
    "Product": "EQ_Option",
    "Quantity": 45,
    "Strike": -31.41249430583221,
    "Maturity": "2022-10-01",
    "OptionType": "put",
    "MarketValue": 89.43425919208067,
}
producer.send("trades", trd)

<kafka.producer.future.FutureRecordMetadata at 0x7ff5ffb52490>

# Emulating real-time messages

In the following cell, I have a function that will generate random trades.

In [5]:
import random
import time

def generate_a_new_trade(id):

    # this function is generating trades data (random)

    trade = {
        "TradeId": "Trd_" + str(id),
        "Ticker": random.choice(["AAPL", "MSFT", "OXY"]),
        "Book": random.choice(
            ["EQ_LARG_DM", "EQ_SMAL_EM", "EQ_STRUCT", "EQ_VOL_HED", "EQ_WAREHOU"]
        ),
        "Product": "EQ_Option",
        "Quantity": random.randrange(-100, 100),
        "Strike": random.uniform(-100, 100),
        "Maturity": "2022-10-01",
        "OptionType": "put",
        "MarketValue": random.uniform(-100, 100),
    }

    print("A new trade generated:")
    print(json.dumps(trade, indent=4))

    return trade

Publishing to kafka:

In [None]:
for i in range(1000,1100):
    new_trade = generate_a_new_trade(i)

    producer.send("trades", new_trade)
    print("Published.\n")
    time.sleep(1)