In [0]:
# SETUP DO NOTEBOOK

%pip install openpyxl
%pip install yfinance
%pip freeze > requirements.txt


import pyspark
import pandas as pd
import yfinance as yf

In [0]:
# INICIO DA BUSCA DOS DADOS

ticker = yf.Ticker("BZ=F")  # Brent
df = ticker.history(period="max")

display(df)

df.to_csv("../Bronze_data/brent_oil.csv")
"""
arquivo combustiveis-liquidos-estados.xlsx baixado de https://www.gov.br/anp/pt-br/assuntos/precos-e-defesa-da-concorrencia/precos/precos-de-distribuicao-de-combustiveis e upado para o databricks manualmente
^^^^
DADOS COLETADOS 09/12/2025
Aparentam ter as medições completas de 09/2020 até meados de 03/2025 a análise tratará os dados no período 09/2020 até 02/2025 para possibilitar um período de análise completo em ambas as bases
"""

# FINALIZAÇÃO DO PRIMEIRO NÍVEL DE DADOS - BRONZE (DADOS BRUTOS)

In [0]:
# INICIO DA TRANSFORMAÇÃO DE DADOS

# TRANSFORMAÇÃO DE DADOS DA BASE DE PETROLEO

df = pd.read_csv("../Bronze_data/brent_oil.csv")

df = df.drop(columns=["Dividends", "Stock Splits"])
df = df.rename(columns={"Date": "date", "Open": "open", "High": "high", "Low": "low", "Close": "close", "Volume": "volume"})

df["date"] = pd.to_datetime(df["date"], utc=True)
df = df[df['date'] < "2025-03-01"]
df = df[df['date'] > "2020-09-01"]

df = df[["date", "open", "high", "low", "close", "volume"]]
df = df.sort_values("date")
df = df.dropna()
df = df.astype({"open": "float64", "high": "float64", "low": "float64", "close": "float64", "volume": "int64"})
df = df.drop_duplicates()

df = (
    df.groupby(df["date"].dt.to_period("M"))
        .agg(
            avg_open=("open", "mean"),
            avg_close=("close", "mean"),
            avg_high=("high", "mean"),
            avg_low=("low", "mean"),
            highest=("high", "max"),
            lowest=("low", "min")
        )
        .reset_index()
)

df["date"] = df["date"].dt.strftime("%Y-%m")

df.to_csv("../Silver_data/brent_oil.csv")

df.display()

In [0]:
# TRANSFORMAÇÃO DE DADOS DA BASE DOS COMBUSTIVEIS

def categorize(prod):
    prod = prod.upper()
    if "GASOLINA" in prod:
        return "GASOLINE"
    if "DIESEL" in prod:
        return "DIESEL"
    if "ETANOL" in prod:
        return "ETHANOL"
    return None

df = pd.read_excel("../Bronze_data/combustiveis-liquidos-estados.xlsx", skiprows=8)

df = df.drop(columns=["UNIDADE DE MEDIDA", "DESVIO PADRÃO"])
df = df.rename(columns={"MÊS": "date", "PRODUTO": "product", "REGIÃO": "region", "ESTADO": "state", "PRODUTO": "product", "PREÇO MÉDIO DE DISTRIBUIÇÃO": "price"})

df["price"] = pd.to_numeric(df["price"], errors="coerce")

df = df.dropna()
df = df.astype({"price": "float64"})
df = df.drop_duplicates()

df["date"] = pd.to_datetime(df["date"], errors="coerce")
df = df[df['date'] < "2025-03-01"]
df = df.sort_values("date")

df['product'] = df['product'].apply(categorize)

df = (
    df
    .groupby(['date', 'product', 'region', 'state'], as_index=False)
    .agg(
        avg_price=('price', 'mean')
        )
    )

df.to_csv("../Silver_data/combustiveis.csv")

df.display()

# FINALIZADA A TRANSFORMAÇÃO DOS DADOS AO NÍVEL SILVER (PADRONIZAÇÃO)

In [0]:
# TRANSFORMAÇÃO FINAL DOS DADOS E CRIAÇÃO DA TABELA FATO - GOLD

df_oil = pd.read_csv("../Silver_data/brent_oil.csv")
df_gas = pd.read_csv("../Silver_data/combustiveis.csv", index_col=0)

df = pd.DataFrame()
df['date'] = df_oil['date']
df['oil_avg_price'] = (df_oil['avg_close'] + df_oil['avg_open'] + df_oil['avg_high'] + df_oil['avg_low'])/4
df['oil_pct_variation'] = df['oil_avg_price'].pct_change()

monthly_gas = (
    df_gas
    .groupby(['date', 'product'], as_index=False)
    .agg(
        avg_price=('avg_price', 'mean'),
    )
)

