In [1]:
import os
from datetime import datetime
import time
import threading
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError
from sklearn.model_selection import train_test_split
import pandas as pd
import tensorflow as tf
import tensorflow_io as tfio

In [2]:
COLUMNS = [
          #  labels
           'class',
          #  low-level features
           'lepton_1_pT',
           'lepton_1_eta',
           'lepton_1_phi',
           'lepton_2_pT',
           'lepton_2_eta',
           'lepton_2_phi',
           'missing_energy_magnitude',
           'missing_energy_phi',
          #  high-level derived features
           'MET_rel',
           'axial_MET',
           'M_R',
           'M_TR_2',
           'R',
           'MT2',
           'S_R',
           'M_Delta_R',
           'dPhi_r_b',
           'cos(theta_r1)'
           ]

In [6]:
susy_iterator = pd.read_csv('data/SUSY.csv.gz', header=None, names=COLUMNS, chunksize=100000)
susy_df = next(susy_iterator)
susy_df.head()


Unnamed: 0,class,lepton_1_pT,lepton_1_eta,lepton_1_phi,lepton_2_pT,lepton_2_eta,lepton_2_phi,missing_energy_magnitude,missing_energy_phi,MET_rel,axial_MET,M_R,M_TR_2,R,MT2,S_R,M_Delta_R,dPhi_r_b,cos(theta_r1)
0,0.0,0.972861,0.653855,1.176225,1.157156,-1.739873,-0.874309,0.567765,-0.175,0.810061,-0.252552,1.921887,0.889637,0.410772,1.145621,1.932632,0.994464,1.367815,0.040714
1,1.0,1.667973,0.064191,-1.225171,0.506102,-0.338939,1.672543,3.475464,-1.219136,0.012955,3.775174,1.045977,0.568051,0.481928,0.0,0.44841,0.205356,1.321893,0.377584
2,1.0,0.44484,-0.134298,-0.709972,0.451719,-1.613871,-0.768661,1.219918,0.504026,1.831248,-0.431385,0.526283,0.941514,1.587535,2.024308,0.603498,1.562374,1.135454,0.18091
3,1.0,0.381256,-0.976145,0.693152,0.448959,0.891753,-0.677328,2.03306,1.533041,3.04626,-1.005285,0.569386,1.015211,1.582217,1.551914,0.761215,1.715464,1.492257,0.090719
4,1.0,1.309996,-0.690089,-0.676259,1.589283,-0.693326,0.622907,1.087562,-0.381742,0.589204,1.365479,1.179295,0.968218,0.728563,0.0,1.083158,0.043429,1.154854,0.094859


In [18]:
susy_df.dtypes

class                       float64
lepton_1_pT                 float64
lepton_1_eta                float64
lepton_1_phi                float64
lepton_2_pT                 float64
lepton_2_eta                float64
lepton_2_phi                float64
missing_energy_magnitude    float64
missing_energy_phi          float64
MET_rel                     float64
axial_MET                   float64
M_R                         float64
M_TR_2                      float64
R                           float64
MT2                         float64
S_R                         float64
M_Delta_R                   float64
dPhi_r_b                    float64
cos(theta_r1)               float64
dtype: object

In [27]:
train_df, test_df = train_test_split(susy_df, test_size=0.4, shuffle=True)
print("Number of training samples: ",len(train_df))
print("Number of testing sample: ",len(test_df))

x_train_df = train_df.drop(["class"], axis=1)
y_train_df = train_df["class"]

x_test_df = test_df.drop(["class"], axis=1)
y_test_df = test_df["class"]

x_train = list(filter(None, x_train_df.to_csv(index=False).split("\n")[1:]))
y_train=list(y_train_df)

x_test = list(filter(None, x_test_df.to_csv(index=False).split("\n")[1:]))
y_test=list(y_test_df)


Number of training samples:  60000
Number of testing sample:  40000


In [None]:
""""
# The labels are set as the kafka message keys so as to store data
# in multiple-partitions. Thus, enabling efficient data retrieval
# using the consumer groups.
x_train = list(filter(None, x_train_df.to_csv(index=False).split("\n")[1:]))
y_train = list(filter(None, y_train_df.to_csv(index=False).split("\n")[1:]))

x_test = list(filter(None, x_test_df.to_csv(index=False).split("\n")[1:]))
y_test = list(filter(None, y_test_df.to_csv(index=False).split("\n")[1:]))""""

In [21]:
x_train[0]

'0.6721988320350647,0.34237948060035706,-0.7238044142723083,1.1103098392486572,-1.213679790496826,-0.26380720734596247,1.0992820262908936,-1.6995762586593628,1.6501561403274536,-0.32735818624496454,1.028388261795044,1.1920721530914307,1.0286331176757812,2.507549524307251,0.9912425875663757,1.8863195180892944,0.8003770112991333,0.0872780978679657\r'

In [23]:
type(y_train[0])

str

In [72]:
len(x_train_df.columns)

18

In [28]:
NUM_COLUMNS = len(x_train_df.columns)
len(x_train), len(y_train), len(x_test), len(y_test)

(60000, 60000, 40000, 40000)

In [30]:
def error_callback(exc):
    raise Exception('Error while sendig data to kafka: {0}'.format(str(exc)))

