# 1. Requirements

In [1]:
%pip install Unidecode

Note: you may need to restart the kernel to use updated packages.


In [2]:
import pandas as pd
import os
import requests
import unidecode
import re

# 2. Retrieving Data

In [3]:
bancos_df = pd.read_csv(os.path.join(os.getcwd(), "layers/raw", "bancos.csv"), sep=";")

In [4]:
cnpj_list = bancos_df['CNPJ IF'].replace(' ', None).dropna().drop_duplicates().tolist()

In [8]:
from multiprocessing import Process, Manager
import time

start_time = time.perf_counter()

def get_lista_tarifa(cnpj, raw_data_list, endpoint):
    result = requests.get(endpoint)
    try:
        if result.status_code == 200:
            json_result = result.json()
            json_result['cnpj'] = cnpj
            raw_data_list.append(json_result)
            time.sleep(1)
        else:
            raise Exception
    except Exception:
        raise Exception


url_1 = "https://olinda.bcb.gov.br/olinda/servico/Informes_ListaTarifasPorInstituicaoFinanceira/versao/v1/odata/ListaTarifasPorInstituicaoFinanceira(PessoaFisicaOuJuridica=@PessoaFisicaOuJuridica,CNPJ=@CNPJ)?@PessoaFisicaOuJuridica='J'&@CNPJ='"
url_2 = "'&$top=100&$format=json&$select=CodigoServico,Servico,Unidade,DataVigencia,ValorMaximo,TipoValor,Periodicidade"
manager = Manager()
raw_data = manager.list()
processes = []
for cnpj in cnpj_list:
    endpoint = f"{url_1}{cnpj}{url_2}"
    t = Process(target=get_lista_tarifa, args=(cnpj, raw_data, endpoint))
    t.start()
    processes.append(t)
for process in processes:
    process.join()
finish_time = time.perf_counter()
print(len(raw_data))
print(f"Program finished in {finish_time-start_time} seconds")

113
Program finished in 8.876075234031305 seconds


In [9]:
lista_tarifas_df = pd.json_normalize(list(raw_data), 'value', ['@odata.context','cnpj'])

# 3. Transforming Data

In [10]:
bancos_df.dtypes

index                                                int64
Ano                                                  int64
Trimestre                                           object
Categoria                                           object
Tipo                                                object
CNPJ IF                                             object
Instituição financeira                              object
Índice                                              object
Quantidade de reclamações reguladas procedentes      int64
Quantidade de reclamações reguladas - outras         int64
Quantidade de reclamações não reguladas              int64
Quantidade total de reclamações                      int64
Quantidade total de clientes  CCS e SCR            object
Quantidade de clientes  CCS                        object
Quantidade de clientes  SCR                        object
Unnamed: 14                                        float64
dtype: object

In [11]:
#bancos_df
# Normalizing column names
bancos_df = pd.read_csv(os.path.join(os.getcwd(), "layers/raw", "bancos.csv"), sep=";")
bancos_df.drop("Unnamed: 14", axis=1, inplace=True)
bancos_df.drop("index", axis=1, inplace=True)

col_list = bancos_df.columns.tolist()
new_cols = {}
for col in col_list:
    new_cols[col] = unidecode.unidecode(col.lower().replace("\x96","").replace('-', '').replace('  ','_').replace(' ', "_")).lower()
bancos_df.rename(columns=new_cols,inplace=True)

def normalize_indice(value):
    if value == ' ':
        return None
    else:
        return float(value.replace('.','').replace(',','.'))

def normalize_to_int(value):
    if value == ' ':
        return 0
    else:
        return int(value)

bancos_df['indice'] = bancos_df.indice.apply(normalize_indice)
bancos_df['quantidade_total_de_clientes_ccs_e_scr'] =  bancos_df.quantidade_total_de_clientes_ccs_e_scr.apply(normalize_to_int)
bancos_df['quantidade_de_clientes_ccs'] =  bancos_df.quantidade_de_clientes_ccs.apply(normalize_to_int)
bancos_df['quantidade_de_clientes_scr'] =  bancos_df.quantidade_de_clientes_scr.apply(normalize_to_int)
bancos_df['categoria'] = bancos_df.categoria.apply(lambda x: x.lower())
bancos_df['tipo'] = bancos_df.tipo.apply(lambda x: x.lower())
bancos_df['instituicao_financeira'] = bancos_df.instituicao_financeira.apply(lambda x: x.lower())

