# Coleta de dados Externos

In [1]:
import requests
from bs4 import BeautifulSoup as bs
from pathlib import Path
from typing import List
from time import sleep
from tqdm import tqdm
import pandas as pd
import OleFileIO_PL
from datetime import datetime

from selenium import webdriver
from selenium.webdriver.chrome.webdriver import WebDriver
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.common.exceptions import NoSuchWindowException
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By

## Navegação

- O Selenium foi escolhido como ferramenta de navegação devido à incapacidade do portal `CEPEA.org.br` de ser renderizado corretamente por bibliotecas mais simples, como Requests e BeautifulSoup. Isso ocorre porque o portal utiliza tecnologias que exigem um navegador completo para exibir o conteúdo.

- Apesar dessa limitação técnica, o portal foi selecionado por ser uma **fonte primária** de dados e por apresentar uma **metodologia transparente** para consulta, o que aumenta a confiabilidade e a qualidade das informações obtidas.

- O portal apresenta os seguintes desafios reais para web scraping:
    - Necessidade de renderização JavaScript para exibir o conteúdo corretamente.
    - Dados distribuídos de forma dispersa nas páginas.
    - Ausência de uma API para acesso estruturado aos dados.
    - Disponibilidade limitada de dados diretamente na página, restrita aos últimos 15 dias.
    - Entrega dinâmica dos dados, realizada após uma requisição ao banco de dados, em formato fechado (`xls`).

In [2]:
download_folder = Path('raw_data') / datetime.now().strftime('%Y-%m-%d')
if not download_folder.exists():
    download_folder.mkdir(parents=True)

In [3]:
chrome_options = Options()
# chrome_options.add_argument("--headless")  # Executar em modo headless (opcional)
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")

# Configurar o diretório de download automático

prefs = {
    "download.default_directory": str(download_folder.absolute()),
    "download.prompt_for_download": False,
    "download.directory_upgrade": True,
    "safebrowsing.enabled": True
}

chrome_options.add_experimental_option("prefs", prefs)
chrome_service = ChromeService()

driver = webdriver.Chrome(service=chrome_service, options=chrome_options)

In [7]:
def normalize_str(string: str) -> str:
    """Normaliza uma string, removendo acentos e convertendo para minúsculas.
    A função aplica a normalização Unicode NFKD para separar os caracteres de 
    suas marcas diacríticas (acentos), remove essas marcas e, em seguida, 
    converte a string resultante para a sua forma 'casefolded' (uma versão 
    mais agressiva de minúsculas, ideal para comparações).
    Args:
        string (str): A string de entrada a ser normalizada.
    Returns:
        str: A string normalizada, sem acentos e em minúsculas.
    """

    import unicodedata

    try:
        normalized = unicodedata.normalize("NFKD", string)
        string = "".join([c for c in normalized if not unicodedata.combining(c)])
        string = string.casefold()
        return string
    except TypeError:
        return None

def fetch_categories(webdriver: webdriver, url: str) -> dict:
    """
    Busca categorias e seus URLs correspondentes de uma página da web.
    Esta função utiliza um Selenium WebDriver para navegar até a URL especificada,
    extrai uma lista de categorias da página e retorna um dicionário que mapeia
    os nomes das categorias para seus URLs. Categorias não relacionadas à agricultura
    são excluídas dos resultados.
    Argumentos:
        webdriver (selenium.webdriver): A instância do Selenium WebDriver usada para interagir com a página da web.
        url (str): A URL da página da web de onde as categorias serão extraídas.
    Retorna:
        dict: Um dicionário onde as chaves são os nomes normalizados das categorias (str)
              e os valores são seus URLs correspondentes (str).
    Notas:
        - A função assume que as categorias estão localizadas em um XPath específico
          (`/html/body/div/div[2]/div/div[2]`) e que cada categoria é representada como
          um elemento `<li>` contendo uma tag `<a>` com o URL.
        - Categorias não relacionadas à agricultura, como 'bezerro', 'boi', 'florestal',
          'frango', 'ovinos', 'ovos', 'suino' e 'tilapia', são excluídas dos resultados.
    """
    categories_dict = {}

    driver.get(url)
    categories = driver.find_element(By.XPATH, '/html/body/div/div[2]/div/div[2]')
    for category in categories.find_elements(By.TAG_NAME, 'li'):
        category_name = category.text
        category_name = normalize_str(category_name)
        category_url = category.find_element(By.TAG_NAME, 'a').get_attribute('href')
        # Exclui categorias indesejadas, não relacionadas a agricultura
        #
        # Exclui também categorias relacionadas a mandioca e feijão
        # Estas, ainda que relacionadas à agricultura, após
        # avanços no projeto, se mostraram excessivamente custosas de manejar
        # neste momento, visto que necessitam de tratamento especial e entendimento
        # extra do contexto
        if category_name not in ['bezerro', 'boi', 'florestal', 'frango', 'leite', 'ovinos', 'ovos', 'suino', 'tilapia', 'feijao', 'mandioca']:
            categories_dict[category_name] = category_url
    return categories_dict

