### Data Engineering - Cleaned and Enriched Data

Ingestion type: Full Load

Schedule Run: Daily on Briefer Cloud

Source: Yahoo Finance, IPEA

Target location: silver/petro/petro.csv

In [1]:
import pandas as pd
import yfinance as yf
import ipeadatapy as ip
import os
from datetime import datetime
from io import StringIO

In [2]:
# Defining start and end date for data extraction

start_date = datetime.strptime('2008-01-01', '%Y-%m-%d')
end_date = datetime.today()

Extract Data from Yahoo Finance

In [None]:
# Extract Data from Yahoo Finance

def extract_yf_data(ticker: str, start_date, end_date) -> pd.DataFrame:
    df = yf.download(ticker, start=start_date, end=end_date)
    return df

# Extract Petrobras Data
pbr = extract_yf_data('PBR', start_date, end_date)[['Close', 'Adj Close']]
pbr = pbr.rename(columns={'Close': 'pbr', 'Adj Close': 'adj_pbr'})

# Extract Brent Crude Oil Data 
brent = extract_yf_data('BZ=F', start_date, end_date)[['Close']]
brent = brent.rename(columns={'Close': 'brent'})

# Extract WTI Crude Oil Data 
wti = extract_yf_data('CL=F', start_date, end_date)[['Close']]
wti = wti.rename(columns={'Close': 'wti'})

#Extract USD/BRL Quotation
usd = extract_yf_data('USDBRL=X', start_date, end_date)[['Close']]
usd = usd.rename(columns={'Close': 'usd'})

Extract Data from IPEA

In [4]:
# Extract Data from IPEA

def extract_ipea_data(cod: str, start_date) -> pd.DataFrame:
    df = ip.timeseries(cod, yearGreaterThan=start_date.year-1)
    return df

# Extract Month Oil Production - Mean Quantity Barril x 1000 / Month

production = extract_ipea_data('ANP12_PDPET12', start_date)[['VALUE (Barril)']]
production = production.rename(columns={'VALUE (Barril)': 'production'})

Clean and fill null values

In [5]:
# Adjusting NaN values in Production data 

In [6]:
# Reindex data

def reindex_df_to_now(df: pd.DataFrame, start_date_str: str, fill_na_values: bool) -> pd.DataFrame:
    start_date = datetime.strptime(start_date_str, '%Y-%m-%d')
    new_index = pd.date_range(start=start_date, end=datetime.now().date(), freq='D')

    df_reindexed = df.reindex(new_index)
    
    if fill_na_values:
        df_reindexed = df_reindexed.fillna(0)

    return df_reindexed

production = reindex_df_to_now(production, '2008-01-01', fill_na_values=True)

In [7]:
# Distribute month values to daily frequency

def month_to_daily_distribution(df: pd.DataFrame, col_name:str) -> pd.DataFrame:
    def apply_rolling(month_df):
        diary_quantity = month_df[col_name].iloc[0] / len(month_df)
        month_df[col_name] = diary_quantity
        return month_df

    df = df.groupby([df.index.year, df.index.month]).apply(apply_rolling)
    df.index = df.index.droplevel([0, 1])

    return df

production = month_to_daily_distribution(production, 'production')

In [8]:
# Fill NaN Data with three last month values average

def fill_last_tmonth(df: pd.DataFrame, col_name:str):
    
    dif_na = df[df[col_name] != 0]
    last_tmonth = dif_na.index.max() - pd.DateOffset(months=3)
    df_filtered = dif_na[dif_na.index >= last_tmonth]
    df_mean = float(df_filtered[col_name].mean())
    df[col_name] = df[col_name].replace(float(0), df_mean)

    return df

production = fill_last_tmonth(production, 'production')

In [9]:
# Fill NaN values in Brent Crude Oil Data 

In [10]:
# Reindex brent data keeping NaN values

brent = reindex_df_to_now(brent, '2008-01-01', fill_na_values=False)

In [11]:
# Fill Nan Data with a monthly average of existing data

def fillna_mean_month(df: pd.DataFrame, col_name:str) -> pd.DataFrame:
    df[col_name] = df.groupby([df.index.year, df.index.month])[col_name]\
                     .transform(lambda x: x.fillna(x.mean()))
    return df

brent = fillna_mean_month(brent, 'brent')

In [12]:
#Fill NaN values in USD Data

In [13]:
# Reindex usd data keeping NaN values

usd = reindex_df_to_now(usd, '2008-01-01', fill_na_values=False)

In [14]:
# Fill Nan Data with a monthly average of existing data

usd = fillna_mean_month(usd, 'usd')

Petro: Join all databases

In [15]:
# Join bases using outer to maintain all date indexes

petro = pbr.join([brent, wti, production, usd], how='outer')

Clean and fill last null values

In [16]:
# Drop weekend data because stock data do not have information collected on weekends

def drop_weekends_data(df:pd.DataFrame) -> pd.DataFrame:
    df.loc[:, "d_week"] = df.index.dayofweek
    df = df.loc[df['d_week'] <= 4].copy()
    df.drop(columns=['d_week'], inplace=True)
    return df

petro = drop_weekends_data(petro)

In [17]:
# fill last NaN values in data considering next valid observation and setting two as maximum number of consecutive values to fill

petro = petro.bfill(limit=2) 