In [4]:
!pip install tensorflow==2.11.0
!pip install tensorflow-io==0.30.0
!pip install protobuf==3.19.1

Defaulting to user installation because normal site-packages is not writeable
Collecting tensorflow==2.11.0
  Downloading tensorflow-2.11.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (588.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m588.3/588.3 MB[0m [31m491.1 kB/s[0m eta [36m0:00:00[0m00:01[0m00:02[0m
Collecting keras<2.12,>=2.11.0
  Downloading keras-2.11.0-py2.py3-none-any.whl (1.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.7/1.7 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hCollecting tensorboard<2.12,>=2.11
  Downloading tensorboard-2.11.2-py3-none-any.whl (6.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.0/6.0 MB[0m [31m7.1 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hCollecting gast<=0.4.0,>=0.2.1
  Downloading gast-0.4.0-py3-none-any.whl (9.8 kB)
Collecting protobuf<3.20,>=3.9.2
  Downloading protobuf-3.19.6-cp310-cp310-manylinux_2_17_x86_64.manyli

In [12]:
import os
from datetime import datetime
import time
import threading
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError
from sklearn.model_selection import train_test_split
import pandas as pd
import tensorflow as tf
import tensorflow_io as tfio

In [13]:
print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))

tensorflow-io version: 0.25.0
tensorflow version: 2.11.0


In [None]:
!curl -sSOL https://archive.ics.uci.edu/ml/machine-learning-databases/00279/SUSY.csv.gz

In [3]:
COLUMNS = [
          #  labels
           'class',
          #  low-level features
           'lepton_1_pT',
           'lepton_1_eta',
           'lepton_1_phi',
           'lepton_2_pT',
           'lepton_2_eta',
           'lepton_2_phi',
           'missing_energy_magnitude',
           'missing_energy_phi',
          #  high-level derived features
           'MET_rel',
           'axial_MET',
           'M_R',
           'M_TR_2',
           'R',
           'MT2',
           'S_R',
           'M_Delta_R',
           'dPhi_r_b',
           'cos(theta_r1)'
           ]

In [14]:
susy_iterator = pd.read_csv('./SUSY.csv.gz', header=None, names=COLUMNS, chunksize=100000)
susy_df = next(susy_iterator)
susy_df.head()

Unnamed: 0,class,lepton_1_pT,lepton_1_eta,lepton_1_phi,lepton_2_pT,lepton_2_eta,lepton_2_phi,missing_energy_magnitude,missing_energy_phi,MET_rel,axial_MET,M_R,M_TR_2,R,MT2,S_R,M_Delta_R,dPhi_r_b,cos(theta_r1)
0,0.0,0.972861,0.653855,1.176225,1.157156,-1.739873,-0.874309,0.567765,-0.175,0.810061,-0.252552,1.921887,0.889637,0.410772,1.145621,1.932632,0.994464,1.367815,0.040714
1,1.0,1.667973,0.064191,-1.225171,0.506102,-0.338939,1.672543,3.475464,-1.219136,0.012955,3.775174,1.045977,0.568051,0.481928,0.0,0.44841,0.205356,1.321893,0.377584
2,1.0,0.44484,-0.134298,-0.709972,0.451719,-1.613871,-0.768661,1.219918,0.504026,1.831248,-0.431385,0.526283,0.941514,1.587535,2.024308,0.603498,1.562374,1.135454,0.18091
3,1.0,0.381256,-0.976145,0.693152,0.448959,0.891753,-0.677328,2.03306,1.533041,3.04626,-1.005285,0.569386,1.015211,1.582217,1.551914,0.761215,1.715464,1.492257,0.090719
4,1.0,1.309996,-0.690089,-0.676259,1.589283,-0.693326,0.622907,1.087562,-0.381742,0.589204,1.365479,1.179295,0.968218,0.728563,0.0,1.083158,0.043429,1.154854,0.094859


In [15]:
# Number of datapoints and columns
len(susy_df), len(susy_df.columns)

(100000, 19)

In [17]:
# Number of datapoints belonging to each class (0: background noise, 1: signal)
len(susy_df[susy_df["class"]==0]), len(susy_df[susy_df["class"]==1])

(54025, 45975)

In [18]:
train_df, test_df = train_test_split(susy_df, test_size=0.4, shuffle=True)
print("Number of training samples: ",len(train_df))
print("Number of testing sample: ",len(test_df))

x_train_df = train_df.drop(["class"], axis=1)
y_train_df = train_df["class"]

x_test_df = test_df.drop(["class"], axis=1)
y_test_df = test_df["class"]

# The labels are set as the kafka message keys so as to store data
# in multiple-partitions. Thus, enabling efficient data retrieval
# using the consumer groups.
x_train = list(filter(None, x_train_df.to_csv(index=False).split("\n")[1:]))
y_train = list(filter(None, y_train_df.to_csv(index=False).split("\n")[1:]))

x_test = list(filter(None, x_test_df.to_csv(index=False).split("\n")[1:]))
y_test = list(filter(None, y_test_df.to_csv(index=False).split("\n")[1:]))


Number of training samples:  60000
Number of testing sample:  40000


In [19]:
NUM_COLUMNS = len(x_train_df.columns)
len(x_train), len(y_train), len(x_test), len(y_test)

(60000, 60000, 40000, 40000)

In [20]:
def error_callback(exc):
    raise Exception('Error while sendig data to kafka: {0}'.format(str(exc)))

def write_to_kafka(topic_name, items):
  count=0
  producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
  for message, key in items:
    producer.send(topic_name, key=key.encode('utf-8'), value=message.encode('utf-8')).add_errback(error_callback)
    count+=1
  producer.flush()
  print("Wrote {0} messages into topic: {1}".format(count, topic_name))

write_to_kafka("turtorial11-susy-train", zip(x_train, y_train))
write_to_kafka("turtorial11-susy-test", zip(x_test, y_test))


Wrote 60000 messages into topic: turtorial11-susy-train
Wrote 40000 messages into topic: turtorial11-susy-test


In [21]:
def decode_kafka_item(item):
  message = tf.io.decode_csv(item.message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(item.key)
  return (message, key)

BATCH_SIZE=64
SHUFFLE_BUFFER_SIZE=64
train_ds = tfio.IODataset.from_kafka('turtorial11-susy-train', partition=0, offset=0)
train_ds = train_ds.shuffle(buffer_size=SHUFFLE_BUFFER_SIZE)
train_ds = train_ds.map(decode_kafka_item)
train_ds = train_ds.batch(BATCH_SIZE)

2023-12-18 01:33:16.013825: W tensorflow_io/core/kernels/audio_video_mp3_kernels.cc:271] libmp3lame.so.0 or lame functions are not available
2023-12-18 01:33:16.014141: I tensorflow_io/core/kernels/cpu_check.cc:128] Your CPU supports instructions that this TensorFlow IO binary was not compiled to use: AVX2 FMA
2023-12-18 01:33:16.148227: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2023-12-18 01:33:16.148319: W tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:265] failed call to cuInit: UNKNOWN ERROR (303)
2023-12-18 01:33:16.148344: I tensorflow/compiler/xla/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (NamThien): /proc/driver/nvidia/version does not exist
2023-12-18 01:33:16.149023: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary i

In [22]:
# Set the parameters
OPTIMIZER="adam"
LOSS=tf.keras.losses.BinaryCrossentropy(from_logits=True)
METRICS=['accuracy']
EPOCHS=10


In [23]:
# design/build the model
model = tf.keras.Sequential([
  tf.keras.layers.Input(shape=(NUM_COLUMNS,)),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.2),
  tf.keras.layers.Dense(256, activation='relu'),
  tf.keras.layers.Dropout(0.4),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.4),
  tf.keras.layers.Dense(1, activation='sigmoid')
])

print(model.summary())

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense (Dense)               (None, 128)               2432      
                                                                 
 dropout (Dropout)           (None, 128)               0         
                                                                 
 dense_1 (Dense)             (None, 256)               33024     
                                                                 
 dropout_1 (Dropout)         (None, 256)               0         
                                                                 
 dense_2 (Dense)             (None, 128)               32896     
                                                                 
 dropout_2 (Dropout)         (None, 128)               0         
                                                                 
 dense_3 (Dense)             (None, 1)                 1

In [24]:
# compile the model
model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)

In [25]:
# compile the model
model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)

In [26]:
test_ds = tfio.experimental.streaming.KafkaGroupIODataset(
    topics=["turtorial11-susy-test"],
    group_id="testcg",
    servers="127.0.0.1:9092",
    stream_timeout=10000,
    configuration=[
        "session.timeout.ms=7000",
        "max.poll.interval.ms=8000",
        "auto.offset.reset=earliest"
    ],
)

def decode_kafka_test_item(raw_message, raw_key):
  message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(raw_key)
  return (message, key)

test_ds = test_ds.map(decode_kafka_test_item)
test_ds = test_ds.batch(BATCH_SIZE)

Instructions for updating:
Use `tf.data.Dataset.counter(...)` instead.
Instructions for updating:
Lambda fuctions will be no more assumed to be used in the statement where they are used, or at least in the same block. https://github.com/tensorflow/tensorflow/issues/56089
Instructions for updating:
Use `tf.data.Dataset.take_while(...)


2023-12-18 01:33:47.579871: I tensorflow_io/core/kernels/kafka_kernels.cc:879] Kafka configuration: session.timeout.ms=7000
2023-12-18 01:33:47.579974: I tensorflow_io/core/kernels/kafka_kernels.cc:879] Kafka configuration: max.poll.interval.ms=8000
2023-12-18 01:33:47.579996: I tensorflow_io/core/kernels/kafka_kernels.cc:879] Kafka configuration: auto.offset.reset=earliest
2023-12-18 01:33:47.580004: I tensorflow_io/core/kernels/kafka_kernels.cc:879] Kafka configuration: group.id=testcg
2023-12-18 01:33:47.580009: I tensorflow_io/core/kernels/kafka_kernels.cc:879] Kafka configuration: bootstrap.servers=127.0.0.1:9092
2023-12-18 01:33:47.580028: I tensorflow_io/core/kernels/kafka_kernels.cc:919] max num of messages per batch: 10000
2023-12-18 01:33:47.580037: I tensorflow_io/core/kernels/kafka_kernels.cc:938] Creating the kafka consumer
2023-12-18 01:33:47.614140: I tensorflow_io/core/kernels/kafka_kernels.cc:945] Subscribing to the kafka topic: turtorial11-susy-test


In [27]:
res = model.evaluate(test_ds)
print("test loss, test acc:", res)


  output, from_logits = _get_logits(
2023-12-18 01:33:52.473042: E tensorflow_io/core/kernels/kafka_kernels.cc:774] REBALANCE: Local: Assign partitions
2023-12-18 01:33:52.487363: E tensorflow_io/core/kernels/kafka_kernels.cc:776] Retrieved committed offsets with status code: 0
2023-12-18 01:33:52.487428: I tensorflow_io/core/kernels/kafka_kernels.cc:787] REBALANCE: turtorial11-susy-test[0], OFFSET: -1001 ERROR_CODE: 0
2023-12-18 01:33:52.487437: I tensorflow_io/core/kernels/kafka_kernels.cc:787] REBALANCE: turtorial11-susy-test[1], OFFSET: -1001 ERROR_CODE: 0
2023-12-18 01:33:52.487441: I tensorflow_io/core/kernels/kafka_kernels.cc:802] REBALANCE: Assigning partitions


    666/Unknown - 3s 3ms/step - loss: 0.7211 - accuracy: 0.5473

2023-12-18 01:33:54.628805: I tensorflow_io/core/kernels/kafka_kernels.cc:996] EOF reached for all 2 partition(s)


test loss, test acc: [0.7235473394393921, 0.5393916964530945]


2023-12-18 01:34:05.230525: E tensorflow_io/core/kernels/kafka_kernels.cc:1001] Local: Timed out


In [None]:
# bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group testcg

In [28]:
online_train_ds = tfio.experimental.streaming.KafkaBatchIODataset(
    topics=["turtorial11-susy-train"],
    group_id="cgonline",
    servers="127.0.0.1:9092",
    stream_timeout=10000, # in milliseconds, to block indefinitely, set it to -1.
    configuration=[
        "session.timeout.ms=7000",
        "max.poll.interval.ms=8000",
        "auto.offset.reset=earliest"
    ],
)

2023-12-18 01:34:59.728420: I tensorflow_io/core/kernels/kafka_kernels.cc:879] Kafka configuration: session.timeout.ms=7000
2023-12-18 01:34:59.728481: I tensorflow_io/core/kernels/kafka_kernels.cc:879] Kafka configuration: max.poll.interval.ms=8000
2023-12-18 01:34:59.728499: I tensorflow_io/core/kernels/kafka_kernels.cc:879] Kafka configuration: auto.offset.reset=earliest
2023-12-18 01:34:59.728507: I tensorflow_io/core/kernels/kafka_kernels.cc:879] Kafka configuration: group.id=cgonline
2023-12-18 01:34:59.728513: I tensorflow_io/core/kernels/kafka_kernels.cc:879] Kafka configuration: bootstrap.servers=127.0.0.1:9092
2023-12-18 01:34:59.728532: I tensorflow_io/core/kernels/kafka_kernels.cc:919] max num of messages per batch: 10000
2023-12-18 01:34:59.728539: I tensorflow_io/core/kernels/kafka_kernels.cc:938] Creating the kafka consumer
2023-12-18 01:34:59.729084: I tensorflow_io/core/kernels/kafka_kernels.cc:945] Subscribing to the kafka topic: turtorial11-susy-train


In [29]:
def decode_kafka_online_item(raw_message, raw_key):
  message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(raw_key)
  return (message, key)
  
for mini_ds in online_train_ds:
  mini_ds = mini_ds.shuffle(buffer_size=32)
  mini_ds = mini_ds.map(decode_kafka_online_item)
  mini_ds = mini_ds.batch(32)
  if len(mini_ds) > 0:
    model.fit(mini_ds, epochs=3)

2023-12-18 01:35:04.581081: E tensorflow_io/core/kernels/kafka_kernels.cc:774] REBALANCE: Local: Assign partitions
2023-12-18 01:35:04.582564: E tensorflow_io/core/kernels/kafka_kernels.cc:776] Retrieved committed offsets with status code: 0
2023-12-18 01:35:04.582623: I tensorflow_io/core/kernels/kafka_kernels.cc:787] REBALANCE: turtorial11-susy-train[0], OFFSET: -1001 ERROR_CODE: 0
2023-12-18 01:35:04.582631: I tensorflow_io/core/kernels/kafka_kernels.cc:802] REBALANCE: Assigning partitions


Epoch 1/3
Epoch 2/3
Epoch 3/3
Epoch 1/3
Epoch 2/3
Epoch 3/3
Epoch 1/3
Epoch 2/3
Epoch 3/3
Epoch 1/3
Epoch 2/3
Epoch 3/3
Epoch 1/3
Epoch 2/3
Epoch 3/3
Epoch 1/3
Epoch 2/3
Epoch 3/3
Epoch 1/3
Epoch 2/3
Epoch 3/3
Epoch 1/3
Epoch 2/3
Epoch 3/3
Epoch 1/3
Epoch 2/3
Epoch 3/3
Epoch 1/3
Epoch 2/3
Epoch 3/3
Epoch 1/3
Epoch 2/3
Epoch 3/3
Epoch 1/3
Epoch 2/3
Epoch 3/3
Epoch 1/3
Epoch 2/3
Epoch 3/3
Epoch 1/3
Epoch 2/3
Epoch 3/3
Epoch 1/3
Epoch 2/3
Epoch 3/3
Epoch 1/3
Epoch 2/3
Epoch 3/3
Epoch 1/3
Epoch 2/3
Epoch 3/3
Epoch 1/3
 30/313 [=>............................] - ETA: 1s - loss: 0.4748 - accuracy: 0.7771

2023-12-18 01:36:08.210725: I tensorflow_io/core/kernels/kafka_kernels.cc:996] EOF reached for all 1 partition(s)


Epoch 2/3
Epoch 3/3


2023-12-18 01:36:21.890249: E tensorflow_io/core/kernels/kafka_kernels.cc:1001] Local: Timed out
