<a href="https://colab.research.google.com/github/maxMitsuya/classificacao_empresas/blob/main/Projeto_Classificacao_Empresas.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Categorização de todas as empresas do Brasil para classificação de Good Places to work

Este é um projeto de ETL (Extract, Transform and Load) que visa realizar a coleta dos dados de todas as empresas do Brasil no site da receita federal, extrair os dados em formato .csv, realizar o tratamento desses dados e classificar segundo seu cnae fiscal.

## Coleta e Extração dos dados

In [None]:
# Manter a conexão do google drive ativa
from google.colab import output
output.eval_js('google.colab.kernel.proxyPort(5000)')

In [None]:
#importe das bibliotecas
import os #Manipulação dos arquivos
import requests #Requisição de páginas web
from bs4 import BeautifulSoup #WebScraping
from urllib.parse import urljoin #Join de urls
from concurrent.futures import ThreadPoolExecutor #Paralelismo de código
from google.colab import drive #Ativar o drive

In [None]:
def download_dados(url_base, data_coleta):

  # URL da página que contém os arquivos
  url = urljoin(url_base, data_coleta)

  # Diretório no Google Drive onde os arquivos serão salvos
  save_directory = "/content/drive/My Drive/" + data_coleta

  # Cria o diretório se ele não existir
  if not os.path.exists(save_directory):
      os.makedirs(save_directory)

  # Função para baixar um arquivo
  def download_file(file_url, file_name):
      print(f"Baixando {file_name}...")
      try:
          with requests.get(file_url, stream=True) as r:
              r.raise_for_status()
              with open(file_name, 'wb') as f:
                  for chunk in r.iter_content(chunk_size=8192):
                      f.write(chunk)
          print(f"{file_name} baixado com sucesso!")
      except Exception as e:
          print(f"Erro ao baixar {file_name}: {e}")

  # Faz a requisição para a página
  response = requests.get(url)
  response.raise_for_status()  # Verifica se a requisição foi bem-sucedida

  # Parseia o conteúdo HTML da página
  soup = BeautifulSoup(response.text, 'html.parser')

  # Lista para armazenar as URLs dos arquivos
  file_urls = []

  # Encontra todos os links na página
  for link in soup.find_all('a'):
      href = link.get('href')
      if href and href.endswith('.zip'):
          file_url = urljoin(url, href)
          file_name = os.path.join(save_directory, href.split('/')[-1])
          file_urls.append((file_url, file_name))

  # Baixa os arquivos em paralelo usando ThreadPoolExecutor
  with ThreadPoolExecutor(max_workers=5) as executor:  # Ajuste o número de workers conforme necessário
      executor.map(lambda args: download_file(*args), file_urls)

  print("Todos os arquivos foram baixados.")

In [None]:
# Obter a data do arquivo mais atual
from datetime import datetime, timedelta
def data(hoje):
  hoje = hoje
  mes_passado = hoje.replace(day=1) - timedelta(days=1)
  mes = mes_passado.month
  ano = mes_passado.year
  mes_str = '0' + str(mes) if mes < 10 else str(mes)
  data_coleta = f'{ano}' + '-' +mes_str + '/'
  return data_coleta

hoje = datetime.now()
data_coleta = data(hoje)

In [None]:
url_base = "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/"

#Baixa os dados usando a função download_dados()
download_dados(url_base, data_coleta)

## Extração dos dados

In [None]:
#import das bibliotecas
import zipfile

In [None]:
pastas_zipadas = "/content/drive/MyDrive/" + data_coleta #Local da pasta com os arquivos zipados
pasta_extraida = pastas_zipadas + data_coleta #Local onde os arquivos vão ser descompactados

In [None]:
os.makedirs(pasta_extraida, exist_ok=True) #Verifica se a pasta existe

In [None]:
#Cria uma lista com todos os arquivos zip dentro da pasta
arquivos_zip = [f for f in os.listdir(pastas_zipadas) if f.endswith(".zip")]

In [None]:
#Para cada arquivo dentro da pasta zipada, extrai o conteúdo e renomeia o arquivo com o nome do arquivo zip.
#Exemplo: arquivo zip = cnae.zip, arquivo extraído = F.K03200$Z.D50111.CNAECSV, arquivo renomeado = cnae.csv
for arquivo in arquivos_zip:
    zip_path = os.path.join(pastas_zipadas, arquivo)
    nome_zip = os.path.splitext(arquivo)[0]

    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        arquivos_extraidos = zip_ref.namelist()
        zip_ref.extractall(pasta_extraida)  # Extrai todos os arquivos na pasta destino

        arquivo_original = os.path.join(pasta_extraida, arquivos_extraidos[0])  # Primeiro arquivo extraído
        novo_nome = os.path.join(pasta_extraida, f"{nome_zip}.csv")  # Novo nome do arquivo

        os.rename(arquivo_original, novo_nome)  # Renomeia o arquivo
        print(f"✔ {arquivo} extraído e renomeado para {nome_zip}.csv")

## Data Preparation

## Importações de bibliotecas necessárias

Este notebook realiza o processamento de dados do CNPJ brasileiro, convertendo arquivos CSV em Parquet e aplicando diversas transformações para facilitar análises posteriores.
Bibliotecas principais: PySpark para processamento distribuído e módulos do sistema para manipulação de arquivos.

In [None]:
import os
from google.colab import drive
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from pyspark.sql.functions import col, concat, lit, broadcast, regexp_replace, substring
from pyspark.sql.functions import when, to_date, date_format, create_map, count, isnan
from pyspark.sql.functions import explode, split, collect_list
from itertools import chain
import time
import gc

Esta seção configura o ambiente de trabalho, definindo os diretórios necessários para armazenar os arquivos originais e processados. O Google Drive é montado para facilitar o acesso persistente aos dados entre sessões.

In [None]:
# =============== CONFIGURAÇÕES E INICIALIZAÇÃO ===============

# Montar o Google Drive
drive.mount('/content/drive')

# Definir a pasta compartilhada onde os arquivos CSV estão localizados
pasta_compartilhada = '/content/drive/MyDrive/arquivos_cnpj/2025-01'

# Definir pasta para salvar arquivos Parquet
pasta_parquet = os.path.join(pasta_compartilhada, 'parquet')

# Criar diretório para arquivos Parquet se não existir
if not os.path.exists(pasta_parquet):
    os.makedirs(pasta_parquet)

# Mudar para o diretório de trabalho
os.chdir(pasta_compartilhada)

Mounted at /content/drive


## Configuração otimizada da sessão Spark para lidar com grandes volumes de dados