In [8]:
def fetch_download_URLs(webdriver: webdriver, category_URL: str) -> List[str]:
    """
    Busca URLs de download em uma página de categoria.

    Esta função utiliza o Selenium WebDriver para acessar a página da URL fornecida,
    localiza todos os links (<a> tags) na página e retorna uma lista contendo os URLs
    que possuem a palavra 'series' em seu atributo 'href'.

    Args:
        category_URL (str): A URL da página da categoria onde os links de download serão buscados.

    Returns:
        List[str]: Uma lista de strings contendo os URLs de download encontrados na página.

    Notas:
        - Certifique-se de que o WebDriver esteja configurado corretamente para acessar a página.
    """
    download_URLs = []
    driver.get(category_URL)
    for a_tag in driver.find_elements(By.TAG_NAME, 'a'):
        href = a_tag.get_attribute('href')
        if href and 'series' in href:
            download_URLs.append(href)
    return download_URLs

In [9]:
categories = fetch_categories(driver, "https://www.cepea.org.br/br")

## Download dos dados brutos

Como afirmado anteriormente, os dados brutos estão disponíveis para download no site do CEPEA somente em formatos fechados, como XLS.

Portanto, é necessário baixar esses arquivos e convertê-los para um formato aberto, como CSV ou Parquet, para facilitar o processamento e a análise dos dados.

Em primeiro lugar, o download dos dados segue:

In [12]:
_to_download = []

for cat_name, cat_url in categories.items():
    print(f"Acessando página de preços de [{cat_name.title()}] na URL: {cat_url}") 
    download_links = fetch_download_URLs(driver, cat_url)
    _to_download.extend(download_links)

Acessando página de preços de [Acucar] na URL: https://www.cepea.org.br/br/indicador/acucar.aspx
Acessando página de preços de [Algodao] na URL: https://www.cepea.org.br/br/indicador/algodao.aspx
Acessando página de preços de [Arroz] na URL: https://www.cepea.org.br/br/indicador/arroz.aspx
Acessando página de preços de [Cafe] na URL: https://www.cepea.org.br/br/indicador/cafe.aspx
Acessando página de preços de [Citros] na URL: https://www.cepea.org.br/br/indicador/citros.aspx
Acessando página de preços de [Etanol] na URL: https://www.cepea.org.br/br/indicador/etanol.aspx
Acessando página de preços de [Hortifruti] na URL: https://www.cepea.org.br/br/hortifruti.aspx
Acessando página de preços de [Milho] na URL: https://www.cepea.org.br/br/indicador/milho.aspx
Acessando página de preços de [Soja] na URL: https://www.cepea.org.br/br/indicador/soja.aspx
Acessando página de preços de [Trigo] na URL: https://www.cepea.org.br/br/indicador/trigo.aspx


In [13]:
def download_files(driver, _to_download):
    def download_files(driver: WebDriver, _to_download: List[str]) -> None:
        """
        Inicia o download de arquivos abrindo uma nova aba para cada link fornecido.

        Args:
            driver (selenium.webdriver.chrome.webdriver.WebDriver): O WebDriver do Selenium usado para interagir com o navegador.
            _to_download (List[str]): Uma lista de URLs de download.

        Returns:
            None
        """
    driver.switch_to.window(driver.window_handles[0])
    # Cria uma aba para cada link de download:
    for link in tqdm(_to_download[:], desc="Iniciando downloads", unit="arquivo"):
        driver.execute_script(f"window.open('{link}');")
        # Pequena pausa para evitar sobrecarga do servidor
        # Testes mostraram que 7 segundos é um tempo adequado
        # E que 8 segundos é o tempo médio para o download completar
        sleep(3)
    print('Todos os downloads foram iniciados.')


