In [0]:
#Biblotecas necessárias

import requests 
import time
from concurrent.futures import ThreadPoolExecutor, as_completed 
import pyspark.sql.functions as F 
from pyspark.sql import DataFrame 

"""
- requests
    Para as chamadas das APIs
- time
     Para pausas entre as chamadas, ajuda a evitar bloqueios
- ThreadPoolExecutor
    Para chamadas paralelas, aqui é usado para diminuir o tempo de execução na busca dos CNPJs listados
- pyspark.sql.functions
    Para manipulação de dados em SQL

"""

'\n- requests\n    Para as chamadas das APIs\n- time\n     Para pausas entre as chamadas, ajuda a evitar bloqueios\n- ThreadPoolExecutor\n    Para chamadas paralelas, aqui é usado para diminuir o tempo de execução na busca dos CNPJs listados\n- pyspark.sql.functions\n    Para manipulação de dados em SQL\n\n'

In [0]:
url = "https://dados.cvm.gov.br/dados/FI/CAD/DADOS/cad_fi.csv"
local_path = "/Volumes/projeto_triller/bd_triller/arquivos_projeto/cad_fi.csv"

def download_csv(url: str, local_path: str) -> str:
    response = requests.get(url, stream=True)
    if response.status_code == 200:
        with open(local_path, "wb") as file:   # salva direto no Volume
            file.write(response.content)
        print(f"Arquivo salvo em: {local_path}")
        return local_path
    else:
        raise Exception(f"Erro ao baixar arquivo: {response.status_code}")

"""
   Faz o download de um arquivo csv de uma URL e salva em um arquivo local
   local_path: Caminho local onde o arquivo será salvo, será informado no arquivo principal
   url: URL da API em string
    
"""


'\n   Faz o download de um arquivo csv de uma URL e salva em um arquivo local\n   local_path: Caminho local onde o arquivo será salvo, será informado no arquivo principal\n   url: URL da API em string\n    \n'

In [0]:
def carrega_dados(spark, file_path: str, sep: str = ";", header: bool = True, inferSchema: bool = True) -> DataFrame:
    return (
        spark.read.option("sep", sep)
        .option("header", header)
        .option("inferSchema", inferSchema)
        .csv(file_path)
    )

"""
Carrega o arquivo csv em um dataframe
spark: SparkSession
file_path: Caminho do arquivo csv
sep: Separador do arquivo csv
header: Se o arquivo csv tem cabeçalho
"""

'\nCarrega o arquivo csv em um dataframe\nspark: SparkSession\nfile_path: Caminho do arquivo csv\nsep: Separador do arquivo csv\nheader: Se o arquivo csv tem cabeçalho\n'

In [0]:
def trata_cnpj(df: DataFrame)-> DataFrame:
    colunas = ["CNPJ_FUNDO", "CNPJ"]
    for col in df.columns:
        df = df.withColumn(col, F.regexp_replace(F.col(col), "[^0-9]", "")) 

    return df

"""
Padroniza os CNPJs, removendo os caracteres que não sejam números.
A APIBRASIL aceita somente chamada com CNPJ em números
df: Dataframe com os CNPJs
"""

'\nPadroniza os CNPJs, removendo os caracteres que não sejam números.\nA APIBRASIL aceita somente chamada com CNPJ em números\ndf: Dataframe com os CNPJs\n'

In [0]:
def salva_dados(df: DataFrame, tabela: str, modo: str = "overwrite") -> None:
    spark.sql(f"DROP TABLE IF EXISTS {tabela}")
    df.write.mode(modo).saveAsTable(tabela)

"""
Garante que a tabela seja removida e cria uma nova com os dados no Hive Metastore
df: Dataframe com os dados
tabela: Nome da tabela no Hive Metastore, será informada no notebook principal
modo: Modo de escrita no Hive Metastore, nesse caso (overwrite) sobrescreve se a tabela já existir

"""

'\nGarante que a tabela seja removida e cria uma nova com os dados no Hive Metastore\ndf: Dataframe com os dados\ntabela: Nome da tabela no Hive Metastore, será informada no notebook principal\nmodo: Modo de escrita no Hive Metastore, nesse caso (overwrite) sobrescreve se a tabela já existir\n\n'

In [0]:
def consulta_cnpj(cnpj: str, pausa: float = 0.5) -> dict:
    time.sleep(pausa) #Essa pausa evita bloqueios por limite de requisições, o time utilizado é listado na linha acima
    url = f"https://brasilapi.com.br/api/cnpj/v1/{cnpj}"

    try:
        response = requests.get(url, timeout=10)
        if response.status_code == 200:
            data = response.json()
            return {
                "razao_social": data.get("nome"),
                "cep": data.get("cep"),
                "uf": data.get("uf"),
                "cnpj": data.get("cnpj"),
                "erro": None
            }
        else:
            return {
                "razao_social": None,
                "cep": None,
                "uf": None,
                "cnpj": None,
                "erro": f"Erro ao consultar CNPJ: {response.status_code}"
            }
    except Exception as e:
        return { 
                "razao_social": None,
                "cep": None,
                "uf": None,
                "cnpj": None,
                "erro": f"Erro ao consultar CNPJ: {str(e)}"
        }

