# Setup Inicial

## Bibliotecas utilizadas

In [1]:
import pandas as pd
import json

# getting linked services connection string 
adls_account_name = 'stedlk01dtandev'  
sas_key = r"9999"

## Definições gerais

In [2]:
blob_container_name = 'general' # replace with your container name
blob_relative_path_raw = 'raw/mercado_potencial/ibge/' # replace with your relative folder path
blob_relative_path_enriched = 'enriched/mercado_potencial/ibge/' # replace with your relative folder path
linked_service_raw = 'LS_ADLS_RAW_01' # replace with your linked service name
linked_service_enriched = 'LS_ADLS_ENRICHED_01' # replace with your linked service name


ls_raw = mssparkutils.credentials.getPropertiesAll(linked_service_raw)
ls_enriched = mssparkutils.credentials.getPropertiesAll(linked_service_enriched)

converter_dic_raw = json.loads(ls_raw)
converter_dic_enriched = json.loads(ls_enriched)

#coletando o endpoint
end_point_raw = (converter_dic_raw['Endpoint'].split("/"))[2]
end_point_enriched = (converter_dic_enriched['Endpoint'].split("/"))[2]

#Utilizado na leitura via metodo mssparkutils
abfss_path_raw = 'abfss://%s@%s/%s' % (blob_container_name, end_point_raw, blob_relative_path_raw)
abfss_path_enriched = 'abfss://%s@%s/%s' % (blob_container_name, end_point_enriched, blob_relative_path_enriched)

#Utilizados na leitura e gravação dos arquivos via metodo (read_csv ou to_parquet)
#storage_options = {'linked_service':ls_enriched}
#dir_raw  = 'abfss://general/raw/mercado_potencial/ibge/'
#dir_enriched = 'abfss://general/enriched/mercado_potencial/ibge/'



## Funções de utilizadas

In [3]:
def ajuste_IBGE (data):
    """Ajusta o .json da API do IBGE.

        Ajusta o .json captrado direntamente da API do IBGE com nas 
        diversas estruturas e dníveis de dados. 
        
    Parâmetros
    ----------
    data : .json 
        Arquivo em formato .json a ser processado.
    
    Retorno
    -------
    pdResult : pd.DataFrame
        Dataframe de fornato 'long table' com os dados distribuidos em
        tabela de largura fixa, com 6(seis) variáveis.

        'data': datatime
            Período em mês, trimestre ou ano do dado  
        'valor': object
            Valor da informação - algumas series vem strings '...'
            deve-se tratar fora, assim como transformar .astype(float)
        'categoria_1': str
            Nível 1 da categoria dos dados
        'categoria_2': str
            Nível 2 da categoria dos dados
        'variavel': str
            Nome da variável
        'local': str
            Agregração geográfica da Variável
    
    Notes
    -----
    API do IBGE: https://servicodados.ibge.gov.br/api/docs/agregados?versao=3#api-bq

    """
    pdResult = pd.DataFrame()
    for i in range(len(data)): # se tiver mais de uma variável requerida
        result = data[i]['resultados']
        for j in range(len(result)):
            aux = pd.json_normalize(result[j]['series'][0]['serie'])
            local = result[j]['series'][0]['localidade']['nome']
            if len(result[j]['classificacoes']) > 0: # existe mais de uma estrutura - categoria e a série de valores
                nome_cat1 = pd.json_normalize(result[j]['classificacoes'][0]['categoria']).iat[0, 0] # nome da categoria 1
                nome_cat2 = '-'
                if len(result[j]['classificacoes']) > 1:
                    nome_cat2 = pd.json_normalize(result[j]['classificacoes'][1]['categoria']).iat[0, 0] # nome da categoria 2
            else: # se =1 não existe categoria somente a serie de valores
                nome_cat1 = '-'
                nome_cat2 = '-'

            pd_aux = pd.DataFrame({'data': aux.T.index,
                                   'valor': aux.to_numpy()[0], # algumas series vem strings '...' deve tratar fora, assim como transformar .astype(float)
                                   'categoria_1':nome_cat1,
                                   'categoria_2':nome_cat2,
                                   'variavel':data[i]['variavel'],
                                   'local':local})
            pdResult = pd.concat([pdResult, pd_aux], ignore_index=True)

    return(pdResult)

In [4]:
def wide_table_IBGE(df):
    """Transforma dados do IBGE de long para wide table  .

        Transforma as tabelas long table do IBGE em tabelas de formato
        wide.

    Parâmetros
    ----------
    df : pd.DataFrame 
        DataFrame a ser processado .
    
    Retorno
    -------
    df_out: pd.DataFrame
        Datafrme dos dados transformados
    """
    df_out = df.copy()
 
    # Transformando o RowData em ColData usando o groupby Todas as tabelas do IBGE
    # tem a mesma estrutura 
    df_out = df_out.groupby(['data',
                       'categoria_1',
                       'categoria_2',
                       'variavel'])['valor'].sum()

    # Ajustando os dados com o unstack os levels são os níveis em ordem de ajuste
    df_out = df_out.unstack(level=[3,2,1])

    # ajuste do nome das colunas | flat_index conjugas os niveis do indice
    # outras funções que poderiam ser utilizadas .droplevel() .get_level_values()
    df_out.columns = df_out.columns.to_flat_index()

    return(df_out)

