<a href="https://colab.research.google.com/github/lferrazz/projeto-python/blob/main/projeto-python/Projeto_Trust_Work_Final.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Web Scraping para retirada dos dados na Receita Federal

Instalando as bibliotecas necessárias para o Web Scraping. OBS(Caso já possua essas bibliotecas instaladas, não precisa rodar esse bloco de código)

In [None]:
pip install requests beautifulsoup4 tqdm


Importando as bibliotecas necessárias

In [None]:
import os  # Biblioteca para manipulação de arquivos e diretórios
import re  # Biblioteca para manipulação de strings usando expressões regulares
import requests  # Biblioteca para fazer requisições HTTP (baixar arquivos)
import zipfile  # Biblioteca para manipulação de arquivos ZIP (extração)
import time  # Biblioteca para controle do tempo (usado para limitar atualizações de progresso)
from bs4 import BeautifulSoup  # Biblioteca para web scraping (extração de links da página)
from concurrent.futures import ThreadPoolExecutor, as_completed  # Para executar downloads em paralelo (multithreading)
from tqdm import tqdm  # Biblioteca para exibir uma barra de progresso amigável ao usuário

Abaixo é o link do site da receita aonde se encontra os arquivos para download, esse é o único código que você irá precisar alterar, caso precise de arquivos de datas posteriores

In [None]:
import requests
from datetime import datetime, timedelta

def check_url(year, month):
    """Verifica se a URL existe e retorna a URL se for válida."""
    url = f"https://arquivos.receitafederal.gov.br/cnpj/dados_abertos_cnpj/{year}-{month:02d}/"
    try:
        response = requests.get(url, timeout=15)  # Mudamos para GET e aumentamos o timeout
        if response.status_code == 200:
            return url
    except requests.RequestException as e:
        print(f"Erro ao acessar {url}: {e}")
    return None

def get_valid_url():
    """Retorna a URL válida mais recente dos dados da Receita Federal."""
    now = datetime.now()

    # Verifica primeiro o mês atual
    print(f"Tentando o mês atual: {now.year}-{now.month:02d}")
    current_url = check_url(now.year, now.month)
    if current_url:
        return current_url  # Retorna imediatamente se o mês atual for válido

    # Se não existir, tenta o mês anterior
    previous_month = now.replace(day=1) - timedelta(days=1)
    print(f"Tentando o mês anterior: {previous_month.year}-{previous_month.month:02d}")
    return check_url(previous_month.year, previous_month.month)

base_url = get_valid_url()

if base_url:
    print(f"URL válida encontrada: {base_url}")
else:
    print("Nenhuma URL válida encontrada.")


Tentando o mês atual: 2025-03
Erro ao acessar https://arquivos.receitafederal.gov.br/cnpj/dados_abertos_cnpj/2025-03/: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))
Tentando o mês anterior: 2025-02
URL válida encontrada: https://arquivos.receitafederal.gov.br/cnpj/dados_abertos_cnpj/2025-02/


Comando para começar o Web Scraping. OBS: Caso você execute o código e apareça uma mensagem de erro (Erro ao obter links de download), é só executar o Script novamente.

O código abaixo irá baixar todos os arquivos necessários para o tratamento de dados ! Ele irá criar uma pasta para cada arquivo e depois disso irá deletar os arquivos .zip para não mantê-los ocupando espaço desnecessário em disco.

Por se tratar de uma quantidade enorme de dados esse script pode durar muito tempo para executar, de acordo com a velocidade da sua internet.


In [None]:
# Função para obter todos os links de arquivos ZIP de uma página web
def get_download_links(base_url):
    try:
        response = requests.get(base_url)  # Faz uma requisição HTTP para obter o conteúdo da página
        response.raise_for_status()  # Lança um erro se a requisição falhar (ex: página não encontrada)
        soup = BeautifulSoup(response.text, "html.parser")  # Analisa o HTML da página
        # Retorna todos os links que terminam com '.zip', indicando arquivos compactados para download
        return [base_url + link.get("href") for link in soup.find_all("a") if link.get("href", "").endswith(".zip")]
    except requests.RequestException as e:
        print(f"Erro ao obter links de download: {e}")
        return []

# Função para extrair o nome base do arquivo, removendo números finais e extensão
def extract_base_name(file_name):
    file_name = os.path.splitext(file_name)[0]  # Remove a extensão do arquivo
    return re.sub(r"\d+$", "", file_name)  # Remove números no final do nome do arquivo

# Verifica se um arquivo ZIP já foi extraído anteriormente
def is_file_extracted(output_dir, file_name, extracted_files):
    return file_name in extracted_files  # Verifica no log de arquivos extraídos

# Função para baixar um arquivo ZIP da internet
def download_file(url, output_dir, extracted_files, session):
    file_name = url.split("/")[-1]  # Obtém o nome do arquivo a partir da URL
    base_name = extract_base_name(file_name)  # Extrai o nome base do arquivo

    # Se o arquivo já foi extraído, não faz o download novamente
    if is_file_extracted(output_dir, base_name, extracted_files):
        print(f"{file_name} já extraído. Pulando...")
        return None

    file_path = os.path.join(output_dir, file_name)  # Caminho onde o arquivo será salvo

    try:
        # Faz a requisição com suporte a sessões para otimizar múltiplos downloads
        with session.get(url, stream=True) as response:
            response.raise_for_status()  # Verifica se houve erro na requisição
            expected_size = int(response.headers.get("content-length", 0))  # Obtém o tamanho total do arquivo

            with open(file_path, "wb") as file, tqdm(
                total=expected_size, unit="B", unit_scale=True, desc=file_name, ncols=80, mininterval=30
            ) as progress:  # mininterval=30 impede que a barra de progresso atualize a todo momento
                last_update = time.time()  # Marca o tempo da última atualização manual
                for chunk in response.iter_content(chunk_size=1024 * 1024):  # Baixa em blocos de 1MB
                    file.write(chunk)
                    progress.update(len(chunk))  # Atualiza a barra de progresso

                    # Atualiza manualmente a cada 30 segundos para reduzir a poluição visual
                    if time.time() - last_update >= 30:
                        progress.refresh()  # Atualiza a exibição da barra de progresso
                        last_update = time.time()

        return file_path  # Retorna o caminho do arquivo baixado
    except requests.RequestException as e:
        print(f"Erro ao baixar {file_name}: {e}")
        if os.path.exists(file_path):
            os.remove(file_path)  # Remove arquivos corrompidos para evitar problemas
        return None

# Função para extrair arquivos ZIP e excluir o ZIP original
def extract_and_delete_zip(file_path, output_dir, extracted_files):
    try:
        base_name = extract_base_name(os.path.basename(file_path))  # Obtém o nome base do arquivo
        target_folder = os.path.join(output_dir, base_name)  # Define a pasta de extração
        os.makedirs(target_folder, exist_ok=True)  # Cria a pasta se não existir

        with zipfile.ZipFile(file_path, 'r') as zip_ref:
            for extracted_file in zip_ref.namelist():  # Lista arquivos dentro do ZIP
                extracted_file_path = zip_ref.extract(extracted_file, target_folder)  # Extrai o arquivo
                new_file_name = f"{base_name}_{os.path.basename(extracted_file)}"  # Renomeia o arquivo extraído
                new_file_path = os.path.join(target_folder, new_file_name)

                # Evita sobrescrever arquivos repetidos
                counter = 1
                while os.path.exists(new_file_path):
                    new_file_name = f"{base_name}_{counter}.csv"
                    new_file_path = os.path.join(target_folder, new_file_name)
                    counter += 1

                os.rename(extracted_file_path, new_file_path)  # Renomeia o arquivo

                extracted_files.add(os.path.basename(new_file_path))  # Adiciona ao log de arquivos extraídos

        os.remove(file_path)  # Exclui o arquivo ZIP após extração
        print(f"{file_path} extraído e excluído.")
    except zipfile.BadZipFile:
        print(f"Erro: {file_path} está corrompido.")
    except Exception as e:
        print(f"Erro ao extrair {file_path}: {e}")

# Carrega a lista de arquivos que já foram extraídos
def load_extracted_files(output_dir):
    log_path = os.path.join(output_dir, "extracted_files.txt")
    if os.path.exists(log_path):
        with open(log_path, "r") as log_file:
            return set(line.strip() for line in log_file)
    return set()

# Salva a lista de arquivos extraídos para evitar downloads repetidos
def save_extracted_files(output_dir, extracted_files):
    log_path = os.path.join(output_dir, "extracted_files.txt")
    with open(log_path, "w") as log_file:
        for file_name in extracted_files:
            log_file.write(f"{file_name}\n")

