In [1]:
import dask.dataframe as dd
import pandas as pd

path_files = "data/raw/"

# Define os nomes das colunas das tabelas
col_empresas = ["CNPJ", "Razao_Social", "Natureza_Juridica", "Qualificacao_do_Responsavel", "Capital_Social", "Porte", "Ente_Federativo_Responsavel"]
col_estabelecimentos = ["CNPJ_Basico", "CNPJ_Ordem", "CNPJ_DV", "Identificador", "Nome_Fantasia", "Situacao_Cadastral", "Data_Situacao_Cadastral", 
                        "Motivo_Situacao_Cadastral", "Nome_Cidade_Exterior", "Pais", "Data_Inicio_Atividade", "CNAE_Fiscal_Principal", "CNAE_Fiscal_Secundaria",
                        "Tipo_Logradouro", "Logradouro", "Numero", "Complemento", "Bairro", "CEP", "UF", "Municipio", "DDD_1", "Telefone_1", "DDD_2", "Telefone_2", 
                        "DDD_Fax", "Fax", "Correio_Eletronico", "Situacao_Especial", "Data_Situacao_Especial"]

# Tratamento dos arquivos de Empresas
## Adicionando arquivos ao DataFrame

In [11]:
# Lê todos os arquivos passados como parâmetro, utilizando o dask
df_empresa = dd.read_csv(f'{path_files}*.EMPRECSV.zip', delimiter=';', encoding='latin1', low_memory=False, dtype=str, names=col_empresas, blocksize=None)

## Tratamento da coluna Capital Social

In [12]:
df_empresa['Capital_Social'] = df_empresa['Capital_Social'].map(lambda x: x.replace(',','.'))
df_empresa['Capital_Social'] = dd.to_numeric(df_empresa['Capital_Social'], errors='coerce')

## Preenchendo os valores NA da coluna Porte com 01 = Não informado

In [13]:
df_empresa['Porte'] = df_empresa['Porte'].fillna(value='01')

# Manipulação dos dados de Estabelecimentos

In [2]:
df_estab = dd.read_csv(f'{path_files}*.ESTABELE.zip', delimiter=';', encoding='latin1', dtype=str, low_memory=False, names=col_estabelecimentos, blocksize=None)

In [5]:
df_estab['Data_Situacao_Cadastral'] = df_estab['Data_Situacao_Cadastral'].astype('datetime64')
df_estab['Data_Inicio_Atividade'] = df_estab['Data_Inicio_Atividade'].astype('datetime64')


## Altera para 999 os valores nulos da coluna País

In [15]:
df_estab['Pais'] = df_estab['Pais'].fillna(value='999')

## Substitui os valores 0 do campo Data Situação Cadastral pelo valor do campo Data Inicio Atividade

In [None]:
df_estab['Data_Situacao_Cadastral'] = df_estab['Data_Situacao_Cadastral'].mask((df_estab['Data_Situacao_Cadastral'] == '0'), df_estab['Data_Inicio_Atividade'])

## Adicionando a coluna CNPJ concatenando as informações do CNPJ Basico, CNPJ Ordem e CNPJ DV

In [18]:
cnpjs = (df_estab["CNPJ_Basico"].astype(str).apply(lambda x: x.zfill(8), meta=('CNPJ_Basico', str)) + 
        df_estab["CNPJ_Ordem"].astype(str).apply(lambda x: x.zfill(4), meta=('CNPJ_Ordem', str)) + 
        df_estab["CNPJ_DV"].astype(str).apply(lambda x: x.zfill(2), meta=('CNPJ_DV', str)))

df_estab['CNPJ'] = cnpjs

# Salvando em parquet no Google Cloud Storage

In [19]:
import gcsfs
my_gs_bucket = 'dna_public_cnpj_files/parquet_files/'
token_json = 'auth/dados-publicos-326316-e16540376290.json'

In [20]:
def upload_dataframe_to_gcs(df, bucket, token, directory):
  try:
    df.to_parquet(f'gs://{bucket}{directory}', write_index=False, storage_options={'token': token})
  except Exception as e:
    raise e

In [21]:
df_estab = df_estab.repartition(npartitions=150)
df_dict = {'empresa': df_empresa, 'estabelecimento': df_estab}

In [None]:
# for df in df_dict:
#   upload_dataframe_to_gcs(df_dict[df], my_gs_bucket, token_json, df)