In [5]:
def serie_temporal_IBGE(df, nm_cols):
    """Adequa os dados coletados em serie de tempo.

        Transforma as tabelas em tabelas de series temporal,
        renomeando as colunas originais com nomenclatura mais acessivel.
   
    Parâmetros
    ----------
    df : pd.DataFrame 
        DataFrame com dados em formato long table a ser processado.
    nm_cols: list of strings
        Lista com os nomes das colunas a serem renomeadas.
    
    Retorno
    -------
    df_out: pd.DataFrame
        Datafrme dos dados ajustados, coluna com a informação de data 
        ajustada para o formato 'Ano mes' setada como índice e removendo
        '...' dado não existente por pd.NaN
    """
    df_out = wide_table_IBGE(df)
    df_out.index = pd.to_datetime(df_out.index, format='%Y%m')
    df_out = pd.DataFrame(df_out) #precisa do pd.DataFrame pq se tiver só uma coluna retorna uma série
    df_out.columns = nm_cols
    df_out = df_out[df_out != '...'] # substitui os valores não existentes por NaN
    df_out = df_out.astype(float)

    return(df_out)

# Execução

## Leitura e ajuste dos dados na camada row

In [11]:
#lista todos os arquivos no diretório
files = [i.path for i in mssparkutils.fs.ls(abfss_path_raw)]

# ler e trata dos dados do IBGE
ipca = ajuste_IBGE(pd.read_json(files[0], lines = True, encoding='utf-8', storage_options = {'linked_service' : linked_service_raw})[0]) # MENSAL
pib  = ajuste_IBGE(pd.read_json(files[1], lines = True, encoding='utf-8', storage_options = {'linked_service' : linked_service_raw})[0]) # TRIMESTRAL
pms  = ajuste_IBGE(pd.read_json(files[2], lines = True, encoding='utf-8', storage_options = {'linked_service' : linked_service_raw})[0]) # MENSAL
pnad = ajuste_IBGE(pd.read_json(files[3], lines = True, encoding='utf-8', storage_options = {'linked_service' : linked_service_raw})[0]) # MENSAL

## Ajustes iniciais

In [13]:
#In[9]: AJUSTE DOS FORMATOS DAS DATAS DO IBGE

#ajuste da data do PIB Trimestral
pib['data'] = pib['data'].str[:-2] + \
              pib['data'].str[-2:].replace({'01':'03',
                                            '02':'06',
                                            '03':'09',
                                            '04':'12'}) 

#ajuste das das datas de periodo mensal
x = [pnad, ipca, pms, pib]
for i in x:
    i['data'] = pd.to_datetime(i['data'], format='%Y%m')
del(x)


## Ajustes finais

In [14]:
x1 = serie_temporal_IBGE(pms, ['PMS_ReceitaNomSer_CG', 'PMS_ReceitaNomSer_TP', 'PMS_VolumeSer_CG','PMS_VolumeSer_TP'])
x2 = serie_temporal_IBGE(pnad, ['PNAD_Desocupacao'])
x3 = serie_temporal_IBGE(ipca, ['IPCA_VarMensal'])
x4 = serie_temporal_IBGE(pib, ['PIB_Exportacao','PIB_CaptalFixo','PIB_Importacao','PIB_PrecoMercado'])

### Upsampling dos dados trimestrais em dados mensais do PIB

In [15]:
# Foi utilizada a função linear por ser mais fidedifno ao comportamento das curvas
# resample Linear para o Inicio do Mês dos dados que eram Trimestrais

x4l = x4.resample('MS').interpolate(method='linear') # MS - Month Start

# codigo exemplo comparando varios tipos de interpolação - 
# Escolhido o LINEAR por manter fidedignidade o movimento original
#xl = x['PIB_Exportacao'].resample('MS').interpolate(method='linear')
#xc = x['PIB_Exportacao'].resample('MS').interpolate(method='cubic')
#xs = x['PIB_Exportacao'].resample('MS').interpolate(method='spline', order=3)
#pd.concat([xl, xc, xs], axis=1).loc['2021':'2022'].plot(color=['blue','red','green'])

### Agrupando dados do IBGE

In [16]:
# Agrupando dados do IBGE a serem utilizados no estudo
# Dados já em formato float e com datatime unificados
pd_mesIBGE = pd.concat([x1, x2, x3,x4l], axis=1)

### Definição do período inicial das séries

In [17]:
# definindo o periodo inicial da série
pd_mesIBGE = pd_mesIBGE['2012':]

## Salvando dados mensais IBGE na camada eriched

In [18]:
# SALVA TABELA EM FORMATO .parquet
pd_mesIBGE.to_parquet(abfss_path_enriched + 'IBGE_mes.parquet', 
                     storage_options = {'linked_service':linked_service_enriched})


# Acesso à documentação

In [19]:
help(ajuste_IBGE)

In [20]:
help(wide_table_IBGE)

In [21]:
help(serie_temporal_IBGE)