Parâmetros foram ajustados para equilibrar o uso de memória, paralelismo e performance.
Os parâmetros abaixo foram especificamente configurados para:
- Melhorar a distribuição da carga de trabalho entre executores;
- Otimizar operações de join e shuffle que são intensivas em recursos;
- Habilitar otimizações adaptativas do Spark para melhor desempenho em consultas complexas.

In [None]:
# =============== OTIMIZAÇÃO DA SESSÃO SPARK ===============

# Criar uma sessão Spark com configurações otimizadas
spark = SparkSession.builder \
    .appName('Análise de Dados CNPJ') \
    .config("spark.driver.memory", "16g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .config("spark.driver.maxResultSize", "4g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.default.parallelism", "100") \
    .config("spark.memory.fraction", "0.8") \
    .config("spark.memory.storageFraction", "0.2") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryoserializer.buffer.max", "1g") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.skewJoin.enabled", "true") \
    .config("spark.sql.files.maxPartitionBytes", "134217728") \
    .getOrCreate()

# Log de configurações
print("Configurações Spark:")
for conf in sorted(spark.sparkContext.getConf().getAll()):
    print(conf)

Configurações Spark:
('spark.app.id', 'local-1742999352474')
('spark.app.name', 'Análise de Dados CNPJ')
('spark.app.startTime', '1742999352344')
('spark.app.submitTime', '1742993255832')
('spark.default.parallelism', '100')
('spark.driver.extraJavaOptions', '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar

## Definição dos cabeçalhos e estruturas para cada tipo de arquivo

Os arquivos CSV da base de dados do CNPJ seguem uma estrutura específica definida pela Receita Federal.
Esta seção mapeia cada coluna para seu respectivo nome, facilitando o entendimento dos dados e, também define o mapeamento de categorias CNAE para classificação setorial das empresas.

In [None]:
# =============== DEFINIÇÃO DE CABEÇALHOS E SCHEMAS ===============

# Definir os cabeçalhos dos arquivos CSV para os DataFrames
cnae_header = ['ID_Cnae', 'Cnae']
motivo_header = ['ID_Motivo', 'Motivo']
municipios_header = ['ID_Municipio', 'Municipio']
natureza_header = ['ID_Natureza', 'Natureza']
paises_header = ['ID_Pais', 'Pais']
qualificacoes_header = ['ID_Qualificacao', 'Qualificacao']
simples_header = ['CNPJ_Basico', 'Opcao_pelo_Simples', 'Data_de_Opcao_pelo_Simples',
                 'Data_de_Exclusao_do_Simples', 'Opcao_pelo_MEI', 'Data_de_Opcao_pelo_MEI',
                 'Data_de_Exclusao_do_MEI']
empresas_header = ['CNPJ_Basico', 'Razao_Social_Nome_Empresarial', 'Codigo_Natureza_Juridica',
                  'Qualificacao_do_Responsavel', 'Capital_Social_da_Empresa', 'Porte_da_Empresa',
                  'Ente_Federativo_Responsavel']
estabelecimentos_header = ['CNPJ_Basico', 'CNPJ_Ordem', 'CNPJ_Dv', 'Identificador_Matriz_Filial',
                          'Nome_Fantasia', 'Situacao_Cadastral', 'Data_Situacao_Cadastral',
                          'Motivo_Situacao_Cadastral', 'Nome_da_Cidade_no_Exterior', 'Pais',
                          'Data_de_Inicio_Atividade', 'Cnae_Fiscal_Principal', 'Cnae_Fiscal_Secundaria',
                          'Tipo_de_Logradouro', 'Logradouro', 'Numero', 'Complemento', 'Bairro', 'Cep',
                          'UF', 'Municipio', 'DDD_1', 'Telefone_1', 'DDD_2', 'Telefone_2', 'DDD_do_Fax',
                          'Fax', 'Correio_Eletronico', 'Situacao_Especial', 'Data_da_Situacao_Especial']
socios_header = ['CNPJ_Basico', 'Identitificador_de_Socio', 'Nome_do_Socio_ou_Razao_Social',
                'CNPJ_ou_CPF_do_Socio', 'Qualificacao_do_Socio', 'Data_de_Entrada_na_Sociedade',
                'Pais_Socio', 'Representante_Legal', 'Nome_do_Representante',
                'Qualificacao_do_Representante_Legal', 'Faixa_Etaria']

# Colunas com datas
date_simples = ['Data_de_Opcao_pelo_Simples', 'Data_de_Exclusao_do_Simples',
               'Data_de_Opcao_pelo_MEI', 'Data_de_Exclusao_do_MEI']
date_estabelecimentos = ['Data_Situacao_Cadastral', 'Data_de_Inicio_Atividade',
                        'Data_da_Situacao_Especial']
date_socios = ['Data_de_Entrada_na_Sociedade']

# Dicionário de categorias Cnae
cnae_map = {
    '6311': 'Tecnologia e Inovação (TI e Software)',
    '6312': 'Tecnologia e Inovação (TI e Software)',
    '7810': 'Plataformas de emprego e recrutamento',
    '6319': 'Plataformas de emprego e recrutamento',
    '6422': 'Bancos',
    '6423': 'Bancos',
    '6424': 'Bancos',
    '6431': 'Financeiras',
    '6432': 'Financeiras',
    '6433': 'Financeiras',
    '6440': 'Financeiras',
    '4789': 'Conteúdo sensível (sex shop e acompanhantes)',
    '9609': 'Conteúdo sensível (sex shop e acompanhantes)',
    '4771': 'Moda',
    '7410': 'Moda',
    '4776': 'Pet',
    '7500': 'Pet',
    '5111': 'Transporte',
    '5112': 'Transporte',
    '5229': 'Logística e Armazenagem',
    '5320': 'Logística e Armazenagem',
    '7020': 'Consultoria e Treinamento Profissional',
    '8599': 'Consultoria e Treinamento Profissional',
    '62': 'Tecnologia e Inovação (TI e Software)',
    '73': 'Agências de Marketing e Publicidade',
    '68': 'Setor imobiliário',
    '92': 'Apostas',
    '96': 'Beleza e Estética',
    '85': 'Educação',
    '65': 'Seguradoras',
    '14': 'Moda',
    '47': 'Comércio e varejo',
    '86': 'Saúde',
    '79': 'Turismo e Lazer',
    '93': 'Turismo e Lazer',
    '51': 'Empresas aéreas',
    '41': 'Construção e Infraestrutura',
    '01': 'Agronegócio e Pecuária',
    '82': 'Serviços Administrativos e de Apoio',
    '49': 'Transporte',
    '50': 'Transporte',
    '29': 'Veículos',
    '45': 'Veículos',
    '10': 'Alimentos e Bebidas',
    '11': 'Alimentos e Bebidas'
}

## Conjunto de funções auxiliares para as operações de ETL (Extração, Transformação e Carga)

Estas funções encapsulam operações complexas e repetitivas do processo, incluindo:
- Monitoramento de memória para evitar problemas de OOM (Out of Memory);
- Definição de schemas para leitura estruturada dos CSVs;
- Conversão e persistência dos dados em formato Parquet (mais eficiente para análises);
- Operações de join otimizadas para grandes volumes de dados;
- Transformações de tipos de dados (datas, valores monetários, etc.);
- Funções para análise de qualidade dos dados (nulos, duplicatas).

In [None]:
# =============== FUNÇÕES AUXILIARES ===============

def log_memory_usage():
    """Registra o uso de memória atual."""
    from psutil import virtual_memory
    mem = virtual_memory()
    print(f"Memória: {mem.percent}% usada ({mem.used / 1024 / 1024 / 1024:.2f} GB de {mem.total / 1024 / 1024 / 1024:.2f} GB)")

def Schema(header, integers):
    """
    Cria um schema para leitura de CSV.
    Args:
        header (list): Lista de nomes das colunas.
        integers (list): Colunas que devem ser inteiras.
    Returns:
        StructType: Schema do DataFrame.
    """
    schema = StructType([
        StructField(col, StringType()) if col not in integers else StructField(col, IntegerType())
        for col in header
    ])
    return schema

def Carregar_Dados_Parquet(csv_path, schema, header, parquet_path):
    """
    Carrega dados de CSV, aplica schema e salva em formato Parquet.

    Args:
        csv_path: Nome do arquivo CSV
        schema: Schema do DataFrame
        header: Lista com nomes das colunas
        parquet_path: Caminho para salvar o arquivo Parquet

    Returns:
        DataFrame carregado do Parquet
    """
    try:
        # Verifica se o arquivo Parquet já existe
        if not os.path.exists(parquet_path):
            print(f"Convertendo {csv_path} para Parquet...")
            start_time = time.time()

            # Lê o CSV com o schema definido
            df = spark.read.csv(
                csv_path,
                header=False,
                sep=';',
                schema=schema,
                encoding='latin1'
            ).toDF(*header)

            # Salva em formato Parquet com compressão Snappy (bom equilíbrio entre compressão e velocidade)
            df.write.option("compression", "snappy").parquet(parquet_path)

            elapsed = time.time() - start_time
            print(f"Conversão concluída em {elapsed:.2f} segundos")

        # Carrega do Parquet
        return spark.read.parquet(parquet_path)
    except Exception as e:
        print(f"Erro ao processar {csv_path}: {str(e)}")
        raise

def Carregar_Dados_Multiplos_Parquet(csv_paths, schema, header, parquet_base_path):
    """
    Carrega múltiplos arquivos CSV, aplica schema e salva em formato Parquet.

    Args:
        csv_paths: Lista de nomes de arquivos CSV
        schema: Schema do DataFrame
        header: Lista com nomes das colunas
        parquet_base_path: Diretório base para salvar os arquivos Parquet

    Returns:
        DataFrame unificado carregado do Parquet
    """
    try:
        # Verifica se o diretório Parquet existe
        if not os.path.exists(parquet_base_path):
            os.makedirs(parquet_base_path)

            # Processa cada arquivo CSV
            for i, csv_path in enumerate(csv_paths):
                if not os.path.exists(csv_path):
                    print(f"Arquivo {csv_path} não encontrado. Pulando...")
                    continue

                parquet_path = f"{parquet_base_path}/parte_{i}.parquet"

                print(f"Convertendo {csv_path} para Parquet ({i+1}/{len(csv_paths)})...")
                start_time = time.time()

                try:
                    # Lê o CSV com o schema definido
                    df = spark.read.csv(
                        csv_path,
                        header=False,
                        sep=';',
                        schema=schema,
                        encoding='latin1'
                    ).toDF(*header)

                    # Reparticionamento para melhorar performance de escrita
                    df = df.repartition(10)

                    # Salva em formato Parquet
                    df.write.option("compression", "snappy").parquet(parquet_path)

                    elapsed = time.time() - start_time
                    print(f"Conversão de {csv_path} concluída em {elapsed:.2f} segundos")

                except Exception as e:
                    print(f"Erro ao processar {csv_path}: {str(e)}")

        # Carrega todos os arquivos Parquet como um único DataFrame
        return spark.read.parquet(f"{parquet_base_path}/*")
    except Exception as e:
        print(f"Erro ao processar múltiplos arquivos: {str(e)}")
        raise

def Join_Dataframes(df_left, df_right, column_left, column_right):
    """
    Realiza um left join entre dois DataFrames e remove a coluna redundante.
    Substitui a coluna com código pela sua descrição.

    Args:
        df_left: DataFrame à esquerda (DataFrame a ser unido).
        df_right: DataFrame à direita (DataFrame com os dados a serem unidos).
        column_left: Nome da coluna no DataFrame à esquerda para realizar o join.
        column_right: Nome da coluna no DataFrame à direita para realizar o join.

    Returns:
        DataFrame resultante do left join com a coluna redundante removida.
    """
    # Conta o número de registros no DataFrame esquerdo para detecção de possíveis problemas
    count_before = df_left.count()

    # Renomeia temporariamente a coluna para evitar ambiguidade
    column_left_temp = column_left + '_'
    df_left = df_left.withColumnRenamed(column_left, column_left_temp)

    # Verificar se df_right é pequeno o suficiente para broadcast
    right_count = df_right.count()
    if right_count < 10000:  # Limite arbitrário, ajuste conforme necessário
        df_right_broadcast = broadcast(df_right)
    else:
        df_right_broadcast = df_right
        print(f"Tabela secundária muito grande para broadcast: {right_count} registros")

    # Realiza o left join
    df = df_left.join(df_right_broadcast, df_left[column_left_temp] == df_right_broadcast[column_right], 'left')

    # Remove a coluna redundante
    df = df.drop(df_right_broadcast[column_right])

    # Verifica a coluna a ser usada para substituição
    value_col = df_right.columns[1] if len(df_right.columns) > 1 else None

    if value_col:
        # Substitui os valores da coluna temporária
        df = df.withColumn(column_left_temp, col(value_col))

        # Remove a coluna original do DataFrame da direita
        df = df.drop(value_col)

    # Renomeia a coluna temporária de volta ao nome original
    df = df.withColumnRenamed(column_left_temp, column_left)

    # Verifica se o número de linhas mudou após o join (para diagnóstico)
    count_after = df.count()
    if count_before != count_after:
        print(f"AVISO: Diferença no número de registros após join em {column_left}: antes={count_before}, depois={count_after}")

    return df

def Join_Dataframes_Duplicated(df_left, df_right, column_name):
    """
    Realiza um left join entre dois DataFrames e remove a coluna redundante da direita.
    Suporta junção em colunas com o mesmo nome nos dois DataFrames.

    Args:
        df_left: DataFrame à esquerda (principal).
        df_right: DataFrame à direita (dados complementares).
        column_name: Nome da coluna em comum usada para junção.

    Returns:
        DataFrame resultante do left join com a coluna duplicada removida.
    """
    # Conta o número de registros no DataFrame esquerdo para detecção de possíveis problemas
    count_before = df_left.count()

    # Renomeia temporariamente a coluna da direita para evitar conflito
    temp_column_right = column_name + "_right"
    df_right = df_right.withColumnRenamed(column_name, temp_column_right)

    # Seleciona apenas as colunas necessárias do DataFrame direito para reduzir o volume de dados
    cols_to_keep = [c for c in df_right.columns if c != temp_column_right]
    df_right_reduced = df_right.select([temp_column_right] + cols_to_keep)

    # Aplica broadcast apenas se o DataFrame direito for pequeno o suficiente
    right_count = df_right_reduced.count()
    if right_count < 10000:  # Limite arbitrário, ajuste conforme necessário
        df_right_broadcast = broadcast(df_right_reduced)
    else:
        # Para DataFrames maiores, podemos reparticionar para melhorar a performance do join
        df_right_broadcast = df_right_reduced.repartition(50, temp_column_right)
        print(f"Tabela secundária muito grande para broadcast: {right_count} registros. Reparticionando.")

    # Realiza o left join
    df = df_left.join(df_right_broadcast, df_left[column_name] == df_right_broadcast[temp_column_right], 'left')

    # Remove a coluna temporária após a junção
    df = df.drop(temp_column_right)

    # Verifica se o número de linhas mudou após o join (para diagnóstico)
    try:
        count_after = df.count()
        if count_before != count_after:
            print(f"AVISO: Diferença no número de registros após join em {column_name}: antes={count_before}, depois={count_after}")
    except Exception as e:
        print(f"Não foi possível verificar contagem após join: {str(e)}")

    return df

def Time_Convert(df, date_array):
    """
    Converte as colunas de data para o tipo de dado correto.

    Args:
        df: O DataFrame.
        date_array: Uma lista com os nomes das colunas de data.

    Returns:
        O DataFrame com as colunas de data convertidas.
    """
    for col_name in date_array:
        if col_name in df.columns:
            df = df.withColumn(col_name, to_date(col(col_name), 'yyyyMMdd'))
            df = df.withColumn(col_name, date_format(col(col_name), 'dd-MM-yyyy'))
    return df

def contar_nulos(df, max_cols=10):
    """
    Conta valores nulos em cada coluna do DataFrame, limitando o número de colunas exibidas.

    Args:
        df: O DataFrame.
        max_cols: Número máximo de colunas a serem exibidas.
    """
    # Usando .select para criar uma projeção em vez de coletar todos os dados
    null_counts = df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns])

    # Convertendo para um formato mais fácil de exibir
    null_counts_row = null_counts.collect()[0]
    null_dict = {c: null_counts_row[c] for c in df.columns}

    # Ordenando por número de nulos (decrescente)
    sorted_nulls = sorted(null_dict.items(), key=lambda x: x[1], reverse=True)

    # Exibindo os resultados
    print("Contagem de valores nulos por coluna:")
    for col_name, null_count in sorted_nulls[:max_cols]:
        print(f"{col_name}: {null_count}")

    if len(sorted_nulls) > max_cols:
        print(f"... e mais {len(sorted_nulls) - max_cols} colunas")

