# B3 Stock Data ETL

## Part 1: Get data for a subset of B3 stocks

In [0]:
%sh
pip install -q -U pip
pip install -q -U -r requirements.txt

In [0]:
from delta import tables
import pandas as pd
from pyspark.sql import types
import yfinance as yf

In [0]:
# Define the tickers.
TICKERS = [ticker.strip() for ticker in dbutils.widgets.get("tickers").split(",")]

# Define columns for price and volume data.
PRICES_AND_VOLUME_COLUMNS = ["Open", "High", "Low", "Close", "Volume"]

# Define complete column structure for the final DataFrame.
ALL_COLUMNS = ["Ticker", "Date"]
ALL_COLUMNS.extend(PRICES_AND_VOLUME_COLUMNS)

# Define the Delta table name.
DELTA_TABLE_NAME = dbutils.widgets.get("target-table")

In [0]:
# Format tickers to include ".SA" suffix for Yahoo Finance API.
formatted_tickers = [
    f"{ticker}.SA" if not ticker.endswith(".SA") else ticker for ticker in TICKERS
]

# Fetch historical market data for all tickers from Yahoo Finance.
df = yf.download(formatted_tickers, period="max", group_by="ticker")

In [0]:
# Initialize list to store processed data for each ticker.
dfs = []

# Process data for each ticker individually.
# The resulting DataFrame will contain all tickers' data in a flat table.
for ticker in formatted_tickers:
    # Create a copy of ticker data to prevent modifications to original DataFrame.
    ticker_data = df[ticker].copy()

    # Convert date index to column.
    ticker_data.reset_index(inplace=True)

    # Add ticker identification column (without ".SA" suffix).
    ticker_data["Ticker"] = ticker.replace(".SA", "")

    # Organize columns according to defined structure.
    ticker_data = ticker_data[ALL_COLUMNS]

    # Store processed ticker data.
    dfs.append(ticker_data)

all_ticker_data = pd.concat(dfs, ignore_index=True)

In [0]:
# Clean data by removing rows with missing values.
all_ticker_data.dropna(subset=PRICES_AND_VOLUME_COLUMNS, inplace=True)

# Clean axis 1's name.
all_ticker_data.rename_axis(None, axis=1, inplace=True)

# Reset the DataFrame index.
all_ticker_data.reset_index(drop=True, inplace=True)

## Part 2: Persist records into the lakehouse

In [0]:
# Define a schema with column names and types for the temporary Spark DataFrame.
tmp_df_schema = types.StructType(
    [
        types.StructField("ticker", types.StringType()),
        types.StructField("date", types.DateType()),
        types.StructField("open", types.DoubleType()),
        types.StructField("high", types.DoubleType()),
        types.StructField("low", types.DoubleType()),
        types.StructField("close", types.DoubleType()),
        types.StructField("volume", types.DoubleType()),
    ]
)

spark_df = spark.createDataFrame(all_ticker_data, schema=tmp_df_schema)

In [0]:
# Define the target table schema with column names and types.
table_schema = types.StructType(
    [
        types.StructField("ticker", types.StringType()),
        types.StructField("date", types.DateType()),
        types.StructField("open", types.DecimalType(10, 3)),
        types.StructField("high", types.DecimalType(10, 3)),
        types.StructField("low", types.DecimalType(10, 3)),
        types.StructField("close", types.DecimalType(10, 3)),
        types.StructField("volume", types.DecimalType(18, 3)),
    ]
)

spark.createDataFrame([], table_schema)\
    .write\
    .format("delta")\
    .mode("ignore")\
    .option("overwriteSchema", "true"
).saveAsTable(DELTA_TABLE_NAME)

In [0]:
# Load the Delta table.
delta_table = tables.DeltaTable.forName(spark, DELTA_TABLE_NAME)

# Perform the merge.
delta_table.alias("t").merge(
    spark_df.alias("s"),
    "t.ticker = s.ticker AND t.date = s.date"
).whenMatchedUpdateAll()\
    .whenNotMatchedInsertAll()\
    .execute()