Skip to content

Commit 01476ab

Browse files
committed
Preprocessing with KSQL finished (including filtering of columns and rows)
1 parent f6c995a commit 01476ab

File tree

1 file changed

+58
-20
lines changed

1 file changed

+58
-20
lines changed

live-demo___python-jupyter-apache-kafka-ksql-tensorflow-keras.adoc

Lines changed: 58 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
= Live Demo: Python, Jupyter notebook, TensorFlow, Keras, Apache Kafka and KSQL
22

33
Kai Waehner <kontakt@kai-waehner.de>
4-
29 Nov 2018
4+
27 Dec 2018
55

66
This script assumes that all components like Zookeeper, Kafka, Connect, KSQL, Jupyter) use default values.
77

8+
We use the following test data (each row is one single payment):
9+
10+
Id bigint, Timestamp varchar, User varchar, Time int, V1 double, V2 double, V3 double, V4 double, V5 double, V6 double, V7 double, V8 double, V9 double, V10 double, V11 double, V12 double, V13 double, V14 double, V15 double, V16 double, V17 double, V18 double, V19 double, V20 double, V21 double, V22 double, V23 double, V24 double, V25 double, V26 double, V27 double, V28 double, Amount double, Class string
11+
812
== Starting backend services
913

1014
First we need to start a local Kafka ecosystem to use KSQL from the Jupyter notebook. We also need to create some test data:
11-
Either start a data generator to create a continous feed of streaming data or integrate with a file (via Kafka Connect). As this is not part of the ML related tasks, but just to get some test data into a Kafka topic, we do it outside of Jupyter:
15+
Either start a data generator to create a continous feed of streaming data or integrate with a file (via Kafka Connect).This is not part of the ML related tasks, but just to get some test data into a Kafka topic:
1216

1317
[source,bash]
1418
----
@@ -17,21 +21,29 @@ confluent start connect
1721
confluent start ksql-server
1822
1923
// Create Kafka topic
20-
kafka-topics --zookeeper localhost:2181 --create --topic creditcardfraud --partitions 3 --replication-factor 1
24+
kafka-topics --zookeeper localhost:2181 --create --topic creditcardfraud_source --partitions 3 --replication-factor 1
2125
26+
// Produce test data manually
27+
confluent produce creditcardfraud_source
2228
29+
1,"2018-12-18T12:00:00Z","Kai",0,-1.3598071336738,-0.0727811733098497,2.53634673796914,1.37815522427443,-0.338320769942518,0.462387777762292,0.239598554061257,0.0986979012610507,0.363786969611213,0.0907941719789316,-0.551599533260813,-0.617800855762348,-0.991389847235408,-0.311169353699879,1.46817697209427,-0.470400525259478,0.207971241929242,0.0257905801985591,0.403992960255733,0.251412098239705,-0.018306777944153,0.277837575558899,-0.110473910188767,0.0669280749146731,0.128539358273528,-0.189114843888824,0.133558376740387,-0.0210530534538215,149.62,"0"
2330
31+
// Keep last column empty => This is sending NULL => Gets filtered out as part of the preprocessing stream!
32+
1,"2018-12-18T12:00:00Z","Kai",0,-1.3598071336738,-0.0727811733098497,2.53634673796914,1.37815522427443,-0.338320769942518,0.462387777762292,0.239598554061257,0.0986979012610507,0.363786969611213,0.0907941719789316,-0.551599533260813,-0.617800855762348,-0.991389847235408,-0.311169353699879,1.46817697209427,-0.470400525259478,0.207971241929242,0.0257905801985591,0.403992960255733,0.251412098239705,-0.018306777944153,0.277837575558899,-0.110473910188767,0.0669280749146731,0.128539358273528,-0.189114843888824,0.133558376740387,-0.0210530534538215,149.62,
33+
----
2434

25-
// TODO alternative: Create topic (to have no dependency to Kafka Connect in the demo)?
35+
As alternative, you can consume test data from CSV file 'creditcard_extended.csv':
2636

