## Importacao das bibliotecas e chamada do spark

In [3]:
from pyspark.sql import SparkSession, DataFrame, functions as F
from pyspark.sql.types import DoubleType, StringType
from typing import Union, List

spark = SparkSession.builder\
                    .master('local[*]')\
                    .appName('Iniciando com spark')\
                    .getOrCreate()

## Leitura dos arquivos

In [4]:
socios           = spark.read.csv('/Users/tuanymariah/Documents/estudos_pyspark/data/socios', sep = ';', inferSchema = True)
empresas         = spark.read.csv('/Users/tuanymariah/Documents/estudos_pyspark/data/empresas', sep = ';', inferSchema = True)
estabelecimentos = spark.read.csv('/Users/tuanymariah/Documents/estudos_pyspark/data/estabelecimentos', sep = ';', inferSchema = True)

## Manipulacao dos dados

In [5]:
print('printando o dataframde socios')
socios.limit(5).show(truncate=False)

print('printando o dataframde empresas')
empresas.limit(5).show(truncate=False)

print('printando o dataframde estabelecimentos')
estabelecimentos.limit(5).show(truncate=False)

printando o dataframde socios
+-----+---+-------------------------------+-----------+---+--------+----+-----------+----+---+----+
|_c0  |_c1|_c2                            |_c3        |_c4|_c5     |_c6 |_c7        |_c8 |_c9|_c10|
+-----+---+-------------------------------+-----------+---+--------+----+-----------+----+---+----+
|411  |2  |LILIANA PATRICIA GUASTAVINO    |***678188**|22 |19940725|null|***000000**|null|0  |7   |
|411  |2  |CRISTINA HUNDERTMARK           |***637848**|28 |19940725|null|***000000**|null|0  |7   |
|5813 |2  |CELSO EDUARDO DE CASTRO STEPHAN|***786068**|49 |19940516|null|***000000**|null|0  |8   |
|5813 |2  |EDUARDO BERRINGER STEPHAN      |***442348**|49 |19940516|null|***000000**|null|0  |5   |
|14798|2  |HANNE MAHFOUD FADEL            |***760388**|49 |19940609|null|***000000**|null|0  |8   |
+-----+---+-------------------------------+-----------+---+--------+----+-----------+----+---+----+

printando o dataframde empresas
+----+-------------------------------

## Renomeando colunas do dataframe

In [6]:
empresasColNames = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']
estabsColNames   = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']
sociosColNames   = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']

def column_renaming(list_cols: list, df:DataFrame) -> DataFrame:
    for index, colName in enumerate(list_cols,):
        df = df.withColumnRenamed(f"_c{index}", colName)
    
    return df

empresas = column_renaming(empresasColNames, empresas)
estabelecimentos = column_renaming(estabsColNames, estabelecimentos)
socios = column_renaming(sociosColNames, socios)

print('printando o dataframde socios')
socios.limit(5).show(truncate=False)

print('printando o dataframde empresas')
empresas.limit(5).show(truncate=False)

print('printando o dataframde estabelecimentos')
estabelecimentos.limit(5).show(truncate=False)

printando o dataframde socios
+-----------+----------------------+-------------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social  |cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-------------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|411        |2                     |LILIANA PATRICIA GUASTAVINO    |***678188**         |22                   |19940725                 |null|***000000**        |null                 |0                                  |7           |
|411        |2                    

## Analisando os dados após as renomeação das colunas

In [7]:
empresas.printSchema()
estabelecimentos.printSchema()
socios.printSchema()


