In [None]:
from keras.models import Sequential
from keras.layers import LSTM, Dense
from kafka import KafkaConsumer
import plotly.express as px
import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow_io as tfio


## Food Delivery Time Prediction Model

Now let’s train a Machine Learning model using an LSTM neural network model for the task of food delivery time prediction:

In [None]:
model = Sequential()
model.add(LSTM(128, return_sequences=True, input_shape= (3, 1)))
model.add(LSTM(64, return_sequences=False))
model.add(Dense(25))
model.add(Dense(1))
model.compile(optimizer='adam', loss='mean_squared_error', run_eagerly=True)
model.summary()

In [None]:
online_train_ds = tfio.experimental.streaming.KafkaBatchIODataset(
    topics=["model-data"],
    group_id="testzo",
    servers="redpanda-0:9092,redpanda-1:9092,redpanda-2:9092",
    stream_timeout=10000,
    configuration=[
        "session.timeout.ms=7000",
        "max.poll.interval.ms=8000",
        "auto.offset.reset=earliest"
    ],
)


def decode_kafka_online_item(raw_message, raw_key):
    message = tf.io.decode_csv(raw_message, [[0.0] for i in range(3)])
    print("message> ",message)
    key = tf.strings.to_number(raw_key)
    print("key> ",key)
    return (message, key)
  
batch_size = 20
for single_ds in online_train_ds:
    if len(single_ds) >= batch_size:
        single_ds = single_ds.shuffle(buffer_size=batch_size)
        single_ds = single_ds.map(decode_kafka_online_item)
        single_ds = single_ds.batch(batch_size)
    
        model.fit(single_ds, epochs=1)
        tf.keras.models.save_model(model, "./time_prediction_model")
    else:
        print("Not enough data in the dataset. Skipping model fitting.")


