In [1]:
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as f
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()

In [3]:
pathEmpresas = "empresas"
pathEstabelecimentos = "estabelecimentos"
pathSocios = "socios"

In [4]:
empresas = spark.read.csv(pathEmpresas, sep=';', inferSchema=True)
estabelecimentos = spark.read.csv(pathEstabelecimentos, sep=';', inferSchema=True)
socios = spark.read.csv(pathSocios, sep=';', inferSchema=True)

In [5]:
empresas.count()

4585679

In [6]:
estabelecimentos.count()

4836219

In [7]:
socios.count()

2046430

In [8]:
estabelecimentos.limit(5).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,...,_c20,_c21,_c22,_c23,_c24,_c25,_c26,_c27,_c28,_c29
0,1879,1,96,1,PIRAMIDE M. C.,8,20011029,1,,,...,7107,,,,,,,,,
1,2818,1,43,1,,8,20081231,71,,,...,7107,,,,,,,,,
2,3110,1,7,1,,8,19971231,1,,,...,7107,,,,,,,,,
3,3733,1,80,1,,8,20081231,71,,,...,7107,,,,,,,,,
4,4628,3,27,2,EMBROIDERY & GIFT,8,19980429,1,,,...,7075,,,,,,,,,


## Manipulação de Dataframes
### Renomeando as colunas

In [9]:
empresasColNames = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']

estabsColNames = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']

sociosColNames = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']

In [10]:
# Código do curso

# for index, colName in enumerate(empresasColNames):
#     empresas = empresas.withColumnRenamed(f"_c{index}", colName)

# for index, colName in enumerate(estabsColNames):
#     estabelecimentos = estabelecimentos.withColumnRenamed(f"_c{index}", colName)

# for index, colName in enumerate(sociosColNames):
#     socios = socios.withColumnRenamed(f"_c{index}", colName)

# Melhor forma
empresas = empresas.toDF(*empresasColNames)
estabelecimentos = estabelecimentos.toDF(*estabsColNames)
socios = socios.toDF(*sociosColNames)

In [11]:
empresas.columns

['cnpj_basico',
 'razao_social_nome_empresarial',
 'natureza_juridica',
 'qualificacao_do_responsavel',
 'capital_social_da_empresa',
 'porte_da_empresa',
 'ente_federativo_responsavel']

In [12]:
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,400000,1,


In [13]:
estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,1879,1,96,1,PIRAMIDE M. C.,8,20011029,1,,,...,7107,,,,,,,,,
1,2818,1,43,1,,8,20081231,71,,,...,7107,,,,,,,,,
2,3110,1,7,1,,8,19971231,1,,,...,7107,,,,,,,,,
3,3733,1,80,1,,8,20081231,71,,,...,7107,,,,,,,,,
4,4628,3,27,2,EMBROIDERY & GIFT,8,19980429,1,,,...,7075,,,,,,,,,


In [14]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,19940725,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,19940725,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,19940516,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,19940516,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,19940609,,***000000**,,0,8


