**INSTALL LIBRARIES**

In [44]:
! pip install confluent_kafka
! pip install polars
! pip install requests




[notice] A new release of pip is available: 24.2 -> 25.0
[notice] To update, run: python.exe -m pip install --upgrade pip





[notice] A new release of pip is available: 24.2 -> 25.0
[notice] To update, run: python.exe -m pip install --upgrade pip





[notice] A new release of pip is available: 24.2 -> 25.0
[notice] To update, run: python.exe -m pip install --upgrade pip


**DATABASE CONFIGURATION**

In [45]:
import psycopg2

db_config = {
    'dbname': 'mydatabase',
    'user': 'root',
    'password': 'root',
    'host': 'localhost',
    'port': 5432
}

connection = psycopg2.connect(**db_config)
cursor = connection.cursor()

create_table_query = """
CREATE TABLE IF NOT EXISTS stock_data (
    id SERIAL PRIMARY KEY,
    symbol VARCHAR(10) NOT NULL,
    timestamp TIMESTAMP(0) WITHOUT TIME ZONE NOT NULL,
    open DECIMAL(10,4),
    high DECIMAL(10,4),
    low DECIMAL(10,4),
    close DECIMAL(10,4),
    volume BIGINT,
    UNIQUE(symbol, timestamp)
);
"""
cursor.execute(create_table_query)
connection.commit()
print("Tabela criada com sucesso!")

cursor.close()
connection.close()


Tabela criada com sucesso!


**API REQUEST**

In [46]:
import requests
import json
import polars as pl
API_KEY = "42JIJGRH51L2GBTP" 
SYMBOL = "IBM"
INTERVAL = "30min"
url = f'https://www.alphavantage.co/query?function=TIME_SERIES_INTRADAY&symbol={SYMBOL}&interval={INTERVAL}&apikey={API_KEY}'

response = requests.get(url)
data = response.json()

time_series_key = f"Time Series ({INTERVAL})"
if time_series_key not in data:
    print("Error: Invalid response or requests limit exceeded.")
else:
    print("SUcess!")

records = []
for timestamp, values in data[time_series_key].items():
    records.append({
        "symbol": SYMBOL,
        "timestamp": timestamp,
        "open": float(values["1. open"]),
        "high": float(values["2. high"]),
        "low": float(values["3. low"]),
        "close": float(values["4. close"]),
        "volume": int(values["5. volume"])
    })

df = pl.DataFrame(records)
df = df.with_columns(
    pl.col("timestamp").str.to_datetime("%Y-%m-%d %H:%M:%S").dt.strftime("%Y-%m-%d %H:%M")
)
df.head()


SUcess!


symbol,timestamp,open,high,low,close,volume
str,str,f64,f64,f64,f64,i64
"""IBM""","""2025-01-28 19:30""",225.37,225.55,225.35,225.5,120
"""IBM""","""2025-01-28 19:00""",225.66,225.66,225.32,225.5,881926
"""IBM""","""2025-01-28 18:30""",225.66,225.66,225.316,225.55,881838
"""IBM""","""2025-01-28 18:00""",225.48,225.49,225.4,225.4,381
"""IBM""","""2025-01-28 17:30""",225.43,225.55,225.31,225.48,205


**SEND DATA TO KAFKA**

In [47]:
from confluent_kafka import Producer
import json

producer_config = {
    'bootstrap.servers': 'localhost:9092',
    'client.id': 'api-producer'
}
producer = Producer(producer_config)

topic_name = 'stock_data_topic'

for row in df.iter_rows(named=True):
    record = json.dumps(row) #here hes converte the dictionary to json
    producer.produce(topic_name, value=record)

producer.flush()
print("Send to Kafka!")


Send to Kafka!


**CONSUME OF KAFKA DATA AND ADD ON POSTGRE**

In [48]:
from confluent_kafka import Consumer
import psycopg2
import json

consumer_config = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'postgres-consumer',
    'auto.offset.reset': 'earliest'
}
consumer = Consumer(consumer_config)
consumer.subscribe(['stock_data_topic'])

connection = psycopg2.connect(**db_config)
cursor = connection.cursor()