def check_tabs(driver):
    """
    Verifica e gerencia as abas abertas em um navegador controlado pelo Selenium WebDriver.
    Este método realiza as seguintes ações:
    - Aguarda um intervalo de tempo antes de iniciar a verificação das abas.
    - Itera por todas as abas abertas no navegador.
    - Identifica abas com URLs contendo 'id' e as adiciona a uma lista de abas de download.
    - Verifica se o título da aba contém '524', indicando falha no download, e tenta reabrir a URL correspondente.
    - Fecha as abas que foram processadas com sucesso.
    - Repete o processo até que todas as abas sejam fechadas com sucesso.
    Args:
        driver (selenium.webdriver.remote.webdriver.WebDriver): 
            Instância do WebDriver do Selenium usada para controlar o navegador.
    Raises:
        selenium.common.exceptions.NoSuchWindowException: 
            Caso uma aba não seja encontrada durante a troca de contexto.
    """
    while True:
        print('Iniciando o processo de checagem de abas.')
        all_windows = driver.window_handles
        print(f"Número de abas abertas: {len(all_windows)}")
        
        download_windows = []
        for window in all_windows:
            try:
                driver.switch_to.window(window)
            except NoSuchWindowException:
                print(f"Aba {window} não encontrada. Download finalizado.")
                continue
            if 'id' in driver.current_url:
                download_windows.append(window)
            page_title = driver.title
            
            # Abas que contém o título com '524' indicam falha no download em razão da CloudFlare
            # e precisam ser reabertas
            if '524' in page_title:
                failed_download_URL = driver.current_url
                print(f"Erro encontrado na aba {window}, tentando novamente")
                driver.execute_script("window.close();")
                driver.execute_script(f"window.open('{failed_download_URL}');")
        if not download_windows:
            print("Todas as abas foram processadas e fechadas.")
            break
        sleep(10)  # Espera antes de checar as abas novamente

In [14]:
download_files(driver, _to_download)
check_tabs(driver)

Iniciando downloads:   0%|          | 0/14 [00:00<?, ?arquivo/s]

Iniciando downloads: 100%|██████████| 14/14 [00:42<00:00,  3.02s/arquivo]


Todos os downloads foram iniciados.
Iniciando o processo de checagem de abas.
Número de abas abertas: 10
Aba 43D1E155BBFE241E7C61879D6BDBEC94 não encontrada. Download finalizado.
Aba CAC728DBFF1FB86E8E0021AAAE948D11 não encontrada. Download finalizado.
Aba 4EF65DE650182FCE4A6318442C336D8F não encontrada. Download finalizado.
Aba 331A7564BB77AA62E5BAAB12452987E0 não encontrada. Download finalizado.
Aba 646832EBBF8F100995872EDE80977DEF não encontrada. Download finalizado.
Aba D6FB46486513331822F42F5228E86ED3 não encontrada. Download finalizado.
Aba 35D60930D9F53BE695096AAD2538CD43 não encontrada. Download finalizado.
Aba 638DC6D18E9802F61AFA4243B73DB6B2 não encontrada. Download finalizado.
Todas as abas foram processadas e fechadas.


In [15]:
driver.close()

## Estruturação da Camada RAW

