In [1]:
import requests
import json
import base64

In [2]:
def query_ksql_stream(query, ksql_url, api_key, api_secret):
    headers = {
        "Content-Type": "application/vnd.ksql.v1+json; charset=utf-8"
    }
    
    # If using Confluent Cloud or secured ksqlDB, add basic auth
    credentials = f"{api_key}:{api_secret}"
    encoded_credentials = base64.b64encode(credentials.encode('utf-8')).decode('utf-8')
    if api_key and api_secret:
        headers["Authorization"] = f"Basic {encoded_credentials}"
    
    data = {
        "ksql": query,
        "streamsProperties": {"ksql.streams.auto.offset.reset": "earliest"}
    }
    
    response = requests.post(f"{ksql_url}/query", headers=headers, data=json.dumps(data))
    print(response)
    if response.status_code == 200:
        return response.iter_lines()
    else:
        raise Exception(f"Query failed: {response.text}")

In [3]:
# Usage example
ksql_url = "XXXX"  # Change this to your ksqlDB server URL
query = "SELECT * FROM  CREDIT_CARD_TRANSACTIONS_STREAM;"  # Your ksqlDB query
api_key="XXX"
api_secret="XXXX"

In [4]:
list = []
try:
    for line in query_ksql_stream(query, ksql_url, api_key, api_secret):
        data = json.loads(line.decode('utf-8'))
        # filtering ....
        list.append(data)
        print(data)
except Exception as e:
    print(e)