try:
    while True:
        msg = consumer.poll(1.0)  #this says to wait 1 second for mensage 
        if msg is None:
            continue
        if msg.error():
            print(msg.error())
            continue

        record = json.loads(msg.value().decode('utf-8'))
        insert_query = """
        INSERT INTO stock_data (symbol, timestamp, open, high, low, close, volume)
        VALUES (%s, %s, %s, %s, %s, %s, %s)
        ON CONFLICT (symbol, timestamp) DO NOTHING;
        """
        cursor.execute(insert_query, (
            record["symbol"], record["timestamp"], record["open"],
            record["high"], record["low"], record["close"], record["volume"]
        ))
        connection.commit()
        print(f"Inserido: {record['timestamp']}")

except KeyboardInterrupt:
    print("Finishing consumer.")

finally:
    consumer.close()
    cursor.close()
    connection.close()

Inserido: 2025-01-28 19:30
Inserido: 2025-01-28 19:00
Inserido: 2025-01-28 18:30
Inserido: 2025-01-28 18:00
Inserido: 2025-01-28 17:30
Inserido: 2025-01-28 17:00
Inserido: 2025-01-28 16:30
Inserido: 2025-01-28 16:00
Inserido: 2025-01-28 15:30
Inserido: 2025-01-28 15:00
Inserido: 2025-01-28 14:30
Inserido: 2025-01-28 14:00
Inserido: 2025-01-28 13:30
Inserido: 2025-01-28 13:00
Inserido: 2025-01-28 12:30
Inserido: 2025-01-28 12:00
Inserido: 2025-01-28 11:30
Inserido: 2025-01-28 11:00
Inserido: 2025-01-28 10:30
Inserido: 2025-01-28 10:00
Inserido: 2025-01-28 09:30
Inserido: 2025-01-28 09:00
Inserido: 2025-01-28 08:30
Inserido: 2025-01-28 08:00
Inserido: 2025-01-28 07:30
Inserido: 2025-01-28 07:00
Inserido: 2025-01-28 06:30
Inserido: 2025-01-28 06:00
Inserido: 2025-01-28 05:30
Inserido: 2025-01-28 05:00
Inserido: 2025-01-28 04:30
Inserido: 2025-01-28 04:00
Inserido: 2025-01-27 19:30
Inserido: 2025-01-27 19:00
Inserido: 2025-01-27 18:30
Inserido: 2025-01-27 18:00
Inserido: 2025-01-27 17:30
I

**CONSULT DATA**

In [55]:
import psycopg2
import polars as pl

connection = psycopg2.connect(**db_config)
cursor = connection.cursor()

query = "SELECT * FROM stock_data ORDER BY timestamp LIMIT 10;"
cursor.execute(query)

rows = cursor.fetchall()

columns = [desc[0] for desc in cursor.description]

df = pl.DataFrame(rows, schema=columns)

print(df.head())

cursor.close()
connection.close()

shape: (5, 8)
┌─────┬────────┬──────────────┬──────────────┬──────────────┬──────────────┬──────────────┬────────┐
│ id  ┆ symbol ┆ timestamp    ┆ open         ┆ high         ┆ low          ┆ close        ┆ volume │
│ --- ┆ ---    ┆ ---          ┆ ---          ┆ ---          ┆ ---          ┆ ---          ┆ ---    │
│ i64 ┆ str    ┆ datetime[μs] ┆ decimal[*,4] ┆ decimal[*,4] ┆ decimal[*,4] ┆ decimal[*,4] ┆ i64    │
╞═════╪════════╪══════════════╪══════════════╪══════════════╪══════════════╪══════════════╪════════╡
│ 800 ┆ IBM    ┆ 2025-01-23   ┆ 225.7500     ┆ 225.9800     ┆ 225.7500     ┆ 225.9800     ┆ 53     │
│     ┆        ┆ 18:00:00     ┆              ┆              ┆              ┆              ┆        │
│ 799 ┆ IBM    ┆ 2025-01-23   ┆ 226.0400     ┆ 226.0400     ┆ 225.7700     ┆ 225.7700     ┆ 690546 │
│     ┆        ┆ 18:30:00     ┆              ┆              ┆              ┆              ┆        │
│ 798 ┆ IBM    ┆ 2025-01-23   ┆ 226.0400     ┆ 226.0400     ┆ 225.8300     ┆ 

  df = pl.DataFrame(rows, schema=columns)
