In [12]:
import json
import time
import random
import pandas as pd
import psycopg2

from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka.admin import AdminClient
from confluent_kafka import Producer, Consumer, KafkaError, TopicPartition

# Kafka

## Create Topic

In [2]:
admin_client = AdminClient({'bootstrap.servers': 'broker:9092'})

# Retrieve the list of topics
topic_metadata = admin_client.list_topics(timeout=10)
[topic for topic in topic_metadata.topics if not topic.startswith('_')]

[]

In [3]:
# create new topic
new_topics = [NewTopic('trades', num_partitions=3, replication_factor=1)]
fs = admin_client.create_topics(new_topics)
for topic, f in fs.items():
    try:
        f.result()
        print("Topic {} created".format(topic))
    except Exception as e:
        print("Failed to create topic {}: {}".format(topic, e))

Topic trades created


In [4]:
topics = admin_client.list_topics().topics
for topic, topic_info in topics.items():
    if not topic.startswith('_'):
        print(f"Topic: {topic}, Partitions: {len(topic_info.partitions)}")
        for p_id, p_info in topic_info.partitions.items():
            print(f"  Partition: {p_id}, Leader: {p_info.leader}, Replicas: {p_info.replicas}")

Topic: trades, Partitions: 3
  Partition: 0, Leader: 1, Replicas: [1]
  Partition: 1, Leader: 1, Replicas: [1]
  Partition: 2, Leader: 1, Replicas: [1]


## Producer

In [15]:
trades = [
    {
        'e': 'trade',
        'E': 1713991342890,
        's': 'BTCUSDT',
        't': 97236,
        'p': '0.006390',
        'q': '2551',
        'b': 69,
        'a': 607,
        'T': 1713991342890,
        'm': True,
        'M': True
    },
    {
        'e': 'trade',
        'E': 1713991378161,
        's': 'BNBUSDT',
        't': 77422,
        'p': '0.008396',
        'q': '2831',
        'b': 307,
        'a': 539,
        'T': 1713991378161,
        'm': False,
        'M': True
    },
    {
        'e': 'trade',
        'E': 1713991429451,
        's': 'BNBBTC',
        't': 43014,
        'p': '0.004122',
        'q': '3493',
        'b': 123,
        'a': 664,
        'T': 1713991429451,
        'm': True,
        'M': True
    },
]

In [16]:
def acked(err, msg):
    """
    Callback to handle message delivery results.

    Parameters:
    err: Error information if the message delivery failed.
    msg: The message that was attempted to be sent.
    """
    if err is not None:
        print(f"Failed to deliver message: {msg}: {err}")
    else:
        print(f"Message produced: {msg.topic()} {msg.partition()} {msg.key()}")

In [17]:
producer = Producer({'bootstrap.servers': 'broker:9092'})
for trade in trades:
    producer.produce('trades', key=trade['s'], value=json.dumps(trade), callback=acked)
    
# Wait for any outstanding messages to be delivered
producer.flush()

Message produced: trades 1 b'BNBBTC'
Message produced: trades 2 b'BNBUSDT'
Message produced: trades 0 b'BTCUSDT'


0

## Consumer

In [18]:
def consume_messages(topic_name, consumer_config, timeout_duration=10):
    """
    Consumes messages from a specified Kafka topic.

    Parameters:
    topic_name (str): The name of the Kafka topic to subscribe to.
    consumer_config (dict): Configuration settings for the Kafka consumer.
    timeout_duration (int): Duration in seconds to consume messages before stopping.

    """
    # Initialize the consumer
    consumer = Consumer(consumer_config)
    
    # Subscribe to the topic
    consumer.subscribe([topic_name])

    try:
        start_time = time.time()
        while True:
            if time.time() - start_time > timeout_duration:  # Break the loop after the specified duration
                break
            
            msg = consumer.poll(timeout=1.0)  # Poll for a message (timeout in seconds)
            
            if msg is None:
                print("No message available within the timeout period")
                continue  # No message available within the timeout period
                
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    print(f'Reached end of {msg.partition()} at offset {msg.offset()}')
                else:
                    print(f'Error: {msg.error()}')
                continue

            # Access and print message details
            print(f'Message: {msg.value().decode("utf-8")}')
            print(f'Partition: {msg.partition()}')
            print(f'Offset: {msg.offset()}')
            
    finally:
        consumer.close()

In [19]:
# Configure the consumer
consumer_config = {
    'bootstrap.servers': 'broker:9092',
    'client.id': 'consumer1',
    'group.id': 'group1',
    'auto.offset.reset': 'earliest'
}
consume_messages('trades', consumer_config, timeout_duration=5)

