##### https://www.tensorflow.org/io/tutorials/kafka

In [1]:
import pandas as pd
import numpy as np
from kafka import KafkaProducer
import tensorflow as tf
import tensorflow_io as tfio

from getConfig import getConfig
from splitData import splitData

2022-11-27 21:38:34.003101: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-11-27 21:38:34.237871: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2022-11-27 21:38:34.237896: I tensorflow/compiler/xla/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
2022-11-27 21:38:35.069221: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory
2022-

In [2]:
print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))

tensorflow-io version: 0.28.0
tensorflow version: 2.11.0


In [3]:
NUM_COLUMNS = 18     # this is without the label so training columns only

In [4]:
def error_callback(exc):
    raise Exception('Error while sendig data to kafka: {0}'.format(str(exc)))

def write_to_kafka(topic_name, items):
    count = 0
    producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
    for message, key in items:
        producer.send(topic_name, key=key.encode('utf-8'),
                      value=message.encode('utf-8')).add_errback(error_callback)
        count+=1
    producer.flush()
    print("Wrote {0} messages into topic: {1}".format(count, topic_name))

In [5]:
def decode_kafka_item(item):
    message = tf.io.decode_csv(item.message, [[0.0] for i in range(NUM_COLUMNS)])
    key = tf.strings.to_number(item.key)
    return (message, key)

In [6]:
def decode_kafka_test_item(raw_message, raw_key):
    message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
    key = tf.strings.to_number(raw_key)
    return (message, key)

In [14]:
cfg = getConfig()

In [15]:
COLUMNS = ['class', 'lepton_1_pT', 'lepton_1_eta', 'lepton_1_phi',
           'lepton_2_pT', 'lepton_2_eta', 'lepton_2_phi', 'missing_energy_magnitude',
           'missing_energy_phi', 'MET_rel', 'axial_MET', 'M_R', 'M_TR_2', 'R',
           'MT2', 'S_R', 'M_Delta_R', 'dPhi_r_b', 'cos(theta_r1)']

In [16]:
susy_iterator = pd.read_csv(cfg['dataLoc']+cfg['fileName'],
                            header=None,
                            names=COLUMNS,
                            chunksize=1000)

df = next(susy_iterator)

In [17]:
print("{:,} rows and {} columns".format(len(df), df.shape[1]))

# See if the classes are balanced. class=0 is "noise"; class=1 is "signal"
noise = len(df[df['class']==0])
signal = len(df[df['class']==1])
print("\nsignal to noise ratio: {:.2f}".format(signal/noise))

1,000 rows and 19 columns

signal to noise ratio: 0.81


In [18]:
d = splitData(df, cfg)

trainX = d['train'].drop(["class"], axis=1)
trainY = d['train']["class"]

testX = d['test'].drop(["class"], axis=1)
testY = d['test']["class"]

In [19]:
# Convert the dataframe to a comma-separated string, skipping the column headers
x_train = list(filter(None, trainX.to_csv(index=False).split("\n")[1:]))
y_train = list(filter(None, trainY.to_csv(index=False).split("\n")[1:]))

x_test = list(filter(None, testX.to_csv(index=False).split("\n")[1:]))
y_test = list(filter(None, testY.to_csv(index=False).split("\n")[1:]))

assert(len(x_train)==len(y_train)), 'invalid lengths'
assert(len(x_test)==len(y_test)), 'invalid lengths'

In [20]:
# The labels are set as the kafka message keys so as to store data in multiple-partitions
# Thus, enabling efficient data retrieval using the consumer groups.
write_to_kafka("susy-train", zip(x_train, y_train))
write_to_kafka("susy-test", zip(x_test, y_test))

Wrote 750 messages into topic: susy-train
Wrote 250 messages into topic: susy-test


In [21]:
train_ds = tfio.IODataset.from_kafka('susy-train', partition=0, offset=0)
train_ds = train_ds.shuffle(buffer_size=cfg['ShuffleBufferSize'])
train_ds = train_ds.map(decode_kafka_item)
train_ds = train_ds.batch(cfg['batchSize'])

2022-11-27 21:59:22.255312: W tensorflow_io/core/kernels/audio_video_mp3_kernels.cc:271] libmp3lame.so.0 or lame functions are not available
2022-11-27 21:59:22.260318: I tensorflow_io/core/kernels/cpu_check.cc:128] Your CPU supports instructions that this TensorFlow IO binary was not compiled to use: AVX2 FMA
2022-11-27 21:59:22.506235: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2022-11-27 21:59:22.507214: W tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:265] failed call to cuInit: UNKNOWN ERROR (303)
2022-11-27 21:59:22.507250: I tensorflow/compiler/xla/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (basic): /proc/driver/nvidia/version does not exist
2022-11-27 21:59:22.511332: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is o

In [22]:
train_ds.

tensorflow.python.data.ops.dataset_ops.BatchDataset

##### Build and train the model

In [13]:
OPTIMIZER = "adam"
LOSS = tf.keras.losses.BinaryCrossentropy(from_logits=True)
METRICS = ['accuracy']
EPOCHS = 3

In [14]:
model = tf.keras.Sequential([
    tf.keras.layers.Input(shape=(NUM_COLUMNS,)),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dropout(0.2),
    tf.keras.layers.Dense(256, activation='relu'),
    tf.keras.layers.Dropout(0.4),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dropout(0.4),
    tf.keras.layers.Dense(1, activation='sigmoid')
])

print(model.summary())

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense (Dense)                (None, 128)               2432      
_________________________________________________________________
dropout (Dropout)            (None, 128)               0         
_________________________________________________________________
dense_1 (Dense)              (None, 256)               33024     
_________________________________________________________________
dropout_1 (Dropout)          (None, 256)               0         
_________________________________________________________________
dense_2 (Dense)              (None, 128)               32896     
_________________________________________________________________
dropout_2 (Dropout)          (None, 128)               0         
_________________________________________________________________
dense_3 (Dense)              (None, 1)                 1

In [15]:
model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)
model.fit(train_ds, epochs=EPOCHS)

Epoch 1/3
Epoch 2/3
Epoch 3/3


<tensorflow.python.keras.callbacks.History at 0x7f2fc4d8e3d0>

In [17]:
test_ds = tfio.experimental.streaming.KafkaGroupIODataset(
    topics=["susy-test"],
    group_id="testcg",
    servers="127.0.0.1:9092",
    stream_timeout=10000,
    configuration=[
        "session.timeout.ms=7000",
        "max.poll.interval.ms=8000",
        "auto.offset.reset=earliest"
    ],
)
test_ds = test_ds.map(decode_kafka_test_item)
test_ds = test_ds.batch(cfg['batchSize'])

In [21]:
res = model.evaluate(test_ds)
print("test loss, test acc:", res)

test loss, test acc: [0.4438146650791168, 0.7937029600143433]