<Response [200]>
{'queryId': 'transient_CREDIT_CARD_TRANSACTIONS_STREAM_4849112265421171183', 'columnNames': ['CARD_NUMBER', 'AMOUNT', 'TIMESTAMP', 'LOCATION', 'MERCHANT', 'CURRENCY', 'DEVICE_TYPE'], 'columnTypes': ['STRING', 'DECIMAL(10, 2)', 'STRING', 'STRUCT<`CITY` STRING, `STATE` STRING>', 'STRING', 'STRING', 'STRING']}
['2266742705742379', 508.57, '2022-07-10T13:03:27', {'CITY': 'Edwardstad', 'STATE': 'WY'}, 'Reeves, Rodriguez and Osborn', 'TMT', 'mobile']
['2708393123493454', 273.83, '1985-04-17T08:42:19', {'CITY': 'Lake William', 'STATE': 'CT'}, 'Shields-Glover', 'XPF', 'mobile']
['676318660815', 443.2, '1998-07-08T18:10:24', {'CITY': 'Lake Danielhaven', 'STATE': 'GA'}, 'Kidd Ltd', 'JPY', 'desktop']
['4697135223427', 384.47, '1997-04-27T05:19:19', {'CITY': 'Rodriguezfurt', 'STATE': 'NJ'}, 'Ballard LLC', 'SRD', 'mobile']
['4952131940160552', 208.66, '2008-04-30T22:11:56', {'CITY': 'New Christine', 'STATE': 'NY'}, 'Bailey LLC', 'MUR', 'mobile']
['3584027325290360', 224.62, '1970-0

In [5]:
list

[{'queryId': 'transient_CREDIT_CARD_TRANSACTIONS_STREAM_4849112265421171183',
  'columnNames': ['CARD_NUMBER',
   'AMOUNT',
   'TIMESTAMP',
   'LOCATION',
   'MERCHANT',
   'CURRENCY',
   'DEVICE_TYPE'],
  'columnTypes': ['STRING',
   'DECIMAL(10, 2)',
   'STRING',
   'STRUCT<`CITY` STRING, `STATE` STRING>',
   'STRING',
   'STRING',
   'STRING']},
 ['2266742705742379',
  508.57,
  '2022-07-10T13:03:27',
  {'CITY': 'Edwardstad', 'STATE': 'WY'},
  'Reeves, Rodriguez and Osborn',
  'TMT',
  'mobile'],
 ['2708393123493454',
  273.83,
  '1985-04-17T08:42:19',
  {'CITY': 'Lake William', 'STATE': 'CT'},
  'Shields-Glover',
  'XPF',
  'mobile'],
 ['676318660815',
  443.2,
  '1998-07-08T18:10:24',
  {'CITY': 'Lake Danielhaven', 'STATE': 'GA'},
  'Kidd Ltd',
  'JPY',
  'desktop'],
 ['4697135223427',
  384.47,
  '1997-04-27T05:19:19',
  {'CITY': 'Rodriguezfurt', 'STATE': 'NJ'},
  'Ballard LLC',
  'SRD',
  'mobile'],
 ['4952131940160552',
  208.66,
  '2008-04-30T22:11:56',
  {'CITY': 'New Christi

In [6]:
list.pop(0)
list

[['2266742705742379',
  508.57,
  '2022-07-10T13:03:27',
  {'CITY': 'Edwardstad', 'STATE': 'WY'},
  'Reeves, Rodriguez and Osborn',
  'TMT',
  'mobile'],
 ['2708393123493454',
  273.83,
  '1985-04-17T08:42:19',
  {'CITY': 'Lake William', 'STATE': 'CT'},
  'Shields-Glover',
  'XPF',
  'mobile'],
 ['676318660815',
  443.2,
  '1998-07-08T18:10:24',
  {'CITY': 'Lake Danielhaven', 'STATE': 'GA'},
  'Kidd Ltd',
  'JPY',
  'desktop'],
 ['4697135223427',
  384.47,
  '1997-04-27T05:19:19',
  {'CITY': 'Rodriguezfurt', 'STATE': 'NJ'},
  'Ballard LLC',
  'SRD',
  'mobile'],
 ['4952131940160552',
  208.66,
  '2008-04-30T22:11:56',
  {'CITY': 'New Christine', 'STATE': 'NY'},
  'Bailey LLC',
  'MUR',
  'mobile'],
 ['3584027325290360',
  224.62,
  '1970-05-20T20:30:07',
  {'CITY': 'South Julieborough', 'STATE': 'IL'},
  'Mason, Bell and Arias',
  'WST',
  'mobile'],
 ['2294637994850717',
  795.19,
  '1982-01-18T23:15:57',
  {'CITY': 'Port Normaport', 'STATE': 'MI'},
  'Williams Ltd',
  'NOK',
  'mobil

In [7]:
data_dicts = []
for data_list in list:
    data_dict = {
        "card_number": data_list[0],
        "amount": data_list[1],
        "timestamp": data_list[2],
        "location": data_list[3],
        "merchant": data_list[4],
        "currency": data_list[5],
        "devicetype": data_list[6]
    }
    data_dicts.append(data_dict)

In [8]:
data_dicts

[{'card_number': '2266742705742379',
  'amount': 508.57,
  'timestamp': '2022-07-10T13:03:27',
  'location': {'CITY': 'Edwardstad', 'STATE': 'WY'},
  'merchant': 'Reeves, Rodriguez and Osborn',
  'currency': 'TMT',
  'devicetype': 'mobile'},
 {'card_number': '2708393123493454',
  'amount': 273.83,
  'timestamp': '1985-04-17T08:42:19',
  'location': {'CITY': 'Lake William', 'STATE': 'CT'},
  'merchant': 'Shields-Glover',
  'currency': 'XPF',
  'devicetype': 'mobile'},
 {'card_number': '676318660815',
  'amount': 443.2,
  'timestamp': '1998-07-08T18:10:24',
  'location': {'CITY': 'Lake Danielhaven', 'STATE': 'GA'},
  'merchant': 'Kidd Ltd',
  'currency': 'JPY',
  'devicetype': 'desktop'},
 {'card_number': '4697135223427',
  'amount': 384.47,
  'timestamp': '1997-04-27T05:19:19',
  'location': {'CITY': 'Rodriguezfurt', 'STATE': 'NJ'},
  'merchant': 'Ballard LLC',
  'currency': 'SRD',
  'devicetype': 'mobile'},
 {'card_number': '4952131940160552',
  'amount': 208.66,
  'timestamp': '2008-0