In [15]:
#bibliotecas

import requests
import zipfile
import io
import pandas as pd
import numpy as np
from difflib import get_close_matches
import unidecode


In [16]:
# variaveis

start_y = 2011
final_y = 2022
codecvm = '19348'
#arquivo com data da publicação de relatórios
event_dates_file =  'c:\\Users\\thgcn\\OneDrive\\Academico\\Financial-Reports-Impact\\data\\itr_date_itau.csv'
#arquivo com preços históricos
itau_file = 'c:\\Users\\thgcn\\OneDrive\\Academico\\Financial-Reports-Impact\\data\\\historical_data\\ITUB4.SA.csv'
#arquivo final
output_file = 'c:\\Users\\thgcn\\OneDrive\\Academico\\Financial-Reports-Impact\\data\\final_data.csv'


In [45]:
# funções (contendo visao de janelas)

def busca_date_pub(ano, codecvm):
    """
    Busca dados de publicação no site da CVM para o ano e código CVM especificados.

    Args:
        ano (int): O ano dos dados.
        codecvm (str): O código CVM da empresa.

    Returns:
        list: Lista de tuplas contendo os campos Assunto e Data_Entrega para o código CVM especificado.

    Example:
        >>> busca_date_pub(2023, '019348')
        [('Informações consolidadas dos exercícios', '2023-05-15'), ...]
    """
    url = 'https://dados.cvm.gov.br/dados/CIA_ABERTA/DOC/IPE/DADOS/ipe_cia_aberta_%d.zip' % ano
    r = requests.get(url)
    zf = zipfile.ZipFile(io.BytesIO(r.content))
    file = zf.namelist()
    zf = zf.open(file[0])
    lines = zf.readlines()
    lines = [i.strip().decode('ISO-8859-1') for i in lines]
    lines = [i.split(';') for i in lines]

    # Header mapping to find indexes of relevant columns
    header = lines[0]
    idx_codigo_cvm = header.index('Codigo_CVM')
    idx_assunto = header.index('Assunto')
    idx_data_entrega = header.index('Data_Entrega')

    # Strings de busca normalizadas para evitar problemas com acentos e maiúsculas/minúsculas
    busca1 = unidecode.unidecode("Informações consolidadas dos exercícios").lower()
    busca2 = unidecode.unidecode("Informações sobre o resultado").lower()

    result = []
    for line in lines[1:]:
        if line[idx_codigo_cvm] == codecvm:
            assunto_normalizado = unidecode.unidecode(line[idx_assunto]).lower()
            if busca1 in assunto_normalizado or busca2 in assunto_normalizado:
                result.append((line[idx_codigo_cvm], line[idx_assunto], line[idx_data_entrega]))

    # Convertendo o resultado em um DataFrame
    df_result = pd.DataFrame(result, columns=['Codigo_CVM', 'Assunto', 'Data_Entrega'])

    return df_result

def save_cvm_dates_combined(start_y, final_y, codecvm, event_dates_file):
    combined_data = pd.DataFrame()  # DataFrame vazio para acumular os resultados
    
    # Loop para chamar a função para os anos de start_y a final_y
    for year in range(start_y, final_y + 1):
        # Obter os dados para o ano especificado
        df = busca_date_pub(year, codecvm)
        
        # Adicionar os dados ao DataFrame acumulado
        combined_data = pd.concat([combined_data, df], ignore_index=True)
    
    # Salvar os dados acumulados como um único arquivo CSV
    combined_data.to_csv(event_dates_file, index=False)
    print(f'Dados combinados (Codigo_CVM, Assunto e Data_Entrega) salvos em {event_dates_file}')    

