# Apache Kafka Integration + Preprocessing / Interactive Analysis with KSQL

Load KSQL library and initiate connection to KSQL server:

In [1]:
from ksql import KSQLAPI
client = KSQLAPI('http://localhost:8088')

Consume source data from Kafka Topic "creditcardfraud_source":

In [3]:
client.create_stream(table_name='creditcardfraud_source',
                     columns_type=['Id bigint', 'Timestamp varchar', 'User varchar', 'Time int', 'V1 double', 'V2 double', 'V3 double', 'V4 double', 'V5 double', 'V6 double', 'V7 double', 'V8 double', 'V9 double', 'V10 double', 'V11 double', 'V12 double', 'V13 double', 'V14 double', 'V15 double', 'V16 double', 'V17 double', 'V18 double', 'V19 double', 'V20 double', 'V21 double', 'V22 double', 'V23 double', 'V24 double', 'V25 double', 'V26 double', 'V27 double', 'V28 double', 'Amount double', 'Class string'],
                     topic='creditcardfraud_source',
                     value_format='DELIMITED')

True

Preprocessing: 

- Filter columns which are not needed 
- Filter messages where column 'class' is empty
- Change data format to Avro for more convenient further processing


In [5]:
client.create_stream_as(table_name='creditcardfraud_preprocessed_avro',
                     select_columns=['Time', 'V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'V7', 'V8', 'V9', 'V10', 'V11', 'V12', 'V13', 'V14', 'V15', 'V16', 'V17', 'V18', 'V19', 'V20', 'V21', 'V22', 'V23', 'V24', 'V25', 'V26', 'V27', 'V28', 'Amount', 'Class'],
                     src_table='creditcardfraud_source',
                     conditions='Class IS NOT NULL',
                     kafka_topic='creditcardfraud_preprocessed_avro',
                     value_format='AVRO')

True

Take a look at the creates KSQL Streams:

In [6]:
client.ksql('show streams')

[{'@type': 'streams',
  'statementText': 'show streams;',
  'streams': [{'type': 'STREAM',
    'name': 'CREDITCARDFRAUD_SOURCE',
    'topic': 'creditcardfraud_source',
    'format': 'DELIMITED'},
   {'type': 'STREAM',
    'name': 'CREDITCARDFRAUD_PREPROCESSED_AVRO',
    'topic': 'creditcardfraud_preprocessed_avro',
    'format': 'AVRO'}]}]

Take a look at the metadata of the KSQL Stream:

In [7]:
client.ksql('describe CREDITCARDFRAUD_PREPROCESSED_AVRO')

[{'@type': 'sourceDescription',
  'statementText': 'describe CREDITCARDFRAUD_PREPROCESSED_AVRO;',
  'sourceDescription': {'name': 'CREDITCARDFRAUD_PREPROCESSED_AVRO',
   'readQueries': [],
   'writeQueries': [{'sinks': ['CREDITCARDFRAUD_PREPROCESSED_AVRO'],
     'id': 'CSAS_CREDITCARDFRAUD_PREPROCESSED_AVRO_0',
     'queryString': "CREATE stream creditcardfraud_preprocessed_avro WITH (kafka_topic='creditcardfraud_preprocessed_avro', value_format='AVRO') AS SELECT Time, V1, V2, V3, V4, V5, V6, V7, V8, V9, V10, V11, V12, V13, V14, V15, V16, V17, V18, V19, V20, V21, V22, V23, V24, V25, V26, V27, V28, Amount, Class FROM creditcardfraud_source where Class IS NOT NULL;"}],
   'fields': [{'name': 'ROWTIME',
     'schema': {'type': 'BIGINT', 'fields': None, 'memberSchema': None}},
    {'name': 'ROWKEY',
     'schema': {'type': 'STRING', 'fields': None, 'memberSchema': None}},
    {'name': 'TIME',
     'schema': {'type': 'INTEGER', 'fields': None, 'memberSchema': None}},
    {'name': 'V1',
    

Interactive query statement:

In [12]:
query = client.query('SELECT * FROM CREDITCARDFRAUD_PREPROCESSED_AVRO LIMIT 1')

for item in query: 
    print(item)







{"row":{"columns":[1545916448102,null,0,-1.3598071336738,-0.0727811733098497,2.53634673796914,1.37815522427443,-0.338320769942518,0.
462387777762292,0.239598554061257,0.0986979012610507,0.363786969611213,0.0907941719789316,-0.551599533260813,-0.617800855762348,-0.991389847235408,-0.311169353699879,1.46817697209427,-0.470400525259478,0.207971241929242,0.0257905801985591,0.403992960255733,0.251412098239705
,-0.018306777944153,0.277837575558899,-0.110473910188767,0.0669280749146731,0.128539358273528,-0.189114843888824,0.133558376740387,-0.0210530534538215,149.62,"0"]},"errorMessage":null,"finalMessage":null}
{"row":null,"errorMessage":null,"finalMessage":"Limit Reached"}



Execute sql query and keep listening streaming data:

In [None]:
# TODO How to embed ' ' in Python ???
# client.ksql('SET 'auto.offset.reset'='earliest'');

# Mapping from KSQL to Numpy / Pandas for Machine Learning Tasks

In [11]:
a = "Kai"
#print("BEFORE " + a)
query = client.query('select * from creditcardfraud_source LIMIT 1')
#print(list(query))

for item in query: 
    print(item)
    a = item
    print(type(a))
#    print("AFTER " + a)




{"row":{"columns":[1545916370264,null,1,"2018-12-18T12:00:00Z","Kai",0,-1.3598071336738,-0.0727811733098497,2.5363467379691
<class 'str'>
4,1.37815522427443,-0.338320769942518,0.462387777762292,0.239598554061257,0.0986979012610507,0.363786969611213,0.0907941719789316,-0.551599533260813,-0.617800855762348,-0.991389847235408,-0.311169353699879,1.46817697209427,-0.470400525259478,0.207971241929242,0.025790580
<class 'str'>
1985591,0.403992960255733,0.251412098239705,-0.018306777944153,0.277837575558899,-0.110473910188767,0.0669280749146731,0.128539358273528,-0.189114843888824,0.133558376740387,-0.0210530534538215,149.62,"0"]},"errorMessage":null,"finalMessage":null}
{"row":
<class 'str'>
null,"errorMessage":null,"finalMessage":"Limit Reached"}

<class 'str'>


This above command returns a generator. It can be printed e.g. by reading its values via next(query) or a for loop.

TODO Get KSQL list into correct Python structure (list? dict?) for Numpy and TensorFlow

https://towardsdatascience.com/23-great-pandas-codes-for-data-scientists-cca5ed9d8a38
https://pandas.pydata.org/pandas-docs/stable/10min.html
https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.iterrows.html

In [None]:
query

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [None]:
print(a)

In [None]:
dataframe = pd.DataFrame(query)

In [None]:
print(dataframe)

# Preprocessing with Pandas + Model Training with TensorFlow / Keras

This part only includes the steps required for model training of the Autoencoder with Keras and TensorFlow. 

If you want to get a better understanding of the model, take a look at the other notebook "Python Tensorflow Keras Fraud Detection Autoencoder.ipynb" which includes many more details, plots and explanations.

In [14]:
# TODO Copy%Paste from the other Notebook (Python Tensorflow Keras Fraud Detection Autoencoder.ipynb)

In [None]:
# df = pd.read_csv("data/creditcard.csv") #unzip and read in data downloaded to the local directory
# df.head(n=5) #just to check you imported the dataset properly
# df.shape #secondary check on the size of the dataframe

# Model Deployment

This demo focuses on the combination of Python and KSQL for data preprocessing and model training. If you want to understand the relation between Apache Kafka, KSQL and Python-related Machine Learning tools for model deployment and monitoring, please check out my other Github projects and blog posts:

*TODO Links to other Github projects and blog posts.*

- Kafka + ML blog
- Deployment (ML Server vs. Embedded into streaming apps)
- KSQL UDF example
- Kafka Streams example (Keras + TensorFlow)


# Appendix: Pandas analysis with above Fraud Detection Data

In [None]:
df = pd.read_csv("data/creditcard.csv")

In [None]:
df.head()

In [None]:
df.shape

In [None]:
df.index

In [None]:
df.columns

In [None]:
df.values

In [None]:
df.describe()

In [None]:
df['Amount']

In [None]:
df[0:3]

In [None]:
df.iloc[1,1]

In [None]:
# Takes a minute or two (big CSV file)...
#df.plot()