"""
Consulta dados de um CNPJ em uma API, nesse caso a APIBRASIL
CNPJ já deve estar padronizado, sem caracteres especiais, conforme listagem no notebook
Se sucesso, retorna as informações principais
Se erro, retorna o CNPJ com a informação de erro
Foi incluída uma "pausa" para evitar bloqueios, pois são muitos CNPJs

"""

'\nConsulta dados de um CNPJ em uma API, nesse caso a APIBRASIL\nCNPJ já deve estar padronizado, sem caracteres especiais, conforme listagem no notebook\nSe sucesso, retorna as informações principais\nSe erro, retorna o CNPJ com a informação de erro\nFoi incluída uma "pausa" para evitar bloqueios, pois são muitos CNPJs\n\n'

In [0]:
def consulta_lista_cnpjs(lista_cnpjs: list, max_workers: int = 5, pausa: float = 0.5) -> list:
    resultado = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(consulta_cnpj, cnpj, pausa) for cnpj in lista_cnpjs]
        for future in as_completed(futures):
            resultado.append(future.result())

    return resultado

"""
Consulta os CNPJs listados de forma paralela
Usa o ThreadPoolExecutor para chamar a API de forma paralela, evitando demora no retorno dos dados
"""

'\nConsulta os CNPJs listados de forma paralela\nUsa o ThreadPoolExecutor para chamar a API de forma paralela, evitando demora no retorno dos dados\n'

In [0]:
def pipeline_cnpj_brasilapi(spark, tabela_origem: str, tabela_destino: str, max_workers: int = 5, pausa: float = 0.5) -> DataFrame:
    
    cnpjs = spark.sql(f"""
            SELECT DISTINCT CNPJ_FUNDO AS documento
            FROM {tabela_origem}
            WHERE SIT = 'EM FUNCIONAMENTO NORMAL'
            """)
    
    lista_cnpjs = [row.documento for row in cnpjs.collect()]
    dados = consulta_lista_cnpjs(lista_cnpjs, max_workers=max_workers, pausa=pausa)
    df = spark.createDataFrame(dados)
    df = trata_cnpj(df)

    salva_dados(df, tabela_destino, "overwrite")
    return df

"""
Pipeline para consultar os CNPJs e salvar no Hive Metastore
Pega os CNPJs distintos da tabela de origem e salva no Hive Metastore
tabela_origem: Tabela de origem informada no notebook principal
tabela_destino: Tabela de destino informada no notebook principal
max_workers: Número de threads para consulta paralela

"""

'\nPipeline para consultar os CNPJs e salvar no Hive Metastore\nPega os CNPJs distintos da tabela de origem e salva no Hive Metastore\ntabela_origem: Tabela de origem informada no notebook principal\ntabela_destino: Tabela de destino informada no notebook principal\nmax_workers: Número de threads para consulta paralela\n\n'

In [0]:
def consulta_uf() -> list:
    url = 'https://brasilapi.com.br/api/ibge/uf/v1/'
    try:
        response = requests.get(urk, timeout=10)
        if response.status_code == 200:
            estados = response.json()
            return[{
                "uf": e.get("sigla"),
                "nome": e.get("nome"),
                "erro": None
            } for e in estados]
        else:
            return [{
                "uf": None,
                "nome": None,
                "erro": f"Erro ao consultar estados: {response.status_code}"
            }]
    except Exception as e:
        return [{
            "uf": None,
            "nome": None,
            "erro": f"Erro ao consultar estados: {str(e)}"
        }]


"""
Consulta a lista de estados do IBGE na BRasilAPI
Retorna a sigla e o nome de cada estado
Se erro, retorna o estado com a informação de erro

"""

'\nConsulta a lista de estados do IBGE na BRasilAPI\nRetorna a sigla e o nome de cada estado\nSe erro, retorna o estado com a informação de erro\n\n'

In [0]:
def pipeline_ufs(spark, tabela_destino: str) -> DataFrame:
    dados = consulta_uf()
    df = spark.createDataFrame(dados)    
    salva_dados(df, tabela_destino, "overwrite")
    return df

"""
Pipeline completo para UFs
Consulta a API
Cria o DataFrame e salva em uma tabela de destino informada no notebook principal

"""

'\nPipeline completo para UFs\nConsulta a API\nCria o DataFrame e salva em uma tabela de destino informada no notebook principal\n\n'