37+
[source,bash]
38+
----
2739
// Start File Connector to consume data from CSV file:
2840
curl -s -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors -d '{
2941
"name" : "file-source",
3042
"config" : {
3143
"connector.class" : "org.apache.kafka.connect.file.FileStreamSourceConnector",
3244
"tasks.max" : "1",
33-
"file": "/Users/kai.waehner/git-projects/python-jupyter-apache-kafka-ksql-tensorflow-keras/data/creditcard_small.csv",
34-
"topic": "creditcardfraud",
45+
"file": "/Users/kai.waehner/git-projects/python-jupyter-apache-kafka-ksql-tensorflow-keras/data/creditcard_extended.csv",
46+
"topic": "creditcardfraud_source",
3547
"name": "file-source",
3648
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
3749
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
@@ -40,26 +52,55 @@ curl -s -X POST -H 'Content-Type: application/json' http://localhost:8083/connec
4052
4153
confluent status file-source
4254
curl -s -X DELETE localhost:8083/connectors/file-source
55+
----
4356

44-
// Check if the connector is reading the data:
57+
== Demo in Jupyter Notebook
58+
Now go to the Jupyter Notebook 'python-jupyter-apache-kafka-ksql-tensorflow-keras.ipynb' to do the preprocessing and interactive analysis with Python + KSQL, then the model training with Python + Keras.
4559

46-
confluent consume creditcardfraud --from-beginning
60+
[source,bash]
61+
----
62+
// Terminal
63+
jupyter notebook
64+
----
4765

48-
confluent produce creditcardfraud
66+
== Commands to create KSQL Streams and to consume events
67+
Some options to consume the data for testing:
4968

50-
// Message columns
51-
"Id","Time","V1","V2","V3","V4","V5","V6","V7","V8","V9","V10","V11","V12","V13","V14","V15","V16","V17","V18","V19","V20","V21","V22","V23","V24","V25","V26","V27","V28","Amount","Class"
69+
[source,bash]
70+
----
5271
72+
// Terminal
73+
confluent consume creditcardfraud_source --from-beginning
5374
54-
1,"2018-12-18T12:00:00Z","Kai",0,-1.3598071336738,-0.0727811733098497,2.53634673796914,1.37815522427443,-0.338320769942518,0.462387777762292,0.239598554061257,0.0986979012610507,0.363786969611213,0.0907941719789316,-0.551599533260813,-0.617800855762348,-0.991389847235408,-0.311169353699879,1.46817697209427,-0.470400525259478,0.207971241929242,0.0257905801985591,0.403992960255733,0.251412098239705,-0.018306777944153,0.277837575558899,-0.110473910188767,0.0669280749146731,0.128539358273528,-0.189114843888824,0.133558376740387,-0.0210530534538215,149.62,"0"
75+
// KSQL-CLI
76+
SELECT * FROM creditcardfraud_source;
77+
----
5578

56-
CREATE STREAM creditcardfraud_enhanced (Id bigint, timestamp varchar, userid varchar, Time int, V1 double, V2 double, V3 double, V4 double, V5 double, V6 double, V7 double, V8 double, V9 double, V10 double, V11 double, V12 double, V13 double, V14 double, V15 double, V16 double, V17 double, V18 double, V19 double, V20 double, V21 double, V22 double, V23 double, V24 double, V25 double, V26 double, V27 double, V28 double, Amount double, Class string) WITH (kafka_topic='creditcardfraud', value_format='DELIMITED');
79+
KSQL commands (if you want to run it from KSQL CLI instead of using the Jupyter Notebook)
80+
81+
[source,bash]
82+
----
83+
CREATE STREAM creditcardfraud_source (Id bigint, Timestamp varchar, User varchar, Time int, V1 double, V2 double, V3 double, V4 double, V5 double, V6 double, V7 double, V8 double, V9 double, V10 double, V11 double, V12 double, V13 double, V14 double, V15 double, V16 double, V17 double, V18 double, V19 double, V20 double, V21 double, V22 double, V23 double, V24 double, V25 double, V26 double, V27 double, V28 double, Amount double, Class string) WITH (kafka_topic='creditcardfraud_source', value_format='DELIMITED', KEY='Id');
84+
85+
// Preprocessed KSQL Stream:
86+
// Filter columns
87+
// Filter messages where class is empty
88+
// Change data format to Avro
89+
CREATE STREAM creditcardfraud_preprocessed_avro WITH (VALUE_FORMAT='AVRO', KAFKA_TOPIC='creditcardfraud_preprocessed_avro') AS SELECT Time, V1 , V2 , V3 , V4 , V5 , V6 , V7 , V8 , V9 , V10 , V11 , V12 , V13 , V14 , V15 , V16 , V17 , V18 , V19 , V20 , V21 , V22 , V23 , V24 , V25 , V26 , V27 , V28 , Amount , Class FROM creditcardfraud_source WHERE Class IS NOT NULL;
90+
91+
92+
----
93+
94+
== Further possible KSQL preprocessing steps (not integrated into Jupyter and the demo yet)
95+
96+
[source,bash]
97+
----
5798
5899
// make sure that you key your source topic for user data, otherwise you have show rekeying it KSQL which is cumbersome
59100
1:{user_id:1, class:platinum}
60101
2:{user_id:2, class:bronze}
61102
// console-producer --key=:
62-
CREATE TABLE USERS (USERID, CITY, EMAIL, CLASS) WITH (KAFKA_TOPIC='SERS',KEY='USERID')
103+
CREATE TABLE USERS (USERID, CITY, EMAIL, CLASS) WITH (KAFKA_TOPIC='USERS',KEY='USERID')
63104
// validate by SELECT ROWKEY from <table>
64105
// also in PRINT
65106
@@ -176,19 +217,16 @@ java.lang.String cannot be cast to org.apache.avro.generic.GenericRecord
176217
ksql-datagen quickstart=users format=json topic=users maxInterval=1000 propertiesFile=etc/ksql/datagen.properties
177218
----
178219

179-
== Open Jupyter notebook
220+
== Helper commands for Python, Conda, Jupyter, pip
221+
222+
Open Jupyter notebook
180223

181224
[source,bash]
182225
----
183226
// Open Jupyter and select the notebook 'live-demo___python-jupyter-apache-kafka-ksql-tensorflow-keras.adoc'
184227
jupyter notebook
185228
----
186229

187-
Follow the steps in the notebook to run the demo.
188-
189-
190-
== Jupyter / pip / conda commands
191-
192230
Some common commands for Jupyter, pip, conda to manage Python packages like ksql-python:
193231

194232
[source,bash]

0 commit comments

Comments
 (0)