|
11 | 11 | "cell_type": "markdown", |
12 | 12 | "metadata": {}, |
13 | 13 | "source": [ |
| 14 | + "This notebook uses the combination of Python, Apache Kafka, KSQL for Machine Learning infrastructures. \n", |
| 15 | + "\n", |
| 16 | + "It includes code examples using ksql-python and other widespread components from Python’s machine learning ecosystem, like Numpy, pandas, TensorFlow and Keras. \n", |
| 17 | + "\n", |
| 18 | + "The use case is fraud detection for credit card payments. We use a test data set from Kaggle as foundation to train an unsupervised autoencoder to detect anomalies and potential fraud in payments. Focus of this example is not just model training, but the whole Machine Learning infrastructure including data ingestion, data preprocessing, model training, model deployment and monitoring. All of this needs to be scalable, reliable and performant.\n", |
| 19 | + "\n", |
| 20 | + "If you want to learn more about the relation between the Apache Kafka open source ecosystem and Machine Learning, please check out these two blog posts:\n", |
| 21 | + "\n", |
| 22 | + "- [How to Build and Deploy Scalable Machine Learning in Production with Apache Kafka](https://www.confluent.io/blog/build-deploy-scalable-machine-learning-production-apache-kafka/)\n", |
| 23 | + "- [https://www.confluent.io/blog/using-apache-kafka-drive-cutting-edge-machine-learning](https://www.confluent.io/blog/using-apache-kafka-drive-cutting-edge-machine-learning)\n" |
| 24 | + ] |
| 25 | + }, |
| 26 | + { |
| 27 | + "cell_type": "markdown", |
| 28 | + "metadata": {}, |
| 29 | + "source": [ |
| 30 | + "## Data Integration and Preprocessing with Python and KSQL\n", |
| 31 | + "\n", |
14 | 32 | "Load KSQL library and initiate connection to KSQL server:" |
15 | 33 | ] |
16 | 34 | }, |
|
33 | 51 | }, |
34 | 52 | { |
35 | 53 | "cell_type": "code", |
36 | | - "execution_count": 3, |
| 54 | + "execution_count": 2, |
37 | 55 | "metadata": {}, |
38 | 56 | "outputs": [ |
39 | 57 | { |
|
42 | 60 | "True" |
43 | 61 | ] |
44 | 62 | }, |
45 | | - "execution_count": 3, |
| 63 | + "execution_count": 2, |
46 | 64 | "metadata": {}, |
47 | 65 | "output_type": "execute_result" |
48 | 66 | } |
|
67 | 85 | }, |
68 | 86 | { |
69 | 87 | "cell_type": "code", |
70 | | - "execution_count": 5, |
| 88 | + "execution_count": 3, |
71 | 89 | "metadata": {}, |
72 | 90 | "outputs": [ |
73 | 91 | { |
|
76 | 94 | "True" |
77 | 95 | ] |
78 | 96 | }, |
79 | | - "execution_count": 5, |
| 97 | + "execution_count": 3, |
80 | 98 | "metadata": {}, |
81 | 99 | "output_type": "execute_result" |
82 | 100 | } |
|
99 | 117 | }, |
100 | 118 | { |
101 | 119 | "cell_type": "code", |
102 | | - "execution_count": 6, |
| 120 | + "execution_count": 4, |
103 | 121 | "metadata": {}, |
104 | 122 | "outputs": [ |
105 | 123 | { |
|
117 | 135 | " 'format': 'AVRO'}]}]" |
118 | 136 | ] |
119 | 137 | }, |
120 | | - "execution_count": 6, |
| 138 | + "execution_count": 4, |
121 | 139 | "metadata": {}, |
122 | 140 | "output_type": "execute_result" |
123 | 141 | } |
|
135 | 153 | }, |
136 | 154 | { |
137 | 155 | "cell_type": "code", |
138 | | - "execution_count": 7, |
| 156 | + "execution_count": null, |
139 | 157 | "metadata": {}, |
140 | | - "outputs": [ |
141 | | - { |
142 | | - "data": { |
143 | | - "text/plain": [ |
144 | | - "[{'@type': 'sourceDescription',\n", |
145 | | - " 'statementText': 'describe CREDITCARDFRAUD_PREPROCESSED_AVRO;',\n", |
146 | | - " 'sourceDescription': {'name': 'CREDITCARDFRAUD_PREPROCESSED_AVRO',\n", |
147 | | - " 'readQueries': [],\n", |
148 | | - " 'writeQueries': [{'sinks': ['CREDITCARDFRAUD_PREPROCESSED_AVRO'],\n", |
149 | | - " 'id': 'CSAS_CREDITCARDFRAUD_PREPROCESSED_AVRO_0',\n", |
150 | | - " 'queryString': \"CREATE stream creditcardfraud_preprocessed_avro WITH (kafka_topic='creditcardfraud_preprocessed_avro', value_format='AVRO') AS SELECT Time, V1, V2, V3, V4, V5, V6, V7, V8, V9, V10, V11, V12, V13, V14, V15, V16, V17, V18, V19, V20, V21, V22, V23, V24, V25, V26, V27, V28, Amount, Class FROM creditcardfraud_source where Class IS NOT NULL;\"}],\n", |
151 | | - " 'fields': [{'name': 'ROWTIME',\n", |
152 | | - " 'schema': {'type': 'BIGINT', 'fields': None, 'memberSchema': None}},\n", |
153 | | - " {'name': 'ROWKEY',\n", |
154 | | - " 'schema': {'type': 'STRING', 'fields': None, 'memberSchema': None}},\n", |
155 | | - " {'name': 'TIME',\n", |
156 | | - " 'schema': {'type': 'INTEGER', 'fields': None, 'memberSchema': None}},\n", |
157 | | - " {'name': 'V1',\n", |
158 | | - " 'schema': {'type': 'DOUBLE', 'fields': None, 'memberSchema': None}},\n", |
159 | | - " {'name': 'V2',\n", |
160 | | - " 'schema': {'type': 'DOUBLE', 'fields': None, 'memberSchema': None}},\n", |
161 | | - " {'name': 'V3',\n", |
162 | | - " 'schema': {'type': 'DOUBLE', 'fields': None, 'memberSchema': None}},\n", |
163 | | - " {'name': 'V4',\n", |
164 | | - " 'schema': {'type': 'DOUBLE', 'fields': None, 'memberSchema': None}},\n", |
165 | | - " {'name': 'V5',\n", |
166 | | - " 'schema': {'type': 'DOUBLE', 'fields': None, 'memberSchema': None}},\n", |
167 | | - " {'name': 'V6',\n", |
168 | | - " 'schema': {'type': 'DOUBLE', 'fields': None, 'memberSchema': None}},\n", |
169 | | - " {'name': 'V7',\n", |
170 | | - " 'schema': {'type': 'DOUBLE', 'fields': None, 'memberSchema': None}},\n", |
171 | | - " {'name': 'V8',\n", |
172 | | - " 'schema': {'type': 'DOUBLE', 'fields': None, 'memberSchema': None}},\n", |
173 | | - " {'name': 'V9',\n", |
174 | | - " 'schema': {'type': 'DOUBLE', 'fields': None, 'memberSchema': None}},\n", |
175 | | - " {'name': 'V10',\n", |
176 | | - " 'schema': {'type': 'DOUBLE', 'fields': None, 'memberSchema': None}},\n", |
177 | | - " {'name': 'V11',\n", |
178 | | - " 'schema': {'type': 'DOUBLE', 'fields': None, 'memberSchema': None}},\n", |
179 | | - " {'name': 'V12',\n", |
180 | | - " 'schema': {'type': 'DOUBLE', 'fields': None, 'memberSchema': None}},\n", |
181 | | - " {'name': 'V13',\n", |
182 | | - " 'schema': {'type': 'DOUBLE', 'fields': None, 'memberSchema': None}},\n", |
183 | | - " {'name': 'V14',\n", |
184 | | - " 'schema': {'type': 'DOUBLE', 'fields': None, 'memberSchema': None}},\n", |
185 | | - " {'name': 'V15',\n", |
186 | | - " 'schema': {'type': 'DOUBLE', 'fields': None, 'memberSchema': None}},\n", |
187 | | - " {'name': 'V16',\n", |
188 | | - " 'schema': {'type': 'DOUBLE', 'fields': None, 'memberSchema': None}},\n", |
189 | | - " {'name': 'V17',\n", |
190 | | - " 'schema': {'type': 'DOUBLE', 'fields': None, 'memberSchema': None}},\n", |
191 | | - " {'name': 'V18',\n", |
192 | | - " 'schema': {'type': 'DOUBLE', 'fields': None, 'memberSchema': None}},\n", |
193 | | - " {'name': 'V19',\n", |
194 | | - " 'schema': {'type': 'DOUBLE', 'fields': None, 'memberSchema': None}},\n", |
195 | | - " {'name': 'V20',\n", |
196 | | - " 'schema': {'type': 'DOUBLE', 'fields': None, 'memberSchema': None}},\n", |
197 | | - " {'name': 'V21',\n", |
198 | | - " 'schema': {'type': 'DOUBLE', 'fields': None, 'memberSchema': None}},\n", |
199 | | - " {'name': 'V22',\n", |
200 | | - " 'schema': {'type': 'DOUBLE', 'fields': None, 'memberSchema': None}},\n", |
201 | | - " {'name': 'V23',\n", |
202 | | - " 'schema': {'type': 'DOUBLE', 'fields': None, 'memberSchema': None}},\n", |
203 | | - " {'name': 'V24',\n", |
204 | | - " 'schema': {'type': 'DOUBLE', 'fields': None, 'memberSchema': None}},\n", |
205 | | - " {'name': 'V25',\n", |
206 | | - " 'schema': {'type': 'DOUBLE', 'fields': None, 'memberSchema': None}},\n", |
207 | | - " {'name': 'V26',\n", |
208 | | - " 'schema': {'type': 'DOUBLE', 'fields': None, 'memberSchema': None}},\n", |
209 | | - " {'name': 'V27',\n", |
210 | | - " 'schema': {'type': 'DOUBLE', 'fields': None, 'memberSchema': None}},\n", |
211 | | - " {'name': 'V28',\n", |
212 | | - " 'schema': {'type': 'DOUBLE', 'fields': None, 'memberSchema': None}},\n", |
213 | | - " {'name': 'AMOUNT',\n", |
214 | | - " 'schema': {'type': 'DOUBLE', 'fields': None, 'memberSchema': None}},\n", |
215 | | - " {'name': 'CLASS',\n", |
216 | | - " 'schema': {'type': 'STRING', 'fields': None, 'memberSchema': None}}],\n", |
217 | | - " 'type': 'STREAM',\n", |
218 | | - " 'key': '',\n", |
219 | | - " 'timestamp': '',\n", |
220 | | - " 'statistics': '',\n", |
221 | | - " 'errorStats': '',\n", |
222 | | - " 'extended': False,\n", |
223 | | - " 'format': 'AVRO',\n", |
224 | | - " 'topic': 'creditcardfraud_preprocessed_avro',\n", |
225 | | - " 'partitions': 0,\n", |
226 | | - " 'replication': 0}}]" |
227 | | - ] |
228 | | - }, |
229 | | - "execution_count": 7, |
230 | | - "metadata": {}, |
231 | | - "output_type": "execute_result" |
232 | | - } |
233 | | - ], |
| 158 | + "outputs": [], |
234 | 159 | "source": [ |
235 | 160 | "client.ksql('describe CREDITCARDFRAUD_PREPROCESSED_AVRO')" |
236 | 161 | ] |
|
244 | 169 | }, |
245 | 170 | { |
246 | 171 | "cell_type": "code", |
247 | | - "execution_count": 12, |
| 172 | + "execution_count": 5, |
248 | 173 | "metadata": {}, |
249 | 174 | "outputs": [ |
250 | 175 | { |
|
256 | 181 | "\n", |
257 | 182 | "\n", |
258 | 183 | "\n", |
259 | | - "{\"row\":{\"columns\":[1545916448102,null,0,-1.3598071336738,-0.0727811733098497,2.53634673796914,1.37815522427443,-0.338320769942518,0.\n", |
260 | | - "462387777762292,0.239598554061257,0.0986979012610507,0.363786969611213,0.0907941719789316,-0.551599533260813,-0.617800855762348,-0.991389847235408,-0.311169353699879,1.46817697209427,-0.470400525259478,0.207971241929242,0.0257905801985591,0.403992960255733,0.251412098239705\n", |
261 | | - ",-0.018306777944153,0.277837575558899,-0.110473910188767,0.0669280749146731,0.128539358273528,-0.189114843888824,0.133558376740387,-0.0210530534538215,149.62,\"0\"]},\"errorMessage\":null,\"finalMessage\":null}\n", |
| 184 | + "\n", |
| 185 | + "\n", |
| 186 | + "\n", |
| 187 | + "\n", |
| 188 | + "\n", |
| 189 | + "{\"row\":{\"columns\":[1546005557210,null,0,-1.3598071336738,-0.0727811733098497,2.53634673796914,1.37815522427443,-0.338320769942518,0\n", |
| 190 | + ".462387777762292,0.239598554061257,0.0986979012610507,0.363786969611213,0.0907941719789316,-0.551599533260813,-0.617800855762348,-0.991389847235408,-0.311169353699879,1.46817697209427,-0.470400525259478,0.207971241929242,0.0257905801985591,0.403992960255733,0.25141209823970\n", |
| 191 | + "5,-0.018306777944153,0.277837575558899,-0.110473910188767,0.0669280749146731,0.128539358273528,-0.189114843888824,0.133558376740387,-0.0210530534538215,149.62,\"0\"]},\"errorMessage\":null,\"finalMessage\":null}\n", |
262 | 192 | "{\"row\":null,\"errorMessage\":null,\"finalMessage\":\"Limit Reached\"}\n", |
263 | 193 | "\n" |
264 | 194 | ] |
|
271 | 201 | " print(item)\n" |
272 | 202 | ] |
273 | 203 | }, |
274 | | - { |
275 | | - "cell_type": "markdown", |
276 | | - "metadata": {}, |
277 | | - "source": [ |
278 | | - "Execute sql query and keep listening streaming data:" |
279 | | - ] |
280 | | - }, |
281 | 204 | { |
282 | 205 | "cell_type": "code", |
283 | 206 | "execution_count": null, |
|
292 | 215 | "cell_type": "markdown", |
293 | 216 | "metadata": {}, |
294 | 217 | "source": [ |
| 218 | + "### TODO Add the following to the \"script\" for more data preprocessing with KSQL\n", |
| 219 | + "\n", |
| 220 | + "Some more examples for possible data wrangling and preprocessing with KSQL:\n", |
| 221 | + "\n", |
| 222 | + "// Drop columns\n", |
| 223 | + "// Filter messages where value ‘Class’ is empty\n", |
| 224 | + "// Change data format to Avro\n", |
| 225 | + "CREATE STREAM creditcardfraud_preprocessed_avro WITH (VALUE_FORMAT='AVRO', KAFKA_TOPIC='creditcardfraud_preprocessed_avro') AS SELECT Time, V1 , V2 , V3 , V4 , V5 , V6 , V7 , V8 , V9 , V10 , V11 , V12 , V13 , V14 , V15 , V16 , V17 , V18 , V19 , V20 , V21 , V22 , V23 , V24 , V25 , V26 , V27 , V28 , Amount , Class FROM creditcardfraud_source WHERE Class IS NOT NULL;\n", |
| 226 | + "\n", |
| 227 | + "// Anonymization\n", |
| 228 | + "SELECT Id, MASK_LEFT(User, 2) FROM creditcardfraud_source;\n", |
| 229 | + "\n", |
| 230 | + "// Augmentation\n", |
| 231 | + "SELECT Id, IFNULL(Class, -1) FROM creditcardfraud_source;\n", |
| 232 | + "\n", |
| 233 | + "// Merge / Join data frames\n", |
| 234 | + "CREATE STREAM creditcardfraud_per_user WITH (VALUE_FORMAT='AVRO', KAFKA_TOPIC='creditcardfraud_preprocessed_avro') AS SELECT Time, V1 , V2 , V3 , V4 , V5 , V6 , V7 , V8 , V9 , V10 , V11 , V12 , V13 , V14 , V15 , V16 , V17 , V18 , V19 , V20 , V21 , V22 , V23 , V24 , V25 , V26 , V27 , V28 , Amount , Class FROM creditcardfraud_enahnced c INNER JOIN USERS u on c.userid = u.userid WHERE V1 > 5 AND V2 IS NOT NULL AND u.CITY LIKE 'Premium%';\n" |
| 235 | + ] |
| 236 | + }, |
| 237 | + { |
| 238 | + "cell_type": "markdown", |
| 239 | + "metadata": {}, |
| 240 | + "source": [ |
| 241 | + "## TODO => The following section will be fixed soon (when Magnus is back at work)\n", |
295 | 242 | "# Mapping from KSQL to Numpy / Pandas for Machine Learning Tasks" |
296 | 243 | ] |
297 | 244 | }, |
298 | 245 | { |
299 | 246 | "cell_type": "code", |
300 | | - "execution_count": 11, |
| 247 | + "execution_count": null, |
301 | 248 | "metadata": { |
302 | 249 | "scrolled": false |
303 | 250 | }, |
304 | | - "outputs": [ |
305 | | - { |
306 | | - "name": "stdout", |
307 | | - "output_type": "stream", |
308 | | - "text": [ |
309 | | - "\n", |
310 | | - "\n", |
311 | | - "\n", |
312 | | - "{\"row\":{\"columns\":[1545916370264,null,1,\"2018-12-18T12:00:00Z\",\"Kai\",0,-1.3598071336738,-0.0727811733098497,2.5363467379691\n", |
313 | | - "<class 'str'>\n", |
314 | | - "4,1.37815522427443,-0.338320769942518,0.462387777762292,0.239598554061257,0.0986979012610507,0.363786969611213,0.0907941719789316,-0.551599533260813,-0.617800855762348,-0.991389847235408,-0.311169353699879,1.46817697209427,-0.470400525259478,0.207971241929242,0.025790580\n", |
315 | | - "<class 'str'>\n", |
316 | | - "1985591,0.403992960255733,0.251412098239705,-0.018306777944153,0.277837575558899,-0.110473910188767,0.0669280749146731,0.128539358273528,-0.189114843888824,0.133558376740387,-0.0210530534538215,149.62,\"0\"]},\"errorMessage\":null,\"finalMessage\":null}\n", |
317 | | - "{\"row\":\n", |
318 | | - "<class 'str'>\n", |
319 | | - "null,\"errorMessage\":null,\"finalMessage\":\"Limit Reached\"}\n", |
320 | | - "\n", |
321 | | - "<class 'str'>\n" |
322 | | - ] |
323 | | - } |
324 | | - ], |
| 251 | + "outputs": [], |
325 | 252 | "source": [ |
326 | 253 | "a = \"Kai\"\n", |
327 | 254 | "#print(\"BEFORE \" + a)\n", |
|
415 | 342 | "source": [ |
416 | 343 | "This part only includes the steps required for model training of the Autoencoder with Keras and TensorFlow. \n", |
417 | 344 | "\n", |
418 | | - "If you want to get a better understanding of the model, take a look at the other notebook \"Python Tensorflow Keras Fraud Detection Autoencoder.ipynb\" which includes many more details, plots and explanations." |
| 345 | + "If you want to get a better understanding of the model, take a look at the other notebook [Python Tensorflow Keras Fraud Detection Autoencoder.ipynb](http://localhost:8888/notebooks/Python%20Tensorflow%20Keras%20Fraud%20Detection%20Autoencoder.ipynb) which includes many more details, plots and explanations.\n", |
| 346 | + "\n", |
| 347 | + "[Kudos to David Ellison](https://www.datascience.com/blog/fraud-detection-with-tensorflow).\n", |
| 348 | + "\n", |
| 349 | + "[The credit card fraud data set is available at Kaggle](https://www.kaggle.com/mlg-ulb/creditcardfraud/data)." |
419 | 350 | ] |
420 | 351 | }, |
421 | 352 | { |
422 | 353 | "cell_type": "code", |
423 | | - "execution_count": 14, |
| 354 | + "execution_count": 6, |
424 | 355 | "metadata": {}, |
425 | 356 | "outputs": [], |
426 | 357 | "source": [ |
427 | | - "# TODO Copy%Paste from the other Notebook (Python Tensorflow Keras Fraud Detection Autoencoder.ipynb)" |
| 358 | + "# TODO Copy%Paste from the other Notebook (Python Tensorflow Keras Fraud Detection Autoencoder.ipynb)\n", |
| 359 | + "# Will be done after Magnus helped to fix the mapping from KSQL Generator to Pandas dataframe" |
428 | 360 | ] |
429 | 361 | }, |
430 | 362 | { |
|
444 | 376 | "source": [ |
445 | 377 | "# Model Deployment\n", |
446 | 378 | "\n", |
447 | | - "This demo focuses on the combination of Python and KSQL for data preprocessing and model training. If you want to understand the relation between Apache Kafka, KSQL and Python-related Machine Learning tools for model deployment and monitoring, please check out my other Github projects and blog posts:\n", |
| 379 | + "This demo focuses on the combination of Python and KSQL for data preprocessing and model training. If you want to understand the relation between Apache Kafka, KSQL and Python-related Machine Learning tools for model deployment and monitoring, please check out my other Github projects:\n", |
448 | 380 | "\n", |
449 | | - "*TODO Links to other Github projects and blog posts.*\n", |
| 381 | + "Some examples of model deployment in Kafka environments:\n", |
450 | 382 | "\n", |
451 | | - "- Kafka + ML blog\n", |
452 | | - "- Deployment (ML Server vs. Embedded into streaming apps)\n", |
453 | | - "- KSQL UDF example\n", |
454 | | - "- Kafka Streams example (Keras + TensorFlow)\n" |
| 383 | + "- [Analytic models (TensorFlow, Keras, H2O and Deeplearning4j) embedded in Kafka Streams microservices](https://github.com/kaiwaehner/kafka-streams-machine-learning-examples)\n", |
| 384 | + "- [Anomaly detection of IoT sensor data with a model embedded into a KSQL UDF](https://github.com/kaiwaehner/ksql-udf-deep-learning-mqtt-iot)\n", |
| 385 | + "- [RPC communication between Kafka Streams application and model server (TensorFlow Serving)](https://github.com/kaiwaehner/tensorflow-serving-java-grpc-kafka-streams)" |
455 | 386 | ] |
456 | 387 | }, |
457 | 388 | { |
|
0 commit comments