# Pacotes

In [4]:
from functools import reduce
import os
import chardet
import logging
import glob
import secrets
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from pyspark.sql.functions import regexp_replace, when,length,to_date,upper,lower,col,split,explode,coalesce,concat_ws,concat,lit,broadcast,regexp_extract,month,year,to_date
from pyspark.sql.functions import broadcast,expr,udf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from functools import reduce
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from functools import reduce


In [5]:
# Inicializando a sessão do Spark
spark = SparkSession.builder.appName("LoadEstabelecimentos").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/19 17:05:33 INFO SparkEnv: Registering MapOutputTracker
24/02/19 17:05:33 INFO SparkEnv: Registering BlockManagerMaster
24/02/19 17:05:33 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
24/02/19 17:05:33 INFO SparkEnv: Registering OutputCommitCoordinator


In [8]:
# Definindo os schemas:
estabelecimentos = StructType([
        StructField("CNPJ_BASICO", StringType(), nullable=True),
        StructField("CNPJ_ORDEM", StringType(), nullable=True),
        StructField("CNPJ_DV", StringType(), nullable=True),
        StructField("MATRIZ_FILIAL", StringType(), nullable=True),
        StructField("NOME_FANTASIA", StringType(), nullable=True),
        StructField("SIT_CADASTRAL", IntegerType(), nullable=True),
        StructField("DT_SIT_CADASTRAL", StringType(), nullable=True),
        StructField("MOTIVO_CADASTRAL", StringType(), nullable=True),
        StructField("NOME_CIDADE_EXTERIOR", StringType(), nullable=True),
        StructField("PAIS", StringType(), nullable=True),
        StructField("DT_INICIO_ATIVIDADE", StringType(), nullable=True),
        StructField("CNAE_1", StringType(), nullable=True),
        StructField("CNAE_2", StringType(), nullable=True),
        StructField("TIPO_LOUGRADOURO", StringType(), nullable=True),
        StructField("LOGRADOURO", StringType(), nullable=True),
        StructField("NUMERO", IntegerType(), nullable=True),
        StructField("COMPLEMENTO", StringType(), nullable=True),
        StructField("BAIRRO", StringType(), nullable=True),
        StructField("CEP", IntegerType(), nullable=True),
        StructField("UF", StringType(), nullable=True),
        StructField("MUNICIPIO", StringType(), nullable=True),
        StructField("DDD1", StringType(), nullable=True),
        StructField("TEL1", StringType(), nullable=True),
        StructField("DDD2", StringType(), nullable=True),
        StructField("TEL2", StringType(), nullable=True),
        StructField("DDD_FAX", IntegerType(), nullable=True),
        StructField("FAX", IntegerType(), nullable=True),
        StructField("EMAIL", StringType(), nullable=True),
        StructField("SIT_ESPECIAL", StringType(), nullable=True),
        StructField("DT_SIT_ESPECIAL", StringType(), nullable=True)])

empresas = StructType([
        StructField("CNPJ", StringType(), nullable=True),
        StructField("NOME_EMPRESA", StringType(), nullable=True),
        StructField("COD_NAT_JURICA", StringType(), nullable=True),
        StructField("QUALIF_RESPONVAVEL", StringType(), nullable=True),
        StructField("CAP_SOCIAL", StringType(), nullable=True),
        StructField("PORTE", StringType(), nullable=True),
        StructField("ENTE_FEDERATIVO", StringType(), nullable=True)])

municipios = StructType([
        StructField("ID_MUNICPIO", StringType(), nullable=True),
        StructField("MUNICIPIO", StringType(), nullable=True)])

cnaes = StructType([
        StructField("COD_CNAE", StringType(), nullable=True),
        StructField("CNAE", StringType(), nullable=True)])
    
paises = StructType([
        StructField("COD_PAIS", StringType(), nullable=True),
        StructField("NM_PAIS", StringType(), nullable=True)])
    
qualificacoes = StructType([
        StructField("COD_QUALIFICACAO", StringType(), nullable=True),
        StructField("NM_QUALIFICACAO", StringType(), nullable=True)])