root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: integer (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: integer (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- c

## Convertendo colunas
### String -> Double

In [8]:
def convert_double(cols: List[str], df:DataFrame) -> DataFrame:
    for col in cols:
        df = df\
            .withColumn(
                col,
               F.regexp_replace(F.col(col), ',','.').cast(DoubleType())
                        
            )
    return df
empresas = convert_double(cols = ['capital_social_da_empresa'], df = empresas)
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



### String -> Date

In [9]:
def convert_date(cols: List[str], df:DataFrame) -> DataFrame:
    for col in cols:
        df = df\
            .withColumn(
                col,
                F.to_date(F.col(col).cast(StringType()), 'yyyyMMdd')
                        
            )
    return  df
    

estabelecimentos = convert_date(cols = ['data_de_inicio_atividade', 'data_situacao_cadastral', 'data_da_situacao_especial'], df = estabelecimentos )  
socios = convert_date(cols = ['data_de_entrada_sociedade'], df = socios )  


## Selecoes e consultas

primeiro irei realizar a selecao de consultas utilizando o *select* do pyspark, após isso criarei uma coluna utilizando o *withColumns*

In [10]:
socios\
    .select(F.col('cnpj_basico'), 
            F.col('data_de_entrada_sociedade'),
            F.year(F.col('data_de_entrada_sociedade')).alias('ano_entrada_sociedade')
            )\
    .show(5, truncate=False)

+-----------+-------------------------+---------------------+
|cnpj_basico|data_de_entrada_sociedade|ano_entrada_sociedade|
+-----------+-------------------------+---------------------+
|411        |1994-07-25               |1994                 |
|411        |1994-07-25               |1994                 |
|5813       |1994-05-16               |1994                 |
|5813       |1994-05-16               |1994                 |
|14798      |1994-06-09               |1994                 |
+-----------+-------------------------+---------------------+
only showing top 5 rows



criacao da coluna **ano_entrada_sociedade** utilizando o **withColumn**

In [11]:
socios = socios\
            .withColumn(
                'ano_entrada_sociedade', F.year(F.col('data_de_entrada_sociedade'))
            )

## Verificando valores nulos

In [12]:
def qtd_valores_nulos(df: DataFrame ) -> DataFrame:
    df = df.select([ F.count(F.when(F.isnull(col), 1)).alias(col) for col in df.columns])
    return df
print('Quantidade de valores nulos para estabelecimentos')
nulos_estabelecimento = qtd_valores_nulos(df = estabelecimentos)
nulos_estabelecimento.show()

print('Quantidade de valores nulos para empresas')
nulos_empresas = qtd_valores_nulos(df = empresas)
nulos_empresas.show()

print('Quantidade de valores nulos para socios')
nulos_socios = qtd_valores_nulos(df = socios)
nulos_socios.show()


Quantidade de valores nulos para estabelecimentos
+-----------+----------+-------+---------------------------+-------------+------------------+-----------------------+-------------------------+--------------------------+-------+------------------------+---------------------+----------------------+------------------+----------+------+-----------+------+-----+---+---------+-------+----------+-------+----------+----------+-------+------------------+-----------------+-------------------------+
|cnpj_basico|cnpj_ordem|cnpj_dv|identificador_matriz_filial|nome_fantasia|situacao_cadastral|data_situacao_cadastral|motivo_situacao_cadastral|nome_da_cidade_no_exterior|   pais|data_de_inicio_atividade|cnae_fiscal_principal|cnae_fiscal_secundaria|tipo_de_logradouro|logradouro|numero|complemento|bairro|  cep| uf|municipio|  ddd_1|telefone_1|  ddd_2|telefone_2|ddd_do_fax|    fax|correio_eletronico|situacao_especial|data_da_situacao_especial|
+-----------+----------+-------+---------------------------+

In [13]:
estabelecimentos.select(F.col('pais'),
                        F.col('uf'))\
                .where(F.col('pais').isNotNull())\
                .show(5)


+----+---+
|pais| uf|
+----+---+
| 351| EX|
|  23| EX|
| 845| EX|
| 137| EX|
| 580| EX|
+----+---+
only showing top 5 rows



In [14]:
estabelecimentos = estabelecimentos.withColumn('pais',
                                 F.when((F.col('uf').isin([
                                     'AC', 'AL', 'AM', 'AP', 'BA', 'CE', 'DF', 'ES', 'GO', 'MA', 'MG', 'MS',
                                     'MT', 'PA', 'PB', 'PE', 'PI', 'PR', 'RJ', 'RN', 'RO', 'RR', 'RS', 'SC',
                                     'SE', 'SP', 'TO']
                                 )) & F.col('pais').isNull(), '55')
                                 .otherwise(F.col('pais')))


In [15]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [16]:
df_all = empresas.join(estabelecimentos, on = 'cnpj_basico' , how = 'inner')

In [17]:
df_all.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: string (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo

A partir do select utilizando pyspark abaixo, podemos observar que algumas empresas possuem valores iguais a 0.0 no capital social da empresa,
vamos filtrar esses registros por valores acima de 100?

In [18]:
df_all.select(F.col('razao_social_nome_empresarial'), 
              F.col('nome_fantasia'),
              F.col('porte_da_empresa'),
              F.col('capital_social_da_empresa'))\
      .show(5,truncate = False)



+-------------------------------------------------------------+-------------+----------------+-------------------------+
|razao_social_nome_empresarial                                |nome_fantasia|porte_da_empresa|capital_social_da_empresa|
+-------------------------------------------------------------+-------------+----------------+-------------------------+
|PLANNER COMUNICACAO PUBLICIDADE E ASSESSORIA PROMOCIONAL LTDA|null         |5               |0.0                      |
|ROUPA VIVA SERVICOS DE LAVANDERIA LTDA                       |null         |5               |0.0                      |
|IN FOCO PRODUCOES FOTOGRAFICAS S/S LTDA                      |null         |1               |500.0                    |
|CLINICA ROMA S/C LTDA                                        |null         |5               |0.0                      |
|APRENDENDO A VIVER E CRESCER                                 |null         |5               |0.0                      |
+-------------------------------

In [19]:
df_all\
    .select(F.col('razao_social_nome_empresarial'), 
            F.col('nome_fantasia'),
            F.col('porte_da_empresa'),
            F.col('capital_social_da_empresa'))\
    .where(F.col('capital_social_da_empresa')>100)\
    .show(truncate = False)

+--------------------------------------------------+----------------------------------+----------------+-------------------------+
|razao_social_nome_empresarial                     |nome_fantasia                     |porte_da_empresa|capital_social_da_empresa|
+--------------------------------------------------+----------------------------------+----------------+-------------------------+
|IN FOCO PRODUCOES FOTOGRAFICAS S/S LTDA           |null                              |1               |500.0                    |
|CENTRAL LUB COMERCIO DE OLEO E REPRESENTACOES LTDA|null                              |1               |650000.0                 |
|AMERICO REGATIERI NETO REPRESENTACOES             |null                              |1               |5000.0                   |
|TECBATERY ESCOLA LIVRE DE MUSICA LTDA             |null                              |1               |5000.0                   |
|COMERCIAL FREITAS & VIEIRA LTDA                   |EMBALAGENS SERLUZ              