### Importando bibliotecas necessárias

In [None]:
import requests
from datetime import datetime
import pandas as pd
import pandas_profiling
import pprint
import json
from IPython.display import display
from concurrent.futures import ThreadPoolExecutor
from elasticsearch import Elasticsearch
from elasticsearch import helpers
es_client = Elasticsearch(http_compress=True)

In [None]:
pp = pprint.PrettyPrinter(indent=4)
pd.set_option('display.max_colwidth',100)

## Processo de ETL

### Etapa de **extração dos dados**

#### Função que recupera a lista de todos os deputados

In [None]:
def listaDeputados(url):
    headers = {'accept': 'application/json'}
    r = requests.get(url, headers=headers)
    return r.json()['dados']

#### Carrega lista de deputados em um dataframe pandas

In [None]:
deputados = pd.DataFrame(listaDeputados('https://dadosabertos.camara.leg.br/api/v2/deputados?ordem=ASC&ordenarPor=nome'))

In [None]:
deputados.count()

#### Função que recupera para cada deputado todas as despesas do ano de 2019  


In [None]:
despesas = {}
list_dataframes = []
list_deputados = deputados.head(5)
for index, row in list_deputados.iterrows():
    pagina = 1
    while True:
        url = f"https://dadosabertos.camara.leg.br/api/v2/deputados/{row['id']}/despesas?ano=2019&pagina={pagina}&itens=100&ordem=ASC&ordenarPor=dataDocumento"
        headers = {'accept': 'application/json'}
        r = requests.get(url, headers=headers)
        if not r.json()['dados']:
            break
        desp_dep = pd.DataFrame(r.json()['dados'])
        desp_dep['dep_id'] = row['id']
        desp_dep['dep_nome'] = row['nome']
        desp_dep['dep_partido'] = row['siglaPartido']
        desp_dep['dep_uf'] = row['siglaUf']
        list_dataframes.append(desp_dep)
        pagina = pagina + 1
        update_progress(index/(len(list_deputados)-1))
despesas = pd.concat(list_dataframes)

### Etapa de **transformação dos dados**  

### Limpeza dos dados coletados
#### Nessa etapa será realizada a limpeza e adequação do dados baseado em uma análise utilizando a biblioteca `pandas_profile` através do comando `despesas.profile_report(style={'full_width':True})`
- Remoção de colunas desnecessárias do DataFrame, colunas que não contém dados ou já possuem representatividade em outra coluna do DataFrame
- Adequação de campos Nulos, inserindo valores padrão nessas posições
- Conversão dos tipos de dados para numerico e data

In [None]:
despesas.profile_report(style={'full_width':True})

**Excluindo colunas desnecessárias**

In [None]:
despesas.drop(columns=['ano','codLote','parcela','urlDocumento','valorLiquido'],inplace=True)

**Ajustando os tipos de dados**  
Transforma a data para o tipo datetime, caso o campo esteja vazio define a data como sendo o último dia do ano de 2018.

In [None]:
despesas['dataDocumento'] = despesas['dataDocumento'].apply(lambda x: pd.to_datetime(x) if not pd.isna(x) else datetime(2018,12,31,0,0))

In [None]:
despesas.dtypes

In [None]:
despesas['tipoDespesa'].value_counts()

In [None]:
despesas.isna().sum()

In [None]:
despesas.info(memory_usage='deep')

### Carga dos dados para o banco elasticsearch

In [None]:
use_these_keys = despesas.columns.to_list()
def filterKeys(document):
    return {key: document[key] for key in use_these_keys }

In [None]:
def doc_generator(df):
    df_iter = df.iterrows()
    for index, document in df_iter:
        yield {
                "_index": 'teste',
                "_type": "_doc",
                "_source": filterKeys(document),
            }
    raise StopIteration

In [None]:
try:
    helpers.bulk(es_client, doc_generator(despesas))
except:
    print('Fim do carregamento para o Elasticsearch')

In [None]:
for i in doc_generator(despesas):
    helpers.bulk(es_client, i)
print('Fim do carregamento para o Elasticsearch')

In [None]:
es_client.indices.create(index='test-index', ignore=400)

### Funções auxiliares

In [None]:
import time, sys
from IPython.display import clear_output

def update_progress(progress):
    bar_length = 100
    if isinstance(progress, int):
        progress = float(progress)
    if not isinstance(progress, float):
        progress = 0
    if progress < 0:
        progress = 0
    if progress >= 1:
        progress = 1

    block = int(round(bar_length * progress))

    clear_output(wait = True)
    text = "Progress: [{0}] {1:.1f}%".format( "#" * block + "-" * (bar_length - block), progress * 100)
    print(text)

### Testes

In [None]:
despesas.loc[despesas['dep_nome'].str.contains('AB')][['tipoDespesa','dataDocumento','dep_nome','valorDocumento']].groupby(by=['dep_nome','tipoDespesa']).agg(['sum','count','mean'])

In [None]:
despesas.loc[despesas['dep_nome'].str.contains('AB'),['tipoDespesa','dataDocumento','dep_nome','valorDocumento']].groupby(by=['dep_nome','tipoDespesa']).agg(['sum','count','mean'])

In [None]:
deputados.loc[deputados['nome'].str.contains('Tabata.*', regex=True, case=False)]['uri']

In [None]:
listaDeputados('https://dadosabertos.camara.leg.br/api/v2/deputados?ordem=ASC&ordenarPor=nome')

In [None]:
despesas[(despesas['tipoDespesa'].str.contains('COMBUSTÍVEIS'))].groupby(by=['nomeFornecedor'])['valorDocumento'].count().sort_values(ascending=False)

In [None]:
despesas[(despesas['tipoDespesa'].str.contains('COMBUSTÍVEIS'))].groupby(by=['nomeFornecedor'])['valorDocumento'].sum().sort_values(ascending=False)

In [None]:
despesas.loc[(despesas['nomeFornecedor'].str.contains('CASCOL COMBUSTIVEIS'))]

In [None]:
despesas[(despesas['cnpjCpfFornecedor']=="17895646000187")].head()