# application

> Fill in a module description here

In [None]:
#| default_exp application

In [None]:
#| export

from typing import *

from os import environ

from fastkafka.application import FastKafka
from pydantic import BaseModel, NonNegativeFloat, Field

In [None]:
import nest_asyncio

from fastkafka.application import Tester

In [None]:
# | export

class Model:
    
    def predict(*args, **kwargs) -> List[int]:
        return [0, 1, 2]
    
model = Model()

In [None]:
model.predict()

[0, 1, 2]

In [None]:
#| export




class IrisInputData(BaseModel):
    sepal_length: NonNegativeFloat = Field(
        ..., example=0.5, description="Sepal length in cm"
    )
    sepal_width: NonNegativeFloat = Field(
        ..., example=0.5, description="Sepal width in cm"
    )
    petal_length: NonNegativeFloat = Field(
        ..., example=0.5, description="Petal length in cm"
    )
    petal_width: NonNegativeFloat = Field(
        ..., example=0.5, description="Petal width in cm"
    )


class IrisPrediction(BaseModel):
    species: str = Field(..., example="setosa", description="Predicted species")


In [None]:
IrisInputData(sepal_length=0.5, sepal_width=0.5, petal_length=0.5, petal_width=0.5)

IrisInputData(sepal_length=0.5, sepal_width=0.5, petal_length=0.5, petal_width=0.5)

In [None]:
#| export

kafka_server_url = environ.get("KAFKA_HOSTNAME", "localhost")
kafka_server_port = environ.get("KAFKA_PORT", 9092)

kafka_brokers = {
    "localhost": {
        "url": kafka_server_url,
        "description": "local development kafka broker",
        "port": kafka_server_port,
    },
    "production": {
        "url": "kafka.airt.ai",
        "description": "production kafka broker",
        "port": 9092,
        "protocol": "kafka-secure",
        "security": {"type": "plain"},
    },
}

kafka_app = FastKafka(
    title="Iris predictions",
    kafka_brokers=kafka_brokers,
    bootstrap_servers=f"{kafka_server_url}:{kafka_server_port}",
)


In [None]:
kafka_app

<fastkafka._application.app.FastKafka>

In [None]:
#| export

@kafka_app.consumes(topic="input_data", auto_offset_reset="latest")
async def on_input_data(msg: IrisInputData):
    global model
    species_class = model.predict(
        [[msg.sepal_length, msg.sepal_width, msg.petal_length, msg.petal_width]]
    )[0]

    to_predictions(species_class)


@kafka_app.produces(topic="predictions")
def to_predictions(species_class: int) -> IrisPrediction:
    iris_species = ["setosa", "versicolor", "virginica"]

    prediction = IrisPrediction(species=iris_species[species_class])
    return prediction


In [None]:
nest_asyncio.apply()

In [None]:


msg = IrisInputData(
    sepal_length=0.1,
    sepal_width=0.2,
    petal_length=0.3,
    petal_width=0.4,
)

# Start Tester app and create local Kafka broker for testing
async with Tester(kafka_app) as tester:
    # Send IrisInputData message to input_data topic
    await tester.to_input_data(msg)

    # Assert that the kafka_app responded with IrisPrediction in predictions topic
    await tester.awaited_mocks.on_predictions.assert_awaited_with(
        IrisPrediction(species="setosa"), timeout=2
    )


[INFO] fastkafka._testing.local_broker: Java is already installed.
[INFO] fastkafka._testing.local_broker: But not exported to PATH, exporting...
[INFO] fastkafka._testing.local_broker: Kafka is already installed.
[INFO] fastkafka._testing.local_broker: But not exported to PATH, exporting...
[INFO] fastkafka._testing.local_broker: Starting zookeeper...
[INFO] fastkafka._testing.local_broker: zookeeper started, sleeping for 5 seconds...
[INFO] fastkafka._testing.local_broker: Starting kafka...
[INFO] fastkafka._testing.local_broker: kafka started, sleeping for 5 seconds...
[INFO] fastkafka._testing.local_broker: Local Kafka broker up and running on 127.0.0.1:9092
[INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': '127.0.0.1:9092'}'
[INFO] fastkafka._components.aiokafka_producer_manager: AIOKafkaProducerManager.start(): Entering...
[INFO] fastkafka._components.aiokafka_producer_manager: _aiokafka_producer_manager(): Starting.