def contar_duplicatas(df, keys=None):
    """
    Conta registros duplicados no DataFrame com base em chaves específicas.

    Args:
        df: O DataFrame.
        keys: Lista de colunas para verificar duplicatas. Se None, usa todas as colunas.
    """
    if keys is None:
        keys = df.columns

    try:
        # Contagem de duplicatas (mais eficiente que groupBy + filter)
        duplicates = df.groupBy(keys).count().filter("count > 1")
        dup_count = duplicates.count()

        print(f"Encontradas {dup_count} chaves duplicadas.")

        if dup_count > 0 and dup_count <= 5:
            # Mostrar exemplo de duplicatas
            print("Exemplos de duplicatas:")
            duplicates.show(5, truncate=False)
    except Exception as e:
        print(f"Erro ao contar duplicatas: {str(e)}")

def process_chunk(data_chunk, safe_mode=True):
    """
    Processa um subconjunto de dados para evitar sobrecarga do driver.

    Args:
        data_chunk: DataFrame a ser processado
        safe_mode: Se True, limita operações potencialmente perigosas

    Returns:
        DataFrame processado
    """
    try:
        print(f"Processando chunk com {data_chunk.count()} registros")

        # Cria coluna de CNPJ completo
        data_chunk = data_chunk.withColumn(
            "CNPJ",
            concat(col("CNPJ_Basico"), lit("/"), col("CNPJ_Ordem"), lit("-"), col("CNPJ_Dv"))
        )

        # Remove colunas redundantes
        data_chunk = data_chunk.drop("CNPJ_Basico", "CNPJ_Ordem", "CNPJ_Dv")

        # Em modo seguro, não realiza todas as transformações
        if not safe_mode:
            # Transformações adicionais aqui
            pass

        return data_chunk
    except Exception as e:
        print(f"Erro ao processar chunk: {str(e)}")
        return None

