# Data Engineering

This notebook is designed to extract raw fundamentalist data and refine it by cleaning, aggregating, completing and consolidating different external datasets.`

## Libraries and imports

In [None]:
import pandas as pd
import numpy as np
from scipy.interpolate import interp1d
import seaborn as sns
import matplotlib.pyplot as plt

import re
import time

import fundamentus

from google.cloud import storage
import os

import sys
sys.path.append('..')
from scripts.utils import initialize_bucket
import requests
import requests_cache
import logging
import time
from datetime import date

from statsmodels.tsa.holtwinters import SimpleExpSmoothing

## Setup

In [None]:
pd.set_option("display.max_columns", 200)

credentials_path = '../datascience-capstone-project-05b1642f45c3.json'

client, bucket = initialize_bucket(credentials_path,'storage-barsianize')

## Methods and functions

### Getting raw data

In [None]:
def perc_to_float(val):
    """
    Percent to float
      - replace string in pt-br to float
      - from '45,56%' to 0.4556
    Input:
        (DataFrame, column_name)
    """

    res = val
    res = res.replace( to_replace=r'[%]', value='' , regex=True )
    res = res.replace( to_replace=r'[.]', value='' , regex=True )
    res = res.replace( to_replace=r'[,]', value='.', regex=True )
    res = res.astype(float) / 100

    return res

def _rename_cols(data):
    """
    Rename columns in DataFrame
      - use a valid Python identifier
      - so each column can be a DataFrame property
      - Example:
          df.pl > 0
    """

    df2 = pd.DataFrame()

    ## Fix: rename columns
    df2['cotacao'  ] = data['Cotação'          ]
    df2['pl'       ] = data['P/L'              ]
    df2['pvp'      ] = data['P/VP'             ]
    df2['psr'      ] = data['PSR'              ]
    df2['dy'       ] = data['Div.Yield'        ]
    df2['pa'       ] = data['P/Ativo'          ]
    df2['pcg'      ] = data['P/Cap.Giro'       ]
    df2['pebit'    ] = data['P/EBIT'           ]
    df2['pacl'     ] = data['P/Ativ Circ.Liq'  ]
    # df2['evebit'   ] = data['EV/EBIT'          ]
    # df2['evebitda' ] = data['EV/EBITDA'        ] ##
    df2['mrgebit'  ] = data['Mrg Ebit'         ]
    df2['mrgliq'   ] = data['Mrg. Líq.'        ]
    df2['roic'     ] = data['ROIC'             ]
    df2['roe'      ] = data['ROE'              ]
    df2['liqc'     ] = data['Liq. Corr.'       ]
    df2['liq2m'    ] = data['Liq.2meses'       ]
    df2['patrliq'  ] = data['Patrim. Líq'      ]
    df2['divbpatr' ] = data['Dív.Brut/ Patrim.']
    df2['c5y'      ] = data['Cresc. Rec.5a'    ]

    return df2

def get_resultado_raw(url):
    """
    Get data from fundamentus:
      URL:
        http://fundamentus.com.br/resultado.php
    RAW:
      DataFrame preserves original HTML header names
    Output:
      DataFrame
    """

    ##
    ## Busca avançada por empresa
    ##
    # url = 'http://www.fundamentus.com.br/resultado.php'
    hdr = {'User-agent': 'Mozilla/5.0 (Windows; U; Windows NT 6.1; rv:2.2) Gecko/20110201',
           'Accept': 'text/html, text/plain, text/css, text/sgml, */*;q=0.01',
           'Accept-Encoding': 'gzip, deflate',
           }

    with requests_cache.enabled():
        content = requests.get(url, headers=hdr)

        if content.from_cache:
            logging.debug('.../resultado.php: [CACHED]')
        else: # pragma: no cover
            logging.debug('.../resultado.php: sleeping...')
            time.sleep(.500) # 500 ms


    ## parse + load
    df = pd.read_html(content.text, decimal=",", thousands='.')[0]

    ## Fix: percent string
    df['Div.Yield']     = perc_to_float( df['Div.Yield']     )
    df['Mrg Ebit']      = perc_to_float( df['Mrg Ebit']      )
    df['Mrg. Líq.']     = perc_to_float( df['Mrg. Líq.']     )
    df['ROIC']          = perc_to_float( df['ROIC']          )
    df['ROE']           = perc_to_float( df['ROE']           )
    df['Cresc. Rec.5a'] = perc_to_float( df['Cresc. Rec.5a'] )

    ## index by 'Papel', instead of 'int'
    df.index = df['Papel']
    df.drop('Papel', axis='columns', inplace=True)
    df.sort_index(inplace=True)

    ## naming
    df.name = 'Fundamentus: HTML names'
    df.columns.name = 'Multiples'
    df.index.name = 'papel'

    ## return sorted by 'papel'
    return df


def get_resultado(url):
    """
    Data from fundamentus, fixing header names.
      URL:
        given from the user
      Obs:
        DataFrame uses short header names
    Output:
      DataFrame
    """

    ## get RAW data
    data1 = get_resultado_raw(url)

    ## rename!
    data2 = _rename_cols(data1)

    ## metadata
    data2.name = 'Fundamentus: short names'
    data2.columns.name = 'Multiples'
    data2.index.name = 'papel'

    ## remove duplicates
#   df = data2.drop_duplicates(subset=['cotacao','pl','pvp'], keep='last')
    df = data2.drop_duplicates(keep='first')

    return df

In [None]:
def get_dates(urls):
    """
    Extracts dates from a list of URLs.
    
    Args:
        urls (list): A list of URLs.
        
    Returns:
        list: A list of datetime objects representing the extracted dates.
    """
    dates = []
    for value in urls:
        str_1 = re.split('/web/', value)[1]
        str_2 = re.split('/http', str_1)[0]
        str_date = f'{str_2[:4]}-{str_2[4:6]}-{str_2[6:8]}'
        dates.append(pd.to_datetime(str_date))
    return dates



def ingest_data(file: str, path: str, time_to_sleep: int = 1, store_locally: bool = False) -> pd.DataFrame:
    """
    Ingests data from the specified file and path, processing the URLs and retrieving the resultado.
    
    Args:
        file (str): The name of the file.
        path (str): The path to the file.
        time_to_sleep (int, optional): The duration to sleep between processing each URL. Defaults to 1.
        store_locally (bool, optional): Whether to store the processed data locally. Defaults to False.
        
    Returns:
        pd.DataFrame: The ingested and processed data as a pandas DataFrame.
    """
    filename = path + file
    urls = pd.read_csv(filename).sort_values(by='urls')

    # Extract dates from URLs
    dates = get_dates(urls['urls'].values)

    # Create a list of tuples (date, url)
    date_url = list(zip(dates, urls['urls'].values))

    # Initialize an empty DataFrame to store the results
    df_full = pd.DataFrame()

    # Get the year of the first date
    year = dates[0].year

    # Iterate over each date and URL
    for date, url in date_url[:]:
        print(date, url)

        # Get the resultado for the current URL
        df = get_resultado(url)

        # Add date and year columns to the resultado DataFrame
        df['date'] = date
        df['year'] = date.year

        # Concatenate the resultado DataFrame with the full DataFrame
        df_full = pd.concat([df_full, df])

        # Check if the year has changed
        if year == date.year:
            continue
        elif store_locally:
            # Store the data locally for the current year
            df_full.loc[df_full['year'] == year].to_csv(f'../data/01_trusted/{str(date.year)}.csv')
            year = date.year

        # Sleep between processing URLs
        time.sleep(time_to_sleep)

    # Reset the index of the full DataFrame
    return df_full.reset_index()



def get_detailed_ticker_data(tickers: list) -> pd.DataFrame:
    """
    Retrieves detailed ticker data for the given list of tickers.
    
    Args:
        tickers (list): A list of tickers to retrieve data for.
        
    Returns:
        pd.DataFrame: The detailed ticker data as a pandas DataFrame.
    """
    df_tickers = pd.DataFrame()

    # Iterate over each ticker
    for ticker in tickers:
        try:
            # Retrieve ticker data using fundamentus library
            df = fundamentus.get_ticker(ticker)

            # Concatenate ticker data with the existing DataFrame
            df_tickers = pd.concat([df_tickers, df])
        except:
            # Print error message for failed ticker retrieval
            print(f'Failed to retrieve ticker data for: {ticker}')

        # Sleep for 3 seconds before retrieving the next ticker data
        time.sleep(3)

    return df_tickers

### Refining Data

#### Creating Daily Data

In [None]:

def create_daily_dataset(data: pd.DataFrame, date_col: str) -> pd.DataFrame:
    """
    Creates a daily dataset by merging the given data on a daily frequency.
    
    Args:
        data (pd.DataFrame): The original dataset.
        date_col (str): The name of the column containing the dates.
        
    Returns:
        pd.DataFrame: The daily dataset.
    """
    # Generate a range of daily dates from the minimum to the maximum date in the dataset
    date_range = pd.date_range(data[date_col].min(), data[date_col].max(), freq='D')

    # Create a DataFrame with 'days' column containing daily dates
    data_daily = pd.DataFrame(date_range, columns=['days'])

    # Merge the original data with the daily DataFrame based on the 'days' column and date_col
    data_daily = data_daily.merge(data, how='left', left_on='days', right_on=date_col)

    return data_daily


def interpolate_data(data: pd.DataFrame, cols_first: list, cols_spline: list, date_col: str) -> pd.DataFrame:
    """
    Interpolates missing values in the data based on the specified columns.
    
    Args:
        data (pd.DataFrame): The input dataset.
        cols_first (list): The columns to be filled with the first non-null value.
        cols_spline (list): The columns to be interpolated using spline interpolation.
        date_col (str): The name of the column containing the dates.
        
    Returns:
        pd.DataFrame: The dataset with interpolated values.
    """
    # Create a daily dataset with missing dates
    data = create_daily_dataset(data, date_col)

    columns = data.columns

    # Iterate over each column
    for col in columns:
        if col in cols_first:
            # Fill missing values in cols_first with the first non-null value
            data[col] = data[col].fillna(data[col][0])
        elif col in cols_spline:
            # Interpolate missing values in cols_spline using spline interpolation
            data[col] = data[col].interpolate(method='spline', order=2)
        else:
            continue

    return data



import pandas as pd


def get_daily_data_per_ticker(data: pd.DataFrame, ticker_col: str, cols_first: list, cols_spline: list, date_col: str) -> pd.DataFrame:
    """
    Retrieves daily data per ticker by interpolating missing values in the specified columns.
    
    Args:
        data (pd.DataFrame): The input dataset.
        ticker_col (str): The name of the column containing the tickers.
        cols_first (list): The columns to be filled with the first non-null value.
        cols_spline (list): The columns to be interpolated using spline interpolation.
        date_col (str): The name of the column containing the dates.
        
    Returns:
        pd.DataFrame: The daily data per ticker with interpolated values.
    """
    # Get unique tickers from the dataset
    tickers = data[ticker_col].unique()

    # Create a list to store daily dataframes per ticker
    daily_dataframes = []

    # Iterate over each ticker
    for ticker in tickers:
        print(ticker)
        try:
            # Interpolate missing values for the specific ticker
            daily_dataframes.append(
                interpolate_data(data[data[ticker_col] == ticker], cols_first, cols_spline, date_col)
            )
        except:
            continue
    
    # Concatenate daily dataframes for all tickers
    daily_data = pd.concat(daily_dataframes)

    return daily_data




#### Feature engineering and data cleansing

In [None]:
def create_numerical_categories(data: pd.DataFrame, cols: list, n: int) -> pd.DataFrame:
    """
    Creates numerical categories for the specified columns based on quantiles.
    
    Args:
        data (pd.DataFrame): The input dataset.
        cols (list): The columns for which numerical categories will be created.
        n (int): The number of quantiles to use for categorization.
        
    Returns:
        pd.DataFrame: The dataset with numerical categories.
    """
    # Iterate over each column
    for col in cols:
        # Create a new column for numerical categories
        data[col + '_category'] = pd.qcut(data[col], q=n, duplicates='drop', labels=list(range(n)))
    
    return data


def create_besst_categories(data: pd.DataFrame, besst_col: str, besst_1: list, besst_2: list) -> pd.DataFrame:
    """
    Creates binary categories based on the values in the specified column.
    
    Args:
        data (pd.DataFrame): The input dataset.
        besst_col (str): The name of the column used for categorization.
        besst_1 (list): Values considered as category 1.
        besst_2 (list): Values considered as category 2.
        
    Returns:
        pd.DataFrame: The dataset with binary categories.
    """
    # Create a new column to indicate if the values are in besst_1
    data['besst_1'] = np.isin(data[besst_col], besst_1)

    # Create a new column to indicate if the values are in besst_2
    data['besst_2'] = np.isin(data[besst_col], besst_2)

    return data


def correct_data_column(x: str) -> str:
    """
    Corrects the format of a date string from 'DD/MM/YYYY' to 'YYYY-MM-DD'.
    
    Args:
        x (str): The date string in 'DD/MM/YYYY' format.
        
    Returns:
        str: The date string in 'YYYY-MM-DD' format.
    """
    # Split the date string into day, month, and year
    date = x.split('/')

    # Rearrange the date components and join them with '-' separator
    new_date = '-'.join(date[::-1])

    return new_date

def from_percent_to_numeric(x):
    """
    Convert a percentage string to a numeric value.

    This function takes a string representing a percentage, removes the '%' character,
    and converts the resulting string to a floating-point numeric value by dividing it by 100.

    Args:
        x (str): A string representing a percentage, e.g., '25.5%'.

    Returns:
        float: A numeric value corresponding to the input percentage after conversion.

    Example:
        >>> from_percent_to_numeric('25.5%')
        0.255
    """
    numeric_value = np.float(re.sub('%', '', x)) / 100
    return numeric_value



#### Creating aggregations

In [None]:
def aggregator_monthly(data: pd.DataFrame, tickers: list, ticker_col: str, date_col: str, agg_dict: dict) -> pd.DataFrame:
    """
    Aggregates data on a monthly basis for the specified tickers using the provided aggregation dictionary.
    
    Args:
        data (pd.DataFrame): The input dataset.
        tickers (list): The list of tickers to aggregate.
        ticker_col (str): The name of the column containing the tickers.
        date_col (str): The name of the column containing the dates.
        agg_dict (dict): The dictionary specifying the aggregations to be performed.
        
    Returns:
        pd.DataFrame: The aggregated data on a monthly basis.
    """
    # Create an empty list to store the monthly aggregated dataframes
    df_agg_monthly_full = []

    # Iterate over each ticker
    for ticker in tickers:
        # Filter the data for the current ticker and perform monthly aggregation
        df_agg_monthly = data.loc[data[ticker_col] == ticker].resample('M', on=date_col).agg(agg_dict)

        # Append the monthly aggregated dataframe to the list
        df_agg_monthly_full.append(df_agg_monthly)

    # Concatenate all the monthly aggregated dataframes
    df_agg_monthly_full = pd.concat(df_agg_monthly_full).reset_index(drop=True)

    # Modify column names to include the original column name and aggregation function (except for 'first' function)
    df_agg_monthly_full.columns = [x[0] + '_' + x[1] if 'first' != x[1] else x[0] for x in df_agg_monthly_full.columns]

    return df_agg_monthly_full


def get_windowed_data(data: pd.DataFrame, agg_dict: dict, tickers: list, ticker_col: str, columns_to_first: list,
                      columns_to_last: list, agg_dict_ref: dict) -> pd.DataFrame:
    """
    Applies windowed aggregations and shifts to the data for each ticker.
    
    Args:
        data (pd.DataFrame): The input dataset.
        agg_dict (dict): The dictionary specifying the aggregations to be performed.
        tickers (list): The list of tickers.
        ticker_col (str): The name of the column containing the tickers.
        columns_to_first (list): The list of columns to shift using the 'first' value.
        columns_to_last (list): The list of columns to shift using the 'last' value.
        agg_dict_ref (dict): The dictionary specifying the aggregations for the reference window.
        
    Returns:
        pd.DataFrame: The windowed data with aggregations and shifted values.
    """
    # Create an empty list to store the windowed dataframes
    data_windowed_full = []

    # Iterate over each ticker
    for ticker in tickers:
        # Filter the data for the current ticker
        data_ticker = data[data[ticker_col] == ticker]

        # Apply rolling aggregations to the data
        data_windowed = data_ticker.rolling(12).agg(agg_dict)
        data_windowed_ref = data_ticker.rolling(12).agg(agg_dict_ref)

        # Concatenate the rolling aggregations and reference aggregations
        data_windowed = pd.concat([data_windowed, data_windowed_ref], axis=1)

        # Modify column names to include the original column name and aggregation function (except for 'first' function)
        column_suffix = lambda x: x[0] + '_' + x[1] if x[1] != 'first' else x[0]
        data_windowed.columns = [column_suffix(x) for x in data_windowed.columns]

        # Shift selected columns using 'first' and 'last' values
        data_windowed[columns_to_first] = data_ticker[columns_to_first].shift(12)
        data_windowed[[col + '_last' for col in columns_to_last]] = data_ticker[columns_to_last].shift(12)

        # Shift the 'dy_last' column and drop rows with NaN values in 'dy_label'
        data_windowed['dy_label'] = data_ticker['dy_last'].shift(24)
        data_windowed.dropna(subset=['dy_label'], inplace=True)

        # Append the windowed dataframe to the list
        data_windowed_full.append(data_windowed)

    # Concatenate all the windowed dataframes
    data_windowed_full = pd.concat(data_windowed_full)

    # Fill NaN values with backward filling method
    data_windowed_full.fillna(method='bfill', inplace=True)

    return data_windowed_full

### Getting External Data

In [None]:
def get_bcb_data(codes, start_date, end_date, output_format='json'):
    """
    Get macroeconomic data from the Brazilian Central Bank's API.

    Args:
        codes (list): List of series codes to retrieve. See the API documentation for available codes.
        start_date (str): Start date in format 'dd/mm/yyyy'.
        end_date (str): End date in format 'dd/mm/yyyy'.
        output_format (str, optional): Output format, either 'json' or 'csv'. Defaults to 'json'.

    Returns:
        pandas.DataFrame: DataFrame with the requested series data.

    Raises:
        ValueError: If the output format is invalid.
        requests.exceptions.RequestException: If the API request fails.

    Example:
        Get the values of the crude oil production, commodities, dollar, euro, IPCA, IGPM, and Selic series from January 1st, 2021 to December 31st, 2021 in JSON format:

        >>> codes = [13522, 13521, 4390, 189, 11, 1178]
        >>> start_date = '01/01/2021'
        >>> end_date = '31/12/2021'
        >>> output_format = 'json'
        >>> df = get_bcb_data(codes, start_date, end_date, output_format)

    """
    # Define the base URL of the API
    url_base = "https://api.bcb.gov.br/dados/serie/bcdata.sgs.{}/dados"

    # Define the output format parameter
    if output_format not in ['json', 'csv']:
        raise ValueError("Invalid output format. Must be 'json' or 'csv'.")
    formato = output_format

    # Create a dictionary to store the series DataFrames
    dataframes = {}

    # Get the data for each series and store it in a DataFrame
    for code_name, code in codes.items():
        # Build the complete URL with the defined parameters
        url = url_base.format(code) + f"?formato={formato}&dataInicial={start_date}&dataFinal={end_date}"
        # Make the API request
        response = requests.get(url)
        # Check if the request was successful
        if response.status_code == 200:
            # Convert the response to a pandas DataFrame and set the column name to the series code
            try:
                df = pd.read_json(response.text)
            except:
                try:
                    df = pd.read_xml(response.text)
                except:
                    continue
            col_name = code_name
            df = df.rename(columns={"valor": col_name})
            # Set the DataFrame index to the date
            df = df.set_index("data")
            # Store the DataFrame in the dictionary
            dataframes[col_name] = df[col_name]
        else:
            raise requests.exceptions.RequestException(f"Error getting data. HTTP status code: {response.status_code}")

    # Combine the DataFrames for each series into a single DataFrame
    final_df = pd.concat(dataframes.values(), axis=1).reset_index()

    return final_df

## Implementation
### Data Collection

For the data gathering, two repositories on Github were combined, so that the historical data could be scraped from the web. This Raw data is beeing stored in the cloud (GCP) and are the main data used for the project.

Repositories utilized:
* https://github.com/mv/fundamentus-api
* https://github.com/Victorcorcos/bovespa-winner

In [None]:
df = ingest_data('urls.csv', '../data/00_raw/', time_to_sleep=3, store_locally=False)
df.head()

In [None]:
fundamentus.get_resultado().reset_index().head()

#### Get detailed information about each ticker

In [None]:
tickers = df['papel'].unique()

df_tickers = get_detailed_ticker_data(tickers)

In [None]:
df_tickers.head()

##### Storing the raw data into Google Cloud

In [None]:
# upload raw fundamentalist data to Google Cloud Storage
blob = bucket.blob('01_raw/fundamentus_historical_raw.csv')
blob.upload_from_string(df.to_csv(), 'text/csv')
# # upload raw fundamentalist data to Google Cloud Storage
# blob = bucket.blob('01_raw/fundamentus_tickers_raw.csv')
# blob.upload_from_string(df_tickers.to_csv(), 'text/csv')

### Data Cleansing

In [None]:
path = "gs://storage-barsianize/01_raw/fundamentus_historical_raw.csv"
df =  pd.read_csv(path, index_col=0)

path = "gs://storage-barsianize/01_raw/fundamentus_tickers_raw.csv"
df_tickers = pd.read_csv(path, index_col=0)

In [None]:
df['date'] = pd.to_datetime(df['date'])

columns_to_spline = ['cotacao',
                     'pl',
                     'pvp',
                     'psr',
                     'dy',
                     'pa',
                     'pcg',
                     'pebit',
                     'pacl',
                     'evebit',
                     'mrgebit',
                     'mrgliq',
                     'roic',
                     'roe',
                     'liqc',
                     'liq2m',
                     'patrliq',
                     'divbpatr',
                     'c5y']

columns_to_first = ['papel']

In [None]:
df_daily = get_daily_data_per_ticker(df, 'papel', columns_to_first, columns_to_spline, 'date')

##### Storing the raw data into Google Cloud

In [None]:
path_local = '../data/01_trusted/'
filename = 'daily_data.parquet'
blob_name = '02_trusted/'

# save the DataFrame as a parquet file
df_daily.to_parquet(path_local + filename)

# upload the parquet file to Google Cloud Storage
blob = bucket.blob(blob_name + filename)
blob._chunk_size = 8388608
blob.upload_from_filename(path_local + filename, num_retries=10, )

### Feature engineering

In [None]:
path = "gs://storage-barsianize/02_trusted/daily_data.parquet"
df_daily =  pd.read_parquet(path)

Being sure that there are no negative DY values

In [None]:
df_daily['dy'][df_daily['dy']<0] = 0

Taking out all the tickers that were not active before 2008

In [None]:
df_tickers = df_tickers[~(pd.to_datetime(df_tickers['Data_ult_cot']).dt.year<2008)]

Creating numerical categories

In [None]:
df_tickers = create_numerical_categories(df_tickers, ['Valor_de_mercado','Lucro_Liquido_12m','Receita_Liquida_12m','Patrim_Liq'], 10)

Creating a BESST 1 and BESST 2 features for the companies that belong to specific sectors

In [None]:
besst_1 =  [
    'Intermediários Financeiros',
    'Energia Elétrica',
    'Previdência e Seguros',
    'Água e Saneamento',
    'Serviços Financeiros Diversos'
]

besst_2 =  [
    'Mineração',
    'Madeira e Papel',
    'Químicos',
    'Siderurgia e Metalurgia',
    'Petróleo, Gás e Biocombustíveis'
]

In [None]:
sns.displot(df_tickers[np.isin(df_tickers['Setor'], besst_1)]['Lucro_Liquido_12m_category'])
sns.displot(df_tickers[np.isin(df_tickers['Setor'], besst_2)]['Lucro_Liquido_12m_category'])

In [None]:
df_tickers = create_besst_categories(df_tickers, 'Setor', besst_1, besst_2)

Correcting DY values from str (in %) to numeric

In [None]:
df_tickers['Div_Yield'] = df_tickers['Div_Yield'].apply(from_percent_to_numeric)

Selecting just the tickers with the type we want

In [None]:
df_tickers['Tipo'] = df_tickers['Tipo'].str[:2]
df_tickers = df_tickers.loc[np.isin(df_tickers['Tipo'],['ON','PN'])]
df_tickers = df_tickers.loc[np.isin(df_tickers['Papel'].str[-1],['3','4','5','6'])]

Creating completed dataset

In [None]:
info_tickers = ['Papel','Tipo', 'Empresa', 'Setor', 'Subsetor','Data_ult_cot','Lucro_Liquido_12m_category','Valor_de_mercado_category','Patrim_Liq_category','besst_1','besst_2']
               
df_tickers_clean = df_tickers[info_tickers]

df_tickers_clean = df_tickers_clean.dropna()
df_tickers_clean = df_tickers_clean.drop_duplicates()

df_tickers_clean['Data_ult_cot'] = pd.to_datetime(df_tickers_clean['Data_ult_cot'])
df_tickers_clean = df_tickers_clean.reset_index(drop=True)

df_completed = df_tickers_clean.merge(df_daily, how='left', left_on='Papel', right_on='papel').drop_duplicates()

In [None]:
df_completed.dropna(inplace=True)

#### Storing results

In [None]:
client, bucket = initialize_bucket(credentials_path,'storage-barsianize')

path_local = '../data/02_refined/'
filename = 'df_completed_daily.parquet'
blob_name = '03_refined/'

# save the DataFrame as a parquet file
df_completed.to_parquet(path_local + filename)

# upload the parquet file to Google Cloud Storage
blob = bucket.blob(blob_name + filename)
blob._chunk_size = 8388608
blob.upload_from_filename(path_local + filename, num_retries=10, )

### Data Aggregations

In [None]:
client, bucket = initialize_bucket(credentials_path,'storage-barsianize')

path = "gs://storage-barsianize/03_refined/df_completed_daily.parquet"
df_completed =  pd.read_parquet(path)

In [None]:
ticker_col = 'Papel'
date_col = 'date'
tickers = list(df_completed[ticker_col].unique())
agg_dict = {
        'Papel':'first',
        'Tipo':'first',
        'Empresa':'first',
        'Setor':'first',
        'Subsetor':'first',
        'Data_ult_cot':'first',
        'Lucro_Liquido_12m_category':'first',
        'Valor_de_mercado_category':'first',
        'Patrim_Liq_category':'first',
        'besst_1':'first',
        'besst_2':'first',
        'cotacao':['max','min','mean','last'],
        'pl':['max','min','mean','last'],
        'pvp':['max','min','mean','last'],
        'psr':['max','min','mean','last'],
        'dy':['max','min','mean','last','median'],
        'pa':['max','min','mean','last'],
        'pcg':['max','min','mean','last'],
        'pebit':['max','min','mean','last'],
        'pacl':['max','min','mean','last'],
        'evebit':['max','min','mean','last'],
        'mrgebit':['max','min','mean','last'],
        'mrgliq':['max','min','mean','last'],
        'roic':['max','min','mean','last'],
        'roe':['max','min','mean','last'],
        'liqc':['max','min','mean','last'],
        'liq2m':['max','min','mean','last'],
        'patrliq':['max','min','mean','last'],
        'divbpatr':['max','min','mean','last'],
        'c5y':['max','min','mean','last'],
        'date':'last',
        'year':'first'
        }

df_agg_monthly = aggregator_monthly(df_completed, tickers, ticker_col, date_col, agg_dict)

In [None]:
df_agg_monthly.head()

### Get Central Bank Data

In [None]:
# https://www3.bcb.gov.br/sgspub/localizarseries/localizarSeries.do?method=prepararTelaLocalizarSeries

codes = {
    'preco_do_petroleo': 4390,
    'preco_do_minerio_de_ferro': 25521,
    'indice_da_industria': 24369,
    'indice_do_agro': 24368,
    'dolar_comercial': 1,
    'euro': 21619,
    'ibovespa': 23686,
    'pib': 21920,
    'pib_dolarizado': 22786,
    'igpm': 189,
    'ipca': 433,
    'selic': 11
}
start_date = "01/01/2008"
end_date = "28/02/2023"
output_format='json'

df_bcb = get_bcb_data(codes, start_date, end_date, output_format=output_format)

df_bcb['data'] = pd.to_datetime(df_bcb['data'].apply(correct_data_column))

In [None]:
df_bcb = df_bcb.fillna(method='ffill').fillna(method='bfill')

agg_dict = {
        'data':'first',
        # 'preco_do_petroleo':'last',
        'indice_da_industria':'last',
        'dolar_comercial':'last',
        'euro':'last',
        'ibovespa':'last',
        'pib_dolarizado':'last',
        'igpm':'last',
        'ipca':'last',
        'selic':'last',
        }
df_bcb_agg = df_bcb.resample('M', on='data').agg(agg_dict)
df_bcb_agg = df_bcb_agg.reset_index(drop=True)

In [None]:
df_bcb_agg['month'] = df_bcb_agg['data'].dt.month
df_bcb_agg['year'] = df_bcb_agg['data'].dt.year

df_agg_monthly['month'] = df_agg_monthly['date_last'].dt.month

df_monthly_full = df_agg_monthly.merge(df_bcb_agg, how='left', left_on=['month','year'], right_on=['month','year'])

In [None]:
client, bucket = initialize_bucket(credentials_path,'storage-barsianize')

path_local = '../data/02_refined/'
filename = 'df_monthly_full.parquet'
blob_name = '03_refined/'

# save the DataFrame as a parquet file
df_monthly_full.to_parquet(path_local + filename)

# upload the parquet file to Google Cloud Storage
blob = bucket.blob(blob_name + filename)
blob._chunk_size = 8388608
blob.upload_from_filename(path_local + filename, num_retries=10, )

### Create windowed dataset

In [None]:
path = "gs://storage-barsianize/03_refined/df_monthly_full.parquet"
df_monthly_full =  pd.read_parquet(path)

In [None]:
df_monthly_full.head()

In [None]:
agg_dict = {
    'cotacao_max':'max',	
    'cotacao_min':'min',
    'cotacao_mean':['mean','std'],
    'pl_max':'max',
    'pl_min':'min',
    'pl_mean':['mean','std'],
    'pvp_max':'max',
    'pvp_min':'min',
    'pvp_mean':['mean','std'],
    'psr_max':'max',
    'psr_min':'min',
    'psr_mean':['mean','std'],
    'dy_max':'max',
    'dy_min':'min',
    'dy_median':['max','min'],
    'dy_mean':['mean','std'],
    'pa_max':'max',
    'pa_min':'min',
    'pa_mean':['mean','std'],
    'pcg_max':'max',
    'pcg_min':'min',
    'pcg_mean':['mean','std'],
    'pebit_max':'max',
    'pebit_min':'min',
    'pebit_mean':['mean','std'],
    'pacl_max':'max',
    'pacl_min':'min',
    'pacl_mean':['mean','std'],
    'evebit_max':'max',
    'evebit_min':'min',
    'evebit_mean':['mean','std'],
    'mrgebit_max':'max',
    'mrgebit_min':'min',
    'mrgebit_mean':['mean','std'],
    'mrgliq_max':'max',
    'mrgliq_min':'min',
    'mrgliq_mean':['mean','std'],
    'roic_max':'max',
    'roic_min':'min',
    'roic_mean':['mean','std'],
    'roe_max':'max',
    'roe_min':'min',
    'roe_mean':['mean','std'],
    'liqc_max':'max',
    'liqc_min':'min',
    'liqc_mean':['mean','std'],
    'liq2m_max':'max',
    'liq2m_min':'min',
    'liq2m_mean':['mean','std'],
    'patrliq_max':'max',
    'patrliq_min':'min',
    'patrliq_mean':['mean','std'],
    'divbpatr_max':'max',
    'divbpatr_min':'min',
    'divbpatr_mean':['mean','std'],
    'c5y_max':'max',
    'c5y_min':'min',
    'c5y_mean':['mean','std'],
    # 'preco_do_petroleo':['min','max','mean','std'],
    'indice_da_industria':['min','max','mean','std'],
    'dolar_comercial':['min','max','mean','std'],
    'euro':['min','max','mean','std'],
    'ibovespa':['min','max','mean','std'],
    'pib_dolarizado':['min','max','mean','std'],
    'igpm':['min','max','mean','std'],
    'ipca':['min','max','mean','std'],
    'selic':['min','max','mean','std'],
}

agg_dict_ref = {
    'cotacao_max_ref':'max',	
    'cotacao_min_ref':'min',
    'cotacao_mean_ref':['mean','std'],
    'pl_max_ref':'max',
    'pl_min_ref':'min',
    'pl_mean_ref':['mean','std'],
    'pvp_max_ref':'max',
    'pvp_min_ref':'min',
    'pvp_mean_ref':['mean','std'],
    'psr_max_ref':'max',
    'psr_min_ref':'min',
    'psr_mean_ref':['mean','std'],
    'dy_max_ref':'max',
    'dy_min_ref':'min',
    'dy_median_ref':['max','min'],
    'dy_mean_ref':['mean','std'],
    'pa_max_ref':'max',
    'pa_min_ref':'min',
    'pa_mean_ref':['mean','std'],
    'pcg_max_ref':'max',
    'pcg_min_ref':'min',
    'pcg_mean_ref':['mean','std'],
    'pebit_max_ref':'max',
    'pebit_min_ref':'min',
    'pebit_mean_ref':['mean','std'],
    'pacl_max_ref':'max',
    'pacl_min_ref':'min',
    'pacl_mean_ref':['mean','std'],
    'evebit_max_ref':'max',
    'evebit_min_ref':'min',
    'evebit_mean_ref':['mean','std'],
    'mrgebit_max_ref':'max',
    'mrgebit_min_ref':'min',
    'mrgebit_mean_ref':['mean','std'],
    'mrgliq_max_ref':'max',
    'mrgliq_min_ref':'min',
    'mrgliq_mean_ref':['mean','std'],
    'roic_max_ref':'max',
    'roic_min_ref':'min',
    'roic_mean_ref':['mean','std'],
    'roe_max_ref':'max',
    'roe_min_ref':'min',
    'roe_mean_ref':['mean','std'],
    'liqc_max_ref':'max',
    'liqc_min_ref':'min',
    'liqc_mean_ref':['mean','std'],
    'liq2m_max_ref':'max',
    'liq2m_min_ref':'min',
    'liq2m_mean_ref':['mean','std'],
    'patrliq_max_ref':'max',
    'patrliq_min_ref':'min',
    'patrliq_mean_ref':['mean','std'],
    'divbpatr_max_ref':'max',
    'divbpatr_min_ref':'min',
    'divbpatr_mean_ref':['mean','std'],
    'c5y_max_ref':'max',
    'c5y_min_ref':'min',
    'c5y_mean_ref':['mean','std'],
}

columns_to_first = [
    'Papel',
    'Tipo',
    'Empresa',
    'Setor',
    'Subsetor',
    'Data_ult_cot',
    'Lucro_Liquido_12m_category',
    'Valor_de_mercado_category',
    'Patrim_Liq_category',
    'besst_1',
    'besst_2',
]

columns_to_last = [
    'year',
    'date_last',
    'month',
    # 'preco_do_petroleo',
    'indice_da_industria',
    'dolar_comercial',
    'euro',
    'ibovespa',
    'pib_dolarizado',
    'igpm',
    'ipca',
    'selic',
    'cotacao_mean',
    'c5y_mean',
    'pl_mean',
    'pvp_mean',
    'psr_mean',
    'dy_mean',
    'dy_median',
    'pa_mean',
    'pcg_mean',
    'pebit_mean',
    'pacl_mean',
    'evebit_mean',
    'mrgebit_mean',
    'mrgliq_mean',
    'roic_mean',
    'roe_mean',
    'liqc_mean',
    'liq2m_mean',
    'patrliq_mean',
    'divbpatr_mean',
]

sector_col = 'Setor'
cols_to_ref = [
    'cotacao_max',
    'cotacao_min',
    'cotacao_mean',
    'cotacao_last',
    'pl_max',
    'pl_min',
    'pl_mean',
    'pl_last',
    'pvp_max',
    'pvp_min',
    'pvp_mean',
    'pvp_last',
    'psr_max',
    'psr_min',
    'psr_mean',
    'psr_last',
    'dy_max',
    'dy_min',
    'dy_mean',
    'dy_median',
    'dy_last',
    'pa_max',
    'pa_min',
    'pa_mean',
    'pa_last',
    'pcg_max',
    'pcg_min',
    'pcg_mean',
    'pcg_last',
    'pebit_max',
    'pebit_min',
    'pebit_mean',
    'pebit_last',
    'pacl_max',
    'pacl_min',
    'pacl_mean',
    'pacl_last',
    'evebit_max',
    'evebit_min',
    'evebit_mean',
    'evebit_last',
    'mrgebit_max',
    'mrgebit_min',
    'mrgebit_mean',
    'mrgebit_last',
    'mrgliq_max',
    'mrgliq_min',
    'mrgliq_mean',
    'mrgliq_last',
    'roic_max',
    'roic_min',
    'roic_mean',
    'roic_last',
    'roe_max',
    'roe_min',
    'roe_mean',
    'roe_last',
    'liqc_max',
    'liqc_min',
    'liqc_mean',
    'liqc_last',
    'liq2m_max',
    'liq2m_min',
    'liq2m_mean',
    'liq2m_last',
    'patrliq_max',
    'patrliq_min',
    'patrliq_mean',
    'patrliq_last',
    'divbpatr_max',
    'divbpatr_min',
    'divbpatr_mean',
    'divbpatr_last',
    'c5y_max',
    'c5y_min',
    'c5y_mean',
    'c5y_last'
]

In [None]:
df_monthly_full_ref = df_monthly_full.groupby([sector_col,'year','month']).median()[cols_to_ref]
df_monthly_full_ref.columns = [col+'_ref' for col in df_monthly_full_ref.columns]
df_monthly_full_ref = df_monthly_full_ref.reset_index()

In [None]:
df_monthly_full_merge =  df_monthly_full.merge(df_monthly_full_ref, how='left', on=['Setor','year','month'])
ref_cols = [col for col in df_monthly_full_merge.columns if '_ref' in col]
val_cols = [col[:-4] for col in ref_cols]

df_monthly_full_merge[ref_cols] = np.divide(df_monthly_full_merge[val_cols],df_monthly_full_merge[ref_cols])

df_monthly_full_merge.fillna(0, inplace=True)

In [None]:
df_monthly_full_merge.head()

In [None]:
ticker_col = 'Papel'
tickers = df_monthly_full[ticker_col].unique()
agg_dict_ref = agg_dict_ref

df_windowed_full = get_windowed_data(df_monthly_full_merge, agg_dict, tickers, ticker_col,columns_to_first,columns_to_last,agg_dict_ref)

In [None]:
df_windowed_full.dropna(inplace=True)
df_windowed_full.drop_duplicates(inplace=True)

#### Add window spread

In [None]:
max_cols = [col for col in df_windowed_full.columns if '_max' in col]
min_cols = [col for col in df_windowed_full.columns if '_min' in col]
spread_cols = [re.sub('mean_','',re.sub('_min','',re.sub('_max','',col)))+'_spread' for col in max_cols]
print(spread_cols)

df_windowed_full[spread_cols] = np.subtract(df_windowed_full[max_cols],df_windowed_full[min_cols])

spread_relevance_cols = [re.sub('mean_','',re.sub('_min','',re.sub('_max','',col)))+'_spread' for col in max_cols]

In [None]:
last_cols = [col for col in df_windowed_full.columns if '_last' in col]

spread_cols_clean = [re.sub('_spread','',col) for col in spread_cols]
last_cols_clean = [re.sub('_mean','',re.sub('_last','',col)) for col in df_windowed_full.columns if '_last' in col]

spread_cols = np.array(spread_cols)
last_cols = np.array(last_cols)

spread_relevance_cols_spread = sorted(spread_cols[np.isin(spread_cols_clean,last_cols_clean)])
spread_relevance_cols_last   = sorted(last_cols[np.isin(last_cols_clean,spread_cols_clean)])

spread_relevance_cols = [col + '_relevance' for col in spread_relevance_cols_spread]

df_windowed_full[spread_relevance_cols] = np.divide(df_windowed_full[spread_relevance_cols_spread],df_windowed_full[spread_relevance_cols_last]).fillna(0)

In [None]:
df_windowed_full[df_windowed_full['Papel']=='BBAS3'].head()

#### Storing refined data into Google Cloud

In [1]:
client, bucket = initialize_bucket(credentials_path,'storage-barsianize')

path_local = '../data/02_refined/'
filename = 'df_monthly_full_ref.parquet'
blob_name = '03_refined/'

# save the DataFrame as a parquet file
df_monthly_full_ref.to_parquet(path_local + filename)

# upload the parquet file to Google Cloud Storage
blob = bucket.blob(blob_name + filename)
blob._chunk_size = 8388608
blob.upload_from_filename(path_local + filename, num_retries=10, )




path_local = '../data/02_refined/'
filename = 'df_monthly_full_merge.parquet'
blob_name = '03_refined/'

# save the DataFrame as a parquet file
df_monthly_full_merge.to_parquet(path_local + filename)

# upload the parquet file to Google Cloud Storage
blob = bucket.blob(blob_name + filename)
blob._chunk_size = 8388608
blob.upload_from_filename(path_local + filename, num_retries=10, )





path_local = '../data/02_refined/'
filename = 'df_windowed_full.parquet'
blob_name = '03_refined/'

# save the DataFrame as a parquet file
df_windowed_full.to_parquet(path_local + filename)

# upload the parquet file to Google Cloud Storage
blob = bucket.blob(blob_name + filename)
blob._chunk_size = 8388608
blob.upload_from_filename(path_local + filename, num_retries=10, )

NameError: name 'initialize_bucket' is not defined

## End

In [None]:
print('hello world!')