##### Copyright 2020 The TensorFlow IO Authors.

In [1]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Resilient inference on streaming data using Kafka and Tensorflow-IO

<table class="tfo-notebook-buttons" align="left">
  <td>
    <a target="_blank" href="https://www.tensorflow.org/io/tutorials/kafka"><img src="https://www.tensorflow.org/images/tf_logo_32px.png" />View on TensorFlow.org</a>
  </td>
  <td>
    <a target="_blank" href="https://colab.research.google.com/github/tensorflow/io/blob/master/docs/tutorials/kafka.ipynb"><img src="https://www.tensorflow.org/images/colab_logo_32px.png" />Run in Google Colab</a>
  </td>
  <td>
    <a target="_blank" href="https://github.com/tensorflow/io/blob/master/docs/tutorials/kafka.ipynb"><img src="https://www.tensorflow.org/images/GitHub-Mark-32px.png" />View source on GitHub</a>
  </td>
      <td>
    <a href="https://storage.googleapis.com/tensorflow_docs/io/docs/tutorials/kafka.ipynb"><img src="https://www.tensorflow.org/images/download_logo_32px.png" />Download notebook</a>
  </td>
</table>

Caution: In addition to python packages this notebook uses `sudo apt-get install` to install third party packages.

## Overview

This tutorial focuses on streaming data from a [Kafka](https://docs.confluent.io/current/getting-started.html) cluster into a `tf.data.Dataset` which is then used in conjunction with `tf.keras` for training and inference.

Kafka is primarily a distributed event-streaming platform which provides scalable and fault-tolerant streaming data across data pipelines. It is an essential technical component of a plethora of major enterprises where mission-critical data delivery is a primary requirement.

**NOTE:** A basic understanding of the [kafka components](https://docs.confluent.io/current/kafka/introduction.html) will help you in following the tutorial with ease.

## Setup and usage

### Install the required tensorflow-io and kafka packages

In [2]:
import os

In [3]:
try:
  %tensorflow_version 2.x
except Exception:
  pass

In [4]:
!pip install tensorflow-io



In [5]:
!pip install kafka-python



In [26]:
from datetime import datetime
import pandas as pd
import kafka
import tensorflow as tf
import tensorflow_io as tfio

In [7]:
tfio.__version__

'0.15.0'

### Download and setup Kafka and Zookeeper instances

For demo purposes, the following instances are setup locally:

- Kafka (Brokers: 127.0.0.1:9092)
- Zookeeper (Node: 127.0.0.1:2181)



In [8]:
!curl -sSOL http://packages.confluent.io/archive/5.4/confluent-community-5.4.1-2.12.tar.gz
!tar -xzf confluent-community-5.4.1-2.12.tar.gz

We use the default configurations for spinning up these instances as provided by the confluent package.

In [10]:

!cd confluent-5.4.1 && bin/zookeeper-server-start -daemon etc/kafka/zookeeper.properties
!cd confluent-5.4.1 && bin/kafka-server-start -daemon etc/kafka/server.properties
!cd confluent-5.4.1 && bin/schema-registry-start -daemon etc/schema-registry/schema-registry.properties
!echo "Waiting for 10 secs until kafka, zookeeper and schema registry services are up and running"
!sleep 10


Waiting for 10 secs until kafka, zookeeper and schema registry services are up and running


Once the instances are started as daemon processes, we can grep for `kafka` in the processes list. The three java processes correspond to zookeeper, kafka and the schema-registry instances.

In [11]:
!ps -ef | grep kafka

root         548       1  0 19:18 ?        00:00:10 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true -Xlog:gc*:file=/content/confluent-5.4.1/bin/../logs/zookeeper-gc.log:time,tags:filecount=10,filesize=102400 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/content/confluent-5.4.1/bin/../logs -Dlog4j.configuration=file:bin/../etc/kafka/log4j.properties -cp /content/confluent-5.4.1/bin/../share/java/kafka/*:/content/confluent-5.4.1/bin/../support-metrics-client/build/dependant-libs-2.12.10/*:/content/confluent-5.4.1/bin/../support-metrics-client/build/libs/*:/usr/share/java/support-metrics-client/* org.apache.zookeeper.server.quorum.QuorumPeerMain etc/kafka/zookeeper.properties
root         606       1  0 19:19 ?        00:00:37 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMi

### Dataset

**NOTE:** In actual scenarios, since kafka is an event streaming platform, data from various sources can be written into kafka. For instance:

- syslog records
- Web traffic logs
- Firewall logs
- Astronomical entity metrics
- IoT sensor data
- Product reviews
- Fashion images and many more.

#### Description

For the purpose of this tutorial, we will download the [SUSY](https://archive.ics.uci.edu/ml/datasets/SUSY#) dataset and feed it into kafka manually, so as to simulate a streaming environment.

Th goal of this classification problem is to distinguish between a signal process which produces supersymmetric particles and a background process which does not. In our case, we simulate an enviornment where we classify these events in real time by utilizing a trained machine learning model.


In [12]:
!curl -sSOL https://archive.ics.uci.edu/ml/machine-learning-databases/00279/SUSY.csv.gz
!gzip -d SUSY.csv.gz

#### Explore the dataset

In [29]:
df = pd.read_csv('SUSY.csv', header=None)
df.columns=['class', 'lepton_1_pT', 'lepton_1_eta', 'lepton_1_phi', 'lepton_2_pT', 'lepton_2_eta', 'lepton_2_phi', 'missing_energy_magnitude', 'missing_energy_phi', 'MET_rel', 'axial_MET', 'M_R', 'M_TR_2', 'R', 'MT2', 'S_R', 'M_Delta_R', 'dPhi_r_b', 'cos(theta_r1)']
df.head()

Unnamed: 0,class,lepton_1_pT,lepton_1_eta,lepton_1_phi,lepton_2_pT,lepton_2_eta,lepton_2_phi,missing_energy_magnitude,missing_energy_phi,MET_rel,axial_MET,M_R,M_TR_2,R,MT2,S_R,M_Delta_R,dPhi_r_b,cos(theta_r1)
0,0.0,0.972861,0.653855,1.176225,1.157156,-1.739873,-0.874309,0.567765,-0.175,0.810061,-0.252552,1.921887,0.889637,0.410772,1.145621,1.932632,0.994464,1.367815,0.040714
1,1.0,1.667973,0.064191,-1.225171,0.506102,-0.338939,1.672543,3.475464,-1.219136,0.012955,3.775174,1.045977,0.568051,0.481928,0.0,0.44841,0.205356,1.321893,0.377584
2,1.0,0.44484,-0.134298,-0.709972,0.451719,-1.613871,-0.768661,1.219918,0.504026,1.831248,-0.431385,0.526283,0.941514,1.587535,2.024308,0.603498,1.562374,1.135454,0.18091
3,1.0,0.381256,-0.976145,0.693152,0.448959,0.891753,-0.677328,2.03306,1.533041,3.04626,-1.005285,0.569386,1.015211,1.582217,1.551914,0.761215,1.715464,1.492257,0.090719
4,1.0,1.309996,-0.690089,-0.676259,1.589283,-0.693326,0.622907,1.087562,-0.381742,0.589204,1.365479,1.179295,0.968218,0.728563,0.0,1.083158,0.043429,1.154854,0.094859