df['date'] = pd.to_datetime(df['date'], errors="coerce").dt.date
df_gas['date'] = pd.to_datetime(df_gas['date'], errors="coerce").dt.date

df['gasoline_avg_price'] = monthly_gas.loc[monthly_gas['product'] == 'GASOLINE', 'avg_price'].values
df['gasoline_pct_variation'] = df['gasoline_avg_price'].pct_change()

df['diesel_avg_price'] = monthly_gas.loc[monthly_gas['product'] == 'DIESEL', 'avg_price'].values
df['diesel_pct_variation'] = df['diesel_avg_price'].pct_change()

df['ethanol_avg_price'] = monthly_gas.loc[monthly_gas['product'] == 'ETHANOL', 'avg_price'].values
df['ethanol_pct_variation'] = df['ethanol_avg_price'].pct_change()

df['dif_pct_oil_gasoline'] = abs(df['oil_pct_variation']) - abs(df['gasoline_pct_variation'])
df['dif_pct_oil_diesel'] = abs(df['oil_pct_variation']) - abs(df['diesel_pct_variation'])
df['dif_pct_oil_ethanol'] = abs(df['oil_pct_variation']) - abs(df['ethanol_pct_variation'])

df = df.dropna()

df.to_csv("../Gold_data/tabela_fato.csv")

spark_df = spark.createDataFrame(df)
spark_df_gas = spark.createDataFrame(df_gas)
spark.sql("USE CATALOG `mvp-juj`")
spark.sql("USE SCHEMA `mvp-data`")
spark.sql("DROP TABLE IF EXISTS tabela_fato")
spark.sql("DROP TABLE IF EXISTS combustiveis")
spark_df.write.mode("overwrite").saveAsTable("tabela_fato")
spark_df_gas.write.mode("overwrite").saveAsTable("combustiveis")

df.display()

# FINALIZADA A CRIAÇÃO DA TABELA FATO - GOLD; DADOS PRONTOS PARA ANÁLISE

In [0]:
# DEMAIS ANÁLISES NECESSÁRIAS

def corr_with_lag(df, col1, col2, lag):
    return df[col1].corr(df[col2].shift(-lag))

corr_gas_lag0 = corr_with_lag(df, 'oil_pct_variation', 'gasoline_pct_variation', 0)
corr_gas_lag1 = corr_with_lag(df, 'oil_pct_variation', 'gasoline_pct_variation', 1)
corr_gas_lag2 = corr_with_lag(df, 'oil_pct_variation', 'gasoline_pct_variation', 2)

corr_diesel_lag0 = corr_with_lag(df, 'oil_pct_variation', 'diesel_pct_variation', 0)
corr_diesel_lag1 = corr_with_lag(df, 'oil_pct_variation', 'diesel_pct_variation', 1)
corr_diesel_lag2 = corr_with_lag(df, 'oil_pct_variation', 'diesel_pct_variation', 2)

corr_ethanol_lag0 = corr_with_lag(df, 'oil_pct_variation', 'ethanol_pct_variation', 0)
corr_ethanol_lag1 = corr_with_lag(df, 'oil_pct_variation', 'ethanol_pct_variation', 1)
corr_ethanol_lag2 = corr_with_lag(df, 'oil_pct_variation', 'ethanol_pct_variation', 2)

corr_gas_etanol_lag0 = corr_with_lag(df, 'gasoline_pct_variation', 'ethanol_pct_variation', 0)
corr_gas_etanol_lag1 = corr_with_lag(df, 'gasoline_pct_variation', 'ethanol_pct_variation', 1)
corr_gas_etanol_lag2 = corr_with_lag(df, 'gasoline_pct_variation', 'ethanol_pct_variation', 2)

corr_gas_diesel_lag0 = corr_with_lag(df, 'gasoline_pct_variation', 'diesel_pct_variation', 0)
corr_gas_diesel_lag1 = corr_with_lag(df, 'gasoline_pct_variation', 'diesel_pct_variation', 1)
corr_gas_diesel_lag2 = corr_with_lag(df, 'gasoline_pct_variation', 'diesel_pct_variation', 2)

corr_diesel_etanol_lag0 = corr_with_lag(df, 'diesel_pct_variation', 'ethanol_pct_variation', 0)
corr_diesel_etanol_lag1 = corr_with_lag(df, 'diesel_pct_variation', 'ethanol_pct_variation', 1)
corr_diesel_etanol_lag2 = corr_with_lag(df, 'diesel_pct_variation', 'ethanol_pct_variation', 2)