- Os dados obtidos do CEPEA não estão em um formato padrão `xls`.
    - Eles são armazenados como um [`Compound Document`](https://en.wikipedia.org/wiki/Compound_File_Binary_Format), um formato de arquivo container que permite armazenar diferentes tipos de conteúdo em um único arquivo.
- O Pandas, por padrão, não suporta este tipo de arquivo. Por isso, é necessário utilizar a biblioteca `OleFileIO_PL` para acessar o conteúdo interno e fazer o parsing utilizando as engines `xlrd` - para arquivos no formato `.xlrd` - ou `openpyxl` - para arquivos no formato `.xlsx`.
    - Essa biblioteca permite abrir e manipular arquivos no formato OLE (Object Linking and Embedding), que é a base do formato Compound Document.

[Referência: https://stackoverflow.com/a/63347276]

- Os arquivos convertidos são salvos em uma pasta chamada `raw_data`, que é criada automaticamente caso não exista no momento da execução.
- O formato `.csv` foi escolhido para armazenar os dados convertidos, em vez de `.json` ou `.parquet`, pelos seguintes motivos:
    - Alguns arquivos apresentam inconsistências no formato, como colunas excedentes ou diferentes estruturas, o que dificulta a padronização.
        - Por exemplo, arquivos como `precos_farinha_mandioca_seca_grossa_branca_crua_tipo_1.csv` possuem mais colunas do que a maioria dos outros arquivos.
    - O formato `.csv` é simples, amplamente suportado e não apresenta a repetição de chaves a cada linha, como ocorre no `.json`.
    - Embora o formato `.parquet` seja mais eficiente em termos de armazenamento e leitura em grandes volumes de dados, ele depende de bibliotecas especializadas devido à sua natureza binária, o que pode dificultar o processo de depuração.
    - O tamanho dos dados neste projeto não justifica a adoção do `.parquet`, que é mais vantajoso em cenários de Big Data.

- Esta etapa de estruturação da camada RAW prioriza simplicidade e flexibilidade, considerando o tamanho atual dos dados e a necessidade de facilitar o processamento e a análise subsequente. O formato `.csv` atende bem a esses requisitos, oferecendo um equilíbrio entre acessibilidade e funcionalidade.

In [16]:
clean_data = Path('clean_data')
if not clean_data.exists():
    clean_data.mkdir(parents=True)

In [17]:
def convert_xls_to_csv(download_folder: Path, clean_data_folder: Path) -> None:
    """
    Converte arquivos .xls em .csv no diretório especificado.

    Esta função percorre todos os arquivos .xls no diretório fornecido,
    converte cada um deles para o formato .csv e salva o arquivo convertido
    no em um diretório separado com o nome indicado no cabeçalho.

    Args:
        raw_data_path (Path): O caminho do diretório onde os arquivos .xls estão localizados.

    Returns:
        None
    """
    headers_to_fname = {
        "INDICADOR DO AÇÚCAR CRISTAL BRANCO CEPEA/ESALQ - SÃO PAULO": "acucar_cristal_branco_cepea_esalq_sao_paulo",
        "Indicador do Algodão em Pluma CEPEA/ESALQ - Prazo de 8 dias": "algodao_pluma_cepea_esalq_8_dias",
        "INDICADOR DO CAFÉ ARÁBICA CEPEA/ESALQ": "indicador_cafe_arabica_cepea_esalq",
        "INDICADOR DO ARROZ EM CASCA CEPEA/IRGA-RS": "indicador_arroz_casca_cepea_irga_rs",
        "INDICADOR DO CAFÉ ROBUSTA CEPEA/ESALQ": "indicador_cafe_robusta_cepea_esalq",
        "Indicador Açúcar Cristal - Santos (FOB)": "indicador_acucar_cristal_santos_fob",
        "Preços do Feijão Carioca - Notas 8 e 8,5 - CEPEA/CNA": "precos_feijao_carioca_notas_8_85_cepea_cna",
        "Preços do Feijão Carioca - peneira 12 e/ou notas 9 ou superior - CEPEA/CNA": "precos_feijao_carioca_peneira_12_notas_9_cepea_cna",
        "Indicador Semanal do Etanol Hidratado Outros Fins CEPEA/ESALQ - São Paulo": "indicador_semanal_etanol_hidratado_outros_fins_cepea_esalq_sao_paulo",
        "PREÇOS DA FARINHA DE MANDIOCA SECA GROSSA - BRANCA/CRUA TIPO 1": "precos_farinha_mandioca_seca_grossa_branca_crua_tipo_1",
        "INDICADOR DO MILHO ESALQ/BM&FBOVESPA": "indicador_milho_esalq_bm_fbovespa",
        "INDICADOR DA SOJA CEPEA/ESALQ - PARANAGUÁ": "indicador_soja_cepea_esalq_paranagua",
        "PREÇO MÉDIO DO TRIGO CEPEA/ESALQ - PARANÁ": "preco_medio_trigo_cepea_esalq_parana",
        "PREÇO MÉDIO DO TRIGO CEPEA/ESALQ - RIO GRANDE DO SUL": "preco_medio_trigo_cepea_esalq_rio_grande_sul",
        "LEITE AO PRODUTOR CEPEA/ESALQ (R$/litro) - líquido": "leite_ao_produtor_cepea_esalq_r_litro_liquido",
        "PREÇOS DA RAIZ DE MANDIOCA": "precos_raiz_mandioca",
        "INDICADOR DA SOJA CEPEA/ESALQ - PARANÁ": "indicador_soja_cepea_esalq_parana",
        "PREÇOS DA FARINHA DE MANDIOCA SECA FINA - BRANCA/CRUA TIPO 1": "precos_farinha_mandioca_seca_fina_branca_crua_tipo_1",
        "INDICADOR SEMANAL DO ETANOL HIDRATADO COMBUSTÍVEL CEPEA/ESALQ - SÃO PAULO": "indicador_semanal_etanol_hidratado_combustivel_cepea_esalq_sao_paulo",
        "PREÇOS DA FÉCULA DE MANDIOCA": "precos_fecula_mandioca",
        "Preços do Feijão Preto Tipo 1 - CEPEA/CNA": "precos_feijao_preto_tipo_1_cepea_cna",
        "INDICADOR SEMANAL DO ETANOL ANIDRO CEPEA/ESALQ - SÃO PAULO": "indicador_semanal_etanol_anidro_cepea_esalq_sao_paulo",

    }

    xls_files = list(download_folder.glob('*.xls'))
    for xls_file in tqdm(xls_files, desc="Convertendo arquivos .xls para .csv", unit="arquivo"):
        ole_file = OleFileIO_PL.OleFileIO(xls_file)
        df = pd.read_excel(ole_file.openstream('Workbook'), engine='xlrd')
        
        df_header = df.columns[0]
        # Utiliza o cabeçalho do arquivo como um filename mais descritivo
        # Se o cabeçalho não estiver no dicionário, usa o nome do arquivo original
        # Caso o nome do arquivo não conste no dicionário, registra um aviso no logger
        if df_header not in headers_to_fname:
            print(f"Aviso: Cabeçalho '{df_header}' não encontrado no dicionário. Usando o nome original do arquivo.")
        filename = headers_to_fname.get(df_header, f"{xls_file.stem}.csv") + '.csv'
        

        real_df = df[3:].copy() # Pula as 3 primeiras linhas que são cabeçalhos desnecessários
        real_df.columns = df.iloc[2].values # Define a 3ª linha como cabeçalho do novo arquivo
        real_df.reset_index(drop=True, inplace=True)

        real_df.columns = [normalize_str(col) for col in real_df.columns] # Normaliza os nomes das colunas
        real_df.columns = [col.replace(' ', '_') for col in real_df.columns] # Substitui espaços por underscores nos nomes das colunas
        real_df.columns = [col.replace('r$', 'brl') for col in real_df.columns] # Substitui 'r$' por 'brl' nos nomes das colunas
        real_df.columns = [col.replace('us$', 'usd') for col in real_df.columns] # Substitui 'us$' por 'usd' nos nomes das colunas
        real_df.columns = [col.replace('.', '') for col in real_df.columns] # Remove pontos dos nomes das colunas
        
        real_df.to_csv(clean_data / filename, index=False, encoding='cp1252')
        ole_file.close()

In [18]:
convert_xls_to_csv(download_folder=download_folder, clean_data_folder=clean_data)

Convertendo arquivos .xls para .csv: 100%|██████████| 14/14 [00:00<00:00, 18.46arquivo/s]


## Armazenamento na AWS S3

A Amazon possui algumas diretivas de recomendação no caso de adoção da solução AWS S3, disponível em [sua documentação](https://docs.aws.amazon.com/prescriptive-guidance/latest/defining-bucket-names-data-lakes/data-layer-definitions.html).

No caso da camada Raw deste projeto:

- Para armazenar os dados brutos em um Bucket S3 na AWS, o recomendável seria criar um bucket dedicado, ou caso já exista um, uma pasta exclusiva para esta tarefa dentro dele.
- Dentro deste bucket deverão ser armazenados os dados sem transformação alguma, ou seja, no caso específico deste projeto, os arquivos localizados em `/raw_data`, provenientes da função `download_files()`.
    - Apesar de seu formato impróprio para consumo direto em razão dos pontos já levantados, eles representam a versão mais próxima da realidade no momento da exportação do banco original.
- Os arquivos, de preferência, devem ser organizados em subpastas para melhor organização.
    - No caso deste projeto, como espera-se que eles sejam exportados uma vez por dia, faz sentido que sejam criadas subpastas dentro de `/raw_data` com o dia de execução do script.
- É necessário ativar a função de versionamento do bucket também, para evitar perdas em caso de sobrescrita.
- É necessário também identificar os usuários e seus papéis na organização e configurar o Bucket para que somente os indivíduos que necessitam de autorização para acessar e modificar os arquivos consigam fazê-lo.
- A implementação de uma política de ciclo de vida também é recomendável, uma vez que a documentação indica que arquivos mais antigos podem ser movidos para soluções de armazenamento de longa duração e pouco acesso, como a Glacier ou IA (Infrequent Access).