# Função principal que organiza download, extração e exclusão dos arquivos ZIP
def main_download_extract_delete():
    output_dir = "output"  # Pasta onde os arquivos serão salvos
    os.makedirs(output_dir, exist_ok=True)  # Cria a pasta de saída, se não existir

    extracted_files = load_extracted_files(output_dir)  # Carrega arquivos já extraídos
    links = get_download_links(base_url)  # Obtém os links dos arquivos ZIP
    if not links:
        print("Nenhum link válido encontrado.")
        return

    # Usa múltiplas threads para baixar arquivos simultaneamente (aumenta a velocidade)
    with ThreadPoolExecutor(max_workers=4) as executor, requests.Session() as session:
        futures = [executor.submit(download_file, link, output_dir, extracted_files, session) for link in links]
        for future in as_completed(futures):  # Processa cada download assim que ele termina
            file_path = future.result()
            if file_path:
                extract_and_delete_zip(file_path, output_dir, extracted_files)

    save_extracted_files(output_dir, extracted_files)  # Atualiza o log dos arquivos extraídos

# Executa a função principal quando o script é rodado
if __name__ == "__main__":
    main_download_extract_delete()



Empresas0.zip:   0%|                                 | 0.00/405M [00:00<?, ?B/s][A

Empresas2.zip:   0%|                                | 0.00/79.1M [00:00<?, ?B/s][A[A


Cnaes.zip: 100%|████████████████████████████| 22.1k/22.1k [00:00<00:00, 154kB/s]


output/Cnaes.zip extraído e excluído.


Empresas3.zip:   0%|                                | 0.00/85.1M [00:00<?, ?B/s]

Empresas2.zip:  31%|███████▎                | 24.1M/79.1M [00:30<01:10, 780kB/s][A[A

Empresas3.zip:  28%|██████▊                 | 24.1M/85.1M [00:30<01:17, 785kB/s]


Empresas1.zip:  31%|███████▍                | 24.1M/77.8M [00:31<01:09, 777kB/s][A[A[A


Empresas1.zip:  31%|███████▍                | 24.1M/77.8M [00:31<01:09, 777kB/s][A[A[A
Empresas0.zip:   6%|█▍                       | 24.1M/405M [00:31<08:10, 776kB/s][A
Empresas3.zip:  46%|██████████▉             | 38.8M/85.1M [00:49<00:59, 785kB/s]
Empresas0.zip:  10%|██▍                      | 38.8M/405M [00:49<07:51, 776kB/s][A


Empresas1.zip:  50%|███████████▉            | 38.8M/77.8M [00:49<00:50, 777kB/s][A[A[A

Empresas2.zip:  49%|███████████▊            | 38.8M/79.1M [00:50<00:51, 780kB/s][A[A

Empresas2.zip:  62%|██████████████▉         | 49.3M/79.1M [01:02<00:37, 795kB/s][A[A

Empresas2.zip:  62%|██████████████▉         | 4

output/Empresas1.zip extraído e excluído.
output/Empresas2.zip extraído e excluído.


Empresas3.zip: 100%|████████████████████████| 85.1M/85.1M [01:46<00:00, 798kB/s]
Empresas6.zip:   0%|                                | 0.00/94.5M [00:00<?, ?B/s]

output/Empresas3.zip extraído e excluído.



Empresas0.zip:  21%|█████▎                   | 87.0M/405M [01:50<06:37, 800kB/s][A
Empresas0.zip:  24%|██████                   | 98.6M/405M [02:03<06:22, 800kB/s][A
Empresas0.zip:  24%|██████                   | 98.6M/405M [02:03<06:22, 800kB/s][A


Empresas4.zip:  27%|██████▍                 | 24.1M/90.4M [00:30<01:22, 801kB/s][A[A[A


Empresas4.zip:  27%|██████▍                 | 24.1M/90.4M [00:30<01:22, 801kB/s][A[A[A

Empresas5.zip:  25%|█████▉                  | 24.1M/97.6M [00:30<01:31, 803kB/s][A[A

Empresas6.zip:  27%|██████▍                 | 25.2M/94.5M [00:31<01:25, 808kB/s]

Empresas5.zip:  33%|███████▉                | 32.5M/97.6M [00:40<01:21, 803kB/s][A[A


Empresas4.zip:  37%|████████▉               | 33.6M/90.4M [00:42<01:10, 801kB/s][A[A[A
Empresas6.zip:  35%|████████▌               | 33.6M/94.5M [00:42<01:15, 808kB/s]
Empresas0.zip:  31%|███████▉                  | 124M/405M [02:34<05:50, 802kB/s][A
Empresas0.zip:  31%|███████▉                  | 

output/Empresas4.zip extraído e excluído.



Empresas0.zip:  43%|███████████▏              | 174M/405M [03:37<04:46, 805kB/s][A
Empresas5.zip: 100%|████████████████████████| 97.6M/97.6M [02:01<00:00, 804kB/s]


Empresas8.zip:   0%|                                | 0.00/99.4M [00:00<?, ?B/s][A[A

output/Empresas5.zip extraído e excluído.


Empresas6.zip: 100%|████████████████████████| 94.5M/94.5M [01:57<00:00, 806kB/s]
Empresas9.zip:   0%|                                | 0.00/94.9M [00:00<?, ?B/s]

output/Empresas6.zip extraído e excluído.



Empresas0.zip:  45%|███████████▊              | 184M/405M [03:50<04:35, 805kB/s][A


Empresas7.zip:  25%|██████                  | 25.2M/99.2M [00:31<01:31, 810kB/s][A[A[A


Empresas7.zip:  25%|██████                  | 25.2M/99.2M [00:31<01:31, 810kB/s][A[A[A
Empresas0.zip:  49%|████████████▊             | 199M/405M [04:08<04:15, 805kB/s][A
Empresas0.zip:  49%|████████████▊             | 199M/405M [04:08<04:15, 805kB/s][A

Empresas8.zip:  25%|██████                  | 25.2M/99.4M [00:31<01:32, 806kB/s][A[A

Empresas9.zip:  27%|██████▎                 | 25.2M/94.9M [00:31<01:26, 805kB/s]


Empresas7.zip:  40%|█████████▋              | 39.8M/99.2M [00:49<01:13, 810kB/s][A[A[A
Empresas9.zip:  38%|█████████               | 35.7M/94.9M [00:45<01:13, 805kB/s]

Empresas8.zip:  39%|█████████▎              | 38.8M/99.4M [00:49<01:15, 806kB/s][A[A


Empresas7.zip:  51%|████████████▏           | 50.3M/99.2M [01:02<01:00, 808kB/s][A[A[A


Empresas7.zip:  51%|████████████▏     

output/Empresas7.zip extraído e excluído.



Empresas0.zip:  68%|█████████████████▋        | 275M/405M [05:42<02:41, 805kB/s][A
Empresas9.zip: 100%|████████████████████████| 94.9M/94.9M [01:58<00:00, 804kB/s]
Empresas8.zip: 100%|████████████████████████| 99.4M/99.4M [02:03<00:00, 806kB/s]


Estabelecimentos2.zip:   0%|                         | 0.00/337M [00:00<?, ?B/s][A[A

output/Empresas9.zip extraído e excluído.
output/Empresas8.zip extraído e excluído.



Empresas0.zip:  71%|██████████████████▌       | 288M/405M [06:00<02:24, 805kB/s][A


Estabelecimentos0.zip:   2%|▎               | 25.2M/1.59G [00:31<32:09, 810kB/s][A[A[A


Estabelecimentos0.zip:   2%|▎               | 25.2M/1.59G [00:31<32:09, 810kB/s][A[A[A
Empresas0.zip:  74%|███████████████████▏      | 299M/405M [06:12<02:11, 804kB/s][A
Estabelecimentos1.zip:   7%|█▏               | 24.1M/340M [00:30<06:37, 794kB/s]

Estabelecimentos2.zip:   7%|█▎               | 25.2M/337M [00:31<06:27, 804kB/s][A[A

Estabelecimentos2.zip:   7%|█▎               | 25.2M/337M [00:31<06:27, 804kB/s][A[A


Estabelecimentos1.zip:  11%|█▊               | 36.7M/340M [00:47<06:21, 794kB/s]
Empresas0.zip:  77%|████████████████████      | 312M/405M [06:30<01:54, 804kB/s][A

Estabelecimentos2.zip:  11%|█▊               | 36.7M/337M [00:45<06:13, 804kB/s][A[A


Estabelecimentos0.zip:   3%|▌               | 50.3M/1.59G [01:02<31:44, 807kB/s][A[A[A


Estabelecimentos1.zip:  14%|██▍          

output/Empresas0.zip extraído e excluído.





Estabelecimentos0.zip:   9%|█▌               | 150M/1.59G [03:06<29:49, 804kB/s][A[A[A


Estabelecimentos0.zip:   9%|█▌               | 150M/1.59G [03:06<29:49, 804kB/s][A[A[A

Estabelecimentos1.zip:  43%|███████▋          | 146M/340M [03:03<04:04, 793kB/s]


Estabelecimentos0.zip:  10%|█▋               | 158M/1.59G [03:16<29:39, 804kB/s][A[A[A

Estabelecimentos2.zip:  45%|████████          | 150M/337M [03:06<03:52, 802kB/s][A[A

Estabelecimentos2.zip:  45%|████████          | 150M/337M [03:06<03:52, 802kB/s][A[A
Estabelecimentos3.zip:   7%|█▏               | 24.1M/342M [00:30<06:39, 795kB/s][A
Estabelecimentos1.zip:  46%|████████▎         | 156M/340M [03:17<03:51, 793kB/s]
Estabelecimentos3.zip:  11%|█▊               | 36.7M/342M [00:46<06:23, 795kB/s][A

Estabelecimentos2.zip:  49%|████████▊         | 165M/337M [03:25<03:34, 802kB/s][A[A


Estabelecimentos0.zip:  11%|█▊               | 175M/1.59G [03:37<29:16, 804kB/s][A[A[A


Estabelecimentos1.zip:  50%|██████

output/Estabelecimentos2.zip extraído e excluído.





Estabelecimentos0.zip:  23%|███▊             | 360M/1.59G [07:26<25:32, 802kB/s][A[A[A
Estabelecimentos3.zip:  65%|███████████▋      | 222M/342M [04:36<02:28, 804kB/s][A

output/Estabelecimentos1.zip extraído e excluído.



Estabelecimentos3.zip:  65%|███████████▊      | 223M/342M [04:37<02:27, 804kB/s][A

Estabelecimentos4.zip:   7%|█▏               | 25.2M/364M [00:31<07:00, 805kB/s][A[A

Estabelecimentos5.zip:   7%|█▏               | 24.1M/338M [00:30<06:31, 802kB/s]


Estabelecimentos0.zip:  24%|████             | 375M/1.59G [07:45<25:04, 806kB/s][A[A[A


Estabelecimentos0.zip:  24%|████             | 375M/1.59G [07:45<25:04, 806kB/s][A[A[A
Estabelecimentos5.zip:  10%|█▋               | 33.6M/338M [00:42<06:19, 802kB/s]

Estabelecimentos4.zip:  10%|█▊               | 37.7M/364M [00:47<06:44, 805kB/s][A[A


Estabelecimentos0.zip:  24%|████             | 384M/1.59G [07:56<24:54, 806kB/s][A[A[A
Estabelecimentos3.zip:  72%|█████████████     | 247M/342M [05:07<01:57, 804kB/s][A
Estabelecimentos3.zip:  73%|█████████████     | 249M/342M [05:09<01:55, 804kB/s][A

Estabelecimentos4.zip:  14%|██▎              | 50.3M/364M [01:02<06:28, 806kB/s][A[A

Estabelecimentos5.zip:  15%|██▍            

output/Estabelecimentos3.zip extraído e excluído.




Estabelecimentos4.zip:  41%|███████▍          | 150M/364M [03:06<04:26, 802kB/s][A[A

Estabelecimentos5.zip:  43%|███████▊          | 147M/338M [03:04<04:02, 790kB/s]


Estabelecimentos0.zip:  31%|█████▎           | 500M/1.59G [10:21<22:35, 803kB/s][A[A[A


Estabelecimentos0.zip:  31%|█████▎           | 500M/1.59G [10:21<22:35, 803kB/s][A[A[A
Estabelecimentos6.zip:   7%|█▏               | 24.1M/335M [00:30<06:28, 799kB/s][A
Estabelecimentos6.zip:   7%|█▏               | 24.1M/335M [00:30<06:28, 799kB/s][A

Estabelecimentos5.zip:  48%|████████▌         | 161M/338M [03:22<03:43, 790kB/s]
Estabelecimentos6.zip:  10%|█▋               | 32.5M/335M [00:40<06:18, 799kB/s][A


Estabelecimentos0.zip:  32%|█████▍           | 512M/1.59G [10:36<22:21, 803kB/s][A[A[A

Estabelecimentos4.zip:  48%|████████▋         | 175M/364M [03:37<03:54, 803kB/s][A[A

Estabelecimentos5.zip:  51%|█████████         | 171M/338M [03:34<03:30, 794kB/s]


Estabelecimentos0.zip:  33%|█████▌           | 

output/Estabelecimentos5.zip extraído e excluído.





Estabelecimentos0.zip:  44%|███████▌         | 701M/1.59G [14:31<18:20, 806kB/s][A[A[A


Estabelecimentos0.zip:  44%|███████▌         | 701M/1.59G [14:31<18:20, 806kB/s][A[A[A
Estabelecimentos6.zip:  67%|████████████      | 223M/335M [04:37<02:18, 804kB/s][A
Estabelecimentos6.zip:  67%|████████████▏     | 225M/335M [04:40<02:15, 804kB/s][A

Estabelecimentos4.zip: 100%|██████████████████| 364M/364M [07:31<00:00, 805kB/s]


Estabelecimentos8.zip:   0%|                         | 0.00/358M [00:00<?, ?B/s][A[A
Estabelecimentos6.zip:  70%|████████████▌     | 233M/335M [04:50<02:06, 804kB/s][A


Estabelecimentos7.zip:   7%|█▏               | 24.1M/349M [00:30<06:52, 786kB/s]

output/Estabelecimentos4.zip extraído e excluído.





Estabelecimentos0.zip:  46%|███████▊         | 726M/1.59G [15:01<17:55, 802kB/s][A[A[A


Estabelecimentos0.zip:  46%|███████▊         | 726M/1.59G [15:01<17:55, 802kB/s][A[A[A
Estabelecimentos6.zip:  74%|█████████████▎    | 247M/335M [05:08<01:48, 801kB/s][A
Estabelecimentos7.zip:  11%|█▉               | 38.8M/349M [00:49<06:33, 786kB/s]

Estabelecimentos8.zip:   7%|█▏               | 24.1M/358M [00:30<06:59, 796kB/s][A[A

Estabelecimentos8.zip:   7%|█▏               | 24.1M/358M [00:30<06:59, 796kB/s][A[A
Estabelecimentos6.zip:  77%|█████████████▊    | 257M/335M [05:20<01:37, 801kB/s][A


Estabelecimentos7.zip:  14%|██▎              | 48.2M/349M [01:01<06:21, 787kB/s]

Estabelecimentos8.zip:  10%|█▋               | 35.7M/358M [00:45<06:45, 796kB/s][A[A


Estabelecimentos0.zip:  47%|████████         | 751M/1.59G [15:32<17:22, 803kB/s][A[A[A


Estabelecimentos0.zip:  47%|████████         | 751M/1.59G [15:32<17:22, 803kB/s][A[A[A
Estabelecimentos7.zip:  18%|███    

output/Estabelecimentos6.zip extraído e excluído.





Estabelecimentos0.zip:  52%|████████▊        | 825M/1.59G [17:05<15:49, 804kB/s][A[A[A


Estabelecimentos7.zip:  39%|██████▉           | 134M/349M [02:49<04:28, 798kB/s]

Estabelecimentos8.zip:  34%|██████▏           | 123M/358M [02:33<04:54, 802kB/s][A[A

Estabelecimentos8.zip:  34%|██████▏           | 123M/358M [02:33<04:54, 802kB/s][A[A


Estabelecimentos7.zip:  42%|███████▌          | 146M/349M [03:03<04:15, 793kB/s]
Estabelecimentos9.zip:   7%|█▏               | 24.1M/337M [00:30<06:31, 799kB/s][A
Estabelecimentos9.zip:   7%|█▏               | 24.1M/337M [00:30<06:31, 799kB/s][A

Estabelecimentos8.zip:  37%|██████▋           | 132M/358M [02:45<04:42, 802kB/s][A[A


Estabelecimentos0.zip:  54%|█████████        | 850M/1.59G [17:36<15:17, 805kB/s][A[A[A


Estabelecimentos7.zip:  45%|████████▏         | 158M/349M [03:19<03:59, 793kB/s]
Estabelecimentos9.zip:  10%|█▋               | 34.6M/337M [00:44<06:18, 799kB/s][A

Estabelecimentos8.zip:  41%|███████▍          | 1

output/Estabelecimentos7.zip extraído e excluído.
output/Motivos.zip extraído e excluído.
output/Municipios.zip extraído e excluído.
output/Naturezas.zip extraído e excluído.
output/Paises.zip extraído e excluído.
output/Qualificacoes.zip extraído e excluído.



Estabelecimentos9.zip:  70%|████████████▌     | 236M/337M [04:54<02:05, 805kB/s][A

Estabelecimentos8.zip:  97%|█████████████████▌| 349M/358M [07:14<00:11, 805kB/s][A[A

Estabelecimentos8.zip:  97%|█████████████████▌| 349M/358M [07:14<00:11, 805kB/s][A[A


Estabelecimentos0.zip:  67%|██████████▋     | 1.06G/1.59G [21:57<11:02, 800kB/s][A[A[A
Estabelecimentos9.zip:  74%|█████████████▎    | 250M/337M [05:10<01:48, 805kB/s][A
Simples.zip:  10%|██▋                        | 25.2M/251M [00:31<04:41, 802kB/s]

Estabelecimentos8.zip: 100%|██████████████████| 358M/358M [07:25<00:00, 804kB/s]


Socios0.zip:   0%|                                   | 0.00/174M [00:00<?, ?B/s][A[A


Estabelecimentos0.zip:  67%|██████████▊     | 1.07G/1.59G [22:09<10:48, 801kB/s][A[A[A


Simples.zip:  13%|███▌                       | 32.5M/251M [00:41<04:32, 802kB/s]
Estabelecimentos9.zip:  77%|█████████████▉    | 260M/337M [05:24<01:35, 805kB/s][A

output/Estabelecimentos8.zip extraído e excluído.





Estabelecimentos0.zip:  68%|██████████▉     | 1.08G/1.59G [22:27<10:33, 801kB/s][A[A[A
Estabelecimentos9.zip:  81%|██████████████▌   | 274M/337M [05:40<01:18, 803kB/s][A
Simples.zip:  20%|█████▎                     | 49.3M/251M [01:02<04:14, 793kB/s]

Socios0.zip:  14%|███▋                       | 24.1M/174M [00:30<03:07, 801kB/s][A[A

Socios0.zip:  14%|███▋                       | 24.1M/174M [00:30<03:07, 801kB/s][A[A


Estabelecimentos0.zip:  69%|██████████▉     | 1.09G/1.59G [22:39<10:27, 792kB/s][A[A[A


Estabelecimentos0.zip:  69%|██████████▉     | 1.09G/1.59G [22:39<10:27, 792kB/s][A[A[A
Simples.zip:  26%|██████▉                    | 64.0M/251M [01:21<03:55, 793kB/s]

Socios0.zip:  22%|██████                     | 38.8M/174M [00:49<02:49, 801kB/s][A[A


Estabelecimentos0.zip:  70%|███████████▏    | 1.11G/1.59G [22:57<10:10, 792kB/s][A[A[A
Estabelecimentos9.zip:  89%|███████████████▉  | 299M/337M [06:11<00:47, 804kB/s][A
Simples.zip:  29%|███████▉           

output/Estabelecimentos9.zip extraído e excluído.


Simples.zip:  49%|█████████████▌              | 122M/251M [02:32<02:41, 798kB/s]

Socios0.zip:  57%|███████████████▎           | 98.6M/174M [02:02<01:34, 803kB/s][A[A

Socios0.zip:  57%|███████████████▎           | 98.6M/174M [02:02<01:34, 803kB/s][A[A


Estabelecimentos0.zip:  73%|███████████▋    | 1.16G/1.59G [24:10<08:52, 796kB/s][A[A[A


Estabelecimentos0.zip:  73%|███████████▋    | 1.16G/1.59G [24:10<08:52, 796kB/s][A[A[A
Socios1.zip:  51%|█████████████▏            | 25.2M/49.5M [00:31<00:30, 807kB/s][A
Simples.zip:  54%|███████████████▏            | 136M/251M [02:51<02:23, 798kB/s]

Socios0.zip:  64%|█████████████████▊          | 111M/174M [02:19<01:18, 803kB/s][A[A


Estabelecimentos0.zip:  74%|███████████▊    | 1.18G/1.59G [24:27<08:37, 796kB/s][A[A[A
Simples.zip:  59%|████████████████▍           | 147M/251M [03:03<02:09, 801kB/s]

Socios0.zip:  71%|███████████████████▊        | 124M/174M [02:33<01:02, 805kB/s][A[A

Socios0.zip:  71%|███████████████████▊      

output/Socios1.zip extraído e excluído.


Simples.zip:  64%|█████████████████▉          | 160M/251M [03:21<01:52, 801kB/s]

Socios0.zip:  78%|█████████████████████▉      | 136M/174M [02:49<00:47, 805kB/s][A[A


Simples.zip:  69%|███████████████████▏        | 172M/251M [03:35<01:38, 802kB/s]

Socios0.zip:  85%|███████████████████████▉    | 149M/174M [03:05<00:31, 805kB/s][A[A

Socios0.zip:  85%|███████████████████████▉    | 149M/174M [03:05<00:31, 805kB/s][A[A


Estabelecimentos0.zip:  76%|████████████▏   | 1.21G/1.59G [25:12<07:47, 801kB/s][A[A[A


Estabelecimentos0.zip:  76%|████████████▏   | 1.21G/1.59G [25:12<07:47, 801kB/s][A[A[A
Socios2.zip:  51%|█████████████▎            | 25.2M/49.1M [00:31<00:29, 807kB/s][A
Simples.zip:  74%|████████████████████▌       | 185M/251M [03:51<01:22, 802kB/s]

Socios0.zip:  92%|█████████████████████████▊  | 160M/174M [03:19<00:17, 805kB/s][A[A


Estabelecimentos0.zip:  77%|████████████▎   | 1.22G/1.59G [25:27<07:34, 801kB/s][A[A[A
Simples.zip:  79%|██████████████████████   

output/Socios0.zip extraído e excluído.


Socios2.zip: 100%|██████████████████████████| 49.1M/49.1M [01:00<00:00, 805kB/s]

Socios4.zip:   0%|                                  | 0.00/49.0M [00:00<?, ?B/s][A

output/Socios2.zip extraído e excluído.


Simples.zip:  83%|███████████████████████▎    | 209M/251M [04:21<00:52, 805kB/s]


Simples.zip:  89%|████████████████████████▊   | 222M/251M [04:37<00:35, 805kB/s]

Socios3.zip:  51%|█████████████▎            | 25.2M/49.3M [00:31<00:29, 808kB/s][A[A

Socios3.zip:  51%|█████████████▎            | 25.2M/49.3M [00:31<00:29, 808kB/s][A[A


Estabelecimentos0.zip:  80%|████████████▋   | 1.26G/1.59G [26:15<06:43, 803kB/s][A[A[A


Estabelecimentos0.zip:  80%|████████████▋   | 1.26G/1.59G [26:15<06:43, 803kB/s][A[A[A
Socios4.zip:  51%|█████████████▎            | 25.2M/49.0M [00:31<00:29, 808kB/s][A
Simples.zip:  93%|██████████████████████████  | 234M/251M [04:51<00:21, 805kB/s]


Estabelecimentos0.zip:  80%|████████████▊   | 1.27G/1.59G [26:27<06:32, 803kB/s][A[A[A

Socios3.zip:  68%|█████████████████▋        | 33.6M/49.3M [00:42<00:19, 808kB/s][A[A
Socios3.zip: 100%|██████████████████████████| 49.3M/49.3M [01:01<00:00, 807kB/s]


Socios5.zip:   0%|                              

output/Socios3.zip extraído e excluído.


Simples.zip: 100%|████████████████████████████| 251M/251M [05:12<00:00, 802kB/s]
Socios4.zip: 100%|██████████████████████████| 49.0M/49.0M [01:00<00:00, 806kB/s]

Socios7.zip:   0%|                                  | 0.00/49.1M [00:00<?, ?B/s][A


Estabelecimentos0.zip:  82%|█████████████   | 1.30G/1.59G [26:57<06:02, 803kB/s][A[A[A

output/Simples.zip extraído e excluído.
output/Socios4.zip extraído e excluído.




Socios5.zip:  51%|█████████████▎            | 25.2M/49.2M [00:31<00:29, 808kB/s][A[A

Socios5.zip:  51%|█████████████▎            | 25.2M/49.2M [00:31<00:29, 808kB/s][A[A


Estabelecimentos0.zip:  83%|█████████████▏  | 1.31G/1.59G [27:17<05:40, 804kB/s][A[A[A


Socios6.zip:  49%|████████████▋             | 24.1M/49.5M [00:30<00:31, 801kB/s]

Socios5.zip:  66%|█████████████████▏        | 32.5M/49.2M [00:41<00:20, 808kB/s][A[A
Socios7.zip:  51%|█████████████▎            | 25.2M/49.1M [00:31<00:29, 808kB/s][A
Socios6.zip:  78%|████████████████████▍     | 38.8M/49.5M [00:48<00:13, 801kB/s]


Socios5.zip: 100%|██████████████████████████| 49.2M/49.2M [01:01<00:00, 806kB/s]


Socios8.zip:   0%|                                  | 0.00/49.3M [00:00<?, ?B/s][A[A
Socios7.zip:  83%|█████████████████████▋    | 40.9M/49.1M [00:50<00:10, 808kB/s][A

output/Socios5.zip extraído e excluído.





Estabelecimentos0.zip:  84%|█████████████▍  | 1.34G/1.59G [27:48<05:10, 803kB/s][A[A[A


Socios6.zip: 100%|██████████████████████████| 49.5M/49.5M [01:01<00:00, 798kB/s]
Socios9.zip:   0%|                                  | 0.00/49.0M [00:00<?, ?B/s]

output/Socios6.zip extraído e excluído.


Socios7.zip: 100%|██████████████████████████| 49.1M/49.1M [01:00<00:00, 806kB/s]


output/Socios7.zip extraído e excluído.





Estabelecimentos0.zip:  85%|█████████████▋  | 1.35G/1.59G [28:07<04:52, 803kB/s][A[A[A

Socios8.zip:  51%|█████████████▎            | 25.2M/49.3M [00:31<00:29, 808kB/s][A[A

Socios8.zip:  51%|█████████████▎            | 25.2M/49.3M [00:31<00:29, 808kB/s][A[A


Estabelecimentos0.zip:  86%|█████████████▋  | 1.36G/1.59G [28:19<04:38, 804kB/s][A[A[A


Socios9.zip:  75%|███████████████████▍      | 36.7M/49.0M [00:46<00:15, 797kB/s]

Socios8.zip:  81%|█████████████████████     | 39.8M/49.3M [00:50<00:11, 808kB/s][A[A


Estabelecimentos0.zip:  87%|█████████████▉  | 1.38G/1.59G [28:37<04:21, 804kB/s][A[A[A

Socios8.zip: 100%|██████████████████████████| 49.3M/49.3M [01:01<00:00, 806kB/s][A[A

Socios8.zip: 100%|██████████████████████████| 49.3M/49.3M [01:01<00:00, 806kB/s]


output/Socios8.zip extraído e excluído.





Estabelecimentos0.zip:  87%|█████████████▉  | 1.39G/1.59G [28:50<04:07, 804kB/s][A[A[A


Socios9.zip: 100%|██████████████████████████| 49.0M/49.0M [01:01<00:00, 800kB/s]


output/Socios9.zip extraído e excluído.





Estabelecimentos0.zip:  88%|██████████████  | 1.40G/1.59G [29:07<03:51, 804kB/s][A[A[A


Estabelecimentos0.zip:  89%|██████████████▏ | 1.41G/1.59G [29:21<03:36, 804kB/s][A[A[A


Estabelecimentos0.zip:  89%|██████████████▏ | 1.41G/1.59G [29:21<03:36, 804kB/s][A[A[A


Estabelecimentos0.zip:  90%|██████████████▎ | 1.43G/1.59G [29:37<03:21, 804kB/s][A[A[A


Estabelecimentos0.zip:  91%|██████████████▍ | 1.44G/1.59G [29:52<03:06, 802kB/s][A[A[A


Estabelecimentos0.zip:  91%|██████████████▍ | 1.44G/1.59G [29:52<03:06, 802kB/s][A[A[A


Estabelecimentos0.zip:  91%|██████████████▌ | 1.45G/1.59G [30:07<02:52, 802kB/s][A[A[A


Estabelecimentos0.zip:  92%|██████████████▋ | 1.46G/1.59G [30:22<02:36, 801kB/s][A[A[A


Estabelecimentos0.zip:  92%|██████████████▋ | 1.46G/1.59G [30:22<02:36, 801kB/s][A[A[A


Estabelecimentos0.zip:  93%|██████████████▊ | 1.47G/1.59G [30:37<02:22, 801kB/s][A[A[A


Estabelecimentos0.zip:  94%|██████████████▉ | 1.49G/1.59G [30:52<02:06, 800kB

output/Estabelecimentos0.zip extraído e excluído.


# Utilizando o Spark para fazer a conversão dos arquivos baixados para arquivos Parquet

Agora que temos a pasta com todos os arquivos, como se trata de planilhas com grande quantidade de dados, optamos por utilizar o Spark para manipular esses dados.

O código abaixo irá transformar todos os arquivos baixados em .parquet, irá colocá-los em pastas separadas e deletar os arquivos originais que não serão mais utilizados !

In [None]:
import os
import shutil
from pyspark.sql import SparkSession

# Criar a sessão Spark
spark = SparkSession.builder.appName("CSV para Parquet").getOrCreate()

# Função para converter arquivos CSV para Parquet
def converter_csv_para_parquet(caminho_arquivo, caminho_parquet_temp):
    df = spark.read.option("encoding", "ISO-8859-1").csv(
        caminho_arquivo,
        header=False, inferSchema=True, sep=";"
    )
    df.coalesce(1).write.mode("overwrite").parquet(caminho_parquet_temp)

# Lista de pastas a serem processadas
pastas = [
    'output/Cnaes', 'output/Motivos', 'output/Municipios', 'output/Naturezas',
    'output/Paises', 'output/Qualificacoes', 'output/Simples', 'output/Estabelecimentos',
    'output/Empresas', 'output/Socios'
]

# Loop para processar cada pasta
for caminho_pasta in pastas:
    if not os.path.exists(caminho_pasta):
        print(f"Pasta {caminho_pasta} não encontrada. Pulando...")
        continue

    arquivos = sorted([arquivo for arquivo in os.listdir(caminho_pasta) if os.path.isfile(os.path.join(caminho_pasta, arquivo))])

    if not arquivos:
        print(f"Nenhum arquivo encontrado em {caminho_pasta}. Pulando...")
        continue

    # Criar a pasta final para armazenar os Parquets
    caminho_parquet_final = f"{caminho_pasta}_Parquet"
    os.makedirs(caminho_parquet_final, exist_ok=True)

    contador = 0

    for arquivo in arquivos:
        caminho_arquivo = os.path.join(caminho_pasta, arquivo)
        caminho_parquet_temp = os.path.join(caminho_pasta, "temp_parquet")
        os.makedirs(caminho_parquet_temp, exist_ok=True)

        # Converter para Parquet
        converter_csv_para_parquet(caminho_arquivo, caminho_parquet_temp)

        # Nome final do arquivo Parquet
        nome_parquet = f"{os.path.basename(caminho_pasta)}{contador}.parquet"
        caminho_parquet_finalizado = os.path.join(caminho_parquet_final, nome_parquet)

        # Mover o arquivo Parquet final
        for arquivo_temp in os.listdir(caminho_parquet_temp):
            if arquivo_temp.endswith(".parquet"):
                shutil.move(os.path.join(caminho_parquet_temp, arquivo_temp), caminho_parquet_finalizado)

        # Remover a pasta temporária
        shutil.rmtree(caminho_parquet_temp)

        print(f"Arquivo {arquivo} convertido para {nome_parquet}")
        contador += 1

    # Remover a pasta original após conversão
    shutil.rmtree(caminho_pasta)
    print(f"Pasta {caminho_pasta} e arquivos CSV removidos.")

# Encerrar sessão do Spark
spark.stop()


Arquivo Cnaes_F.K03200$Z.D50208.CNAECSV convertido para Cnaes0.parquet
Pasta output/Cnaes e arquivos CSV removidos.
Arquivo Motivos_F.K03200$Z.D50208.MOTICSV convertido para Motivos0.parquet
Pasta output/Motivos e arquivos CSV removidos.
Arquivo Municipios_F.K03200$Z.D50208.MUNICCSV convertido para Municipios0.parquet
Pasta output/Municipios e arquivos CSV removidos.
Arquivo Naturezas_F.K03200$Z.D50208.NATJUCSV convertido para Naturezas0.parquet
Pasta output/Naturezas e arquivos CSV removidos.
Arquivo Paises_F.K03200$Z.D50208.PAISCSV convertido para Paises0.parquet
Pasta output/Paises e arquivos CSV removidos.
Arquivo Qualificacoes_F.K03200$Z.D50208.QUALSCSV convertido para Qualificacoes0.parquet
Pasta output/Qualificacoes e arquivos CSV removidos.
Arquivo Simples_F.K03200$W.SIMPLES.CSV.D50208 convertido para Simples0.parquet
Pasta output/Simples e arquivos CSV removidos.
Arquivo Estabelecimentos_K3241.K03200Y0.D50208.ESTABELE convertido para Estabelecimentos0.parquet
Arquivo Estabelec

# Começando o tratamento de dados

Primeiro foi criado uma função que será usada depois para fazer o tratamento das datas, que se encontra no formato YYYYMMDD e vamos mudar para o formato brasileiro DD/MM/YYYY

In [None]:
from pyspark.sql import functions as F

# Função que transforma a data da coluna para o formato DD/MM/YYYY
def transformar_data(df, coluna_data):
    # Criação das colunas 'ano', 'mes', 'dia' a partir da coluna fornecida
    df = df.withColumn("ano", F.substring(coluna_data, 1, 4))  # Pega os 4 primeiros caracteres como 'ano'
    df = df.withColumn("mes", F.substring(coluna_data, 5, 2))  # Pega os próximos 2 caracteres como 'mes'
    df = df.withColumn("dia", F.substring(coluna_data, 7, 2))  # Pega os 2 últimos caracteres como 'dia'

    # Criar a coluna 'data_atividade' no formato DD/MM/YYYY
    df = df.withColumn(coluna_data, F.concat(F.col("dia"), F.lit("/"), F.col("mes"), F.lit("/"), F.col("ano")))

    # Remover as colunas auxiliares 'ano', 'mes', 'dia'
    df = df.drop("ano", "mes", "dia")

    return df

Agora vamos começar a fazer o tratamento de nossas tabelas. Os tratamentos foram quebrados em varios blocos de códigos, para o tratamento não ficar muito grande e ter uma melhor execução

In [None]:
from pyspark.sql import SparkSession
import os

# Criando a sessão do Spark
spark = SparkSession.builder \
    .appName("Leitura Parquet de múltiplos arquivos") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# Caminho da pasta que contém os arquivos Parquet
file_path = r'output/Estabelecimentos_Parquet/'

# Lendo todos os arquivos Parquet dentro da pasta
arquivos = os.listdir(file_path)

# Lendo e processando os arquivos Parquet
for arquivo in arquivos:
    if arquivo.endswith('.parquet'):
        df_temp = spark.read.parquet(os.path.join(file_path, arquivo))

        df_temp = df_temp.withColumnRenamed("_c0", "id_cnpj") #1
        df_temp = df_temp.withColumnRenamed("_c1", "cnpj_ordem") #2
        df_temp = df_temp.withColumnRenamed("_c2", "cnpj_dv") #3
        df_temp = df_temp.withColumnRenamed("_c3", "identificador_matriz_filial") #4
        df_temp = df_temp.withColumnRenamed("_c4", "nome_fantasia") #5
        df_temp = df_temp.withColumnRenamed("_c5", "situacao_cadastral") #6
        df_temp = df_temp.withColumnRenamed("_c6", "data_situacao_cadastral") #7
        df_temp = df_temp.withColumnRenamed("_c7", "motivo_situacao_cadastral") #8
        df_temp = df_temp.withColumnRenamed("_c8", "nome_cidade_exterior") #9
        df_temp = df_temp.withColumnRenamed("_c9", "codigo_pais") #10
        df_temp = df_temp.withColumnRenamed("_c10", "data_inicio_atividade") #11
        df_temp = df_temp.withColumnRenamed("_c11", "ramo") #12
        df_temp = df_temp.withColumnRenamed("_c12", "cnae_fiscal_secundario") #13
        df_temp = df_temp.withColumnRenamed("_c13", "tipo_logradouro") #14
        df_temp = df_temp.withColumnRenamed("_c14", "logradouro") #15
        df_temp = df_temp.withColumnRenamed("_c15", "numero") #16
        df_temp = df_temp.withColumnRenamed("_c16", "complemento") #17
        df_temp = df_temp.withColumnRenamed("_c17", "bairro") #18
        df_temp = df_temp.withColumnRenamed("_c18", "cep") #19
        df_temp = df_temp.withColumnRenamed("_c19", "uf") #20
        df_temp = df_temp.withColumnRenamed("_c20", "id_municipio") #21
        df_temp = df_temp.withColumnRenamed("_c21", "ddd1") #22
        df_temp = df_temp.withColumnRenamed("_c22", "telefone1") #23
        df_temp = df_temp.withColumnRenamed("_c23", "ddd2") #24
        df_temp = df_temp.withColumnRenamed("_c24", "telefone2") #25
        df_temp = df_temp.withColumnRenamed("_c25", "ddd_fax") #26
        df_temp = df_temp.withColumnRenamed("_c26", "fax") #27
        df_temp = df_temp.withColumnRenamed("_c27", "email") #28
        df_temp = df_temp.withColumnRenamed("_c28", "situacao_especial") #29
        df_temp = df_temp.withColumnRenamed("_c29", "data_situacao_especial") #30

        # Realizando os tratamentos no DataFrame
        df_temp = df_temp.withColumn("numero", F.regexp_replace(F.col("numero"), "^000$|^00$|^0$", ""))
        df_temp = df_temp.withColumn("numero", F.regexp_replace(F.col("numero"), "^SN$", "S/N"))

        # Tratando valores nulos antes de aplicar as transformações
        df_temp = df_temp.withColumn("logradouro",
                                     F.when(F.col("logradouro").isNotNull(),
                                            F.regexp_replace(F.regexp_replace(F.regexp_replace(
                                                F.regexp_replace(F.col("logradouro"), r"^.*\bRUA\b.*$", "RUA"),
                                                r"^.*\bAVENIDA\b.*$", "AVENIDA"),
                                                r"^.*\bTRAVESSA\b.*$", "TRAVESSA"),
                                                r"UTF-8", "ASCII//TRANSLIT"))
                                     .otherwise(F.col("logradouro")))

        # Criando novas colunas
        df_temp = df_temp.withColumn('cnpj_completo', F.concat(F.col('id_cnpj'), F.col('cnpj_ordem'), F.col('cnpj_dv')))
        df_temp = df_temp.drop('cnpj_ordem', 'cnpj_dv')

        # Lendo o arquivo de cidades
        cidade = spark.read.parquet(r'output/Municipios_Parquet/Municipios0.parquet')

        # Selecionando apenas as colunas necessárias da tabela cidade e renomeando _c1 para nome_municipio
        cidade = cidade.select(F.col("_c0").alias("municipio"), F.col("_c1").alias("nome_municipio"))

        # Realizando o join com a tabela df_temp, comparando os IDs corretamente
        df_temp = df_temp.join(cidade, df_temp["id_municipio"] == cidade["municipio"], how="left")

        # Removendo a coluna duplicada "municipio" da tabela cidade, se necessário
        df_temp = df_temp.drop(cidade["municipio"])

        # Alterando 0 e 1 para Matriz ou Filial
        df_temp = df_temp.withColumn("identificador_matriz_filial", F.when(F.col("identificador_matriz_filial") == 1, "matriz").otherwise("filial"))

        df_temp = transformar_data(df_temp, "data_situacao_cadastral")
        df_temp = transformar_data(df_temp, "data_inicio_atividade")

        # Continuando o processamento de outras colunas (telefone, whatsapp, etc.)
        df_temp = df_temp.withColumn("telefone",F.when(F.col("ddd1").isNotNull(),
           # Formatando o DDD (considerando que os 2 primeiros dígitos são o DDD)
           F.concat(
               F.lit("("),
               F.substring(F.col("ddd1").cast("string"), 1, 2),  # DDD
               F.lit(") "),
               F.regexp_replace(F.col("telefone1"), r"\D", "")  # Número sem caracteres não numéricos
           )
            ).otherwise(F.lit("NA"))
        )

        df_temp = df_temp.withColumn("whats", F.concat(F.lit("https://api.whatsapp.com/send?phone=55"),
                                                       F.col("ddd1"), F.col("telefone1"),
                                                       F.lit("&text=Ol%C3%A1%2C%20tudo%20bom%3F%20Pe%C3%A7o%20licen%C3%A7a%20para%20entrar%20em%20contato...")))

        df_temp = df_temp.withColumn("email",
                                     F.when((F.col("email") == "NULL") | (F.col("email") == ""), "NA")
                                     .otherwise(F.col("email")))


        df_temp = df_temp.withColumn("tem_whats",
                                     F.when(F.substring(F.col("telefone1").cast("string"), 1, 1).isin("8", "9"), "SIM")
                                     .otherwise("NAO"))

        df_final = df_temp


In [None]:
# Caminho da pasta que contém os arquivos Parquet
file_path = r'output/Empresas_Parquet/'

# Lendo todos os arquivos Parquet dentro da pasta
arquivos = os.listdir(file_path)

# Lendo e processando os arquivos Parquet
for arquivo in arquivos:
    if arquivo.endswith('.parquet'):
        df_temp_empresas = spark.read.parquet(os.path.join(file_path, arquivo))

        # Renomeando as colunas da tabela Empresas
        df_temp_empresas = df_temp_empresas.withColumnRenamed("_c0", "id_cnpj")  #1
        df_temp_empresas = df_temp_empresas.withColumnRenamed("_c1", "razao_social")  #2
        df_temp_empresas = df_temp_empresas.withColumnRenamed("_c2", "id_natureza_juridica")  #3
        df_temp_empresas = df_temp_empresas.withColumnRenamed("_c3", "id_qualificacao")  #4
        df_temp_empresas = df_temp_empresas.withColumnRenamed("_c4", "capital_social")  #5
        df_temp_empresas = df_temp_empresas.withColumnRenamed("_c5", "porte")  #6
        df_temp_empresas = df_temp_empresas.withColumnRenamed("_c6", "ente_federativo_resp")  #7

        # Realizando o join entre df_final e df_temp_empresas
        df_combinado = df_final.join(df_temp_empresas, on="id_cnpj", how="left")


        # Caminhos dos arquivos Parquet para natureza_juridica e qualificacao
        natureza_juridica_path = "output/Naturezas_Parquet/Naturezas0.parquet"
        qualificacao_path = "output/Qualificacoes_Parquet/Qualificacoes0.parquet"


        # Lendo os arquivos Parquet
        natureza_juridica = spark.read.parquet(natureza_juridica_path)
        qualificacao = spark.read.parquet(qualificacao_path)

        # Renomeando natureza_juridica
        natureza_juridica = natureza_juridica.withColumnRenamed("_c0", "id_natureza_juridica")  #1
        natureza_juridica = natureza_juridica.withColumnRenamed("_c1", "nome_natureza_juridica")  #2

        # Renomeando qualificacao
        qualificacao = qualificacao.withColumnRenamed("_c0", "id_qualificacao")  #1
        qualificacao = qualificacao.withColumnRenamed("_c1", "nome_qualificacao_socio")  #2



        # Realizando o join entre df_combinado e natureza_juridica
        df_combinado = df_combinado.join(natureza_juridica, on="id_natureza_juridica", how="left")

        # Realizando o join entre df_combinado e qualificacao
        df_combinado = df_combinado.join(qualificacao,on="id_qualificacao", how="left")


In [None]:
# Caminho da pasta que contém os arquivos Parquet
file_path = r'output/Socios_Parquet/'

# Lendo todos os arquivos Parquet dentro da pasta
arquivos = os.listdir(file_path)

# Lendo e processando os arquivos Parquet
for arquivo in arquivos:
    if arquivo.endswith('.parquet'):
        df_temp_socios = spark.read.parquet(os.path.join(file_path, arquivo))

    # Renomeando as colunas da tabela Socios
        df_temp_socios = df_temp_socios.withColumnRenamed("_c0", "id_cnpj")  #1
        df_temp_socios = df_temp_socios.withColumnRenamed("_c1", "tipo_socio")  #2
        df_temp_socios = df_temp_socios.withColumnRenamed("_c2", "nome_do_socio")  #3
        df_temp_socios = df_temp_socios.withColumnRenamed("_c3", "cnpj_cpf_socio")  #4
        df_temp_socios = df_temp_socios.withColumnRenamed("_c4", "id_qualificacao_socio")  #5
        df_temp_socios = df_temp_socios.withColumnRenamed("_c5", "data_entrada_sociedade")  #6
        df_temp_socios = df_temp_socios.withColumnRenamed("_c6", "id_pais")  #7
        df_temp_socios = df_temp_socios.withColumnRenamed("_c7", "cpf_representante_legal")  #8
        df_temp_socios = df_temp_socios.withColumnRenamed("_c8", "nome_representante")  #9
        df_temp_socios = df_temp_socios.withColumnRenamed("_c9", "id_qualificacao_representante")  #10
        df_temp_socios = df_temp_socios.withColumnRenamed("_c10", "faixa_etaria")  #11

    # Realizando a junção entre df_combinado e df_temp_socios
    df_combinado_final = df_combinado.join(df_temp_socios, on="id_cnpj", how="left")

    df_combinado_final = df_combinado_final.withColumn(
    "tipo_socio",
    F.when(df_combinado_final.tipo_socio == "1", "pessoa_juridica")
    .when(df_combinado_final.tipo_socio == "2", "pessoa_fisica")
    .when(df_combinado_final.tipo_socio == "3", "estrangeiro")
    .otherwise(df_combinado_final.tipo_socio)  # Caso não seja nenhuma das condições acima
    )
      # Criando a coluna 'faixa_etaria' com base nas condições da coluna 'V11'
    df_combinado_final = df_combinado_final.withColumn(
    "faixa_etaria",
    F.when(df_combinado_final.faixa_etaria == "0", "Nao_se_aplica")
    .when(df_combinado_final.faixa_etaria == "1", "0 a 12")
    .when(df_combinado_final.faixa_etaria == "2", "13 a 20")
    .when(df_combinado_final.faixa_etaria == "3", "21 a 30")
    .when(df_combinado_final.faixa_etaria == "4", "31 a 40")
    .when(df_combinado_final.faixa_etaria == "5", "41 a 50")
    .when(df_combinado_final.faixa_etaria == "6", "51 a 60")
    .when(df_combinado_final.faixa_etaria == "7", "61 a 70")
    .when(df_combinado_final.faixa_etaria == "8", "71 a 80")
    .when(df_combinado_final.faixa_etaria == "9", "Mais_de_80")
    .otherwise(df_combinado_final.faixa_etaria)  # Caso não seja nenhuma das condições acima
    )
    # Criando a coluna 'data_entrada_socio' com o formato 'dd/mm/aaaa'
    df_combinado_final = df_combinado_final.withColumn(
    "data_entrada_sociedade",
    F.concat(
        F.substring(df_combinado_final.data_entrada_sociedade, 7, 2),  # Extraindo o dia
        F.lit("/"),
        F.substring(df_combinado_final.data_entrada_sociedade, 5, 2),  # Extraindo o mês
        F.lit("/"),
        F.substring(df_combinado_final.data_entrada_sociedade, 1, 4)   # Extraindo o ano
    )
    )

    # Caminho para os arquivos Parquet
    paises_path = "output/Paises_Parquet/Paises0.parquet"

    # Lendo os arquivos Parquet
    df_paises = spark.read.parquet(paises_path)

    # Renomeando colunas de df_paises para facilitar o join
    df_paises = df_paises.withColumnRenamed("_c0", "id_pais").withColumnRenamed("_c1", "nome_pais")

    # Fazendo o join com df_paises e criando a coluna pais_socio
    df_combinado_final = df_combinado_final.join(df_paises, on="id_pais", how="left")



    # Carregar o arquivo Cnaes
    cnae = spark.read.parquet("output/Cnaes_Parquet/Cnaes0.parquet")

    # Realizar o join entre df_combinado_final e cnae
    df_combinado_final = df_combinado_final.join(cnae, df_combinado_final.ramo == cnae["_c0"], "left")


    # Renomeando a coluna _c1 para ramo_nome
    df_combinado_final = df_combinado_final.withColumnRenamed("_c1", "ramo_nome")

    # Adicionando "#" como prefixo na coluna 'porte'
    df_combinado_final = df_combinado_final.withColumn("porte", F.concat(F.lit("#"), F.col("porte")))

        # Criando a nova categoria baseada nos valores da coluna 'porte'
    df_combinado_final = df_combinado_final.withColumn(
    "categoria",
    F.when(F.col("porte") == "#1", "MICRO")
     .when(F.col("porte") == "#3", "PEQUENA")
     .when(F.col("porte") == "#5", "GRANDE/OUTROS")
     .otherwise(None)  # Para manter valores que não estão na lista como NULL
    )

    # Atualizando a coluna 'porte' com os valores de 'new_category'
    df_combinado_final = df_combinado_final.withColumn("porte", F.col("categoria"))

    # Removendo a coluna c0
    df_combinado_final = df_combinado_final.drop(cnae["_c0"])

# Removendo algumas colunas
df_combinado_final = df_combinado_final.drop("id_qualificacao_socio", "id_pais","id_qualificacao", "cnae_fiscal_secundario", "id_natureza_juridica", "id_municipio")



In [None]:


Simples_path = "output/Simples_Parquet/Simples0.parquet"

simples = spark.read.parquet(Simples_path)

# Renomeando Simples
simples = simples.withColumnRenamed("_c0", "id_cnpj")  #1
simples = simples.withColumnRenamed("_c1", "opcao_simples")  #2
simples = simples.withColumnRenamed("_c2", "data_opcao_simples")  #3
simples = simples.withColumnRenamed("_c3", "data_exclusao_simples")  #4
simples = simples.withColumnRenamed("_c4", "opcao_mei")  #5
simples = simples.withColumnRenamed("_c5", "data_opcao_mei")  #6
simples = simples.withColumnRenamed("_c6", "data_exclusao_mei")  #7

df_combinado_final = df_combinado_final.join(simples, on="id_cnpj", how="left")


motivos_path = "output/Motivos_Parquet/Motivos0.parquet"

motivos = spark.read.parquet(motivos_path)

# Renomeando Motivos
motivos = motivos.withColumnRenamed("_c0", "motivo_situacao_cadastral")  #1
motivos = motivos.withColumnRenamed("_c1", "descricao_motivo")  #2

df_combinado_final = df_combinado_final.join(motivos, on="motivo_situacao_cadastral", how="left")
df_combinado_final = df_combinado_final.drop(df_combinado_final.columns[0])



### Criando a coluna do tipo de ramo de atividade baseado no CNAE

In [None]:
# Alterando os valores de 'ramo' que têm 6 caracteres para começar com '0'
df_combinado_final = df_combinado_final.withColumn("ramo",
                             F.when(F.length(F.col("ramo")) == 6, F.lpad(F.col("ramo"), 7, '0'))
                             .otherwise(F.col("ramo"))
                            )

In [None]:

from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

# Dicionário de ramos
ramo_dict = {
    "Agricultura, Pecuária, Produção Florestal, Pesca e Aquicultura": ["0111", "0112", "0141", "0142", "0143", "0144", "0151", "0152", "0153"],
    "Indústria Extrativa": ["0510", "0520", "0910", "0990"],
    "Indústria de Transformação": [
        "1011", "1012", "1031", "1032", "1033", "1039", "1041", "1051", "1052", "1053", "1054", "1055", "1061", "1062", "1063", "1064", "1065", "1069", "1071", "1072", "1073", "1074", "1079", "1081", "1082", "1083", "1084", "1091", "1092", "1093", "1094", "1095", "1096", "1099", "1101", "1102", "1103", "1104", "1105", "1106", "1107"
    ],
    "Fornecimento de Eletricidade e Gás": ["3512", "3513", "3514", "3520"],
    "Águas, Esgotos, Atividades de Gestão de Resíduos e Descontaminação": ["3600", "3700", "3900"],
    "Construção": ["4110", "4211", "4299"],
    "Comércio; Reparação de Veículos Automotores e Motocicletas": ["4511", "4520", "4711", "4712", "4721", "4722", "4731", "4732", "4741", "4742", "4751", "4752", "4753", "4759", "4761", "4762", "4763", "4771", "4772", "4773", "4774", "4775", "4776", "4777", "4779", "4781", "4782", "4783", "4789", "4791", "4799"],
    "Transporte, Armazenagem e Comunicação": ["4911", "4912", "4921", "4922", "4930", "5011", "5021", "5111", "5112", "5120", "5210", "5221", "5222", "5223", "5230", "5311", "5312", "5320", "5511", "5512", "5590", "5611", "5612", "5613", "5614", "5619", "5620", "5630", "5811", "5812", "5813", "5814", "5820", "5911", "5912", "5913", "5920", "6010", "6020", "6110", "6120", "6130"],
    "Alojamento e Alimentação": ["5511", "5512", "5520", "5590", "5611", "5612", "5613", "5614", "5619"],
    "Informação e Comunicação": ["5811", "5812", "5813", "5814", "5820", "5911", "5912", "5913", "5920", "6010", "6020", "6110", "6120", "6130"],
    "Atividades Financeiras, de Seguros e Serviços Relacionados": ["6411", "6412", "6421", "6422", "6423", "6424", "6431", "6432", "6433", "6440", "6511", "6512", "6520", "6530", "6611", "6612", "6613", "6619", "6621", "6622", "6629"],
    "Imobiliárias e Aluguel de Bens Imóveis e Intangíveis": ["6810", "6821", "6822", "6831", "6832", "6911", "6912", "6920"],
    "Atividades Profissionais, Científicas e Técnicas": ["6911", "6912", "6920", "7020", "7111", "7112", "7120", "7210", "7220", "7311", "7312", "7319", "7320", "7410", "7420", "7490"],
    "Serviços Administrativos e Serviços de Apoio": ["7711", "7712", "7721", "7722", "7730", "7740", "7810", "7820", "7830", "7911", "7912", "7990"],
    "Administração Pública, Defesa e Seguridade Social": ["8411", "8412", "8413", "8421", "8422", "8430"],
    "Educação": ["8511", "8512", "8520", "8531", "8532", "8541", "8542", "8591", "8592", "8599"],
    "Saúde Humana e Serviços Sociais": ["8610", "8621", "8622", "8630", "8690", "8691", "8692", "8699"],
    "Artes, Cultura, Esportes e Recreação": ["9001", "9002", "9003", "9004", "9101", "9102", "9103", "9200", "9311", "9312", "9319", "9321", "9329"],
    "Outras Atividades de Serviços": ["9601", "9602", "9603", "9604", "9605", "9606", "9609"],
    "Organismos Internacionais e Outras Instituições Extraterritoriais": ["9901", "9902", "9903", "9904"],
}

# Função de mapeamento
def mapear_ramo(codigo):
    for categoria, codigos in ramo_dict.items():
        if codigo in codigos:
            return categoria
    return "outros"  # Caso o código não esteja no dicionário

# Registrar a função como UDF
mapear_ramo_udf = udf(mapear_ramo, StringType())

# Aplicar a UDF ao DataFrame
df_combinado_final = df_combinado_final.withColumn(
    "ramo_da_empresa",
    mapear_ramo_udf(col("ramo").substr(1, 4))
)



### Salvando o Dataframe em arquivos Parquet

In [None]:


# Caminho onde será salvo o arquivo
output_path = "Arquivo_Final"

# Criar a pasta se não existir
if not os.path.exists(output_path):
    os.makedirs(output_path)

# Verificar se o DataFrame não está vazio antes de salvar
if df_combinado_final is not None and df_combinado_final.count() > 0:
    # Salvar no formato Parquet
    df_combinado_final.write.mode("overwrite").parquet(output_path, compression="snappy")
    print(f"Arquivo salvo com sucesso em: {output_path}")
else:
    print("⚠️ O DataFrame está vazio ou não foi definido.")


Arquivo salvo com sucesso em: Arquivo_Final


# Carregando o arquivo Parquet de acordo com a região escolhida

Aqui estamos carregando os arquivos parquet salvos e transformando em um dataset no pandas para poder ser manipulado

OBS: Existe um campo uf_filtro, que é uma variável aonde você pode filtrar pelo o UF do estado que você deseja buscar as informações (EX: MG, SP, GO). Caso você queira puxar as informações de todos os estados é só deixar o campo vazio. (EX: uf_filtro = "" )

In [None]:
import glob
import pandas as pd
from tabulate import tabulate

# Caminho da pasta onde os arquivos Parquet estão armazenados
pasta = '/content/Arquivo_Final'  # Ajuste o caminho para o seu diretório

# Variável uf_filtro (defina a UF que você deseja filtrar ou deixe em branco para trazer todos)
uf_filtro = ''  # Exemplo: 'mg', pode deixar como '' (vazio) para trazer todos os UFs

# Garantir que o uf_filtro seja em maiúsculas
uf_filtro = uf_filtro.upper() if uf_filtro else ''

# Listar todos os arquivos Parquet na pasta
arquivos_parquet = glob.glob(pasta + '/*.parquet')

# Verifique se a lista de arquivos Parquet não está vazia
if arquivos_parquet:
    # Carregar todos os arquivos Parquet em um único DataFrame
    df = pd.concat([pd.read_parquet(arquivo) for arquivo in arquivos_parquet], ignore_index=True)

    # Filtrar os dados pela coluna 'uf', se uf_filtro não estiver vazio
    if uf_filtro:
        df = df[df['uf'] == uf_filtro]

    # Exibir o DataFrame com tabulate para visualização mais bonita no terminal
    print(tabulate(df.head(), headers='keys', tablefmt='pretty'))
else:
    print("Nenhum arquivo Parquet encontrado na pasta.")


+---+---------+-----------------------------+---------------+--------------------+-------------------------+----------------------+-------------+-----------------------+---------+-----------------+----------------+--------+-------------+-------------------+----------+----+------+-----------+------+-----------+---------+----------+-------+-------------------+------------------------+---------------+----------------+---------------+------------------------------------------------------------------------------------------------------------------------------------------------+-----------+--------------+----------------+-------+----------------------+------------------------+-------------------------+---------------+----------------------------------+----------------+------------------------+-------------------------+--------------------+-------------------------------+--------------+-----------+-----------------------------------------------+-----------+---------------+--------------------