In [20]:
# Instalação das dependências necessárias para Kafka, Spark e coleta de dados

!pip install -q pyspark==3.5.0 kafka-python yfinance findspark


In [21]:
# Configuração de variáveis e inicialização o SparkSession

import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("CurrencyStreamingNotebook") \
    .master("local[*]") \
    .config(
        "spark.jars.packages",
        "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0"
    ) \
    .getOrCreate()

spark


In [22]:
# Definição o schema do evento de cotação de moedas

from pyspark.sql.types import *

currency_schema = StructType([
    StructField("ticker", StringType()),
    StructField("date", StringType()),
    StructField("open", DoubleType()),
    StructField("high", DoubleType()),
    StructField("low", DoubleType()),
    StructField("close", DoubleType()),
    StructField("volume", IntegerType()),
    StructField("source", StringType()),
    StructField("ingested_at", TimestampType())
])


In [23]:
# Coleta das cotações de moedas usando Yahoo Finance

import yfinance as yf
from datetime import datetime
from dateutil.relativedelta import relativedelta

tickers = ['USDBRL=X', 'EURBRL=X', 'GBPBRL=X', 'JPYBRL=X']
end = datetime.now().date()
start = end - relativedelta(days=1)

df_raw = yf.download(
    tickers,
    start=start,
    end=end,
    group_by="ticker",
    threads=False,
    auto_adjust=True
)

df_raw


[*********************100%***********************]  4 of 4 completed


Ticker,USDBRL=X,USDBRL=X,USDBRL=X,USDBRL=X,USDBRL=X,GBPBRL=X,GBPBRL=X,GBPBRL=X,GBPBRL=X,GBPBRL=X,JPYBRL=X,JPYBRL=X,JPYBRL=X,JPYBRL=X,JPYBRL=X,EURBRL=X,EURBRL=X,EURBRL=X,EURBRL=X,EURBRL=X
Price,Open,High,Low,Close,Volume,Open,High,Low,Close,Volume,Open,High,Low,Close,Volume,Open,High,Low,Close,Volume
Date,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2,Unnamed: 7_level_2,Unnamed: 8_level_2,Unnamed: 9_level_2,Unnamed: 10_level_2,Unnamed: 11_level_2,Unnamed: 12_level_2,Unnamed: 13_level_2,Unnamed: 14_level_2,Unnamed: 15_level_2,Unnamed: 16_level_2,Unnamed: 17_level_2,Unnamed: 18_level_2,Unnamed: 19_level_2,Unnamed: 20_level_2
2026-01-29,5.1954,5.2459,5.169,5.1971,0,7.173601,7.217705,7.142857,7.178751,0,0.033941,0.034311,0.033729,0.033939,0,6.21504,6.2662,6.1795,6.218905,0


In [24]:
# Normalização dos dados e preparação de eventos para envio ao Kafka

import pandas as pd

# O stack cria um MultiIndex. Resetamos para ter colunas normais.
df = df_raw.stack(level=0).reset_index()

# Padronizando nomes para evitar erros na Célula 6
df.columns = [c.lower() for c in df.columns] 
df["date"] = df["date"].astype(str)
df = df.dropna()

print("Colunas encontradas:", df.columns.tolist())
df.head()


Colunas encontradas: ['date', 'ticker', 'open', 'high', 'low', 'close', 'volume']


Unnamed: 0,date,ticker,open,high,low,close,volume
0,2026-01-29,USDBRL=X,5.1954,5.2459,5.169,5.1971,0
1,2026-01-29,GBPBRL=X,7.173601,7.217705,7.142857,7.178751,0
2,2026-01-29,JPYBRL=X,0.033941,0.034311,0.033729,0.033939,0
3,2026-01-29,EURBRL=X,6.21504,6.2662,6.1795,6.218905,0


In [25]:
# Produção dos eventos no tópico Kafka

from kafka import KafkaProducer
import json
from datetime import datetime, timezone  

# Definição do Tópico 
TOPIC = "currency-quotes"

producer = KafkaProducer(
    bootstrap_servers="localhost:9092",
    value_serializer=lambda v: json.dumps(v).encode("utf-8")
)

for _, row in df.iterrows():
    event = {
        "ticker": str(row["ticker"]),
        "date": str(row["date"]),
        "open": float(row["open"]),
        "high": float(row["high"]),
        "low": float(row["low"]),
        "close": float(row["close"]),
        "volume": int(row["volume"]),
        "source": "yahoo_finance",
        "ingested_at": datetime.now(timezone.utc).isoformat()
    }
    producer.send(TOPIC, event)

producer.flush()
print(f"Mensagens enviadas para o tópico {TOPIC}!")


Mensagens enviadas para o tópico currency-quotes!


                                                                                

In [26]:
# Consumo dos dados do Kafka usando Spark Structured Streaming

from pyspark.sql.functions import from_json, col

kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", TOPIC) \
    .option("startingOffsets", "earliest") \
    .load()

parsed_df = kafka_df.select(
    from_json(col("value").cast("string"), currency_schema).alias("data")
).select("data.*")

parsed_df


DataFrame[ticker: string, date: string, open: double, high: double, low: double, close: double, volume: int, source: string, ingested_at: timestamp]

In [27]:
# "Persistência dos dados processados em formato Parquet via Streaming e validação da leitura dos arquivos gerados."
import time

# Inicia o streaming
query = parsed_df.writeStream \
    .format("parquet") \
    .option("path", "data/parquet/currency") \
    .option("checkpointLocation", "data/checkpoints/currency") \
    .outputMode("append") \
    .start()

# Aguarda 10 segundos para o Spark processar os arquivos físicos
print("Aguardando processamento do streaming...")
time.sleep(10)

# Validação
try:
    df_parquet = spark.read.parquet("data/parquet/currency")
    print(f"Registros processados: {df_parquet.count()}")
    df_parquet.show()
except:
    print("Os arquivos ainda não foram criados. Aguarde mais um pouco.")

26/01/30 14:47:15 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
26/01/30 14:47:15 WARN StreamingQueryManager: Stopping existing streaming query [id=bc85531b-060b-4367-b6db-178dceb7fff8, runId=7e86b17e-7576-4feb-b946-8f22ebd095cd], as a new run is being started.
26/01/30 14:47:15 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


Aguardando processamento do streaming...
Registros processados: 10
+--------+----------+-------------------+--------------------+-------------------+--------------------+------+-------------+--------------------+
|  ticker|      date|               open|                high|                low|               close|volume|       source|         ingested_at|
+--------+----------+-------------------+--------------------+-------------------+--------------------+------+-------------+--------------------+
|USDBRL=X|2026-01-29|  5.195400238037109|  5.2459001541137695|  5.169000148773193|  5.1971001625061035|     0|yahoo_finance|2026-01-30 17:01:...|
|EURBRL=X|2026-01-29|   6.21504020690918|   6.266200065612793|  6.179500102996826|   6.218904972076416|     0|yahoo_finance|2026-01-30 17:01:...|
|USDBRL=X|2026-01-29|  5.195400238037109|  5.2459001541137695|  5.169000148773193|  5.1971001625061035|     0|yahoo_finance|2026-01-30 14:02:...|
|EURBRL=X|2026-01-29|   6.21504020690918|   6.26620006561