Skip to content

Commit 5f2cca2

Browse files
committed
Added working Jupyter notebook showing Python + KSQL
1 parent 4868151 commit 5f2cca2

File tree

1 file changed

+126
-8
lines changed

1 file changed

+126
-8
lines changed

python-jupyter-apache-kafka-ksql-tensorflow-keras.ipynb

Lines changed: 126 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,150 @@
11
{
22
"cells": [
3+
{
4+
"cell_type": "markdown",
5+
"metadata": {},
6+
"source": [
7+
"Setup for the KSQL API:"
8+
]
9+
},
10+
{
11+
"cell_type": "code",
12+
"execution_count": 1,
13+
"metadata": {},
14+
"outputs": [],
15+
"source": [
16+
"from ksql import KSQLAPI\n",
17+
"client = KSQLAPI('http://localhost:8088')"
18+
]
19+
},
20+
{
21+
"cell_type": "code",
22+
"execution_count": 3,
23+
"metadata": {},
24+
"outputs": [
25+
{
26+
"data": {
27+
"text/plain": [
28+
"True"
29+
]
30+
},
31+
"execution_count": 3,
32+
"metadata": {},
33+
"output_type": "execute_result"
34+
}
35+
],
36+
"source": [
37+
"client.create_table(table_name='users_original',\n",
38+
" columns_type=['registertime bigint','gender varchar','regionid varchar','userid varchar'],\n",
39+
" topic='users',\n",
40+
" value_format='JSON',\n",
41+
" key = 'userid')"
42+
]
43+
},
44+
{
45+
"cell_type": "code",
46+
"execution_count": 6,
47+
"metadata": {},
48+
"outputs": [
49+
{
50+
"data": {
51+
"text/plain": [
52+
"[{'@type': 'tables',\n",
53+
" 'statementText': 'show tables;',\n",
54+
" 'tables': [{'type': 'TABLE',\n",
55+
" 'name': 'USERS_ORIGINAL',\n",
56+
" 'topic': 'users',\n",
57+
" 'format': 'JSON',\n",
58+
" 'isWindowed': False}]}]"
59+
]
60+
},
61+
"execution_count": 6,
62+
"metadata": {},
63+
"output_type": "execute_result"
64+
}
65+
],
66+
"source": [
67+
"client.ksql('show tables')"
68+
]
69+
},
70+
{
71+
"cell_type": "markdown",
72+
"metadata": {},
73+
"source": [
74+
"Execute sql query and keep listening streaming data:"
75+
]
76+
},
77+
{
78+
"cell_type": "code",
79+
"execution_count": 4,
80+
"metadata": {},
81+
"outputs": [
82+
{
83+
"name": "stdout",
84+
"output_type": "stream",
85+
"text": [
86+
"{\"row\":{\"columns\":[1543510415238,\"User_5\",1489124138779,\"OTHER\",\"Region_4\",\"User_5\"]},\"errorMessage\":null,\"finalMessage\":null}\n",
87+
"{\"row\":{\"columns\":[1543510415361,\"User_8\",1516543544214,\"OTHER\",\"Region_3\",\"User_8\"]},\"errorMessage\":null,\"finalMessage\":null}\n",
88+
"{\"row\":{\"columns\":[1543510415897,\"User_2\",1515464455832,\"FEMALE\",\"Region_9\",\"User_2\"]},\"errorMessage\":null,\"finalMessage\":null}\n",
89+
"{\"row\":{\"columns\":[1543510416775,\"User_3\",1514158220288,\"OTHER\",\"Region_4\",\"User_3\"]},\"errorMessage\":null,\"finalMessage\":null}\n"
90+
]
91+
},
92+
{
93+
"ename": "KeyboardInterrupt",
94+
"evalue": "",
95+
"output_type": "error",
96+
"traceback": [
97+
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
98+
"\u001b[0;31mKeyboardInterrupt\u001b[0m Traceback (most recent call last)",
99+
"\u001b[0;32m<ipython-input-4-10c244adfa2c>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n\u001b[1;32m 1\u001b[0m \u001b[0mquery\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mclient\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mquery\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'select * from users_original'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 2\u001b[0;31m \u001b[0;32mfor\u001b[0m \u001b[0mitem\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mquery\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0mprint\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mitem\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m",
100+
"\u001b[0;32m/anaconda3/envs/ksql-python/lib/python3.7/site-packages/ksql-0.5.1-py3.7.egg/ksql/api.py\u001b[0m in \u001b[0;36mquery\u001b[0;34m(self, query_string, encoding, chunk_size, stream_properties, idle_timeout)\u001b[0m\n\u001b[1;32m 69\u001b[0m \u001b[0mstreaming_response\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_request\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mendpoint\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34m'query'\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0msql_string\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mquery_string\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mstream_properties\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mstream_properties\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 70\u001b[0m \u001b[0mstart_idle\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 71\u001b[0;31m \u001b[0;32mfor\u001b[0m \u001b[0mchunk\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mstreaming_response\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0miter_content\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mchunk_size\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mchunk_size\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 72\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mchunk\u001b[0m \u001b[0;34m!=\u001b[0m \u001b[0;34mb'\\n'\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 73\u001b[0m \u001b[0mstart_idle\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
101+
"\u001b[0;32m/anaconda3/envs/ksql-python/lib/python3.7/site-packages/requests-2.20.1-py3.7.egg/requests/models.py\u001b[0m in \u001b[0;36mgenerate\u001b[0;34m()\u001b[0m\n\u001b[1;32m 748\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mhasattr\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mraw\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m'stream'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 749\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 750\u001b[0;31m \u001b[0;32mfor\u001b[0m \u001b[0mchunk\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mraw\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstream\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mchunk_size\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mdecode_content\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mTrue\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 751\u001b[0m \u001b[0;32myield\u001b[0m \u001b[0mchunk\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 752\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mProtocolError\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
102+
"\u001b[0;32m/anaconda3/envs/ksql-python/lib/python3.7/site-packages/urllib3-1.24.1-py3.7.egg/urllib3/response.py\u001b[0m in \u001b[0;36mstream\u001b[0;34m(self, amt, decode_content)\u001b[0m\n\u001b[1;32m 488\u001b[0m \"\"\"\n\u001b[1;32m 489\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mchunked\u001b[0m \u001b[0;32mand\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msupports_chunked_reads\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 490\u001b[0;31m \u001b[0;32mfor\u001b[0m \u001b[0mline\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mread_chunked\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mamt\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mdecode_content\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mdecode_content\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 491\u001b[0m \u001b[0;32myield\u001b[0m \u001b[0mline\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 492\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
103+
"\u001b[0;32m/anaconda3/envs/ksql-python/lib/python3.7/site-packages/urllib3-1.24.1-py3.7.egg/urllib3/response.py\u001b[0m in \u001b[0;36mread_chunked\u001b[0;34m(self, amt, decode_content)\u001b[0m\n\u001b[1;32m 667\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mchunk_left\u001b[0m \u001b[0;34m==\u001b[0m \u001b[0;36m0\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 668\u001b[0m \u001b[0;32mbreak\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 669\u001b[0;31m \u001b[0mchunk\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_handle_chunk\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mamt\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 670\u001b[0m decoded = self._decode(chunk, decode_content=decode_content,\n\u001b[1;32m 671\u001b[0m flush_decoder=False)\n",
104+
"\u001b[0;32m/anaconda3/envs/ksql-python/lib/python3.7/site-packages/urllib3-1.24.1-py3.7.egg/urllib3/response.py\u001b[0m in \u001b[0;36m_handle_chunk\u001b[0;34m(self, amt)\u001b[0m\n\u001b[1;32m 623\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0;31m# amt > self.chunk_left\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 624\u001b[0m \u001b[0mreturned_chunk\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_fp\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_safe_read\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mchunk_left\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 625\u001b[0;31m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_fp\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_safe_read\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;36m2\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;31m# Toss the CRLF at the end of the chunk.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 626\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mchunk_left\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 627\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mreturned_chunk\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
105+
"\u001b[0;32m/anaconda3/envs/ksql-python/lib/python3.7/http/client.py\u001b[0m in \u001b[0;36m_safe_read\u001b[0;34m(self, amt)\u001b[0m\n\u001b[1;32m 608\u001b[0m \u001b[0ms\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 609\u001b[0m \u001b[0;32mwhile\u001b[0m \u001b[0mamt\u001b[0m \u001b[0;34m>\u001b[0m \u001b[0;36m0\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 610\u001b[0;31m \u001b[0mchunk\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mfp\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mread\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mmin\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mamt\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mMAXAMOUNT\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 611\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0;32mnot\u001b[0m \u001b[0mchunk\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 612\u001b[0m \u001b[0;32mraise\u001b[0m \u001b[0mIncompleteRead\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34mb''\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mjoin\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0ms\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mamt\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
106+
"\u001b[0;32m/anaconda3/envs/ksql-python/lib/python3.7/socket.py\u001b[0m in \u001b[0;36mreadinto\u001b[0;34m(self, b)\u001b[0m\n\u001b[1;32m 587\u001b[0m \u001b[0;32mwhile\u001b[0m \u001b[0;32mTrue\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 588\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 589\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_sock\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mrecv_into\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mb\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 590\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mtimeout\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 591\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_timeout_occurred\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;32mTrue\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
107+
"\u001b[0;31mKeyboardInterrupt\u001b[0m: "
108+
]
109+
}
110+
],
111+
"source": [
112+
"query = client.query('select * from users_original')\n",
113+
"for item in query: print(item)"
114+
]
115+
},
3116
{
4117
"cell_type": "code",
5118
"execution_count": null,
6-
"metadata": {
7-
"collapsed": true
8-
},
119+
"metadata": {},
120+
"outputs": [],
121+
"source": []
122+
},
123+
{
124+
"cell_type": "code",
125+
"execution_count": null,
126+
"metadata": {},
9127
"outputs": [],
10128
"source": []
11129
}
12130
],
13131
"metadata": {
14132
"kernelspec": {
15-
"display_name": "Python 2",
133+
"display_name": "Python 3",
16134
"language": "python",
17-
"name": "python2"
135+
"name": "python3"
18136
},
19137
"language_info": {
20138
"codemirror_mode": {
21139
"name": "ipython",
22-
"version": 2
140+
"version": 3
23141
},
24142
"file_extension": ".py",
25143
"mimetype": "text/x-python",
26144
"name": "python",
27145
"nbconvert_exporter": "python",
28-
"pygments_lexer": "ipython2",
29-
"version": "2.7.3"
146+
"pygments_lexer": "ipython3",
147+
"version": "3.7.1"
30148
}
31149
},
32150
"nbformat": 4,

0 commit comments

Comments
 (0)