# Inicio


In [None]:
!pip install pyspark

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Analise Empresas") \
    .config('spark.ui.port', '4050') \
    .getOrCreate()

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# ESTABELECIMENTOS

In [None]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType 
from pyspark.sql.types import ArrayType, DoubleType, BooleanType

In [None]:
schema = StructType() \
      .add("_c0",StringType(),True) \
      .add("_c1",StringType(),True) \
      .add("_c2",StringType(),True) \
      .add("_c3",IntegerType(),True) \
      .add("_c4",StringType(),True) \
      .add("_c5",StringType(),True) \
      .add("_c6",StringType(),True) \
      .add("_c7",StringType(),True) \
      .add("_c8",StringType(),True) \
      .add("_c9",StringType(),True) \
      .add("_c10",StringType(),True) \
      .add("_c11",StringType(),True) \
      .add("_c12",StringType(),True) \
      .add("_c13",StringType(),True) \
      .add("_c14",StringType(),True) \
      .add("_c15",StringType(),True) \
      .add("_c16",StringType(),True) \
      .add("_c17",StringType(),True) \
      .add("_c18",StringType(),True) \
      .add("_c19",StringType(),True) \
      .add("_c20",StringType(),True) \
      .add("_c21",StringType(),True) \
      .add("_c22",StringType(),True) \
      .add("_c23",StringType(),True) \
      .add("_c24",StringType(),True) \
      .add("_c25",StringType(),True) \
      .add("_c26",StringType(),True) \
      .add("_c27",StringType(),True) \
      .add("_c28",StringType(),True) \
      .add("_c29",StringType(),True) 

In [None]:
path = '/content/drive/MyDrive/Dados/K3241.K03200Y*'
estabelecimentos = spark.read.csv(path, sep=';', schema = schema)

In [None]:
estabsColNames = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']

In [None]:
estabelecimentos.printSchema()

In [None]:
for index, colName in enumerate(estabsColNames):
    estabelecimentos = estabelecimentos.withColumnRenamed(f"_c{index}", colName)
    
#estabelecimentos.columns

In [None]:
estabelecimentos.show(5)

# CIDADES

In [None]:
path = '/content/drive/MyDrive/Dados/F.K03200$Z.D21217.MUNIC.CSV'
cidades = spark.read.csv(path, sep=';', inferSchema=True)

In [None]:
cidadesColNames = ['municipio', 'cidade']

In [None]:
for index, colName in enumerate(cidadesColNames):
    cidades = cidades.withColumnRenamed(f"_c{index}", colName)
    
cidades.columns

In [None]:
cidades.show(5)

# EMPRESAS

In [None]:
schema = StructType() \
      .add("_c0",StringType(),True) \
      .add("_c1",StringType(),True) \
      .add("_c2",StringType(),True) \
      .add("_c3",StringType(),True) \
      .add("_c4",IntegerType(),True) \
      .add("_c5",StringType(),True) \
      .add("_c6",StringType(),True)

In [None]:
path = '/content/drive/MyDrive/Dados/empresas/'
empresas = spark.read.csv(path, sep=';', schema = schema)

In [None]:
empresasColNames = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']

In [None]:
for index, colName in enumerate(empresasColNames):
    empresas = empresas.withColumnRenamed(f"_c{index}", colName)

empresas.columns

In [None]:
empresas.show(5)

# AJUSTE DOS DADOS (JUNÇÕES, FILTROS)

In [None]:
from pyspark.sql import functions as f

In [None]:
dados = estabelecimentos.join(cidades, 'municipio', how='inner')

In [None]:
dados = dados.join(empresas, 'cnpj_basico', how='inner')

In [None]:
## Juntando cnpj em numero unico

dados = dados.withColumn('cnpj', 
                    f.concat(f.col('cnpj_basico'), f.col('cnpj_ordem'), f.col('cnpj_dv') ))

In [None]:
dados = dados.select('*').where("uf == 'RS'")

In [None]:
dados.show(5)

In [None]:
dados.write.parquet(
    path='/content/drive/MyDrive/Dados/parquet-rs-total',
    mode='overwrite'
)

# LER ARQUIVOS PARQUET (após filtragem inicial)

In [None]:
from pyspark.sql import functions as f

In [None]:
path = '/content/drive/MyDrive/Dados/parquet-rs-total/'
dados = spark.read.parquet(path, sep=';', inferSchema = True)

In [None]:
dados = dados.select('cnpj','razao_social_nome_empresarial', 'nome_fantasia', 'situacao_cadastral', 'cnae_fiscal_principal', 'cidade', 'capital_social_da_empresa', 'porte_da_empresa' )

In [None]:
grupo1 = ['PORTO ALEGRE','CANOAS','GRAVATAI','VIAMAO','CACHOEIRINHA','ALVORADA','GUAIBA','SAPUCAIA','SAO LEOPOLDO', 'ESTEIO', 'NOVO HAMBURGO']
grupo2 = ['PASSO FUNDO','ERECHIM','SOLEDADE','TAPEJARA','CARAZINHO','MARAU']

In [None]:
dados.printSchema()

In [None]:
dados.createOrReplaceTempView("dados")

In [None]:
dadosG1 = spark\
    .sql("""
        SELECT *
            FROM dados
            WHERE cidade IN ('PORTO ALEGRE','CANOAS','GRAVATAI','VIAMAO','CACHOEIRINHA','ALVORADA','GUAIBA','SAPUCAIA','SAO LEOPOLDO', 'ESTEIO', 'NOVO HAMBURGO')
    """)

In [None]:
dadosG2 = spark\
    .sql("""
        SELECT *
            FROM dados
            WHERE cidade IN ('PASSO FUNDO','ERECHIM','SOLEDADE','TAPEJARA','CARAZINHO','MARAU')
    """)

In [None]:
dadosG1.count()

In [None]:
dadosG2.count()

In [None]:
dadosG1.write.parquet(
    path='/content/drive/MyDrive/Dados/dados-grupo1',
    mode='overwrite'
)

In [None]:
dadosG2.write.parquet(
    path='/content/drive/MyDrive/Dados/dados-grupo2',
    mode='overwrite'
)