# Machine Learning on streaming data using Kafka

In [None]:
!pip install tensorflow-io==0.17.0
!pip install tensorflow==2.4.0
!pip install kafka-python

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting tensorflow-io==0.17.0
  Downloading tensorflow_io-0.17.0-cp37-cp37m-manylinux2010_x86_64.whl (25.3 MB)
[K     |████████████████████████████████| 25.3 MB 1.5 MB/s 
[?25hCollecting tensorflow<2.5.0,>=2.4.0
  Downloading tensorflow-2.4.4-cp37-cp37m-manylinux2010_x86_64.whl (394.5 MB)
[K     |████████████████████████████████| 394.5 MB 40 kB/s 
Collecting wrapt~=1.12.1
  Downloading wrapt-1.12.1.tar.gz (27 kB)
Collecting absl-py~=0.10
  Downloading absl_py-0.15.0-py3-none-any.whl (132 kB)
[K     |████████████████████████████████| 132 kB 61.7 MB/s 
[?25hCollecting h5py~=2.10.0
  Downloading h5py-2.10.0-cp37-cp37m-manylinux1_x86_64.whl (2.9 MB)
[K     |████████████████████████████████| 2.9 MB 52.0 MB/s 
[?25hCollecting typing-extensions~=3.7.4
  Downloading typing_extensions-3.7.4.3-py3-none-any.whl (22 kB)
Collecting termcolor~=1.1.0
  Downloading termcolor-1.1.0.tar.gz (3.9 k

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting tensorflow==2.4.0
  Downloading tensorflow-2.4.0-cp37-cp37m-manylinux2010_x86_64.whl (394.7 MB)
[K     |████████████████████████████████| 394.7 MB 18 kB/s 
Installing collected packages: tensorflow
  Attempting uninstall: tensorflow
    Found existing installation: tensorflow 2.4.4
    Uninstalling tensorflow-2.4.4:
      Successfully uninstalled tensorflow-2.4.4
Successfully installed tensorflow-2.4.0
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
[K     |████████████████████████████████| 246 kB 27.0 MB/s 
[?25hInstalling collected packages: kafka-python
Successfully installed kafka-python-2.0.2


In [None]:
!tar -xzf kafka_2.13-2.7.2.tgz

In [None]:
!./kafka_2.13-2.7.2/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-2.7.2/config/zookeeper.properties
!./kafka_2.13-2.7.2/bin/kafka-server-start.sh -daemon ./kafka_2.13-2.7.2/config/server.properties
!echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
!sleep 10

Waiting for 10 secs until kafka and zookeeper services are up and running


In [None]:
!./kafka_2.13-2.7.2/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic newdata-train
!./kafka_2.13-2.7.2/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 2 --topic newdata-test


Error while executing topic command : Topic 'newdata-train' already exists.
[2022-10-24 09:55:40,673] ERROR org.apache.kafka.common.errors.TopicExistsException: Topic 'newdata-train' already exists.
 (kafka.admin.TopicCommand$)
Error while executing topic command : Topic 'newdata-test' already exists.
[2022-10-24 09:55:43,795] ERROR org.apache.kafka.common.errors.TopicExistsException: Topic 'newdata-test' already exists.
 (kafka.admin.TopicCommand$)


In [None]:
!./kafka_2.13-2.7.2/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic newdata-train
!./kafka_2.13-2.7.2/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic newdata-test


Topic: newdata-train	PartitionCount: 1	ReplicationFactor: 1	Configs: segment.bytes=1073741824
	Topic: newdata-train	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
Topic: newdata-test	PartitionCount: 2	ReplicationFactor: 1	Configs: segment.bytes=1073741824
	Topic: newdata-test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
	Topic: newdata-test	Partition: 1	Leader: 0	Replicas: 0	Isr: 0


In [None]:
import os
from datetime import datetime
import time
import threading
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError
from sklearn.model_selection import train_test_split
import pandas as pd
import tensorflow as tf
import tensorflow_io as tfio


In [None]:
print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))

tensorflow-io version: 0.17.0
tensorflow version: 2.4.0


In [175]:
COLUMNS = [
           'sex',
           'age',
           
           'type',
           
           'induration_diameter',
           'treatment'

           ]

In [176]:
newdata_iterator = pd.read_csv('ml.csv', header=None, names=COLUMNS, chunksize=100000)
newdata_df = next(newdata_iterator)
newdata_df.head()


Unnamed: 0,sex,age,type,induration_diameter,treatment
0,1,34,34,34,1
1,1,32,4,32,1
2,1,12,2,12,1
3,2,11,66,11,0
4,2,12,3,12,0


In [177]:
# Number of datapoints and columns
len(newdata_df), len(newdata_df.columns)

(96, 5)

In [178]:
# Number of datapoints belonging to each class (0: background noise, 1: signal)
len(newdata_df[newdata_df["treatment"]==0]), len(newdata_df[newdata_df["treatment"]==1])

(48, 48)

In [179]:
# Split the dataset

train_df, test_df = train_test_split(newdata_df, test_size=0.4, shuffle=True)
print("Number of training samples: ",len(train_df))
print("Number of testing sample: ",len(test_df))

x_train_df = train_df.drop(["treatment"], axis=1)
y_train_df = train_df["treatment"]

x_test_df = test_df.drop(["treatment"], axis=1)
y_test_df = test_df["treatment"]

# The labels are set as the kafka message keys so as to store data
# in multiple-partitions. Thus, enabling efficient data retrieval
# using the consumer groups.
x_train = list(filter(None, x_train_df.to_csv(index=False).split("\n")[1:]))
print(x_train)
y_train = list(filter(None, y_train_df.to_csv(index=False).split("\n")[1:]))

x_test = list(filter(None, x_test_df.to_csv(index=False).split("\n")[1:]))
y_test = list(filter(None, y_test_df.to_csv(index=False).split("\n")[1:]))

Number of training samples:  57
Number of testing sample:  39
['1,34,34,34', '1,12,2,12', '2,13,2,13', '2,11,66,11', '1,32,4,32', '2,12,3,12', '2,12,3,12', '1,32,4,32', '2,11,66,11', '2,11,66,11', '2,11,66,11', '1,32,4,32', '2,11,66,11', '1,12,2,12', '1,34,34,34', '1,32,4,32', '2,12,3,12', '1,12,2,12', '1,12,2,12', '2,11,66,11', '2,11,66,11', '1,32,4,32', '2,13,2,13', '1,12,2,12', '2,12,3,12', '2,12,3,12', '1,12,2,12', '2,11,66,11', '1,12,2,12', '2,12,3,12', '2,13,2,13', '1,12,2,12', '1,12,2,12', '1,34,34,34', '2,12,3,12', '2,11,66,11', '2,13,2,13', '2,13,2,13', '2,12,3,12', '1,34,34,34', '1,34,34,34', '1,34,34,34', '1,32,4,32', '1,32,4,32', '2,11,66,11', '1,32,4,32', '2,11,66,11', '1,32,4,32', '2,11,66,11', '1,32,4,32', '2,13,2,13', '1,34,34,34', '2,11,66,11', '2,12,3,12', '2,13,2,13', '1,34,34,34', '2,11,66,11']


In [180]:
NUM_COLUMNS = len(x_train_df.columns)
len(x_train), len(y_train), len(x_test), len(y_test)

(57, 57, 39, 39)

In [181]:
# Store the train and test data in kafka

def error_callback(exc):
    raise Exception('Error while sendig data to kafka: {0}'.format(str(exc)))

def write_to_kafka(topic_name, items):
  count=0
  producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
  for message, key in items:
    producer.send(topic_name, key=key.encode('utf-8'), value=message.encode('utf-8')).add_errback(error_callback)
    count+=1
  producer.flush()
  print("Wrote {0} messages into topic: {1}".format(count, topic_name))

write_to_kafka("newdata-train", zip(x_train, y_train))
write_to_kafka("newdata-test", zip(x_test, y_test))

Wrote 57 messages into topic: newdata-train
Wrote 39 messages into topic: newdata-test


In [192]:
def decode_kafka_item(item):
  message = tf.io.decode_csv(item.message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(item.key)
  return (message, key)

BATCH_SIZE=64
SHUFFLE_BUFFER_SIZE=64
train_ds = tfio.IODataset.from_kafka('newdata-train', partition=0, offset=0)
train_ds = train_ds.shuffle(buffer_size=SHUFFLE_BUFFER_SIZE)
train_ds = train_ds.map(decode_kafka_item)
train_ds = train_ds.batch(BATCH_SIZE)

In [188]:
# Set the parameters

OPTIMIZER="adam"
LOSS=tf.keras.losses.BinaryCrossentropy(from_logits=True)
METRICS=['accuracy']
EPOCHS=10

In [189]:
# design/build the model
print(NUM_COLUMNS)
model = tf.keras.Sequential([
  tf.keras.layers.Input(shape=(NUM_COLUMNS,)),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.2),
  tf.keras.layers.Dense(256, activation='relu'),
  tf.keras.layers.Dropout(0.4),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.4),
  tf.keras.layers.Dense(1, activation='sigmoid')
])

print(model.summary())

4
Model: "sequential_8"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense_32 (Dense)             (None, 128)               640       
_________________________________________________________________
dropout_24 (Dropout)         (None, 128)               0         
_________________________________________________________________
dense_33 (Dense)             (None, 256)               33024     
_________________________________________________________________
dropout_25 (Dropout)         (None, 256)               0         
_________________________________________________________________
dense_34 (Dense)             (None, 128)               32896     
_________________________________________________________________
dropout_26 (Dropout)         (None, 128)               0         
_________________________________________________________________
dense_35 (Dense)             (None, 1)              

In [190]:
# compile the model
model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)

In [191]:
print(train_ds)

# fit the model
model.fit(train_ds, epochs=EPOCHS)

<BatchDataset shapes: ((None, 4), (None,)), types: (tf.float32, tf.float32)>
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7fa4af755690>

In [None]:
test_ds = tfio.experimental.streaming.KafkaGroupIODataset(
    topics=["newdata-test"],
    group_id="testcg",
    servers="127.0.0.1:9092",
    stream_timeout=10000,
    configuration=[
        "session.timeout.ms=7000",
        "max.poll.interval.ms=8000",
        "auto.offset.reset=earliest"
    ],
)

def decode_kafka_test_item(raw_message, raw_key):
  message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(raw_key)
  return (message, key)

test_ds = test_ds.map(decode_kafka_test_item)
test_ds = test_ds.batch(BATCH_SIZE)

In [None]:
res = model.evaluate(test_ds)
print("test loss, test acc:", res)

test loss, test acc: [1.6361463069915771, 0.8055555820465088]


In [None]:
!./kafka_2.13-2.7.2/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group testcg


GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                  HOST            CLIENT-ID
testcg          newdata-test    0          8               8               0               rdkafka-7af8aa06-5e81-4728-b26a-232e959fb8b8 /172.28.0.2     rdkafka
testcg          newdata-test    1          31              31              0               rdkafka-7af8aa06-5e81-4728-b26a-232e959fb8b8 /172.28.0.2     rdkafka


In [None]:
online_train_ds = tfio.experimental.streaming.KafkaBatchIODataset(
    topics=["newdata-train"],
    group_id="cgonline",
    servers="127.0.0.1:9092",
    stream_timeout=10000, # in milliseconds, to block indefinitely, set it to -1.
    configuration=[
        "session.timeout.ms=7000",
        "max.poll.interval.ms=8000",
        "auto.offset.reset=earliest"
    ],
)

In [None]:
def decode_kafka_online_item(raw_message, raw_key):
  message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(raw_key)
  return (message, key)

for mini_ds in online_train_ds:
  mini_ds = mini_ds.shuffle(buffer_size=32)
  mini_ds = mini_ds.map(decode_kafka_online_item)
  mini_ds = mini_ds.batch(32)
  if len(mini_ds) > 0:
    model.fit(mini_ds, epochs=3)

Epoch 1/3
Epoch 2/3
Epoch 3/3


## Task 1: Execute the above code properly with the given dataset. 

## Task 2: Make a report about,
### -> detailed analysis of the code
### -> How did you execute the task using Kafka, and why is Kafka important in this machine learning model?

## Task 3: Feed a new dataset into Kafka. Utilizing the dataset, train and test your choice of machine learning model and solve any issues that may arise in the code											