# Apache Kafka Integration + Preprocessing / Interactive Analysis with KSQL

This notebook uses the combination of Python, Apache Kafka, KSQL for Machine Learning infrastructures. 

It includes code examples using ksql-python and other widespread components from Python’s machine learning ecosystem, like Numpy, pandas, TensorFlow and Keras. 

The use case is fraud detection for credit card payments. We use a test data set from Kaggle as foundation to train an unsupervised autoencoder to detect anomalies and potential fraud in payments. Focus of this example is not just model training, but the whole Machine Learning infrastructure including data ingestion, data preprocessing, model training, model deployment and monitoring. All of this needs to be scalable, reliable and performant.

If you want to learn more about the relation between the Apache Kafka open source ecosystem and Machine Learning, please check out these two blog posts:

- [How to Build and Deploy Scalable Machine Learning in Production with Apache Kafka](https://www.confluent.io/blog/build-deploy-scalable-machine-learning-production-apache-kafka/)
- [Using Apache Kafka to Drive Cutting-Edge Machine Learning](https://www.confluent.io/blog/using-apache-kafka-drive-cutting-edge-machine-learning)

### Start Backend Services (Zookeeper, Kafka, KSQL)

The only server requirement is a local KSQL server running (with Kafka broker ZK node). If you don't have it running, just use Confluent CLI:

In [18]:
# Shows correct startup but does not work 100% yet. Better start from Terminal!
! confluent start kafka

This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.html

Using CONFLUENT_CURRENT: /var/folders/0s/0xdkb9n12yqdb3fs71926z3c0000gp/T/confluent.gQMzOky1
Starting zookeeper
zookeeper is [[0;32mUP[0m]
Starting kafka
kafka is [[0;32mUP[0m]


## Data Integration and Preprocessing with Python and KSQL

First of all, create the Kafka Topic 'creditcardfraud_source' if it does not exist already:

In [1]:
! kafka-topics --zookeeper localhost:2181 --create --topic creditcardfraud_source --partitions 3 --replication-factor 1

Created topic "creditcardfraud_source".


Then load KSQL library and initiate connection to KSQL server:

In [2]:
from ksql import KSQLAPI
client = KSQLAPI('http://localhost:8088')

Consume source data from Kafka Topic "creditcardfraud_source":

In [3]:
client.create_stream(table_name='creditcardfraud_source',
                     columns_type=['Id bigint', 'Timestamp varchar', 'User varchar', 'Time int', 'V1 double', 'V2 double', 'V3 double', 'V4 double', 'V5 double', 'V6 double', 'V7 double', 'V8 double', 'V9 double', 'V10 double', 'V11 double', 'V12 double', 'V13 double', 'V14 double', 'V15 double', 'V16 double', 'V17 double', 'V18 double', 'V19 double', 'V20 double', 'V21 double', 'V22 double', 'V23 double', 'V24 double', 'V25 double', 'V26 double', 'V27 double', 'V28 double', 'Amount double', 'Class string'],
                     topic='creditcardfraud_source',
                     value_format='DELIMITED')

True

Preprocessing: 

- Filter columns which are not needed 
- Filter messages where column 'class' is empty
- Change data format to Avro for more convenient further processing


In [4]:
client.create_stream_as(table_name='creditcardfraud_preprocessed_avro',
                     select_columns=['Time', 'V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'V7', 'V8', 'V9', 'V10', 'V11', 'V12', 'V13', 'V14', 'V15', 'V16', 'V17', 'V18', 'V19', 'V20', 'V21', 'V22', 'V23', 'V24', 'V25', 'V26', 'V27', 'V28', 'Amount', 'Class'],
                     src_table='creditcardfraud_source',
                     conditions='Class IS NOT NULL',
                     kafka_topic='creditcardfraud_preprocessed_avro',
                     value_format='AVRO')

True

Take a look at the creates KSQL Streams:

In [None]:
client.ksql('show streams')

Take a look at the metadata of the KSQL Stream:

In [None]:
client.ksql('describe CREDITCARDFRAUD_PREPROCESSED_AVRO')

Interactive query statement:

In [5]:
query = client.query('SELECT * FROM CREDITCARDFRAUD_PREPROCESSED_AVRO LIMIT 1')

for item in query: 
    print(item)





{"row":{"columns":[1546951056534,null,0,-1.3598071336738,-0.0727811733098497,2.53634673796914,1.37815522427443,-0.338320769942518,0.
462387777762292,0.239598554061257,0.0986979012610507,0.363786969611213,0.0907941719789316,-0.551599533260813,-0.617800855762348,-0.991389847235408,-0.311169353699879,1.46817697209427,-0.470400525259478,0.207971241929242,0.0257905801985591,0.403992960255733,0.251412098239705
,-0.018306777944153,0.277837575558899,-0.110473910188767,0.0669280749146731,0.128539358273528,-0.189114843888824,0.133558376740387,-0.0210530534538215,149.62,"0"]},"errorMessage":null,"finalMessage":null}
{"row":null,"errorMessage":null,"finalMessage":"Limit Reached"}



In [None]:
# TODO How to embed ' ' in Python ???
# See https://github.com/bryanyang0528/ksql-python/issues/54
# client.ksql('SET \'auto.offset.reset\'=\'earliest\'');

### Additional (optional) analysis and preprocessing examples

Some more examples for possible data wrangling and preprocessing with KSQL:

- Anonymization
- Augmentation
- Merge / Join data frames

In [None]:
query = client.query('SELECT Id, MASK_LEFT(User, 2) FROM creditcardfraud_source LIMIT 1')

for item in query: 
    print(item)

In [None]:
query = client.query('SELECT Id, IFNULL(Class, \'-1\') FROM creditcardfraud_source LIMIT 1')

for item in query: 
    print(item)

#### Stream-Table-Join

For the STREAM-TABLE-JOIN, you first need to create a Kafka Topic 'Users' (for the corresponding KSQL TABLE 'Users):

In [None]:
! kafka-topics --zookeeper localhost:2181 --create --topic users --partitions 3 --replication-factor 1 

Then create the KSQL Table:

In [None]:
client.create_table(table_name='users',
                     columns_type=['userid varchar', 'gender varchar', 'regionid varchar'],
                     topic='users',
                     key='userid',
                     value_format='AVRO')

In [None]:
client.ksql("CREATE STREAM creditcardfraud_per_user WITH (VALUE_FORMAT='AVRO', KAFKA_TOPIC='creditcardfraud_per_user') AS SELECT Time, Amount, Class FROM creditcardfraud_source c INNER JOIN USERS u on c.user = u.userid WHERE u.USERID = 1")

## TODO => The following section will be fixed soon
# Mapping from KSQL to Numpy / Pandas for Machine Learning Tasks

In [13]:
a = "Kai"
#print("BEFORE " + a)
query = client.query('select * from creditcardfraud_source LIMIT 1')
#print(list(query))

for item in query: 
    print(item)
    
    print(type(dataframe))
#    print("AFTER " + a)

ConnectionError: HTTPConnectionPool(host='localhost', port=8088): Read timed out.

This above command returns a generator. It can be printed e.g. by reading its values via next(query) or a for loop.

TODO Get KSQL list into correct Python structure (list? dict?) for Numpy and TensorFlow

https://towardsdatascience.com/23-great-pandas-codes-for-data-scientists-cca5ed9d8a38
https://pandas.pydata.org/pandas-docs/stable/10min.html
https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.iterrows.html

In [9]:
query

<generator object BaseAPI.query at 0x10f6b06d0>

In [6]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [16]:
query = client.query('select * from creditcardfraud_source LIMIT 1')
#print(list(query))

for item in query: 
    #print(item)
    #dataframe = pd.read_json(item)
    dataframe = pd.read_json(query)
    #print(type(dataframe))
    print(dataframe)

ValueError: Unexpected character found when decoding array value (2)

In [None]:
print(a)

In [11]:
dataframe = pd.DataFrame(query)


In [8]:
print(dataframe)

Empty DataFrame
Columns: []
Index: []


# Preprocessing with Pandas + Model Training with TensorFlow / Keras

This part only includes the steps required for model training of the Autoencoder with Keras and TensorFlow. 

If you want to get a better understanding of the model, take a look at the other notebook [Python Tensorflow Keras Fraud Detection Autoencoder.ipynb](http://localhost:8888/notebooks/Python%20Tensorflow%20Keras%20Fraud%20Detection%20Autoencoder.ipynb) which includes many more details, plots and explanations.

[Kudos to David Ellison](https://www.datascience.com/blog/fraud-detection-with-tensorflow).

[The credit card fraud data set is available at Kaggle](https://www.kaggle.com/mlg-ulb/creditcardfraud/data).

In [None]:
# TODO Copy%Paste from the other Notebook (Python Tensorflow Keras Fraud Detection Autoencoder.ipynb)
# Will be done after Magnus helped to fix the mapping from KSQL Generator to Pandas dataframe

In [None]:
# import packages
# matplotlib inline
import pandas as pd
import numpy as np
from scipy import stats
import tensorflow as tf
import matplotlib.pyplot as plt
import seaborn as sns
import pickle
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, precision_recall_curve
from sklearn.metrics import recall_score, classification_report, auc, roc_curve
from sklearn.metrics import precision_recall_fscore_support, f1_score
from sklearn.preprocessing import StandardScaler
from pylab import rcParams
from keras.models import Model, load_model
from keras.layers import Input, Dense
from keras.callbacks import ModelCheckpoint, TensorBoard
from keras import regularizers

In [None]:
# TODO => Replace data frame 'df' with data from KSQL query from above to show integration between Kafka and Pandas...

# "data/creditcard_small.csv" is a very small data set (just for quick demo purpose to get a model binary)
# => replace with "data/creditcard.csv" to use a real data set to train a model with good accuracy
df = pd.read_csv("data/creditcard_small.csv") 
df.head(n=5) #just to check you imported the dataset properly

In [None]:
#set random seed and percentage of test data
RANDOM_SEED = 314 #used to help randomly select the data points
TEST_PCT = 0.2 # 20% of the data

#set up graphic style in this case I am using the color scheme from xkcd.com
rcParams['figure.figsize'] = 14, 8.7 # Golden Mean
LABELS = ["Normal","Fraud"]
#col_list = ["cerulean","scarlet"]# https://xkcd.com/color/rgb/
#sns.set(style='white', font_scale=1.75, palette=sns.xkcd_palette(col_list))

In [None]:
normal_df = df[df.Class == 0] #save normal_df observations into a separate df
fraud_df = df[df.Class == 1] #do the same for frauds

In [None]:
#data = df.drop(['Time'], axis=1) #if you think the var is unimportant
df_norm = df
df_norm['Time'] = StandardScaler().fit_transform(df_norm['Time'].values.reshape(-1, 1))
df_norm['Amount'] = StandardScaler().fit_transform(df_norm['Amount'].values.reshape(-1, 1))

In [None]:
train_x, test_x = train_test_split(df_norm, test_size=TEST_PCT, random_state=RANDOM_SEED)
train_x = train_x[train_x.Class == 0] #where normal transactions
train_x = train_x.drop(['Class'], axis=1) #drop the class column

test_y = test_x['Class'] #save the class column for the test set
test_x = test_x.drop(['Class'], axis=1) #drop the class column

train_x = train_x.values #transform to ndarray
test_x = test_x.values

In [None]:
# Reduce number of epochs and batch_size if your Jupyter crashes (due to memory issues)
# nb_epoch = 100
# batch_size = 128
nb_epoch = 5
batch_size = 32

input_dim = train_x.shape[1] #num of columns, 30
encoding_dim = 14
hidden_dim = int(encoding_dim / 2) #i.e. 7
learning_rate = 1e-7

input_layer = Input(shape=(input_dim, ))
encoder = Dense(encoding_dim, activation="tanh", activity_regularizer=regularizers.l1(learning_rate))(input_layer)
encoder = Dense(hidden_dim, activation="relu")(encoder)
decoder = Dense(hidden_dim, activation='tanh')(encoder)
decoder = Dense(input_dim, activation='relu')(decoder)
autoencoder = Model(inputs=input_layer, outputs=decoder)

In [None]:
autoencoder.compile(metrics=['accuracy'],
                    loss='mean_squared_error',
                    optimizer='adam')

cp = ModelCheckpoint(filepath="models/autoencoder_fraud.h5",
                               save_best_only=True,
                               verbose=0)

tb = TensorBoard(log_dir='./logs',
                histogram_freq=0,
                write_graph=True,
                write_images=True)

history = autoencoder.fit(train_x, train_x,
                    epochs=nb_epoch,
                    batch_size=batch_size,
                    shuffle=True,
                    validation_data=(test_x, test_x),
                    verbose=1,
                    callbacks=[cp, tb]).history

In [None]:
autoencoder = load_model('models/autoencoder_fraud.h5')


In [None]:
test_x_predictions = autoencoder.predict(test_x)
mse = np.mean(np.power(test_x - test_x_predictions, 2), axis=1)
error_df = pd.DataFrame({'Reconstruction_error': mse,
                        'True_class': test_y})
error_df.describe()

The binary 'models/autoencoder_fraud.h5' is the trained model which can then be deployed anywhere to do prediction on new incoming events in real time. 

# Model Deployment

This demo focuses on the combination of Python and KSQL for data preprocessing and model training. If you want to understand the relation between Apache Kafka, KSQL and Python-related Machine Learning tools for model deployment and monitoring, please check out my other Github projects:

Some examples of model deployment in Kafka environments:

- [Analytic models (TensorFlow, Keras, H2O and Deeplearning4j) embedded in Kafka Streams microservices](https://github.com/kaiwaehner/kafka-streams-machine-learning-examples)
- [Anomaly detection of IoT sensor data with a model embedded into a KSQL UDF](https://github.com/kaiwaehner/ksql-udf-deep-learning-mqtt-iot)
- [RPC communication between Kafka Streams application and model server (TensorFlow Serving)](https://github.com/kaiwaehner/tensorflow-serving-java-grpc-kafka-streams)

# Appendix: Pandas analysis with above Fraud Detection Data

In [None]:
df = pd.read_csv("data/creditcard.csv")

In [None]:
df.head()

In [None]:
df.shape

In [None]:
df.index

In [None]:
df.columns

In [None]:
df.values

In [None]:
df.describe()

In [None]:
df['Amount']

In [None]:
df[0:3]

In [None]:
df.iloc[1,1]

In [None]:
# Takes a minute or two (big CSV file)...
#df.plot()