## Inicialização dos schemas para cada tipo de arquivo

Define quais campos devem ser tratados como inteiros vs. strings.

Estes schemas garantem que os dados sejam lidos de forma correta e consistente.

In [None]:
# =============== INICIALIZAÇÃO DE SCHEMAS ===============

# Definir schemas para cada DataFrame
schema_cnae = Schema(cnae_header, [])
schema_motivos = Schema(motivo_header, [])
schema_municipios = Schema(municipios_header, [])
schema_naturezas = Schema(natureza_header, [])
schema_paises = Schema(paises_header, [])
schema_qualificacoes = Schema(qualificacoes_header, [])
schema_simples = Schema(simples_header, [])
schema_empresas = Schema(empresas_header, ['Porte_da_Empresa'])
schema_estabelecimentos = Schema(estabelecimentos_header, ['Identificador_Matriz_Filial', 'Situacao_Cadastral'])
schema_socios = Schema(socios_header, ['Identitificador_de_Socio', 'Faixa_Etaria'])

## Processo de carregamento e conversão de dados

Esta etapa é dividida em dois blocos principais:
1. Carregamento de tabelas de referência (menores);
2. Carregamento das tabelas principais de empresas, estabelecimentos e sócios;

Os arquivos são convertidos de CSV para Parquet apenas na primeira execução.
Em execuções subsequentes, os dados são lidos diretamente do formato Parquet mais eficiente, economizando tempo de processamento