In [15]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [16]:
estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: integer (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: integer (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: str

In [17]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: integer (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [18]:
empresas = empresas.withColumn('capital_social_da_empresa', f.regexp_replace('capital_social_da_empresa', ',', '.'))

In [19]:
empresas = empresas.withColumn('capital_social_da_empresa', empresas['capital_social_da_empresa'].cast(DoubleType()))

In [20]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [21]:
estabelecimentos = estabelecimentos.withColumn(
        "data_situacao_cadastral",
        f.to_date(estabelecimentos.data_situacao_cadastral.cast(StringType()), 'yyyyMMdd')
    ).withColumn(
        "data_de_inicio_atividade",
        f.to_date(estabelecimentos.data_de_inicio_atividade.cast(StringType()), 'yyyyMMdd')
    ).withColumn(
        "data_da_situacao_especial",
        f.to_date(estabelecimentos.data_da_situacao_especial.cast(StringType()), 'yyyyMMdd')
    )

In [22]:
estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [23]:
estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,1879,1,96,1,PIRAMIDE M. C.,8,2001-10-29,1,,,...,7107,,,,,,,,,
1,2818,1,43,1,,8,2008-12-31,71,,,...,7107,,,,,,,,,
2,3110,1,7,1,,8,1997-12-31,1,,,...,7107,,,,,,,,,
3,3733,1,80,1,,8,2008-12-31,71,,,...,7107,,,,,,,,,
4,4628,3,27,2,EMBROIDERY & GIFT,8,1998-04-29,1,,,...,7075,,,,,,,,,


In [24]:
socios = socios.withColumn(
        "data_de_entrada_sociedade",
        f.to_date(socios.data_de_entrada_sociedade.cast(StringType()), 'yyyyMMdd')
    )

## Seleções e consultas

In [25]:
empresas.select('*').show(5, False)

+-----------+--------------------------------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial                                                               |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+--------------------------------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|306        |FRANCAMAR REFRIGERACAO TECNICA S/C LTDA                                                     |2240             |49                         |0.0                      |1               |NULL                       |
|1355       |BRASILEIRO & OLIVEIRA LTDA                                                                 

In [26]:
empresas.select('natureza_juridica', 'porte_da_empresa', 'capital_social_da_empresa').show(5, False)

+-----------------+----------------+-------------------------+
|natureza_juridica|porte_da_empresa|capital_social_da_empresa|
+-----------------+----------------+-------------------------+
|2240             |1               |0.0                      |
|2062             |5               |0.0                      |
|3034             |5               |0.0                      |
|2135             |5               |0.0                      |
|2062             |1               |4000.0                   |
+-----------------+----------------+-------------------------+
only showing top 5 rows



In [27]:
socios.select(
    'nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('data_entrada')
).show(5, False)

+-------------------------------+------------+------------+
|nome_do_socio_ou_razao_social  |faixa_etaria|data_entrada|
+-------------------------------+------------+------------+
|LILIANA PATRICIA GUASTAVINO    |7           |1994        |
|CRISTINA HUNDERTMARK           |7           |1994        |
|CELSO EDUARDO DE CASTRO STEPHAN|8           |1994        |
|EDUARDO BERRINGER STEPHAN      |5           |1994        |
|HANNE MAHFOUD FADEL            |8           |1994        |
+-------------------------------+------------+------------+
only showing top 5 rows



In [28]:
estabelecimentos.select(
    'nome_fantasia', 'municipio',
    f.year('data_de_inicio_atividade').alias('ano_de_inicio_atividade'),
    f.month('data_de_inicio_atividade').alias('mes_de_inicio_atividade'),
).show(5, False)

+-----------------+---------+-----------------------+-----------------------+
|nome_fantasia    |municipio|ano_de_inicio_atividade|mes_de_inicio_atividade|
+-----------------+---------+-----------------------+-----------------------+
|PIRAMIDE M. C.   |7107     |1994                   |5                      |
|NULL             |7107     |1994                   |5                      |
|NULL             |7107     |1994                   |5                      |
|NULL             |7107     |1994                   |5                      |
|EMBROIDERY & GIFT|7075     |1995                   |5                      |
+-----------------+---------+-----------------------+-----------------------+
only showing top 5 rows



## Identificando valores nulos

In [31]:
estabelecimentos.limit(2).show()

+-----------+----------+-------+---------------------------+--------------+------------------+-----------------------+-------------------------+--------------------------+----+------------------------+---------------------+----------------------+------------------+---------------+------+-----------+------------------+-------+---+---------+-----+----------+-----+----------+----------+----+------------------+-----------------+-------------------------+
|cnpj_basico|cnpj_ordem|cnpj_dv|identificador_matriz_filial| nome_fantasia|situacao_cadastral|data_situacao_cadastral|motivo_situacao_cadastral|nome_da_cidade_no_exterior|pais|data_de_inicio_atividade|cnae_fiscal_principal|cnae_fiscal_secundaria|tipo_de_logradouro|     logradouro|numero|complemento|            bairro|    cep| uf|municipio|ddd_1|telefone_1|ddd_2|telefone_2|ddd_do_fax| fax|correio_eletronico|situacao_especial|data_da_situacao_especial|
+-----------+----------+-------+---------------------------+--------------+---------------

In [34]:
socios.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in socios.columns]).show()

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+-------+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|   pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+-------+-------------------+---------------------+-----------------------------------+------------+
|          0|                     0|                          208|                1234|                    0|                        1|2038255|                  0|              1995432|                                  0|           0|
+-----------+----------------------+------------------------

In [35]:
socios.na.fill(0).limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,0,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,0,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,0,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,0,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,0,***000000**,,0,8


In [37]:
socios.na.fill('-').limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,,***000000**,-,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,,***000000**,-,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,,***000000**,-,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,,***000000**,-,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,,***000000**,-,0,8


## Ordenando os valores

In [44]:
socios.select(
    "nome_do_socio_ou_razao_social", "faixa_etaria", f.year('data_de_entrada_sociedade').alias('ano_de_entrada')
).show(5, False)

+-------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social  |faixa_etaria|ano_de_entrada|
+-------------------------------+------------+--------------+
|LILIANA PATRICIA GUASTAVINO    |7           |1994          |
|CRISTINA HUNDERTMARK           |7           |1994          |
|CELSO EDUARDO DE CASTRO STEPHAN|8           |1994          |
|EDUARDO BERRINGER STEPHAN      |5           |1994          |
|HANNE MAHFOUD FADEL            |8           |1994          |
+-------------------------------+------------+--------------+
only showing top 5 rows



In [48]:
socios.select(
    "nome_do_socio_ou_razao_social", "faixa_etaria", f.year('data_de_entrada_sociedade').alias('ano_de_entrada')
).orderBy(['faixa_etaria', 'ano_de_entrada'], ascending=[False, False]).show(15, False)

+------------------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social             |faixa_etaria|ano_de_entrada|
+------------------------------------------+------------+--------------+
|FRANCISCO PIMPINATO                       |9           |2021          |
|ANDRE PAUL GABAY                          |9           |2021          |
|SYLVIA ZELINDA CATTANI FANALI BARROS      |9           |2021          |
|HARUE YAMAMOTO                            |9           |2021          |
|JOSE DA SILVA                             |9           |2021          |
|ZELIA FERREIRA DA SILVA                   |9           |2021          |
|YARA CABRAL PINTO                         |9           |2021          |
|EUDOXIO PONTES DE SOUZA                   |9           |2021          |
|FRANCISCO ALFREDO LOBO JUNGER             |9           |2021          |
|ROBERTO CARDOSO DE ALMEIDA AMORIM         |9           |2021          |
|LUZINETE DANTAS DE OLIVEIRA RODRIGUES     |9      

In [49]:
empresas.where(
    "capital_social_da_empresa==50"
).show(5, False)

+-----------+------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial       |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|17350147   |ERIK MARCELO DOS SANTOS 42107848858 |2135             |50                         |50.0                     |1               |NULL                       |
|17833214   |ALEXANDRE MACHADO LIMA 73750123772  |2135             |50                         |50.0                     |1               |NULL                       |
|20860830   |YASMIN MOURA DA FONSECA 13457709793 |2135             |50                         |50.0                     |1               |NULL                 

In [50]:
socios.select(
    "nome_do_socio_ou_razao_social"
).filter(
    socios.nome_do_socio_ou_razao_social.startswith("ANTONIO")
).filter(
    socios.nome_do_socio_ou_razao_social.endswith("ALVES")
).limit(10).toPandas()

Unnamed: 0,nome_do_socio_ou_razao_social
0,ANTONIO ELICIO SANTOS ALVES
1,ANTONIO DE PADUA ALVES
2,ANTONIO EDUARDO ROCHA ALVES
3,ANTONIO ALCINO ALVES
4,ANTONIO MASCARENHAS ALVES
5,ANTONIO MARCELO ARAUJO GONCALVES
6,ANTONIO VILMAR PEREIRA ALVES
7,ANTONIO RODRIGUES GONCALVES
8,ANTONIO JOSE ALVES
9,ANTONIO APARECIDO GONCALVES


In [None]:
spark.stop()