results = pd.DataFrame({
    'correlation': ['Diesel x Oil', 'Gasoline x Oil', 'Ethanol x Oil', 'Gasoline x Ethanol', 'Gasoline x Diesel', 'Diesel x Ethanol'],
    'lag_0': [corr_diesel_lag0, corr_gas_lag0, corr_ethanol_lag0, corr_gas_etanol_lag0, corr_gas_diesel_lag0, corr_diesel_etanol_lag0],
    'lag_1': [corr_diesel_lag1, corr_gas_lag1, corr_ethanol_lag1, corr_gas_etanol_lag1, corr_gas_diesel_lag1, corr_diesel_etanol_lag1],
    'lag_2': [corr_diesel_lag2, corr_gas_lag2, corr_ethanol_lag2, corr_gas_etanol_lag2, corr_gas_diesel_lag2, corr_diesel_etanol_lag2]
})

results.display()

results.to_csv("../Gold_data/correlacoes.csv")
spark_df_corr = spark.createDataFrame(results)
spark.sql("USE CATALOG `mvp-juj`")
spark.sql("USE SCHEMA `mvp-data`")
spark.sql("DROP TABLE IF EXISTS correlacoes")
spark_df_corr.write.mode("overwrite").saveAsTable("correlacoes")

In [0]:
# df_gas -> combustiveis.csv

monthly_regions = (
    df_gas
    .groupby(['date', 'product', 'region'], as_index=False)
    .agg(
        avg_price=('avg_price', 'mean'),
    )
)

south = monthly_regions.loc[monthly_regions['region'] == 'SUL']
southeast = monthly_regions.loc[monthly_regions['region'] == 'SUDESTE']

regions = pd.DataFrame()
regions['date'] = monthly_regions['date'].unique()
regions['date'] = pd.to_datetime(regions['date'], errors="coerce").dt.date

regions['south_gas_avg_price'] = south.loc[south['product'] == 'GASOLINE', 'avg_price'].values
regions['south_gas_pct_variation'] = regions['south_gas_avg_price'].pct_change()
regions['south_diesel_avg_price'] = south.loc[south['product'] == 'DIESEL', 'avg_price'].values
regions['south_diesel_pct_variation'] = regions['south_diesel_avg_price'].pct_change()
regions['south_ethanol_avg_price'] = south.loc[south['product'] == 'ETHANOL', 'avg_price'].values
regions['south_ethanol_pct_variation'] = regions['south_ethanol_avg_price'].pct_change()

regions['southeast_gas_avg_price'] = southeast.loc[southeast['product'] == 'GASOLINE', 'avg_price'].values
regions['southeast_gas_pct_variation'] = regions['southeast_gas_avg_price'].pct_change()
regions['southeast_diesel_avg_price'] = southeast.loc[southeast['product'] == 'DIESEL', 'avg_price'].values
regions['southeast_diesel_pct_variation'] = regions['southeast_diesel_avg_price'].pct_change()
regions['southeast_ethanol_avg_price'] = southeast.loc[southeast['product'] == 'ETHANOL', 'avg_price'].values
regions['southeast_ethanol_pct_variation'] = regions['southeast_ethanol_avg_price'].pct_change()

regions = regions.dropna()

regions.display()

regions.to_csv('../Gold_data/regions.csv')
spark_regions = spark.createDataFrame(regions)
spark.sql("USE CATALOG `mvp-juj`")
spark.sql("USE SCHEMA `mvp-data`")
spark.sql("DROP TABLE IF EXISTS regions")
spark_regions.write.mode('overwrite').saveAsTable('regions')

In [0]:
# df_gas -> combustiveis.csv

south = df_gas.loc[df_gas['region'] == 'SUL']
southeast = df_gas.loc[df_gas['region'] == 'SUDESTE']

PR = south.loc[south['state'] == 'PARANA']
SC = south.loc[south['state'] == 'SANTA CATARINA']
RS = south.loc[south['state'] == 'RIO GRANDE DO SUL']
SP = southeast.loc[southeast['state'] == 'SAO PAULO']
RJ = southeast.loc[southeast['state'] == 'RIO DE JANEIRO']
MG = southeast.loc[southeast['state'] == 'MINAS GERAIS']
ES = southeast.loc[southeast['state'] == 'ESPIRITO SANTO']

south = pd.DataFrame()
southeast = pd.DataFrame()

south['date'] = df_gas['date'].unique()
south['date'] = pd.to_datetime(south['date'], errors="coerce").dt.date
southeast['date'] = df_gas['date'].unique()
southeast['date'] = pd.to_datetime(southeast['date'], errors="coerce").dt.date

