# Send Transactions To Kafka

This notebook will read the `./transactions.csv` file to read transactions, and will send the events to Apache Kafka. Data will be then processed by Kafka Connect and will eventually end up on a QuestDB table.

We first create the QuestDB table. It would automatically be created if it didn't exist in any case, but this way we can see the schema.

In [1]:
#ignore deprecation warnings in this demo
import warnings
warnings.simplefilter("ignore", category=DeprecationWarning)

In [None]:
import psycopg as pg
import os

# Fetch environment variables with defaults
host = os.getenv('QDB_CLIENT_HOST', 'questdb')
port = os.getenv('QDB_CLIENT_PORT', '8812')
user = os.getenv('QDB_CLIENT_USER', 'admin')
password = os.getenv('QDB_CLIENT_PASSWORD', 'quest')

# Create the connection string using the environment variables or defaults
conn_str = f'user={user} password={password} host={host} port={port} dbname=qdb'

with pg.connect(conn_str, autocommit=True) as connection:
    with connection.cursor() as cur:
        cur.execute(
        """
        CREATE TABLE  IF NOT EXISTS 'transactions' (
        timestamp TIMESTAMP,
  merchant SYMBOL capacity 5000 CACHE,
  category SYMBOL capacity 256 CACHE,
  amt DOUBLE,
  gender SYMBOL capacity 256 CACHE,
  city SYMBOL capacity 2000 CACHE,
  state SYMBOL capacity 256 CACHE,
  first VARCHAR,
  last VARCHAR,
  street VARCHAR,
  job VARCHAR,
  trans_num VARCHAR,
  cc_num LONG,
  zip LONG,
  city_pop LONG,
  dob LONG,
  lat DOUBLE,
  lon DOUBLE,
  merch_lat DOUBLE,
  merch_long DOUBLE
) timestamp (timestamp) PARTITION BY DAY WAL DEDUP UPSERT KEYS(timestamp, trans_num);
""")



## Sending the data to Kafka

Now we read the `./transactions.csv` file and we convert every row to Avro binary format before we send to Kafka into a topic named `transactions`.

By default, the script will override the original date with the current date and
 will wait 50ms between events before sending to Kafka, to simulate a real time stream and provide
a nicer visualization. You can override those configurations by changing the constants in the script. 

This script will keep sending data until you click stop or exit the notebook, or until the end of the file is reached.

While the script is running, you can check the data in the table directly at QuestDB's web console at http://localhost:9000 


In [None]:
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
import csv
from datetime import datetime
import time
from concurrent.futures import ThreadPoolExecutor

TOTAL_EVENTS = 2000000  # Total number of events to produce
NUM_SENDERS = 5  # Number of senders to execute in parallel
KAFKA_BROKER = 'broker:29092,broker-2:29092'
KAFKA_TOPIC = 'transactions'
CSV_FILE = './transactions.csv'
SCHEMA_REGISTRY = 'http://schema_registry:8081'
VERBOSE = False
DELAY_MS = 50  # Delay between events in milliseconds
TIMESTAMP_FROM_FILE = False  # Set to False to use current system time instead

def get_delivery_report_func(verbose):
    def delivery_report(err, msg):
        if verbose:
            if err is not None:
                print(f'Message delivery failed: {err}')
            else:
                print(f'Message delivered to {msg.topic()} [{msg.partition()}]')
    return delivery_report