In [None]:
# =============== CARREGAMENTO DE DADOS ===============

print("\n\n======== CARREGANDO DADOS DE REFERÊNCIA ========")
# Carregar dados de tabelas de referência (geralmente tabelas pequenas)
df_cnae = Carregar_Dados_Parquet('Cnaes.csv', schema_cnae, cnae_header,
                                os.path.join(pasta_parquet, 'cnae.parquet'))
df_motivos = Carregar_Dados_Parquet('Motivos.csv', schema_motivos, motivo_header,
                                   os.path.join(pasta_parquet, 'motivos.parquet'))
df_municipios = Carregar_Dados_Parquet('Municipios.csv', schema_municipios, municipios_header,
                                      os.path.join(pasta_parquet, 'municipios.parquet'))
df_naturezas = Carregar_Dados_Parquet('Naturezas.csv', schema_naturezas, natureza_header,
                                     os.path.join(pasta_parquet, 'naturezas.parquet'))
df_paises = Carregar_Dados_Parquet('Paises.csv', schema_paises, paises_header,
                                  os.path.join(pasta_parquet, 'paises.parquet'))
df_qualificacoes = Carregar_Dados_Parquet('Qualificacoes.csv', schema_qualificacoes, qualificacoes_header,
                                         os.path.join(pasta_parquet, 'qualificacoes.parquet'))

print("\n\n======== CARREGANDO TABELAS PRINCIPAIS ========")
# Tabelas principais - verificar existência de arquivos
arquivos_empresas = [f'Empresas{i}.csv' for i in range(10) if os.path.exists(f'Empresas{i}.csv')]
arquivos_estabelecimentos = [f'Estabelecimentos{i}.csv' for i in range(10) if os.path.exists(f'Estabelecimentos{i}.csv')]
arquivos_socios = [f'Socios{i}.csv' for i in range(10) if os.path.exists(f'Socios{i}.csv')]

# Informações sobre arquivos encontrados
print(f"Encontrados {len(arquivos_empresas)} arquivos de Empresas")
print(f"Encontrados {len(arquivos_estabelecimentos)} arquivos de Estabelecimentos")
print(f"Encontrados {len(arquivos_socios)} arquivos de Socios")

# Carregar tabela simples
print("\nCarregando dados do Simples Nacional...")
df_simples = Carregar_Dados_Parquet('Simples.csv', schema_simples, simples_header,
                               os.path.join(pasta_parquet, 'simples.parquet'))

# Carregar dados de Empresas, Estabelecimentos e Socios
print("\nCarregando dados de Empresas...")
df_empresas = Carregar_Dados_Multiplos_Parquet(arquivos_empresas, schema_empresas, empresas_header,
                                         os.path.join(pasta_parquet, 'empresas'))

print("\nCarregando dados de Estabelecimentos...")
df_estabelecimentos = Carregar_Dados_Multiplos_Parquet(arquivos_estabelecimentos, schema_estabelecimentos,
                                              estabelecimentos_header,
                                              os.path.join(pasta_parquet, 'estabelecimentos'))

print("\nCarregando dados de Socios...")
df_socios = Carregar_Dados_Multiplos_Parquet(arquivos_socios, schema_socios, socios_header,
                                       os.path.join(pasta_parquet, 'socios'))





Encontrados 10 arquivos de Empresas
Encontrados 10 arquivos de Estabelecimentos
Encontrados 10 arquivos de Socios

Carregando dados do Simples Nacional...

Carregando dados de Empresas...

Carregando dados de Estabelecimentos...

Carregando dados de Socios...


## Implementação de amostragem controlada para validação e teste

Esta etapa reduz o volume de dados processados para uma fração do conjunto original.

Isso permite validar todas as transformações e joins de forma rápida antes de executar o processamento em escala completa.

A fração de amostragem pode ser ajustada conforme necessário

In [None]:
# =============== ESTATÍSTICAS E AMOSTRAGEM ===============
print("\n\n======== APLICANDO AMOSTRAGEM PARA VALIDAÇÃO ========")

# Parâmetro de controle para amostragem
MODO_AMOSTRA = True  # Altere para False para processamento completo
FRACAO_AMOSTRA = 0.05  # 5% do volume original

if MODO_AMOSTRA:
    print(f"Aplicando amostragem de {FRACAO_AMOSTRA*100}% aos dados...")
    # Salvando estatísticas antes da amostragem
    count_empresas_original = df_empresas.count()
    count_estabelecimentos_original = df_estabelecimentos.count()
    count_socios_original = df_socios.count()

    # Aplicando amostragem
    df_empresas = df_empresas.sample(fraction=FRACAO_AMOSTRA, seed=42)
    df_estabelecimentos = df_estabelecimentos.sample(fraction=FRACAO_AMOSTRA, seed=42)
    df_socios = df_socios.sample(fraction=FRACAO_AMOSTRA, seed=42)

    # Persistindo os DataFrames amostrados
    df_empresas.persist()
    df_estabelecimentos.persist()
    df_socios.persist()

    # Estatísticas após amostragem
    print(f"Empresas: {count_empresas_original:,} -> {df_empresas.count():,} registros")
    print(f"Estabelecimentos: {count_estabelecimentos_original:,} -> {df_estabelecimentos.count():,} registros")
    print(f"Sócios: {count_socios_original:,} -> {df_socios.count():,} registros")

    # Modificar pasta de saída para indicar que são dados de amostra
    pasta_parquet_saida = os.path.join(pasta_parquet, 'amostra')
    if not os.path.exists(pasta_parquet_saida):
        os.makedirs(pasta_parquet_saida)
else:
    print("Processando conjunto de dados completo...")
    pasta_parquet_saida = pasta_parquet

# Continuar com as transformações normalmente...