In [12]:
bancos_df.dtypes

ano                                                  int64
trimestre                                           object
categoria                                           object
tipo                                                object
cnpj_if                                             object
instituicao_financeira                              object
indice                                             float64
quantidade_de_reclamacoes_reguladas_procedentes      int64
quantidade_de_reclamacoes_reguladas_outras           int64
quantidade_de_reclamacoes_nao_reguladas              int64
quantidade_total_de_reclamacoes                      int64
quantidade_total_de_clientes_ccs_e_scr               int64
quantidade_de_clientes_ccs                           int64
quantidade_de_clientes_scr                           int64
dtype: object

In [13]:
lista_tarifas_df.dtypes

CodigoServico      object
Servico            object
Unidade            object
DataVigencia       object
ValorMaximo       float64
TipoValor          object
Periodicidade      object
@odata.context     object
cnpj               object
dtype: object

In [14]:
#lista_tarifas_df
# Normalizing column names
def camel_case_split(identifier):
    matches = re.finditer('.+?(?:(?<=[a-z])(?=[A-Z])|(?<=[A-Z])(?=[A-Z][a-z])|$)', identifier)
    return '_'.join([m.group(0) for m in matches]).lower()

lista_tarifas_df = pd.json_normalize(list(raw_data), 'value', ['@odata.context','cnpj'])
lista_tarifas_df.drop(columns=["@odata.context"], inplace=True)
col_list = lista_tarifas_df.columns.tolist()

new_cols = {}
for col in col_list:
    new_cols[col] = unidecode.unidecode(camel_case_split(col))
lista_tarifas_df.rename(columns=new_cols,inplace=True)
lista_tarifas_df['servico'] = lista_tarifas_df.servico.apply(lambda x: x.lower())
lista_tarifas_df['unidade'] = lista_tarifas_df.unidade.apply(lambda x: unidecode.unidecode(x.lower()))
lista_tarifas_df['data_vigencia'] = pd.to_datetime(lista_tarifas_df.data_vigencia)
lista_tarifas_df['tipo_valor'] = lista_tarifas_df.tipo_valor.apply(lambda x: x.lower())
lista_tarifas_df['periodicidade'] = lista_tarifas_df.periodicidade.apply(lambda x: x.lower())

In [15]:
lista_tarifas_df.dtypes

codigo_servico            object
servico                   object
unidade                   object
data_vigencia     datetime64[ns]
valor_maximo             float64
tipo_valor                object
periodicidade             object
cnpj                      object
dtype: object

# 4. Uploading Data

In [31]:
import logging
from sqlalchemy import create_engine