def process_itau_data(itau_file, event_dates_file, codecvm):
    # Carregar dados do arquivo ITUB4.SA.csv
    itau_data = pd.read_csv(itau_file)
    itau_data['Date'] = pd.to_datetime(itau_data['Date'])  # Converter a coluna Date para o tipo datetime

    # Carregar datas de eventos do arquivo itr_date_itau.csv
    event_dates = pd.read_csv(event_dates_file)
    event_dates['Data_Entrega'] = pd.to_datetime(event_dates['Data_Entrega'])  # Converter a coluna Data_Entrega para o tipo datetime

    # Definir as datas de eventos como um conjunto para busca eficiente
    event_set = set(event_dates['Data_Entrega'])

    # Determinar se cada data é um evento (1) ou não (0)
    itau_data['event'] = itau_data['Date'].isin(event_set).astype(int)

    # Receber o codcvm via parâmetro de entrada da função
    itau_data['CD_CVM'] = codecvm
    # Calcular o retorno diário em escala logarítmica
    itau_data['Return'] = np.log(itau_data['Close'] / itau_data['Close'].shift(1))

    # Calcular o retorno semanal em escala logarítmica
    #itau_data['week_return'] = np.log(itau_data['Close'] / itau_data['Close'].shift(5))

    # Calcular o retorno mensal em escala logarítmica
    #itau_data['month_return'] = np.log(itau_data['Close'] / itau_data['Close'].shift(22))
       
    # Selecionar as colunas relevantes para o DataFrame final
    final_data = itau_data[['CD_CVM', 'Date', 'Return', 'event']]
    
    return final_data

def analyze_data_quality(final_data):
    # Identificar quantos "COD_CVM" distintos existem na base e exibir os valores
    unique_cod_cvm = final_data['CD_CVM'].unique()
    num_unique_cod_cvm = len(unique_cod_cvm)
    print(f"Existem {num_unique_cod_cvm} COD_CVM distintos na base:")
    print(unique_cod_cvm)
    
    # Identificar quantas datas iguais existem em "Date" e exibir os valores
    final_data['Date'] = pd.to_datetime(final_data['Date'], errors='coerce')
    date_counts = final_data['Date'].value_counts()
    duplicate_dates = date_counts[date_counts > 1]
    print(f"\nExistem {len(duplicate_dates)} datas repetidas na base:")
    print(duplicate_dates)
    
    # Contar quantos eventos iguais a 0 e 1 existem na base
    event_0_count = (final_data['event'] == 0).sum()
    event_1_count = (final_data['event'] == 1).sum()
    print(f"\nContagem de eventos:")
    print(f"Eventos iguais a 0: {event_0_count}")
    print(f"Eventos iguais a 1: {event_1_count}")
    
    # Identificar quais colunas contêm valores NaN e quantas vezes
    nan_counts = final_data.isna().sum()
    columns_with_nan = nan_counts[nan_counts > 0]
    print(f"\nColunas com valores NaN e a quantidade de NaNs:")
    print(columns_with_nan)
    
    # Contar eventos (event=1) em uma janela de um ano e exibir as datas
    final_data['Year'] = final_data['Date'].dt.year
    event_dates_by_year = final_data[final_data['event'] == 1].groupby('Year')['Date'].apply(list)
    
    print("\nPublicações anuais (event=1):")
    for year, dates in event_dates_by_year.items():
        print(f"{year}: {len(dates)} publicações")
        for date in dates:
            print(date.strftime('%Y-%m-%d'))
        print("------------")
        
        
def add_and_reorder(df, cd_cvm, date, ret, event, year):
    """
    Adiciona uma nova linha ao DataFrame e reordena por data em ordem decrescente.

    Args:
        df (pd.DataFrame): DataFrame original.
        cd_cvm (str): Valor para a coluna 'CD_CVM'.
        date (str): Valor para a coluna 'Date' no formato 'YYYY-MM-DD'.
        ret (str): Valor para a coluna 'Return'.
        event (int): Valor para a coluna 'event'.
        year (int): Valor para a coluna 'Year'.

    Returns:
        pd.DataFrame: DataFrame atualizado com a nova linha e reordenado por data.
    """
    # Criar uma nova linha como um DataFrame
    new_row = pd.DataFrame({
        'CD_CVM': [cd_cvm],
        'Date': [pd.to_datetime(date)],
        'Return': [float(ret) if ret else None],
        'event': [int(event)],
        'Year': [int(year)]
    })

    # Adicionar a nova linha ao DataFrame original
    updated_df = pd.concat([df, new_row], ignore_index=True)
    
    # Reordenar o DataFrame por data em ordem decrescente
    updated_df = updated_df.sort_values(by='Date', ascending=False).reset_index(drop=True)
    
    return updated_df