Message: {"e": "trade", "E": 1713991342890, "s": "BTCUSDT", "t": 97236, "p": "0.006390", "q": "2551", "b": 69, "a": 607, "T": 1713991342890, "m": true, "M": true}
Partition: 0
Offset: 1
Message: {"e": "trade", "E": 1713991429451, "s": "BNBBTC", "t": 43014, "p": "0.004122", "q": "3493", "b": 123, "a": 664, "T": 1713991429451, "m": true, "M": true}
Partition: 1
Offset: 1
Message: {"e": "trade", "E": 1713991378161, "s": "BNBUSDT", "t": 77422, "p": "0.008396", "q": "2831", "b": 307, "a": 539, "T": 1713991378161, "m": false, "M": true}
Partition: 2
Offset: 1
No message available within the timeout period
No message available within the timeout period
No message available within the timeout period
No message available within the timeout period
No message available within the timeout period


In [20]:
# Configure the consumer
consumer_config = {
    'bootstrap.servers': 'broker:9092',
    'client.id': 'consumer2',
    'group.id': 'group1',
    'auto.offset.reset': 'earliest'
}
consume_messages('trades', consumer_config, timeout_duration=5)

No message available within the timeout period
No message available within the timeout period
No message available within the timeout period
No message available within the timeout period
No message available within the timeout period


In [21]:
# Configure the consumer
consumer_config = {
    'bootstrap.servers': 'broker:9092',
    'client.id': 'consumer2-1',
    'group.id': 'group2',
    'auto.offset.reset': 'earliest'
}
consume_messages('trades', consumer_config, timeout_duration=5)

