In [1]:
! pip install confluent_kafka

Collecting confluent_kafka
  Downloading confluent_kafka-1.9.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.1 MB)
[K     |████████████████████████████████| 3.1 MB 12.1 MB/s eta 0:00:01
[?25hInstalling collected packages: confluent-kafka
Successfully installed confluent-kafka-1.9.2


In [2]:
import json
import pprint

import confluent_kafka as ck
import requests
import psycopg2
import pandas as pd

## Kafka settings
- consumer
    - detects the change of row in `tbl_car_price` table. (source topic)
    - retrieves one message at a time.
- producer
    - sends a message to the car_data_predicted table. (sink topic)

In [3]:
"""
Create kafka consumer / producer instances
"""

consumer = ck.Consumer(
    {
        "bootstrap.servers": "kafka:9092",
        "group.id": "teste",
        "auto.offset.reset": "earliest",
    }
)

producer = ck.Producer(
    {
        "bootstrap.servers": "kafka:9092",
    }
)

In [4]:
# topics = consumer.list_topics()
# pprint.pprint(topics.topics)

In [5]:
"""
Subscribe car price table
- detect of table changes
- we can use poll to get the events created for given timeout interval (1 seconeds for this example)
"""

consumer.subscribe(["car_database.public.tbl_car_price"])

Create a new record in car database.

```sql
INSERT INTO
    tbl_car_price
(
    MODEL,
    YEAR,
    TRANSMISSION,
    MILEAGE,
    FUEL_TYPE,
    TAX,
    MPG,
    ENGINE_SIZE
)
VALUES
(
    'B-MAX',
    2022,
    'Automatic',
    1230,
    'Diesel',
    165,
    45.2,
    1.2
);
```

In [10]:
msg = consumer.poll(1.0)

In [11]:
message_dict = json.loads(msg.value().decode("utf-8"))
pprint.pprint(message_dict)

{'payload': {'engine_size': 1.2,
             'fuel_type': 'Diesel',
             'id': 22,
             'mileage': 1230,
             'model': 'B-MAX',
             'mpg': 45.2,
             'price': None,
             'suggested_price': None,
             'tax': 165,
             'transmission': 'Automatic',
             'year': 2022},
 'schema': {'fields': [{'default': 0,
                        'field': 'id',
                        'optional': False,
                        'type': 'int32'},
                       {'field': 'model', 'optional': True, 'type': 'string'},
                       {'field': 'year', 'optional': True, 'type': 'int32'},
                       {'field': 'price', 'optional': True, 'type': 'int32'},
                       {'field': 'transmission',
                        'optional': True,
                        'type': 'string'},
                       {'field': 'mileage', 'optional': True, 'type': 'int32'},
                       {'field': 'fuel_type',
    

## Bentoml API request
Before post data to the API server, don't forget to run `bento serve` in docker container with:
```bash
$ bentoml serve --host price_prediction_service --host 0.0.0.0 --port 3000 --production
```

In [12]:
"""
This dictionary defines (table_column, feature_name) pairs to match the name of ML input features.
"""

FIELDS = {
    "model": "model",
    "year": "year",
    "transmission": "transmission",
    "mileage": "mileage",
    "fuel_type": "fuelType",
    "tax": "tax",
    "mpg": "mpg",
    "engine_size": "engineSize",
}

In [13]:
"""
Reconstruct feature names required to passed to the ML model.
"""

input_dict = {
    api_col: [message_dict["payload"][db_col]]
    for db_col, api_col in FIELDS.items()
}
pprint.pprint(input_dict)

{'engineSize': [1.2],
 'fuelType': ['Diesel'],
 'mileage': [1230],
 'model': ['B-MAX'],
 'mpg': [45.2],
 'tax': [165],
 'transmission': ['Automatic'],
 'year': [2022]}


In [14]:
"""
Request the BentoML deployed server to predict the car price.
"""

endpoint = "http://0.0.0.0:3000/predict"
response = requests.post(endpoint, json=input_dict)

print(f"status: {response.status_code}")
print(f"output: {json.loads(response.text)}")

status: 200
output: [14935.037109375]


In [15]:
"""
Append prediction result to the response we got from the consumer.
"""

message_dict["payload"]["suggested_price"] = float(json.loads(response.text)[0])
pprint.pprint(message_dict["payload"])

{'engine_size': 1.2,
 'fuel_type': 'Diesel',
 'id': 22,
 'mileage': 1230,
 'model': 'B-MAX',
 'mpg': 45.2,
 'price': None,
 'suggested_price': 14935.037109375,
 'tax': 165,
 'transmission': 'Automatic',
 'year': 2022}


## Postgres DB status

In [16]:
"""
Check out current DB status.
"""

conn = psycopg2.connect(
    host="postgres",
    port="5432",
    user="postgres",
    password="postgres",
    database="database",
)

In [17]:
pd.read_sql_query("select * from public.tbl_car_price", conn)



Unnamed: 0,id,model,year,price,transmission,mileage,fuel_type,tax,mpg,engine_size,suggested_price
0,1,Fiesta,2017,,Automatic,13000,Petrol,140,43.2,1.3,
1,2,B-MAX,2020,,Manual,4000,Diesel,100,57.4,1.5,
2,3,C-MAX,2016,,Manual,5060,Diesel,165,45.2,1.2,
3,4,C-MAX,2016,,Manual,5060,Diesel,165,45.2,1.2,
4,5,C-MAX,2017,,Manual,5060,Diesel,165,45.2,1.2,
5,7,C-MAX,2016,,Manual,5060,Diesel,165,45.2,1.2,
6,8,C-MAX,2016,,Manual,5060,Diesel,165,45.2,1.2,
7,9,C-MAX,2016,,Manual,5060,Diesel,165,45.2,1.3,
8,11,C-MAX,2022,,Manual,3000,Petrol,12000,50.3,1.3,
9,12,C-MAX,2022,,Manual,3000,Petrol,12000,50.3,1.3,


## Save prediction results to the `car_data_predicted` topic

In [18]:
"""
Send the message to the topic with the predicted price
"""

producer.produce(
    "car_data_predicted",
    key=msg.key().decode("utf-8"),
    value=json.dumps(message_dict)
)
producer.flush(1)

0

In [19]:
"""
Now, the topic "car_data_predicted" is updated as you can see below
"""

pd.read_sql_query("select * from public.car_data_predicted", conn)



Unnamed: 0,transmission,year,mpg,price,suggested_price,model,tax,engine_size,id,fuel_type,mileage
0,Automatic,2017,43.2,,12992,Fiesta,140,1.3,1,Petrol,13000
1,Manual,2020,57.4,,17379,B-MAX,100,1.5,2,Diesel,4000
2,Manual,2016,45.2,,10369,C-MAX,165,1.2,3,Diesel,5060
3,Manual,2016,45.2,,10369,C-MAX,165,1.2,4,Diesel,5060
4,Manual,2017,45.2,,10200,C-MAX,165,1.2,5,Diesel,5060
5,Manual,2016,45.2,,10369,C-MAX,165,1.2,7,Diesel,5060
6,Manual,2016,45.2,,10369,C-MAX,165,1.2,8,Diesel,5060
7,Manual,2016,45.2,,10276,C-MAX,165,1.3,9,Diesel,5060
8,Manual,2022,50.3,,14614,C-MAX,12000,1.3,11,Petrol,3000
9,Manual,2022,50.3,,14614,C-MAX,12000,1.3,12,Petrol,3000


%4|1671110787.196|MAXPOLL|rdkafka#consumer-1| [thrd:main]: Application maximum poll interval (300000ms) exceeded by 9ms (adjust max.poll.interval.ms for long-running message processing): leaving group