def set_event(df, date):
    """
    Altera o valor da coluna 'event' para 1 para uma data específica.

    Args:
        df (pd.DataFrame): DataFrame original.
        date (str): Data no formato 'YYYY-MM-DD' para a qual o valor de 'event' deve ser alterado.

    Returns:
        pd.DataFrame: DataFrame atualizado com o valor de 'event' alterado para 1 na data especificada.
    """
    # Converter a coluna 'Date' para o tipo datetime, caso ainda não esteja
    df['Date'] = pd.to_datetime(df['Date'])
    
    # Alterar o valor da coluna 'event' para 1 na data especificada
    df.loc[df['Date'] == pd.to_datetime(date), 'event'] = 1
    
    return df


In [27]:
#Desconsiderar (NÃO EXECUTE ESTA CELULA)
# funções

def busca_date_pub(ano, codecvm):
    """
    Busca dados de publicação no site da CVM para o ano e código CVM especificados.

    Args:
        ano (int): O ano dos dados.
        codecvm (str): O código CVM da empresa.

    Returns:
        list: Lista de tuplas contendo os campos Assunto e Data_Entrega para o código CVM especificado.

    Example:
        >>> busca_date_pub(2023, '019348')
        [('Informações consolidadas dos exercícios', '2023-05-15'), ...]
    """
    url = 'https://dados.cvm.gov.br/dados/CIA_ABERTA/DOC/IPE/DADOS/ipe_cia_aberta_%d.zip' % ano
    r = requests.get(url)
    zf = zipfile.ZipFile(io.BytesIO(r.content))
    file = zf.namelist()
    zf = zf.open(file[0])
    lines = zf.readlines()
    lines = [i.strip().decode('ISO-8859-1') for i in lines]
    lines = [i.split(';') for i in lines]

    # Header mapping to find indexes of relevant columns
    header = lines[0]
    idx_codigo_cvm = header.index('Codigo_CVM')
    idx_assunto = header.index('Assunto')
    idx_data_entrega = header.index('Data_Entrega')

    # Strings de busca normalizadas para evitar problemas com acentos e maiúsculas/minúsculas
    busca1 = unidecode.unidecode("Informações consolidadas dos exercícios").lower()
    busca2 = unidecode.unidecode("Informações sobre o resultado").lower()

    result = []
    for line in lines[1:]:
        if line[idx_codigo_cvm] == codecvm:
            assunto_normalizado = unidecode.unidecode(line[idx_assunto]).lower()
            if busca1 in assunto_normalizado or busca2 in assunto_normalizado:
                result.append((line[idx_codigo_cvm], line[idx_assunto], line[idx_data_entrega]))

    # Convertendo o resultado em um DataFrame
    df_result = pd.DataFrame(result, columns=['Codigo_CVM', 'Assunto', 'Data_Entrega'])

    return df_result

def save_cvm_dates_combined(start_y, final_y, codecvm, event_dates_file):
    combined_data = pd.DataFrame()  # DataFrame vazio para acumular os resultados
    
    # Loop para chamar a função para os anos de start_y a final_y
    for year in range(start_y, final_y + 1):
        # Obter os dados para o ano especificado
        df = busca_date_pub(year, codecvm)
        
        # Adicionar os dados ao DataFrame acumulado
        combined_data = pd.concat([combined_data, df], ignore_index=True)
    
    # Salvar os dados acumulados como um único arquivo CSV
    combined_data.to_csv(event_dates_file, index=False)
    print(f'Dados combinados (Codigo_CVM, Assunto e Data_Entrega) salvos em {event_dates_file}')    

def process_itau_data(itau_file, event_dates_file, codecvm):
    # Carregar dados do arquivo ITUB4.SA.csv
    itau_data = pd.read_csv(itau_file)
    itau_data['Date'] = pd.to_datetime(itau_data['Date'])  # Converter a coluna Date para o tipo datetime

    # Calcular o retorno diário em escala logarítmica
    itau_data['Return'] = np.log(itau_data['Close'] / itau_data['Close'].shift(1))

    # Carregar datas de eventos do arquivo itr_date_itau.csv
    event_dates = pd.read_csv(event_dates_file)
    event_dates['Data_Entrega'] = pd.to_datetime(event_dates['Data_Entrega'])  # Converter a coluna Data_Entrega para o tipo datetime

    # Definir as datas de eventos como um conjunto para busca eficiente
    event_set = set(event_dates['Data_Entrega'])

    # Determinar se cada data é um evento (1) ou não (0)
    itau_data['event'] = itau_data['Date'].isin(event_set).astype(int)

    # Receber o codcvm via parâmetro de entrada da função
    itau_data['CD_CVM'] = codecvm

    # Selecionar as colunas relevantes para o DataFrame final
    final_data = itau_data[['CD_CVM', 'Date', 'Return', 'event']]
    
    return final_data

