Skip to content

Commit b6b08d1

Browse files
committed
Added many KSQL snippets for data preprocessing
1 parent 81eb81b commit b6b08d1

File tree

1 file changed

+111
-2
lines changed

1 file changed

+111
-2
lines changed

live-demo___python-jupyter-apache-kafka-ksql-tensorflow-keras.adoc

Lines changed: 111 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@ Either start a data generator to create a continous feed of streaming data or in
1616
confluent start connect
1717
confluent start ksql-server
1818
19+
// Create Kafka topic
20+
kafka-topics --zookeeper localhost:2181 --create --topic creditcardfraud --partitions 3 --replication-factor 1
21+
22+
23+
24+
1925
// TODO alternative: Create topic (to have no dependency to Kafka Connect in the demo)?
2026
2127
// Start File Connector to consume data from CSV file:
@@ -41,15 +47,118 @@ confluent consume creditcardfraud --from-beginning
4147
4248
confluent produce creditcardfraud
4349
44-
1,0,-1.3598071336738,-0.0727811733098497,2.53634673796914,1.37815522427443,-0.338320769942518,0.462387777762292,0.239598554061257,0.0986979012610507,0.363786969611213,0.0907941719789316,-0.551599533260813,-0.617800855762348,-0.991389847235408,-0.311169353699879,1.46817697209427,-0.470400525259478,0.207971241929242,0.0257905801985591,0.403992960255733,0.251412098239705,-0.018306777944153,0.277837575558899,-0.110473910188767,0.0669280749146731,0.128539358273528,-0.189114843888824,0.133558376740387,-0.0210530534538215,149.62,"0"
50+
// Message columns
51+
"Id","Time","V1","V2","V3","V4","V5","V6","V7","V8","V9","V10","V11","V12","V13","V14","V15","V16","V17","V18","V19","V20","V21","V22","V23","V24","V25","V26","V27","V28","Amount","Class"
52+
53+
54+
1,"2018-12-18T12:00:00Z","Kai",0,-1.3598071336738,-0.0727811733098497,2.53634673796914,1.37815522427443,-0.338320769942518,0.462387777762292,0.239598554061257,0.0986979012610507,0.363786969611213,0.0907941719789316,-0.551599533260813,-0.617800855762348,-0.991389847235408,-0.311169353699879,1.46817697209427,-0.470400525259478,0.207971241929242,0.0257905801985591,0.403992960255733,0.251412098239705,-0.018306777944153,0.277837575558899,-0.110473910188767,0.0669280749146731,0.128539358273528,-0.189114843888824,0.133558376740387,-0.0210530534538215,149.62,"0"
55+
56+
CREATE STREAM creditcardfraud_enhanced (Id bigint, timestamp varchar, userid varchar, Time int, V1 double, V2 double, V3 double, V4 double, V5 double, V6 double, V7 double, V8 double, V9 double, V10 double, V11 double, V12 double, V13 double, V14 double, V15 double, V16 double, V17 double, V18 double, V19 double, V20 double, V21 double, V22 double, V23 double, V24 double, V25 double, V26 double, V27 double, V28 double, Amount double, Class string) WITH (kafka_topic='creditcardfraud', value_format='DELIMITED');
57+
58+
// make sure that you key your source topic for user data, otherwise you have show rekeying it KSQL which is cumbersome
59+
1:{user_id:1, class:platinum}
60+
2:{user_id:2, class:bronze}
61+
// console-producer --key=:
62+
CREATE TABLE USERS (USERID, CITY, EMAIL, CLASS) WITH (KAFKA_TOPIC='SERS',KEY='USERID')
63+
// validate by SELECT ROWKEY from <table>
64+
// also in PRINT
4565
4666
// kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic creditcardfraud --from-beginning
4767
4868
// confluent consume creditcardfraud --value-format avro --from-beginning
4969
70+
// TODO Use AVRO as option (or at least mention this option)... Or is this making it more complex for the data scientist audience instead of just using DELIMITED?
5071
5172
//KSQL
52-
CREATE STREAM creditcardfraud (Time int, V1 double, V2 double, V3 double, V4 double, V5 double, V6 double, V7 double, V8 double, V9 double, V10 double, V11 double, V12 double, V13 double, V14 double, V15 double, V16 double, V17 double, V18 double, V19 double, V20 double, V21 double, V22 double, V23 double, V24 double, V25 double, V26 double, V27 double, V28 double, Amount double, Class string) WITH (kafka_topic='creditcardfraud', value_format='DELIMITED');
73+
// shows dropping columns (e.g. Timestamp, ID, user etc)
74+
// shows dropping rows (wher V1 is greater than 5 and V2 isnot null and usernae starts with Kai
75+
// also switch to Avro & illustrate using bespoke kafka topic name
76+
CREATE STREAM creditcardfraud WITH (VALUE_FORMAT='AVRO', KAFKA_TOPIC='fraud_prep') AS SELECT Time, V1 , V2 , V3 , V4 , V5 , V6 , V7 , V8 , V9 , V10 , V11 , V12 , V13 , V14 , V15 , V16 , V17 , V18 , V19 , V20 , V21 , V22 , V23 , V24 , V25 , V26 , V27 , V28 , Amount , Class FROM creditcardfraud_enahnced c INNER JOIN USERS u on c.userid = u.userid WHERE V1 > 5 AND V2 IS NOT NULL AND u.CITY LIKE 'Premium%';
77+
78+
// DESCRIBE creditcardfraud;
79+
// ^ show the schema
80+
// DESCRIBE EXTENDED creditcardfraud;
81+
// ^ show the schema and underlying query and the nuebr of msgs processed -> this is an app we've built and it's continually running
82+
83+
84+
// Create a delimited version of this stream
85+
// Now app that *needs* csv gets it but other users of the data benefit from the explictly decalred schema and dont' have to type it out each time
86+
CREATE STREAM creditcardfraud_csv WITH (VALUE_FORMAT='DELIMITED') AS SELECT * FROM creditcardfraud
87+
88+
// KSQL => Extended CSV
89+
Add column to:
90+
91+
SELECT 'hsbc.csv' AS SOURCE_FILE, * FROM creditcardfraud;
92+
93+
Remove NAs / No values
94+
95+
SELECT * FROM creditcardfraud WHERE V1 IS NOT NULL;
96+
SELECT * FROM creditcardfraud WHERE (V1 IS NOT NULL AND V2 IS NOT NULL);
97+
98+
Restrict date range
99+
// there isn't <NOW> function
100+
// NOW - 1HOUR doesn't exist :(
101+
// i.. you have to hard code the epoch
102+
SELECT * FROM credicardfraud WHERE ROWTIME > {epoch value}
103+
104+
105+
Timestamp handling
106+
// See ATM fraud slides for illustration Slides: https://speakerdeck.com/rmoff/atm-fraud-detection-with-kafka-and-ksql
107+
Code: https://github.com/confluentinc/demo-scene/blob/master/ksql-atm-fraud-detection/ksql-atm-fraud-detection-README.adoc
108+
// this changes the way KSQL parses the timestamp of the message and uses a timestamp col from the payload - very important for time-based aggregations & time-based joins (e.g. stream-stream windowing)
109+
CREATE STREAM credicardfraud … WITH (TIMESTAMP='timestamp_col',TIMESTAMP_FORMAT='YYYY etc')
110+
ROWTIME then inherits tiemstamp_col _not_ kafka timestamp
111+
112+
SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss Z'), ROWTIME , timestamp_col from creditcardfraud limit 1;
113+
114+
// or you can leave the timestamp of the mesasage alone and just filter as required
115+
// useful for standard data prep & filtering
116+
SELECT * FROM creditcardfraud where STRINGTOTIMESTAMP(timestamp_col,'YYYY etc') > {epoch value}
117+
118+
Drop column / row
119+
120+
// drop row -> WHERE clause
121+
122+
123+
se
124+
// Concatenate
125+
126+
SELECT COL1 + COL2 AS NEW_COL FROM MY_STREAM;
127+
SELECT CAST(COL1 AS VARCHAR) + CAST(COL2 AS VARCHAR) FROM MY_STREAM;
128+
SELECT COL1 || ': static value : ' || COL2 AS NEW_COL // not sure if this is still supported
129+
SELECT CONCAT(COL1,COL2) // SQL users might expect it but it's ugly
130+
131+
// splitting a col - can't be done
132+
// there is no INSTR/INDEXOF, there's no SPLIT
133+
// SELECT SUBSTRING(FULL_NAME,1,INDEXOF(FULL_NAME,' '))
134+
// -> please go and upvote these on github
135+
SELECT SUBSTR(FULL_NAME, 1,5) FROM MY_STREAM
136+
// COALLESCE / CASE are the other huge missing ones
137+
https://github.com/confluentinc/ksql/issues/620
138+
139+
// Anonymization
140+
141+
https://github.com/confluentinc/ksql-recipes-try-it-at-home/tree/master/data-masking
142+
143+
144+
// Merge / Join data frames
145+
146+
// e.g. two sources of data with the same structure
147+
148+
CREATE STREAM website_source (SAME SCHEMA) (WITH KAFAK_TOPIC='from website')
149+
CREATE STREAM api_source (SAME SCHEMA) (WITH KAFAK_TOPIC='api')
150+
// also different geos etc
151+
152+
CREATE STREAM UNIFIED AS SELECT 'website' AS SOURCE, * FROM WEBSITE_SOURCE;
153+
INSERT INTO UNIFIED AS SELECT 'api' AS SOURCE, * FROM API_SOURCE;
154+
155+
// Single resultig stream (-> kafka topic) but continually popualted with data from BOTH sources
156+
// basically UNION of data sets
157+
158+
159+
What else?
160+
161+
CREATE STREAM creditcardfraud (Id bigint, Time int, V1 double, V2 double, V3 double, V4 double, V5 double, V6 double, V7 double, V8 double, V9 double, V10 double, V11 double, V12 double, V13 double, V14 double, V15 double, V16 double, V17 double, V18 double, V19 double, V20 double, V21 double, V22 double, V23 double, V24 double, V25 double, V26 double, V27 double, V28 double, Amount double, Class string) WITH (kafka_topic='creditcardfraud', value_format='DELIMITED');
53162
54163
describe creditcardfraud;
55164

0 commit comments

Comments
 (0)