Aplicando amostragem de 5.0% aos dados...
Empresas: 60,959,932 -> 3,050,055 registros
Estabelecimentos: 64,017,368 -> 3,200,591 registros
Sócios: 25,162,928 -> 1,259,042 registros


## Limpeza dos dados

In [None]:
df_empresas = df_empresas.na.fill({
    'Porte_da_Empresa': 0,
    'Razao_Social_Nome_Empresarial': 'Não Informado',
    })

In [None]:
df_estabelecimentos = df_estabelecimentos.na.fill({
    'Nome_Fantasia': 'Não Informado',
    'Tipo_de_Logradouro': 'Não Informado',
    'Logradouro': 'Não Informado',
    'Numero': 'SN',
    'Complemento': 'Não Informado',
    'Bairro': 'Não Informado',
    'Cep': 'Não Informado',
    'UF': 'Não Informado',
    'ddd_1': 0,
    'Telefone_1': 0,
    'ddd_2': 0,
    'Telefone_2': 0,
    'ddd_do_fax': 0,
    'Fax': 0,
    'Correio_Eletronico': 'Não Informado'

})

In [None]:
df_socios = df_socios.na.fill({'Nome_do_Socio_ou_Razao_Social': 'Não Informado',
                               'CNPJ_ou_CPF_do_Socio': 'Não Informado',
                               'Nome_do_Representante': 'Não Informado'

})

In [None]:
#Removendo duplicatas
df_emrpesas = df_empresas.dropDuplicates()
df_estabelecimentos = df_estabelecimentos.dropDuplicates()
df_socios = df_socios.dropDuplicates()

## Transformações iniciais nos dados carregados

Esta etapa realiza:
1. Conversão de formatos de data para o padrão dd-MM-yyyy;
2. Transformação de valores monetários (como capital social) para o tipo adequado;
3. Criação de classificações setoriais baseadas nos códigos CNAE;

Estas transformações preparam os dados para análises posteriores e facilitam a integração das diferentes tabelas.

In [None]:
# =============== TRANSFORMAÇÃO DE DADOS ===============
print("\n\n======== TRANSFORMANDO DADOS ========")

# Estatísticas para debug
print(f"\nRegistros de Empresas: {df_empresas.count():,}")
print(f"Registros de Estabelecimentos: {df_estabelecimentos.count():,}")
print(f"Registros de Socios: {df_socios.count():,}")
print(f"Registros de Simples: {df_simples.count():,}")

print("\nAplicando conversões de data...")
# Converter colunas de data para o tipo de dado correto
df_simples = Time_Convert(df_simples, date_simples)
df_estabelecimentos = Time_Convert(df_estabelecimentos, date_estabelecimentos)
df_socios = Time_Convert(df_socios, date_socios)

print("\nConvertendo Capital Social para float...")
# Transformar a Coluna 'Capital_Social_da_Empresa' em FloatType
df_empresas = df_empresas.withColumn(
    'Capital_Social_da_Empresa',
    regexp_replace(col('Capital_Social_da_Empresa'), "\\.", "")  # Remove pontos de milhar
).withColumn(
    "Capital_Social_da_Empresa",
    regexp_replace(col('Capital_Social_da_Empresa'), ",", ".")  # Substitui vírgula por ponto
).withColumn(
    'Capital_Social_da_Empresa',
    col('Capital_Social_da_Empresa').cast(FloatType())  # Converte para FloatType
)





Registros de Empresas: 3,050,055
Registros de Estabelecimentos: 3,200,591
Registros de Socios: 1,259,041
Registros de Simples: 41,720,964

Aplicando conversões de data...

Convertendo Capital Social para float...


## Processo de enriquecimento e integração dos dados

Esta etapa conecta as tabelas principais com as tabelas de referência para:
1. Substituir códigos por suas descrições textuais (tornando os dados mais legíveis);
2. Enriquecer os registros com informações adicionais;
3. Transformar valores numéricos em categorias mais significativas.

A estratégia implementada realiza os joins em etapas, priorizando as tabelas menores primeiro, seguido de limpeza de memória para evitar problemas de OOM.

In [None]:
# =============== JOINS E INTEGRAÇÃO DE DADOS ===============
print("\n\n======== REALIZANDO JOINS ENTRE TABELAS ========")

# Step 1: Primeiro aplicar os joins nas tabelas de referência (menores)
# Isso enriquece os dados antes de fazer joins maiores
print("\nAplicando joins com tabelas de referência...")

try:
    # Joins para tabela de estabelecimentos
    print("  - Enriquecendo estabelecimentos com dados de países...")
    df_estabelecimentos = Join_Dataframes(df_estabelecimentos, df_paises, 'Pais', 'ID_Pais')

    print("  - Enriquecendo estabelecimentos com dados de municípios...")
    df_estabelecimentos = Join_Dataframes(df_estabelecimentos, df_municipios, 'Municipio', 'ID_Municipio')

    print("  - Enriquecendo estabelecimentos com motivos de situação cadastral...")
    df_estabelecimentos = Join_Dataframes(df_estabelecimentos, df_motivos, 'Motivo_Situacao_Cadastral', 'ID_Motivo')

    # Joins para tabela de empresas
    print("  - Enriquecendo empresas com naturezas jurídicas...")
    df_empresas = Join_Dataframes(df_empresas, df_naturezas, 'Codigo_Natureza_Juridica', 'ID_Natureza')

    print("  - Enriquecendo empresas com qualificações dos responsáveis...")
    df_empresas = Join_Dataframes(df_empresas, df_qualificacoes, 'Qualificacao_do_Responsavel', 'ID_Qualificacao')

    # Joins para tabela de sócios
    print("  - Enriquecendo sócios com qualificações...")
    df_socios = Join_Dataframes(df_socios, df_qualificacoes, 'Qualificacao_do_Socio', 'ID_Qualificacao')

    print("  - Enriquecendo sócios com qualificações dos representantes legais...")
    df_socios = Join_Dataframes(df_socios, df_qualificacoes, 'Qualificacao_do_Representante_Legal', 'ID_Qualificacao')

    print("  - Enriquecendo sócios com dados de países...")
    df_socios = Join_Dataframes(df_socios, df_paises, 'Pais_Socio', 'ID_Pais')

except Exception as e:
    print(f"ERRO durante joins com tabelas de referência: {str(e)}")
    # Continuar com os dados que temos, mesmo em caso de erro parcial