socios = StructType([
        StructField("CNPJ_BASICO", StringType(), nullable=True),
        StructField("IDENTIFICADOR_SOCIO", IntegerType(), nullable=True),
        StructField("NOME_SOCIO_RAZAO_SOCIAL", StringType(), nullable=True),
        StructField("CNPJ_CPF_SOCIO", StringType(), nullable=True),
        StructField("QUALIFICAÇAO_SOCIO", StringType(), nullable=True),
        StructField("DATA_ENTRADA_SOCIEDADE", StringType(), nullable=True),
        StructField("PAIS", StringType(), nullable=True),
        StructField("REPRESENTANTE_LEGAL", StringType(), nullable=True),
        StructField("NOME_REPRESENTANTE", StringType(), nullable=True),
        StructField("QUALIFICACAO_REPRESENTANTE_LEGAL", StringType(), nullable=True),
        StructField("FAIXA_ETARIA", StringType(), nullable=True)])

simples = StructType([
        StructField("CNPJ_BASICO", StringType(), nullable=True),
        StructField("OPCAO_PELO_SIMPLES", StringType(), nullable=True),
        StructField("DATA_OPCAO_PELO_SIMPLES", StringType(), nullable=True),
        StructField("DATA_EXCLUSAO_SIMPLES", StringType(), nullable=True),
        StructField("OPÇAO_PELO_MEI", StringType(), nullable=True),
        StructField("DATA_OPCAO_PELO_MEI", StringType(), nullable=True),
        StructField("DATA_EXCLUSAO_MEI", StringType(), nullable=True)])

naturezas = StructType([
        StructField("COD_NAT_JURICA", StringType(), nullable=True),
        StructField("NAT_JURICA", StringType(), nullable=True)])
    
motivos = StructType([
        StructField("COD_MOTIVO", StringType(), nullable=True),
        StructField("NM_MOTIVO", StringType(), nullable=True)])
    
# Definindo os schemas:
schemas = {"Estabelecimentos": estabelecimentos,
            "Empresas": empresas,
            "Municipios": municipios,
            "Cnaes": cnaes,
            "Socios": socios,
            "Simples": simples,
            "Naturezas": naturezas,
            "Qualificacoes": qualificacoes,
            "Motivos": motivos,
            "Paises": paises}

# Mapeamento dos encodings
encodings = {
    'Empresas': 'ascii',
    'Naturezas': 'ISO-8859-1',
    'Qualificacoes': 'ISO-8859-1',
    'Estabelecimentos': 'ascii',
    'Paises': 'ISO-8859-1',
    'Municipios': 'ascii',
    'Cnaes': 'ISO-8859-1',
    'Motivos': 'ascii',
    'Simples': 'ascii'
}

def read_data(spark, schema_name, base_path, encoding):
    file_pattern = f"{base_path}/*"
    
    # Utilizando o Spark para ler os arquivos do GCS
    df = (spark.read.format("csv")
          .option("sep", ";")
          .option("header", "false")
          .option('quote', '"')
          .option("escape", '"')
          .option("encoding", encoding)
          .schema(schemas[schema_name])
          .load(file_pattern))
    
    return df

# Armazenando os dados no bq

## Estabelecimentos

In [15]:
# Caminho base dos arquivos CSV no GCS
base_path = "gs://projeto-dados-receita-federal/Estabelecimentos/arquivo_deszipado"

# Lendo os dados dos estabelecimentos e criando um DataFrame Spark
df_estabelecimentos = read_data(spark, "estabelecimentos", base_path, encodings['Estabelecimentos'])

df_estabelecimentos.write.format('bigquery') \
.option("mapreduce.fileoutputcommitter.algorithm.version", "2")\
.option('table', 'receitafederal.'+str('estabelecimentos')) \
.option("temporaryGcsBucket","projeto-dados-receita-federal/temp_bq") \
.mode("overwrite") \
.save()

                                                                                

## Empresas

In [18]:
# Caminho base dos arquivos CSV no GCS
base_path = "gs://projeto-dados-receita-federal/Empresas/arquivo_deszipado"

# Lendo os dados dos estabelecimentos e criando um DataFrame Spark
df_empresas = read_data(spark, "Empresas", base_path, encodings['Empresas'])

df_empresas.write.format('bigquery') \
.option("mapreduce.fileoutputcommitter.algorithm.version", "2")\
.option('table', 'receitafederal.'+str('Empresas')) \
.option("temporaryGcsBucket","projeto-dados-receita-federal/temp_bq") \
.mode("overwrite") \
.save()

                                                                                

## Naturezas

In [21]:
# Caminho base dos arquivos CSV no GCS
base_path = "gs://projeto-dados-receita-federal/Naturezas/arquivo_deszipado"

