In [52]:
!pip install great_expectations

In [53]:
import pandas as pd
from datetime import datetime, timedelta
import sqlite3 as sql
from tqdm import tqdm
import requests
import great_expectations as ge

Criar ambiente sqlite


In [54]:
%load_ext sql

The sql extension is already loaded. To reload it, use:
  %reload_ext sql


In [55]:
%%sql
sqlite:///coffee.db

In [56]:
conn = sql.connect(r'/content/coffee.db')

In [57]:
def consult_api(date: str) -> pd.DataFrame:
  """
  Recebe uma data de inicio e calcula os 30 dias anteriores para consultar
  a API com os dados de cambio
  """
  # variaveis padroes
  df_raw = pd.DataFrame(columns=['data','response'])
  key = 'ed07ae4f21f54ed4a4f03596639aac89'
  symb = ','.join(['BRL','CLP','EUR'])
  headers = {'Accept': 'application/json'}

  # converter para datetime e calcular data -30 dias
  end_date_obj = datetime.strptime(date, '%Y-%m-%d')
  first_date = (end_date_obj - timedelta(days=30)).strftime('%Y-%m-%d')
  start_date_obj = datetime.strptime(first_date, '%Y-%m-%d')

  # gerar lista de datas entre data inicial e final
  date_range = [(start_date_obj + timedelta(days=x)).strftime('%Y-%m-%d')
                for x in range((end_date_obj - start_date_obj).days + 1)]
  # iterar para coletar as respostas das APIs
  for dt in tqdm(date_range):
    url = (f"https://openexchangerates.org/api/historical/{dt}.json?"
          f"app_id={key}&base=USD&symbols={symb}&show_alternative=false")
    try:
      response = requests.request("GET", url, headers=headers,timeout=3)
    except requests.exceptions.Timeout as errt:
      print("Timeout Error:",errt)
    except requests.exceptions.RequestException as err:
      print("OOps: Something Else",err)

    response_json = response.json()
    new_row = pd.DataFrame([[dt,response_json]],
                            columns=['data', 'response'])
    df_raw = pd.concat([df_raw, new_row], ignore_index=True)
  return df_raw
current_date = datetime.now().strftime('%Y-%m-%d')
df_raw = consult_api(current_date)

100%|██████████| 31/31 [00:13<00:00,  2.30it/s]


In [None]:
# Salvar dados Raw
df_raw.astype(str).to_sql('raw_cambio', con=conn, if_exists='append')
df_raw.to_csv('raw_cambio.csv')

In [None]:
# Tratamento inicial
df_curated = df_raw.copy()
df_curated['moeda'] = df_curated['response'] \
                          .apply(lambda x: list(x['rates'].keys()))
df_curated['cambio'] = df_curated['response'] \
                      .apply(lambda x: list(x['rates'].values()))

df_curated = df_curated.explode(['moeda','cambio'])
df_curated = df_curated[['data', 'moeda', 'cambio']]

# Salvar Curated
df_curated.astype(str).to_sql('curated_cambio', con=conn, if_exists='replace')

In [None]:
# Devido a limitação da API eu coletei os dados de cambio completos através do
# site https://br.investing.com/currencies/usd-brl-historical-data
# a tabela abaixo seria a versão completa da curated_cambio

url_git = 'https://raw.githubusercontent.com/xmarcelo195/coffee-etl/main/src/'

# Criar uma tabela unificada com os valores de cambio
usd_brl = pd.read_csv(f'{url_git}data/USD_BRL.csv')
usd_brl['moeda'] = 'BRL'
usd_clp = pd.read_csv(f'{url_git}data/USD_CLP.csv')
usd_clp['moeda'] = 'CLP'
usd_eur = pd.read_csv(f'{url_git}data/USD_EUR.csv')
usd_eur['moeda'] = 'EUR'
cambios = pd.concat([usd_brl,usd_clp,usd_eur])
cambios = cambios[['Data','Último','moeda']]
cambios.columns = ['Date','cambio','moeda']
cambios['Date'] = pd.to_datetime(cambios['Date'], format='%d.%m.%Y') \
                                .dt.strftime('%Y-%m-%d')
cambios['cambio'] = cambios['cambio'].str \
                    .replace('.', '', regex=True).str \
                    .replace(',', '.', regex=True).astype(float)
# salvar cambios
cambios.to_sql('cambios', con=conn, if_exists='replace')
cambios.to_csv('cambios.csv',index=None)

In [None]:
# import coffee
coffee = pd.read_csv(f'{url_git}data/coffee.csv')

# salvar raw coffee no sqlite
coffee.to_sql('raw_coffee', con=conn, if_exists='replace')


In [None]:
# criar uma copia para não modificar dados originais
coffee_base = coffee.copy()

In [None]:
# Criar tabela analytics
coffee_cambio = coffee.merge(cambios, on='Date', how='left')

# Colunas que o cambio influencia
colunas_cambio = ['High', 'Low', 'Close', 'Open']

# multiplicar colunas pelo cambio
coffee_cambio[colunas_cambio] = coffee_cambio[colunas_cambio] \
                                .multiply(coffee_cambio['cambio'], axis=0)

# ajuste decimais
coffee_cambio[colunas_cambio] = coffee_cambio[colunas_cambio].round(4)

# renomear coluna currency
coffee_cambio['Currency'] = coffee_cambio['moeda']

# manter apenas colunas importantes
coffee_cambio = coffee_cambio[['Date','Open','High','Low',
                               'Close','Volume','Currency']]