# OTIMIZAÇÃO: Processar dados de CNAE secundário apenas se necessário
if False:  # Desativado para reduzir complexidade e consumo de memória
    try:
        print("\nProcessando CNAEs secundários...")
        df_estabelecimentos_exploded = df_estabelecimentos.withColumn(
            "Cnae_Fiscal_Secundaria",
            explode(split("Cnae_Fiscal_Secundaria", ","))
        )

        # Realizar Join com CNAE secundário
        df_joined = df_estabelecimentos_exploded.join(
            df_cnae,
            df_estabelecimentos_exploded["Cnae_Fiscal_Secundaria"] == df_cnae["ID_Cnae"],
            "left"
        )

        # Reagrupar dados após o join
        df_final = df_joined.groupBy(df_estabelecimentos_exploded.columns).agg(
            collect_list("Cnae").alias("Cnae_Fiscal_Secundaria")
        )

        # Usar df_final em vez de df_estabelecimentos para joins seguintes
        df_estabelecimentos = df_final
    except Exception as e:
        print(f"AVISO: Processamento de CNAEs secundários falhou: {str(e)}")
        print("Continuando com os CNAEs principais apenas.")

# Step 3: Limpar memória e cache para maximizar recursos disponíveis
spark.catalog.clearCache()
gc.collect()
log_memory_usage()




Aplicando joins com tabelas de referência...
  - Enriquecendo estabelecimentos com dados de países...
  - Enriquecendo estabelecimentos com dados de municípios...
  - Enriquecendo estabelecimentos com motivos de situação cadastral...
  - Enriquecendo empresas com naturezas jurídicas...
  - Enriquecendo empresas com qualificações dos responsáveis...
  - Enriquecendo sócios com qualificações...
  - Enriquecendo sócios com qualificações dos representantes legais...
  - Enriquecendo sócios com dados de países...
Memória: 78.1% usada (9.58 GB de 12.67 GB)


## Processamento final, análise e persistência dos resultados

Esta etapa executa:
1. Seleção de colunas mais relevantes para reduzir volume de dados;
2. Joins incrementais entre as tabelas principais;
3. Criação do CNPJ completo formatado;
4. Reparticionamento e salvamento otimizado dos dados processados;
5. Visualização segura de uma amostra dos resultados;
6. Análise exploratória básica com contagens e distribuições;

O código inclui tratamento de erros e mecanismos de recuperação para garantir que dados parciais possam ser salvos mesmo em caso de falhas.

In [None]:
# =============== PROCESSAMENTO DO DATASET FINAL ===============
print("\n\n======== PROCESSANDO DATASET FINAL ========")


# ABORDAGEM OTIMIZADA: Realizar joins incrementais e escolher colunas específicas

# 1. Selecionar apenas colunas necessárias de cada tabela
# Isso reduz significativamente o volume de dados sendo transferido

if MODO_AMOSTRA:
  cols_estabelecimentos = [
        'CNPJ_Basico', 'CNPJ_Ordem', 'CNPJ_Dv', 'Identificador_Matriz_Filial',
        'Situacao_Cadastral', 'UF', 'Municipio', 'Cnae_Fiscal_Principal'
  ]
  cols_empresas = ['CNPJ_Basico', 'Razao_Social_Nome_Empresarial', 'Porte_da_Empresa', 'Capital_Social_da_Empresa']
  cols_simples = ['CNPJ_Basico', 'Opcao_pelo_Simples']
  cols_socios = ['CNPJ_Basico', 'Identitificador_de_Socio', 'Faixa_Etaria']

  df_estabelecimentos_reduced = df_estabelecimentos.select(cols_estabelecimentos)
  df_empresas_reduced = df_empresas.select(cols_empresas)
  df_simples_reduced = df_simples.select(cols_simples)
  df_socios_reduced = df_socios.select(cols_socios)

  df_estabelecimentos = df_estabelecimentos_reduced
  df_empresas = df_empresas_reduced
  df_simples = df_simples_reduced
  df_socios = df_socios_reduced

# 2. Realizar joins progressivos, salvando resultados intermediários
print("\nRealizando join entre estabelecimentos e empresas...")
df_estemp = Join_Dataframes_Duplicated(df_estabelecimentos, df_empresas, 'CNPJ_Basico')
df_estemp = Join_Dataframes_Duplicated(df_estemp, df_socios, 'CNPJ_Basico')

# Checkpoint para dados intermediários (salvar e recarregar para melhor gerenciamento de memória)
checkpoint_path = os.path.join(pasta_parquet, 'checkpoint_estemp.parquet')
df_estemp.write.mode('overwrite').option("compression", "snappy").parquet(checkpoint_path)

# Liberação de memória dos DataFrames originais que não são mais necessários
df_estabelecimentos = None
df_empresas = None
df_socios = None
gc.collect()

# Recarregar do checkpoint
df_estemp = spark.read.parquet(checkpoint_path)

print("\nRealizando join com dados do Simples Nacional...")
df_combined = Join_Dataframes_Duplicated(df_estemp, df_simples, 'CNPJ_Basico')

# Liberar mais memória
df_estemp = None
df_simples = None
gc.collect()

# 3. Criar coluna CNPJ completo
print("\nCriando coluna de CNPJ completo...")
df_final = df_combined.withColumn(
     "CNPJ",
     concat(col("CNPJ_Basico"), lit("/"), col("CNPJ_Ordem"), lit("-"), col("CNPJ_Dv"))
)

# 4. Remover colunas redundantes
df_final = df_final.drop("CNPJ_Basico", "CNPJ_Ordem", "CNPJ_Dv")


# 5. Salvar resultado final em Parquet
print("\nSalvando DataFrame final em formato Parquet...")
output_path = os.path.join(pasta_parquet, 'cnpj_completo_otimizado.parquet')

# Reparticionamento para otimizar a escrita
df_final = df_final.repartition(50)

# Salvar com compressão Snappy
df_final.write.mode('overwrite').option("compression", "snappy").parquet(output_path)

print(f"\nDados salvos com sucesso em: {output_path}")

# Liberar recursos
spark.stop()
print("\nSessão Spark encerrada. Processamento concluído.")




Realizando join entre estabelecimentos e empresas...
Tabela secundária muito grande para broadcast: 3050055 registros. Reparticionando.
Tabela secundária muito grande para broadcast: 1259041 registros. Reparticionando.
AVISO: Diferença no número de registros após join em CNPJ_Basico: antes=3200591, depois=3208500

Realizando join com dados do Simples Nacional...
Tabela secundária muito grande para broadcast: 41720964 registros. Reparticionando.

Criando coluna de CNPJ completo...

Salvando DataFrame final em formato Parquet...