class DatabaseManager:

    def __init__(self, database="analytics", user="admin", passwd="admin", host="pg_analytics"):
        conn_string = f'postgresql://{user}:{passwd}@{host}/{database}'
        self.__alchemy_conn = create_engine(conn_string).connect()
        self.__alchemy_conn.autocommit = True

    def create_table_with_pandas_df(self, df, table_name, schema_name='public'):
        logging.info(f"Creating table: {table_name}")
        df.to_sql(table_name, con=self.__alchemy_conn, schema=schema_name, if_exists='replace', index=True)

    def __execute_query(self, query):
        return self.__alchemy_conn.execute(query)
    
    def create_schema(self, schema_name):
        self.__execute_query(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")
        
    def create_table_as_query(self, table_name, query, schema_name='public', drop_on_create=False):
        if drop_on_create:
            self.drop_table(table_name=table_name, schema_name=schema_name)
        self.__execute_query(f"CREATE TABLE IF NOT EXISTS {schema_name}.{table_name} AS {query}")
    
    def drop_table(self, table_name, schema_name='public'):
        self.__execute_query(f"DROP TABLE IF EXISTS {schema_name}.{table_name}")    

In [32]:
db_manager = DatabaseManager()
db_manager.create_schema("trusted")

In [18]:
db_manager.create_table_with_pandas_df(bancos_df, 'bancos', 'trusted')
db_manager.create_table_with_pandas_df(lista_tarifas_df, 'lista_tarifas', 'trusted')

# Generating Star Schema

In [20]:
db_manager.create_schema('star_schema')

db_manager.create_table_as_query(
    schema_name="star_schema",
    table_name="dim_categoria",
    drop_on_create=True,
    query="SELECT DISTINCT md5(categoria) as categoria_id, categoria  FROM trusted.bancos;"
)

db_manager.create_table_as_query(
    schema_name="star_schema",
    table_name="dim_tipo",
    drop_on_create=True,
    query="SELECT DISTINCT md5(tipo) as tipo_id, tipo  FROM trusted.bancos;"
)

db_manager.create_table_as_query(
    schema_name="star_schema",
    table_name="dim_instituicao_financeira",
    drop_on_create=True,
    query="SELECT DISTINCT md5(CONCAT(cnpj_if, instituicao_financeira)) as instituicao_financeira_id, cnpj_if, instituicao_financeira, indice  FROM trusted.bancos;"
)

db_manager.create_table_as_query(
    schema_name="star_schema",
    table_name="dim_data",
    drop_on_create=True,
    query="SELECT DISTINCT md5(CONCAT(ano, trimestre)) as data_id, ano, trimestre FROM trusted.bancos;"
)


db_manager.create_table_as_query(
    schema_name="star_schema",
    table_name="fact_bancos_tarifas",
    drop_on_create=True,
    query="""
    SELECT
        bar.index,
        md5(CONCAT(bar.ano, bar.trimestre)) as data_id,
        md5(CONCAT(bar.cnpj_if, bar.instituicao_financeira)) as instituicao_financeira_id,
        md5(bar.categoria) as categoria,
        md5(bar.tipo) as tipo,
        bar.quantidade_de_reclamacoes_reguladas_procedentes as quantidade_de_reclamacoes_reguladas_procedentes,
        bar.quantidade_de_reclamacoes_reguladas_outras as qtde_reclamacoes_reguladas_outras,
        bar.quantidade_de_reclamacoes_nao_reguladas as qtde_de_reclamacoes_nao_reguladas,
        bar.quantidade_total_de_reclamacoes as  qtde_total_reclamacoes,
        bar.quantidade_total_de_clientes_ccs_e_scr as qtde_total_clientes_spa_ccs_e_scr,
        bar.quantidade_de_clientes_ccs as qtde_clientes_spa_ccs,
        bar.quantidade_de_clientes_scr as qtde_clientes_spa_scr,
        coalesce(foo.total_servicos, 0) as total_servicos,
        coalesce(foo.total_cobrancas_type, 0) as total_cobrancas_type,
        coalesce(foo.total_taxas_pagas, 0) as total_taxas_pagas,
        coalesce(foo.total_taxas_gratuitas, 0) as total_taxas_gratuitas,
        coalesce(foo.valor_maximo_taxa_real, 0) as valor_maximo_taxa_real,
        coalesce(foo.valor_maximo_taxa_percentual, 0) as valor_maximo_taxa_percentual
    FROM trusted.bancos as bar
             LEFT JOIN
                 (SELECT
                      cnpj,
                      COUNT(DISTINCT servico) as total_servicos,
                      COUNT(DISTINCT periodicidade) as total_cobrancas_type,
                      COUNT(DISTINCT servico) - COUNT(DISTINCT CASE WHEN valor_maximo::decimal = 0 THEN servico ELSE NULL END) as total_taxas_pagas,
                      COUNT(DISTINCT CASE WHEN valor_maximo::decimal = 0 THEN servico ELSE NULL END) as total_taxas_gratuitas,
                      MAX(CASE WHEN Tipo_valor = 'Real' THEN valor_maximo::decimal ELSE 0 END) as valor_maximo_taxa_real,
                      MAX(CASE WHEN Tipo_valor = 'Percentual' THEN valor_maximo::decimal ELSE 0 END) as valor_maximo_taxa_percentual
                  FROM trusted.lista_tarifas
                  GROUP BY 1) as foo ON foo.cnpj = bar.cnpj_if
    """
)