In [None]:
# Unir tabela base (USD) com as tabelas tratadas (BRL, CLP, EUR)
final_coffee = pd.concat([coffee_cambio,coffee_base], ignore_index=True)
final_coffee = final_coffee.sort_values(by=['Date','Currency']).reset_index(drop=True)

In [None]:
# Salvar tabela analitica
final_coffee.to_csv('analytics_coffee.csv', index=None)
final_coffee.to_sql('analytics_coffee', con=conn, if_exists='replace')

### Consultas SQL

#### Média de Volume Mês

In [None]:
%%sql
SELECT strftime('%Y-%m', Date) AS year_month,
       round(AVG(Volume),2) AS avg_volume
FROM raw_coffee
GROUP BY year_month;

In [69]:
# Salvar Csv
query = """
SELECT strftime('%Y-%m', Date) AS year_month,
       round(AVG(Volume),2) AS avg_volume
FROM raw_coffee
GROUP BY year_month;
"""
result_df = pd.read_sql_query(query, conn)
result_df.to_csv('media_volume_mes.csv', index=False)

#### Média de Volume Ano

In [None]:
%%sql
SELECT strftime('%Y', Date) AS year,
       round(AVG(Volume),2) AS avg_volume
FROM raw_coffee
GROUP BY year;

In [71]:
# Salvar Csv
query = """
SELECT strftime('%Y', Date) AS year,
       round(AVG(Volume),2) AS avg_volume
FROM raw_coffee
GROUP BY year;
"""
result_df = pd.read_sql_query(query, conn)
result_df.to_csv('media_volume_ano.csv', index=False)

#### Total de café negociado por ano e as cotações

In [None]:
%%sql
WITH cte_coffee as (
  SELECT MAX(Date) AS max_date,
  SUM(Volume) as volume,
  strftime('%Y', Date) as year
  FROM raw_coffee
  GROUP BY year)
select
  a.year,
  a.volume,
  1 as last_usd,
  MAX(CASE WHEN c.moeda = 'EUR' THEN c.cambio END) AS last_eur,
  MAX(CASE WHEN c.moeda = 'BRL' THEN c.cambio END) AS last_brl,
  MAX(CASE WHEN c.moeda = 'CLP' THEN c.cambio END) AS last_clp
from cte_coffee a
left join cambios c on c.Date = a.max_date
group by year

In [73]:
# Salvar Csv
query = """
WITH cte_coffee as (
  SELECT MAX(Date) AS max_date,
  SUM(Volume) as volume,
  strftime('%Y', Date) as year
  FROM raw_coffee
  GROUP BY year)
select
  a.year,
  a.volume,
  1 as last_usd,
  MAX(CASE WHEN c.moeda = 'EUR' THEN c.cambio END) AS last_eur,
  MAX(CASE WHEN c.moeda = 'BRL' THEN c.cambio END) AS last_brl,
  MAX(CASE WHEN c.moeda = 'CLP' THEN c.cambio END) AS last_clp
from cte_coffee a
left join cambios c on c.Date = a.max_date
group by year
"""
result_df = pd.read_sql_query(query, conn)
result_df.to_csv('total_negociado_ano.csv', index=False)

#### Maior volume no período e as cotações do dia

In [None]:
%%sql
WITH cte_coffee as (
  SELECT
  Date
  FROM raw_coffee
where
  volume = (SELECT MAX(volume) FROM analytics_coffee)
)
select
a.Date,
a.Volume,
1 as last_usd,
MAX(CASE WHEN c.moeda = 'EUR' THEN c.cambio END) AS last_eur,
MAX(CASE WHEN c.moeda = 'BRL' THEN c.cambio END) AS last_brl,
MAX(CASE WHEN c.moeda = 'CLP' THEN c.cambio END) AS last_clp
from raw_coffee a
left join cambios c on c.Date = a.Date
where
a.Date in (SELECT Date from cte_coffee)
group by a.Date

In [75]:
# Salvar Csv
query = """
WITH cte_coffee as (
  SELECT
  Date
  FROM raw_coffee
where
  volume = (SELECT MAX(volume) FROM analytics_coffee)
)
select
a.Date,
a.Volume,
1 as last_usd,
MAX(CASE WHEN c.moeda = 'EUR' THEN c.cambio END) AS last_eur,
MAX(CASE WHEN c.moeda = 'BRL' THEN c.cambio END) AS last_brl,
MAX(CASE WHEN c.moeda = 'CLP' THEN c.cambio END) AS last_clp
from raw_coffee a
left join cambios c on c.Date = a.Date
where
a.Date in (SELECT Date from cte_coffee)
group by a.Date
"""
result_df = pd.read_sql_query(query, conn)
result_df.to_csv('data_maior_volume.csv', index=False)



### Testes

In [None]:
# Criar expectativas
context = ge.get_context()
validator = context.sources.pandas_default.read_csv(
    "/content/analytics_coffee.csv"
)
validator.expect_column_to_exist('Volume')
validator.expect_column_to_exist('Close')
validator.expect_column_to_exist('Currency')
validator.expect_column_values_to_not_be_null("Volume")
validator.expect_column_values_to_be_of_type('Volume','int')
validator.save_expectation_suite()

checkpoint = context.add_or_update_checkpoint(
    name="my_quickstart_checkpoint",
    validator=validator,
)
checkpoint_result = checkpoint.run()
context.view_validation_result(checkpoint_result)

In [None]:
print("\nRESULTS:\n")
print(f'Success: {checkpoint_result.success}')
#checkpoint_result['run_results']