def send(sender_id, csv_rows, total_events):
    value_schema_str = """
    {
      "type": "record",
      "name": "Transaction",
      "fields": [
        {"name": "timestamp", "type": {"type": "long", "logicalType": "timestamp-micros"}},
        {"name": "cc_num", "type": "long"},
        {"name": "merchant", "type": "string"},
        {"name": "category", "type": "string"},
        {"name": "amt", "type": "double"},
        {"name": "first", "type": "string"},
        {"name": "last", "type": "string"},
        {"name": "gender", "type": "string"},
        {"name": "street", "type": "string"},
        {"name": "city", "type": "string"},
        {"name": "state", "type": "string"},
        {"name": "zip", "type": "int"},
        {"name": "lat", "type": "double"},
        {"name": "lon", "type": "double"},
        {"name": "city_pop", "type": "int"},
        {"name": "job", "type": "string"},
        {"name": "dob", "type": {"type": "long", "logicalType": "timestamp-micros"}},
        {"name": "trans_num", "type": "string"},
        {"name": "merch_lat", "type": "double"},
        {"name": "merch_long", "type": "double"}
      ]
    }
    """

    value_schema = avro.loads(value_schema_str)

    avro_producer = AvroProducer({
        'bootstrap.servers': KAFKA_BROKER,
        'schema.registry.url': SCHEMA_REGISTRY,
        'linger.ms': '0',  # Send messages immediately
        'batch.size': '8388608',  # Adjust based on your needs
        'compression.type': 'snappy',  # Enable compression
        'queue.buffering.max.messages': '1000000',  # Increase as needed
        'queue.buffering.max.kbytes': '1048576',    # 1 GB
        'acks': '0',  # '0' for no acks (fastest), '1' for leader ack, 'all' for all replicas
    }, default_value_schema=value_schema)

    delivery_report_func = get_delivery_report_func(VERBOSE)

    events_sent = 0  # Counter to track how many events have been sent
    message_count = 0  # Counter for polling

    while events_sent < total_events:
        for row in csv_rows:
            if events_sent >= total_events:
                break

            # Exclude the unnamed index column, 'unix_time', and 'is_fraud' columns
            data = {key: row[key] for key in row if key not in ['', 'unix_time', 'is_fraud']}

            # Rename 'trans_date_trans_time' to 'timestamp'
            data['timestamp'] = data.pop('trans_date_trans_time')

            # Handle timestamp
            if TIMESTAMP_FROM_FILE:
                # Parse the timestamp from the file
                try:
                    trans_datetime = datetime.strptime(data['timestamp'], '%Y-%m-%d %H:%M:%S')
                    trans_date_micros = int(trans_datetime.timestamp() * 1e6)
                except ValueError as e:
                    print(f"Error parsing 'timestamp' for row {row}: {e}")
                    continue
            else:
                # Use the current system time
                trans_date_micros = int(time.time() * 1e6)

            # Parse 'dob' date
            try:
                dob_datetime = datetime.strptime(data['dob'], '%Y-%m-%d')
                dob_micros = int(dob_datetime.timestamp() * 1e6)
            except ValueError as e:
                print(f"Error parsing 'dob' for row {row}: {e}")
                continue

            # Convert data types
            try:
                value = {
                    "timestamp": trans_date_micros,
                    "cc_num": int(data['cc_num']),
                    "merchant": data['merchant'],
                    "category": data['category'],
                    "amt": float(data['amt']),
                    "first": data['first'],
                    "last": data['last'],
                    "gender": data['gender'],
                    "street": data['street'],
                    "city": data['city'],
                    "state": data['state'],
                    "zip": int(data['zip']),
                    "lat": float(data['lat']),
                    "lon": float(data['long']),
                    "city_pop": int(data['city_pop']),
                    "job": data['job'],
                    "dob": dob_micros,
                    "trans_num": data['trans_num'],
                    "merch_lat": float(data['merch_lat']),
                    "merch_long": float(data['merch_long'])
                }
            except ValueError as e:
                print(f"Error converting data types for row {row}: {e}")
                continue

            avro_producer.produce(
                topic=KAFKA_TOPIC,
                value=value,
                on_delivery=delivery_report_func
            )
            events_sent += 1
            message_count += 1

            # Call poll periodically
            if message_count % 1000 == 0:
                avro_producer.poll(0)

            # Delay between events if needed
            if DELAY_MS > 0:
                time.sleep(DELAY_MS / 1000.0)  # Convert milliseconds to seconds

    avro_producer.flush()
    print(f"Sender {sender_id} - Finished sending {events_sent} events.")

def parallel_send(total_events, num_senders):
    events_per_sender = total_events // num_senders
    remaining_events = total_events % num_senders

    sender_events = [events_per_sender] * num_senders
    for i in range(remaining_events):
        sender_events[i] += 1

    # Read CSV once in the main thread
    with open(CSV_FILE, mode='r') as file:
        csv_reader = csv.DictReader(file)
        csv_rows = list(csv_reader)

    with ThreadPoolExecutor(max_workers=num_senders) as executor:
        futures = []
        for sender_id in range(num_senders):
            future = executor.submit(send, sender_id, csv_rows, sender_events[sender_id])
            futures.append(future)
        for future in futures:
            future.result()

if __name__ == '__main__':
    print(f'Ingestion started.\n')
    parallel_send(TOTAL_EVENTS, NUM_SENDERS)


## Verify we have ingested some data

The data you send to Kafka will be processed by Kafka Connect and passed to QuestDB, where it will be stored into a table named `transactions`. Let's check we can actually see some data

In [2]:
import requests

host = 'http://questdb:9000'

sql_query = 'SELECT * FROM transactions LIMIT -5;'

try:
    response = requests.get(
        host + '/exec',
        params={'query': sql_query}).json()
    for row in response['dataset']:
        print(row)
except requests.exceptions.RequestException as e:
    print(f'Error: {e}')

['2024-10-08T16:55:32.671067Z', 'fraud_Weimann, Kuhic and Beahan', 'shopping_pos', 3.26, 'M', 'Moulton', 'IA', 'Jeffrey', 'Rice', '21447 Powell Circle', 'Probation officer', 'c8ab19c91b9289c49201db83b682a1bb', 2305336922781618, 52572, 1132, -280195200000000, 40.6866, -92.6833, 41.101245, -91.939675]
['2024-10-08T16:55:32.953496Z', 'fraud_Kiehn-Emmerich', 'grocery_pos', 129.41, 'M', 'Greenbush', 'VA', 'Richard', 'Marshall', '295 Page Creek Suite 181', 'Psychiatrist', 'a873064960bd1668fb6377dec84318b7', 4412720572684931, 23357, 776, -635040000000000, 37.7681, -75.6664, 37.880469, -75.61632]
['2024-10-08T16:55:33.278427Z', 'fraud_Zemlak Group', 'misc_net', 15.45, 'M', 'Wilmette', 'IL', 'Cristian', 'Jones', '0423 Kirby Field Suite 623', 'Trade mark attorney', '0f8534b474efdf70b40b9448664ecaa0', 573860193545, 60091, 27020, 522460800000000, 42.0765, -87.7246, 42.151554, -87.020095]
['2024-10-08T16:55:33.643397Z', 'fraud_Schmeler, Bashirian and Price', 'shopping_net', 578.3, 'F', 'Albany', 'N