Skip to content

Commit ce1cee5

Browse files
committed
Added integration (connect) and processing (ksql) of the credit card sample data
1 parent 8fe6f2b commit ce1cee5

File tree

1 file changed

+46
-5
lines changed

1 file changed

+46
-5
lines changed

live-demo___python-jupyter-apache-kafka-ksql-tensorflow-keras.adoc

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,53 @@ First we need to start a local Kafka ecosystem to use KSQL from the Jupyter note
1212
[source,bash]
1313
----
1414
// Start KSQL with Kafka and other dependencies
15-
CONFLUENT_HOME# confluent start connect
16-
CONFLUENT_HOME# confluent start ksql-server
15+
confluent start connect
16+
confluent start ksql-server
1717
18-
// TODO
19-
// Start data generator (continuous flow of data instead of CSV file)
20-
CONFLUENT_HOME# ksql-datagen quickstart=users format=json topic=users maxInterval=1000 propertiesFile=etc/ksql/datagen.properties
18+
// Start File Connector to consume data from CSV file:
19+
curl -s -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors -d '{
20+
"name" : "file-source",
21+
"config" : {
22+
"connector.class" : "org.apache.kafka.connect.file.FileStreamSourceConnector",
23+
"tasks.max" : "1",
24+
"file": "/Users/kai.waehner/git-projects/python-jupyter-apache-kafka-ksql-tensorflow-keras/data/creditcard_small.csv",
25+
"topic": "creditcardfraud",
26+
"name": "file-source",
27+
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
28+
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
29+
}
30+
}'
31+
32+
confluent status file-source
33+
curl -s -X DELETE localhost:8083/connectors/file-source
34+
35+
// Check if the connector is reading the data:
36+
37+
confluent consume creditcardfraud --from-beginning
38+
39+
0,-1.3598071336738,-0.0727811733098497,2.53634673796914,1.37815522427443,-0.338320769942518,0.462387777762292,0.239598554061257,0.0986979012610507,0.363786969611213,0.0907941719789316,-0.551599533260813,-0.617800855762348,-0.991389847235408,-0.311169353699879,1.46817697209427,-0.470400525259478,0.207971241929242,0.0257905801985591,0.403992960255733,0.251412098239705,-0.018306777944153,0.277837575558899,-0.110473910188767,0.0669280749146731,0.128539358273528,-0.189114843888824,0.133558376740387,-0.0210530534538215,149.62,"0"
40+
41+
// kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic creditcardfraud --from-beginning
42+
43+
// confluent consume creditcardfraud --value-format avro --from-beginning
44+
45+
46+
//KSQL
47+
CREATE STREAM creditcardfraud (Time int, V1 double, V2 double, V3 double, V4 double, V5 double, V6 double, V7 double, V8 double, V9 double, V10 double, V11 double, V12 double, V13 double, V14 double, V15 double, V16 double, V17 double, V18 double, V19 double, V20 double, V21 double, V22 double, V23 double, V24 double, V25 double, V26 double, V27 double, V28 double, Amount double, Class string) WITH (kafka_topic='creditcardfraud', value_format='DELIMITED');
48+
49+
describe creditcardfraud;
50+
51+
SET 'auto.offset.reset'='earliest';
52+
53+
select * from creditcardfraud;
54+
55+
select TIME, V1, V2, AMOUNT, CLASS FROM creditcardfraud;
56+
57+
java.lang.String cannot be cast to org.apache.avro.generic.GenericRecord
58+
59+
// TODO Start data generator (continuous flow of data instead of CSV file)
60+
// TODO Create / fix creditcardtransactions.avro file
61+
ksql-datagen quickstart=users format=json topic=users maxInterval=1000 propertiesFile=etc/ksql/datagen.properties
2162
----
2263

2364
== Open Jupyter notebook

0 commit comments

Comments
 (0)