Message: {"e": "trade", "E": 1713991378161, "s": "BNBUSDT", "t": 77422, "p": "0.008396", "q": "2831", "b": 307, "a": 539, "T": 1713991378161, "m": false, "M": true}
Partition: 2
Offset: 0
Message: {"e": "trade", "E": 1713991378161, "s": "BNBUSDT", "t": 77422, "p": "0.008396", "q": "2831", "b": 307, "a": 539, "T": 1713991378161, "m": false, "M": true}
Partition: 2
Offset: 1
Message: {"e": "trade", "E": 1713991429451, "s": "BNBBTC", "t": 43014, "p": "0.004122", "q": "3493", "b": 123, "a": 664, "T": 1713991429451, "m": true, "M": true}
Partition: 1
Offset: 0
Message: {"e": "trade", "E": 1713991429451, "s": "BNBBTC", "t": 43014, "p": "0.004122", "q": "3493", "b": 123, "a": 664, "T": 1713991429451, "m": true, "M": true}
Partition: 1
Offset: 1
Message: {"e": "trade", "E": 1713991342890, "s": "BTCUSDT", "t": 97236, "p": "0.006390", "q": "2551", "b": 69, "a": 607, "T": 1713991342890, "m": true, "M": true}
Partition: 0
Offset: 0
Message: {"e": "trade", "E": 1713991342890, "s": "BTCUSDT", "t": 9

# PostgreSQL database

## Create database

In [23]:
def create_database(conn_params, dbname):
    # Modify the connection parameters dictionary to connect to the default database (postgres)
    # conn_params_default = conn_params.copy()
    # conn_params_default['dbname'] = 'postgres'
    
    # Connect to the default database (postgres) to issue commands
    conn = psycopg2.connect(**conn_params_default)
    # Set the connection to autocommit mode
    conn.autocommit = True
    cur = conn.cursor()

    # Check if the database already exists
    cur.execute("SELECT 1 FROM pg_database WHERE datname=%s", (dbname,))
    exists = cur.fetchone()

    if exists:
        print(f"Database '{dbname}' already exists.")
    else:
        # Execute the command to create a new database
        try:
            cur.execute(f"CREATE DATABASE {dbname};")  # Using f-string for database name in SQL statement
            print(f"Database '{dbname}' created successfully.")
        except psycopg2.Error as e:
            print(f"An error occurred: {e}")

    cur.close()
    conn.close()

In [24]:
conn_params = {
    "user": "postgres",
    "password": "postgres",
    "host": "pgdatabase",
    "port": "5432"
}

create_database(conn_params, "trades")

Database 'trades' created successfully.


## Create table

In [9]:
conn_params = {
    "dbname": "trades",
    "user": "postgres",
    "password": "postgres",
    "host": "pgdatabase",
    "port": "5432"
}

In [13]:
def create_table(conn_params, table_name):
    """
    Creates a table in the PostgreSQL database with a predefined schema.
    
    Parameters:
    conn_params (dict): A dictionary containing the connection parameters.
    table_name (str): The name of the table to create.
    """
    try:
        # Establish the connection using the connection parameters
        with psycopg2.connect(**conn_params) as conn:
            with conn.cursor() as cur:
                # SQL command to create a table
                create_table_command = f"""
                    CREATE TABLE IF NOT EXISTS {table_name} (
                        event_type TEXT,
                        event_time BIGINT,
                        symbol TEXT,
                        trade_id BIGINT,
                        price NUMERIC,
                        quantity NUMERIC,
                        buyer_order_id BIGINT,
                        seller_order_id BIGINT,
                        trade_time BIGINT,
                        is_buyer_maker BOOLEAN,
                        ignore_in_price BOOLEAN
                    );
                """
                # Execute the create table command
                cur.execute(create_table_command)
                # Commit the transaction
                conn.commit()
                print(f"Table '{table_name}' created successfully")
    
    except psycopg2.Error as e:
        # Handle exceptions that occur during the creation of the table
        print(f"Failed to create table: {e}")

In [14]:
create_table(conn_params, 'binance')

Table 'binance' created successfully


## Insert sample data into the table

In [15]:
# Data to be inserted
data = {
    'e': 'trade',
    'E': 1714015066671,
    's': 'BNBBTC',
    't': 74686,
    'p': '0.006215',
    'q': '2112',
    'b': 88,
    'a': 544,
    'T': 1714015066671,
    'm': False,
    'M': True
}

# SQL query to insert data
insert_query = """
    INSERT INTO binance 
        (
            event_type, event_time, symbol, trade_id, price, quantity, buyer_order_id, 
            seller_order_id, trade_time, is_buyer_maker, ignore_in_price
        )
    VALUES (%(e)s, %(E)s, %(s)s, %(t)s, %(p)s, %(q)s, %(b)s, %(a)s, %(T)s, %(m)s, %(M)s);
"""

with psycopg2.connect(**conn_params) as conn:
    with conn.cursor() as cur:
        cur.execute(insert_query, data)
        print('Record inserted')

Record inserted


## Kafka consumer reads data from the Kafka stream and insert it into the table

In [16]:
consumer_config = {
    'bootstrap.servers': 'broker:9092',
    'group.id': 'group_db',
    'auto.offset.reset': 'earliest'
}

# Initialize the consumer
consumer = Consumer(consumer_config)

# Subscribe to the topic
topic_name = 'trades'
consumer.subscribe([topic_name])

try:
    start_time = time.time()
    
    with psycopg2.connect(**conn_params) as conn:
        with conn.cursor() as cursor:
            while True:
                
                if time.time() - start_time > 10:  # Break the loop after 10 seconds
                    break
                    
                msg = consumer.poll(timeout=1.0)  # Poll for a message (timeout in seconds)
                
                if msg is None:
                    continue  # No message available within the timeout period
                    
                if msg.error():
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        # End of partition event
                        print(f'Reached end of {msg.partition()} at offset {msg.offset()}')
                    else:
                        print(f'Error: {msg.error()}')
                    continue
        
                # Access and print message details
                message_data = json.loads(msg.value().decode('utf-8'))
                print(f'Received message: {message_data} from partition {msg.partition()}')
                cursor.execute(insert_query, message_data)
                print('Record inserted')
                conn.commit()

finally:
    consumer.close()

## Explore data from PostgresSQL database

In [17]:
%load_ext sql
%sql postgresql://postgres:postgres@pgdatabase

The sql extension is already loaded. To reload it, use:
  %reload_ext sql


In [19]:
%%sql
SELECT datname FROM pg_database;

 * postgresql://postgres:***@pgdatabase
   postgresql://postgres:***@pgdatabase/postgres
   postgresql://postgres:***@pgdatabase/trades
4 rows affected.


datname
postgres
trades
template1
template0


In [22]:
%sql postgresql://postgres:postgres@pgdatabase/trades

In [23]:
%%sql
SELECT table_schema, table_name
    FROM information_schema.tables
    WHERE table_type = 'BASE TABLE' AND
    table_schema NOT IN ('pg_catalog', 'information_schema', 'priv');

   postgresql://postgres:***@pgdatabase
   postgresql://postgres:***@pgdatabase/postgres
 * postgresql://postgres:***@pgdatabase/trades
1 rows affected.


table_schema,table_name
public,binance


In [24]:
%%sql
-- Print the schema of this table.
SELECT column_name, data_type FROM Information_schema.Columns
    WHERE table_name = 'binance';

   postgresql://postgres:***@pgdatabase
   postgresql://postgres:***@pgdatabase/postgres
 * postgresql://postgres:***@pgdatabase/trades
11 rows affected.


column_name,data_type
ignore_in_price,boolean
event_time,bigint
trade_time,bigint
is_buyer_maker,boolean
trade_id,bigint
price,numeric
quantity,numeric
buyer_order_id,bigint
seller_order_id,bigint
symbol,text


In [25]:
%%sql
SELECT count(1) FROM binance;

   postgresql://postgres:***@pgdatabase
   postgresql://postgres:***@pgdatabase/postgres
 * postgresql://postgres:***@pgdatabase/trades
1 rows affected.


count
1


In [26]:
%%sql 
    SELECT 
        event_type,
        event_time,
        symbol,
        trade_id,
        price,
        quantity,
        buyer_order_id,
        seller_order_id,
        trade_time,
        is_buyer_maker,
        ignore_in_price 
    FROM binance;

   postgresql://postgres:***@pgdatabase
   postgresql://postgres:***@pgdatabase/postgres
 * postgresql://postgres:***@pgdatabase/trades
1 rows affected.


event_type,event_time,symbol,trade_id,price,quantity,buyer_order_id,seller_order_id,trade_time,is_buyer_maker,ignore_in_price
trade,1714015066671,BNBBTC,74686,0.006215,2112,88,544,1714015066671,False,True