south['PR_gas_avg_price'] = PR.loc[PR['product'] == 'GASOLINE', 'avg_price'].values
south['PR_gas_pct_variation'] = south['PR_gas_avg_price'].pct_change()
south['PR_diesel_avg_price'] = PR.loc[PR['product'] == 'DIESEL', 'avg_price'].values
south['PR_diesel_pct_variation'] = south['PR_diesel_avg_price'].pct_change()
south['PR_ethanol_avg_price'] = PR.loc[PR['product'] == 'ETHANOL', 'avg_price'].values
south['PR_ethanol_pct_variation'] = south['PR_ethanol_avg_price'].pct_change()
south['SC_gas_avg_price'] = SC.loc[SC['product'] == 'GASOLINE', 'avg_price'].values
south['SC_gas_pct_variation'] = south['SC_gas_avg_price'].pct_change()
south['SC_diesel_avg_price'] = SC.loc[SC['product'] == 'DIESEL', 'avg_price'].values
south['SC_diesel_pct_variation'] = south['SC_diesel_avg_price'].pct_change()
south['SC_ethanol_avg_price'] = SC.loc[SC['product'] == 'ETHANOL', 'avg_price'].values
south['SC_ethanol_pct_variation'] = south['SC_ethanol_avg_price'].pct_change()
south['RS_gas_avg_price'] = RS.loc[RS['product'] == 'GASOLINE', 'avg_price'].values
south['RS_gas_pct_variation'] = south['RS_gas_avg_price'].pct_change()
south['RS_diesel_avg_price'] = RS.loc[RS['product'] == 'DIESEL', 'avg_price'].values
south['RS_diesel_pct_variation'] = south['RS_diesel_avg_price'].pct_change()
south['RS_ethanol_avg_price'] = RS.loc[RS['product'] == 'ETHANOL', 'avg_price'].values
south['RS_ethanol_pct_variation'] = south['RS_ethanol_avg_price'].pct_change()

southeast['SP_gas_avg_price'] = SP.loc[SP['product'] == 'GASOLINE', 'avg_price'].values
southeast['SP_gas_pct_variation'] = southeast['SP_gas_avg_price'].pct_change()
southeast['SP_diesel_avg_price'] = SP.loc[SP['product'] == 'DIESEL', 'avg_price'].values
southeast['SP_diesel_pct_variation'] = southeast['SP_diesel_avg_price'].pct_change()
southeast['SP_ethanol_avg_price'] = SP.loc[SP['product'] == 'ETHANOL', 'avg_price'].values
southeast['SP_ethanol_pct_variation'] = southeast['SP_ethanol_avg_price'].pct_change()
southeast['RJ_gas_avg_price'] = RJ.loc[RJ['product'] == 'GASOLINE', 'avg_price'].values
southeast['RJ_gas_pct_variation'] = southeast['RJ_gas_avg_price'].pct_change()
southeast['RJ_diesel_avg_price'] = RJ.loc[RJ['product'] == 'DIESEL', 'avg_price'].values
southeast['RJ_diesel_pct_variation'] = southeast['RJ_diesel_avg_price'].pct_change()
southeast['RJ_ethanol_avg_price'] = RJ.loc[RJ['product'] == 'ETHANOL', 'avg_price'].values
southeast['RJ_ethanol_pct_variation'] = southeast['RJ_ethanol_avg_price'].pct_change()
southeast['MG_gas_avg_price'] = MG.loc[MG['product'] == 'GASOLINE', 'avg_price'].values
southeast['MG_gas_pct_variation'] = southeast['MG_gas_avg_price'].pct_change()
southeast['MG_diesel_avg_price'] = MG.loc[MG['product'] == 'DIESEL', 'avg_price'].values
southeast['MG_diesel_pct_variation'] = southeast['MG_diesel_avg_price'].pct_change()
southeast['MG_ethanol_avg_price'] = MG.loc[MG['product'] == 'ETHANOL', 'avg_price'].values
southeast['MG_ethanol_pct_variation'] = southeast['MG_ethanol_avg_price'].pct_change()
southeast['ES_gas_avg_price'] = ES.loc[ES['product'] == 'GASOLINE', 'avg_price'].values
southeast['ES_gas_pct_variation'] = southeast['ES_gas_avg_price'].pct_change()
southeast['ES_diesel_avg_price'] = ES.loc[ES['product'] == 'DIESEL', 'avg_price'].values
southeast['ES_diesel_pct_variation'] = southeast['ES_diesel_avg_price'].pct_change()
southeast['ES_ethanol_avg_price'] = ES.loc[ES['product'] == 'ETHANOL', 'avg_price'].values
southeast['ES_ethanol_pct_variation'] = southeast['ES_ethanol_avg_price'].pct_change()

south = south.dropna()
southeast = southeast.dropna()

south.to_csv('../Gold_data/south.csv')
southeast.to_csv('../Gold_data/southeast.csv')

spark_south = spark.createDataFrame(south)
spark_southeast = spark.createDataFrame(southeast)
spark.sql("USE CATALOG `mvp-juj`")
spark.sql("USE SCHEMA `mvp-data`")
spark.sql("DROP TABLE IF EXISTS south")
spark.sql("DROP TABLE IF EXISTS southeast")
spark_south.write.mode("overwrite").saveAsTable("south")
spark_southeast.write.mode("overwrite").saveAsTable("southeast")

south.display()