def analyze_data_quality(final_data):
    # Identificar quantos "COD_CVM" distintos existem na base e exibir os valores
    unique_cod_cvm = final_data['CD_CVM'].unique()
    num_unique_cod_cvm = len(unique_cod_cvm)
    print(f"Existem {num_unique_cod_cvm} COD_CVM distintos na base:")
    print(unique_cod_cvm)
    
    # Identificar quantas datas iguais existem em "Date" e exibir os valores
    date_counts = final_data['Date'].value_counts()
    duplicate_dates = date_counts[date_counts > 1]
    print(f"\nExistem {len(duplicate_dates)} datas repetidas na base:")
    print(duplicate_dates)
    
    # Contar quantos eventos iguais a 0 e 1 existem na base
    event_0_count = (final_data['event'] == 0).sum()
    event_1_count = (final_data['event'] == 1).sum()
    print(f"\nContagem de eventos:")
    print(f"Eventos iguais a 0: {event_0_count}")
    print(f"Eventos iguais a 1: {event_1_count}")
    
    # Identificar quais colunas contêm valores NaN e quantas vezes
    nan_counts = final_data.isna().sum()
    columns_with_nan = nan_counts[nan_counts > 0]
    print(f"\nColunas com valores NaN e a quantidade de NaNs:")
    print(columns_with_nan)

In [46]:
#chamada da função
save_cvm_dates_combined(start_y, final_y, codecvm, event_dates_file)
final_data = process_itau_data(itau_file, event_dates_file, codecvm)
# Correção para itau
set_event(final_data, "2018-04-30")
set_event(final_data, "2021-11-04")
set_event(final_data, "2022-11-11")
#qualidade
analyze_data_quality(final_data)

Dados combinados (Codigo_CVM, Assunto e Data_Entrega) salvos em c:\Users\thgcn\OneDrive\Academico\Financial-Reports-Impact\data\itr_date_itau.csv
Existem 1 COD_CVM distintos na base:
['19348']

Existem 0 datas repetidas na base:
Series([], Name: Date, dtype: int64)

Contagem de eventos:
Eventos iguais a 0: 3177
Eventos iguais a 1: 48

Colunas com valores NaN e a quantidade de NaNs:
Return    1
dtype: int64

Publicações anuais (event=1):
2011: 4 publicações
2011-02-22
2011-05-03
2011-08-02
2011-11-01
------------
2012: 4 publicações
2012-02-07
2012-04-24
2012-07-24
2012-10-23
------------
2013: 4 publicações
2013-02-05
2013-04-30
2013-07-30
2013-10-29
------------
2014: 4 publicações
2014-02-04
2014-04-29
2014-08-05
2014-11-04
------------
2015: 4 publicações
2015-02-03
2015-05-05
2015-08-04
2015-11-03
------------
2016: 4 publicações
2016-02-02
2016-05-03
2016-08-02
2016-10-31
------------
2017: 4 publicações
2017-02-07
2017-05-03
2017-07-31
2017-10-30
------------
2018: 4 publicações


In [53]:
import pandas as pd
import numpy as np

def return_week(final_data):
    # Garantir que a coluna 'Date' seja do tipo datetime
    final_data['Date'] = pd.to_datetime(final_data['Date'])

    # Filtrar os eventos positivos
    event_data = final_data[final_data['event'] == 1].copy()

    # Inicializar a coluna 'Return_week' com NaN
    final_data['Return_week'] = np.nan

    # Iterar sobre os índices dos eventos positivos
    for idx in event_data.index:
        if idx + 5 < len(final_data):
            # Calcular o retorno semanal
            final_data.loc[idx, 'Return_week'] = np.log(final_data.loc[idx + 5, 'Close'] / final_data.loc[idx, 'Close'])

    # Selecionar as colunas relevantes para o DataFrame final
    final_data = final_data[['CD_CVM', 'Date', 'Return_week', 'event']]
    
    return final_data

# Exemplo de uso com a coluna 'Close'
# final_data = pd.read_csv('path_to_final_data.csv')  # Carregar os dados



In [54]:
week_data = return_week(final_data)

KeyError: 'Close'

In [10]:
#Salvando arquivo final na pasta data
final_data.to_csv(output_file, index=False)