# Lendo os dados dos estabelecimentos e criando um DataFrame Spark
df_naturezas = read_data(spark, "Naturezas", base_path, encodings['Naturezas'])

df_naturezas.write.format('bigquery') \
.option("mapreduce.fileoutputcommitter.algorithm.version", "2")\
.option('table', 'receitafederal.'+str('naturezas')) \
.option("temporaryGcsBucket","projeto-dados-receita-federal/temp_bq") \
.mode("overwrite") \
.save()

                                                                                

## Qualificacoes

In [22]:
# Caminho base dos arquivos CSV no GCS
base_path = "gs://projeto-dados-receita-federal/Qualificacoes/arquivo_deszipado"

# Lendo os dados dos estabelecimentos e criando um DataFrame Spark
df_qualificacoes = read_data(spark, "Qualificacoes", base_path, encodings['Qualificacoes'])

df_qualificacoes.write.format('bigquery') \
.option("mapreduce.fileoutputcommitter.algorithm.version", "2")\
.option('table', 'receitafederal.'+str('qualificacoes')) \
.option("temporaryGcsBucket","projeto-dados-receita-federal/temp_bq") \
.mode("overwrite") \
.save()

                                                                                

## Paises

In [23]:
# Caminho base dos arquivos CSV no GCS
base_path = "gs://projeto-dados-receita-federal/Paises/arquivo_deszipado"

# Lendo os dados dos estabelecimentos e criando um DataFrame Spark
df_paises = read_data(spark, "Paises", base_path, encodings['Paises'])

df_paises.write.format('bigquery') \
.option("mapreduce.fileoutputcommitter.algorithm.version", "2")\
.option('table', 'receitafederal.'+str('paises')) \
.option("temporaryGcsBucket","projeto-dados-receita-federal/temp_bq") \
.mode("overwrite") \
.save()

                                                                                

## Municipios

In [24]:
# Caminho base dos arquivos CSV no GCS
base_path = "gs://projeto-dados-receita-federal/Municipios/arquivo_deszipado"

# Lendo os dados dos estabelecimentos e criando um DataFrame Spark
df_municipios = read_data(spark, "Municipios", base_path, encodings['Municipios'])

df_municipios.write.format('bigquery') \
.option("mapreduce.fileoutputcommitter.algorithm.version", "2")\
.option('table', 'receitafederal.'+str('municipios')) \
.option("temporaryGcsBucket","projeto-dados-receita-federal/temp_bq") \
.mode("overwrite") \
.save()

                                                                                

## Cnaes

In [25]:
# Caminho base dos arquivos CSV no GCS
base_path = "gs://projeto-dados-receita-federal/Cnaes/arquivo_deszipado"

# Lendo os dados dos estabelecimentos e criando um DataFrame Spark
df_cnaes = read_data(spark, "Cnaes", base_path, encodings['Cnaes'])

df_cnaes.write.format('bigquery') \
.option("mapreduce.fileoutputcommitter.algorithm.version", "2")\
.option('table', 'receitafederal.'+str('cnaes')) \
.option("temporaryGcsBucket","projeto-dados-receita-federal/temp_bq") \
.mode("overwrite") \
.save()

                                                                                

## Motivos

In [26]:
# Caminho base dos arquivos CSV no GCS
base_path = "gs://projeto-dados-receita-federal/Motivos/arquivo_deszipado"

# Lendo os dados dos estabelecimentos e criando um DataFrame Spark
df_motivos = read_data(spark, "Motivos", base_path, encodings['Motivos'])

df_motivos.write.format('bigquery') \
.option("mapreduce.fileoutputcommitter.algorithm.version", "2")\
.option('table', 'receitafederal.'+str('motivos')) \
.option("temporaryGcsBucket","projeto-dados-receita-federal/temp_bq") \
.mode("overwrite") \
.save()

                                                                                

## Simples

In [None]:
# Caminho base dos arquivos CSV no GCS
base_path = "gs://projeto-dados-receita-federal/Simples/arquivo_deszipado"

# Lendo os dados dos estabelecimentos e criando um DataFrame Spark
df_motivos = read_data(spark, "Simples", base_path, encodings['Simples'])


df_motivos.write.format('bigquery') \
.option("mapreduce.fileoutputcommitter.algorithm.version", "2")\
.option('table', 'receitafederal.'+str('simples')) \
.option("temporaryGcsBucket","projeto-dados-receita-federal/temp_bq") \
.mode("overwrite") \
.save()

[Stage 1:>                                                         (0 + 2) / 18]