In [33]:
from chardet.universaldetector import UniversalDetector
import dask.dataframe as dd
import numpy as np
import pandas as pd
import codecs
import os

In [34]:
caminho_municipio = r'/home/danielbarbosa/seget/notebooks/dados_abertos_cnpj/arquivos/dados/2023-05/F.K03200$Z.D30513.MUNICCSV'

detector = UniversalDetector()
with codecs.open(caminho_municipio, 'rb') as arquivo:
    for linha in arquivo:
        detector.feed(linha)
        break

detector.close()
encoding = detector.result['encoding']

In [35]:
df = pd.read_csv(caminho_municipio, 
                        encoding=encoding, 
                        sep=';',
                        header=None,
                        names=['numero', 'municipio'])

num_marica = df.loc[df['municipio'].str.contains('MARICA', case=False, na=False)]
# num_marica.head()

In [None]:
# Definir diretório dos arquivos
diretorio_estabelecimento = r'/home/danielbarbosa/seget/notebooks/dados_abertos_cnpj/arquivos/dados/2023-05'

# Criar uma lista para armazenar os DataFrames do Dask
dados_estabelecimento = []

for arquivo_empresas in os.listdir(diretorio_estabelecimento):
    if arquivo_empresas.endswith('.ESTABELE'):
        caminho_arquivo = os.path.join(diretorio_estabelecimento, arquivo_empresas)

        print(f"📂 Carregando arquivo {arquivo_empresas} com Dask...")

        # Ler o arquivo diretamente com Dask (Lazy Loading) sem dtype_backend
        df_dask = dd.read_csv(
            caminho_arquivo,
            sep=";",
            encoding="ISO-8859-1",
            assume_missing=True,  # Melhor para lidar com dados incompletos
            blocksize="100MB",  # Define blocos de 100MB para leitura
            dtype={"codigo_municipio": np.float64}  # Define tipo explicitamente para evitar erros
        )

        # Verificar nomes das colunas para garantir que "codigo_municipio" está correto
        print(f"📌 Colunas detectadas: {df_dask.columns}")

        # Aplicar filtro apenas se a coluna "codigo_municipio" existir
        if 'codigo_municipio' in df_dask.columns:
            df_dask_filtrado = df_dask[df_dask["codigo_municipio"] == 5853]
            print(f"✅ Filtro aplicado: Apenas município 5853 ({arquivo_empresas})")
        else:
            print(f"⚠️ Aviso: Coluna 'codigo_municipio' não encontrada no arquivo {arquivo_empresas}, pulando filtro.")
            df_dask_filtrado = df_dask  # Continua com todos os dados se a coluna não existir

        # Adicionar à lista de DataFrames Dask
        dados_estabelecimento.append(df_dask_filtrado)

# Concatenar todos os DataFrames do Dask
empresas_consolidado = dd.concat(dados_estabelecimento)

# Exibir amostra dos dados sem carregar tudo na memória
print(empresas_consolidado.head())

# Salvar em um novo arquivo CSV filtrado (de forma otimizada)
empresas_consolidado.to_csv("empresas_consolidado_dask.csv", single_file=True)

print("✅ Processamento concluído com Dask! Apenas estabelecimentos do município 5853 foram mantidos.")