def write_to_kafka(topic_name, items):
  count=0
  producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
  for message, key in items:
    producer.send(topic_name, key=str(key).encode('utf-8'), value=str(message).encode('utf-8')).add_errback(error_callback)
    count+=1
  producer.flush()
  print("Wrote {0} messages into topic: {1}".format(count, topic_name))

write_to_kafka("susy-train", zip(x_train, y_train))
write_to_kafka("susy-test", zip(x_test, y_test))

Wrote 60000 messages into topic: susy-train
Wrote 40000 messages into topic: susy-test


In [34]:
def decode_kafka_item(item):
    message = tf.io.decode_csv(item.message, [[0.0] for i in range(NUM_COLUMNS)])
    key = tf.strings.to_number(item.key)
    print(type(message))
    print(type(key))
    return (message, key)

BATCH_SIZE=64
SHUFFLE_BUFFER_SIZE=64
train_ds = tfio.IODataset.from_kafka('susy-train', partition=0, offset=0)
train_ds = train_ds.shuffle(buffer_size=SHUFFLE_BUFFER_SIZE)
train_ds = train_ds.map(decode_kafka_item)
train_ds = train_ds.batch(BATCH_SIZE)

<class 'list'>
<class 'tensorflow.python.framework.ops.Tensor'>


In [35]:
# Set the parameters

OPTIMIZER="adam"
LOSS=tf.keras.losses.BinaryCrossentropy(from_logits=True)
METRICS=['accuracy']
EPOCHS=10

In [36]:
# design/build the model
model = tf.keras.Sequential([
  tf.keras.layers.Input(shape=(NUM_COLUMNS,)),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.2),
  tf.keras.layers.Dense(256, activation='relu'),
  tf.keras.layers.Dropout(0.4),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.4),
  tf.keras.layers.Dense(1, activation='sigmoid')
])

print(model.summary())

Model: "sequential_3"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_8 (Dense)             (None, 128)               2432      
                                                                 
 dropout_3 (Dropout)         (None, 128)               0         
                                                                 
 dense_9 (Dense)             (None, 256)               33024     
                                                                 
 dropout_4 (Dropout)         (None, 256)               0         
                                                                 
 dense_10 (Dense)            (None, 128)               32896     
                                                                 
 dropout_5 (Dropout)         (None, 128)               0         
                                                                 
 dense_11 (Dense)            (None, 1)                

In [52]:
y_train=list(map(float,y_train))

In [59]:
x_train_x=[]
for x in x_train:
    x=x[:-1]
    x_arr=x.split(',')
    x_train_x.append(list(map(float,x_arr)))

In [64]:
x_test_x=[]
for x in x_test:
    x=x[:-1]
    x_arr=x.split(',')
    x_test_x.append(list(map(float,x_arr)))

In [60]:
# Add labels to the training data
# Preprocess the packet_data to convert string to meaningful value and use here
train_ds = tf.data.Dataset.from_tensor_slices((x_train_x, y_train))
# Set the batch size
train_ds = train_ds.shuffle(5000).batch(32)
print(train_ds)


<BatchDataset element_spec=(TensorSpec(shape=(None, 18), dtype=tf.float32, name=None), TensorSpec(shape=(None,), dtype=tf.float32, name=None))>


In [70]:
test_ds = tf.data.Dataset.from_tensor_slices((x_test_x, y_test))
# Set the batch size
test_ds = test_ds.shuffle(5000).batch(32)
print(test_ds)

<BatchDataset element_spec=(TensorSpec(shape=(None, 18), dtype=tf.float32, name=None), TensorSpec(shape=(None,), dtype=tf.float32, name=None))>


In [61]:
##### PROGRAM WILL RUN SUCCESSFULLY TILL HERE. TO USE IN THE MODEL DO THE PREPROCESSING OF PACKET DATA AS EXPLAINED ### 

# Have defined some simple model
model = tf.keras.Sequential([
  tf.keras.layers.Flatten(),
  tf.keras.layers.Dense(100),
  tf.keras.layers.Dense(10)
])

model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), 
              metrics=['accuracy'])

model.fit(train_ds, epochs=2)

Epoch 1/2
Epoch 2/2


<keras.callbacks.History at 0x1e41da201f0>

In [69]:
test_ds = tfio.experimental.streaming.KafkaGroupIODataset(
    topics=["susy-test"],
    group_id="testcg",
    servers="127.0.0.1:9092",
    stream_timeout=10000,
    configuration=[
        "session.timeout.ms=7000",
        "max.poll.interval.ms=8000",
        "auto.offset.reset=earliest"
    ],
)

def decode_kafka_test_item(raw_message, raw_key):
    message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
    key = tf.strings.to_number(raw_key)
    print(type(message))
    print(type(key))
    return (message, key)

test_ds = test_ds.map(decode_kafka_test_item)
test_ds = test_ds.batch(BATCH_SIZE)

<class 'list'>
<class 'tensorflow.python.framework.ops.Tensor'>


In [71]:
res = model.evaluate(test_ds)
print("test loss, test acc:", res)

test loss, test acc: [0.4637937843799591, 0.785099983215332]