Dados salvos com sucesso em: /content/drive/MyDrive/arquivos_cnpj/2025-01/parquet/cnpj_completo_otimizado.parquet

Sessão Spark encerrada. Processamento concluído.


## Recarregando os dados finais para visualização

In [None]:
# Criar uma sessão Spark com configurações otimizadas
spark = SparkSession.builder \
    .appName('Análise de Dados CNPJ') \
    .config("spark.driver.memory", "16g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .config("spark.driver.maxResultSize", "4g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.default.parallelism", "100") \
    .config("spark.memory.fraction", "0.8") \
    .config("spark.memory.storageFraction", "0.2") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryoserializer.buffer.max", "1g") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.skewJoin.enabled", "true") \
    .config("spark.sql.files.maxPartitionBytes", "134217728") \
    .getOrCreate()


In [None]:
df_final = spark.read.parquet(os.path.join(pasta_parquet, 'cnpj_completo_otimizado.parquet'))

In [None]:
df_final.printSchema()

root
 |-- Identificador_Matriz_Filial: integer (nullable = true)
 |-- Situacao_Cadastral: integer (nullable = true)
 |-- UF: string (nullable = true)
 |-- Municipio: string (nullable = true)
 |-- Cnae_Fiscal_Principal: string (nullable = true)
 |-- Razao_Social_Nome_Empresarial: string (nullable = true)
 |-- Porte_da_Empresa: integer (nullable = true)
 |-- Capital_Social_da_Empresa: float (nullable = true)
 |-- Identitificador_de_Socio: integer (nullable = true)
 |-- Faixa_Etaria: integer (nullable = true)
 |-- Opcao_pelo_Simples: string (nullable = true)
 |-- CNPJ: string (nullable = true)



In [None]:
# Step 2: Transformar códigos para descrições em todas as tabelas
print("\nTransformando códigos para descrições...")

# Transformações para empresas
df_final = df_final.withColumn(
    'Porte_da_Empresa',
    when(col('Porte_da_Empresa') == 0, 'Não Informado')
    .when(col('Porte_da_Empresa') == 1, 'Micro Empresa')
    .when(col('Porte_da_Empresa') == 3, 'Empresa de Pequeno Porte')
    .when(col('Porte_da_Empresa') == 5, 'Demais')
    .otherwise('Não Informado')
)

# Transformações para estabelecimentos
df_final = df_final.withColumn(
    'Identificador_Matriz_Filial',
    when(col('Identificador_Matriz_Filial') == 1, 'Matriz')
    .when(col('Identificador_Matriz_Filial') == 2, 'Filial')
    .otherwise('Não Informado')
)

df_final = df_final.withColumn(
    'Situacao_Cadastral',
    when(col('Situacao_Cadastral') == 1, 'Nula')
    .when(col('Situacao_Cadastral') == 2, 'Ativa')
    .when(col('Situacao_Cadastral') == 3, 'Suspensa')
    .when(col('Situacao_Cadastral') == 4, 'Inapta')
    .when(col('Situacao_Cadastral') == 8, 'Baixada')
    .otherwise('Não Informado')
)

# Transformações para simples
df_final = df_final.withColumn(
    'Opcao_pelo_Simples',
    when(col('Opcao_pelo_Simples') == 'S', 'Sim')
    .when(col('Opcao_pelo_Simples') == 'N', 'Não')
    .when(col('Opcao_pelo_Simples') == 'EM BRANCO', 'Outros')
    .otherwise('Não Informado')
)

# Transformações para sócios
df_final = df_final.withColumn(
    'Identitificador_de_Socio',
    when(col('Identitificador_de_Socio') == 1, 'Pessoa Jurídica')
    .when(col('Identitificador_de_Socio') == 2, 'Pessoa Física')
    .when(col('Identitificador_de_Socio') == 3, 'Estrangeiro')
    .otherwise('Não Informado')
)

df_final = df_final.withColumn(
    'Faixa_Etaria',
    when(col('Faixa_Etaria') == 1, '0 a 12 anos')
    .when(col('Faixa_Etaria') == 2, '13 a 20 anos')
    .when(col('Faixa_Etaria') == 3, '21 a 30 anos')
    .when(col('Faixa_Etaria') == 4, '31 a 40 anos')
    .when(col('Faixa_Etaria') == 5, '41 a 50 anos')
    .when(col('Faixa_Etaria') == 6, '51 a 60 anos')
    .when(col('Faixa_Etaria') == 7, '61 a 70 anos')
    .when(col('Faixa_Etaria') == 8, '71 a 80 anos')
    .when(col('Faixa_Etaria') == 9, '>80 anos')
    .when(col('Faixa_Etaria') == 0, 'Não se aplica')
    .otherwise('Falso')
)


Transformando códigos para descrições...


In [None]:
df_final = df_final.fillna({
    'Razao_Social_Nome_Empresarial': 'Não Informado',
    'Capital_Social_da_Empresa': 0,
})

In [None]:
print("\nCriando coluna de Ramo baseada no CNAE...")
# Criar coluna 'Ramo' baseada no CNAE usando o mapeamento
mapping_expr = create_map([lit(x) for x in chain(*cnae_map.items())])

df_final = df_final.withColumn(
    'Ramo',
    when(mapping_expr.getItem(substring(col('Cnae_Fiscal_Principal'), 1, 4)).isNotNull(),
         mapping_expr.getItem(substring(col('Cnae_Fiscal_Principal'), 1, 4)))
    .when(mapping_expr.getItem(substring(col('Cnae_Fiscal_Principal'), 1, 2)).isNotNull(),
         mapping_expr.getItem(substring(col('Cnae_Fiscal_Principal'), 1, 2)))
    .otherwise('Outros')
)


Criando coluna de Ramo baseada no CNAE...


In [None]:
df_final.show(10)

+---------------------------+------------------+---+----------------+---------------------+-----------------------------+----------------+-------------------------+------------------------+------------+------------------+----------------+--------------------+
|Identificador_Matriz_Filial|Situacao_Cadastral| UF|       Municipio|Cnae_Fiscal_Principal|Razao_Social_Nome_Empresarial|Porte_da_Empresa|Capital_Social_da_Empresa|Identitificador_de_Socio|Faixa_Etaria|Opcao_pelo_Simples|            CNPJ|                Ramo|
+---------------------------+------------------+---+----------------+---------------------+-----------------------------+----------------+-------------------------+------------------------+------------+------------------+----------------+--------------------+
|                     Filial|           Baixada| BA|LAURO DE FREITAS|              4789099|                Não Informado|   Não informado|                      0.0|           Não Informado|       Falso|     Não Informado