## Projeto Spark

In [1]:
#caso não tenha path nas variaveis de ambiente
import os
#os.environ["SPARK_HOME"] = "C:\spark\spark-3.3.0-bin-hadoop27"


In [2]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession


In [4]:
spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Iniciando com Spark") \
    .config('spark.ui.port','4050') \
    .getOrCreate()

In [5]:
spark

In [6]:
data = [('Zeca', '35'), ('Eva', '29')]
colNames = ['Nome', 'Idade']


In [7]:
df = spark.createDataFrame(data, colNames)


In [8]:
df.show()


+----+-----+
|Nome|Idade|
+----+-----+
|Zeca|   35|
| Eva|   29|
+----+-----+



In [9]:
import pandas as pd




In [10]:
df.toPandas()


Unnamed: 0,Nome,Idade
0,Zeca,35
1,Eva,29


## Projeto

Nosso projeto consiste em ler, manipular, tratar e salvar um conjunto de dados volumosos utilizando como ferramenta o Spark.

In [11]:
import zipfile

In [12]:
#zipfile.ZipFile('./Formacao-Spark-Python-Alura/data/download/empresas.zip','r').extractall('./Formacao-Spark-Python-Alura/data/raw/')

In [13]:
path_empresas = 'D:/ONEDRIVE/EDUCACAO/FORMACAO_SPARK_PYTHON_ALURA/Formacao-Spark-Python-Alura/data/raw/empresas'
empresas = spark.read.csv(path_empresas, sep=';', inferSchema=True)

In [14]:
empresas.count()

4585679

In [15]:
#zipfile.ZipFile('./Formacao-Spark-Python-Alura/data/download/estabelecimentos.zip','r').extractall('./Formacao-Spark-Python-Alura/data/raw/')
#zipfile.ZipFile('./Formacao-Spark-Python-Alura/data/download/socios.zip','r').extractall('./Formacao-Spark-Python-Alura/data/raw/')

In [16]:
path_estabelecimentos = 'D:/ONEDRIVE/EDUCACAO/FORMACAO_SPARK_PYTHON_ALURA/Formacao-Spark-Python-Alura/data/raw/estabelecimentos'
path_socios = 'D:/ONEDRIVE/EDUCACAO/FORMACAO_SPARK_PYTHON_ALURA/Formacao-Spark-Python-Alura/data/raw/socios'
estabelecimentos = spark.read.csv(path_estabelecimentos, sep=';', inferSchema=True)
socios = spark.read.csv(path_socios, sep=';', inferSchema=True)

In [17]:
estabelecimentos.count()

4836219

In [18]:
socios.count()

2046430

In [19]:
empresas.limit(5)

DataFrame[_c0: int, _c1: string, _c2: int, _c3: int, _c4: string, _c5: int, _c6: string]

In [20]:
empresas.limit(5).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6
0,4519,DANIELA DA SILVA CRUZ,2135,50,0,5,
1,8638,JOAO DOS SANTOS FAGUNDES,2135,50,0,5,
2,11748,PANIFICADORA E CONFEITARIA CONFIANCA RIO PRETO...,2062,49,0,1,
3,12027,L G SORVETERIA LTDA,2062,49,0,5,
4,13289,ANDREIA CRISTINA DELSIN EIRELI,2305,65,10000000,1,


In [21]:
empresasColNames = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']

In [22]:
for item in enumerate(empresasColNames):
        print(item)

(0, 'cnpj_basico')
(1, 'razao_social_nome_empresarial')
(2, 'natureza_juridica')
(3, 'qualificacao_do_responsavel')
(4, 'capital_social_da_empresa')
(5, 'porte_da_empresa')
(6, 'ente_federativo_responsavel')


In [23]:
for index, colName in enumerate(empresasColNames):
    empresas =empresas.withColumnRenamed(f"_c{index}", colName)

empresas.columns

['cnpj_basico',
 'razao_social_nome_empresarial',
 'natureza_juridica',
 'qualificacao_do_responsavel',
 'capital_social_da_empresa',
 'porte_da_empresa',
 'ente_federativo_responsavel']

In [24]:
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,4519,DANIELA DA SILVA CRUZ,2135,50,0,5,
1,8638,JOAO DOS SANTOS FAGUNDES,2135,50,0,5,
2,11748,PANIFICADORA E CONFEITARIA CONFIANCA RIO PRETO...,2062,49,0,1,
3,12027,L G SORVETERIA LTDA,2062,49,0,5,
4,13289,ANDREIA CRISTINA DELSIN EIRELI,2305,65,10000000,1,


In [25]:
estabsColNames = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']

sociosColNames = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']

In [26]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [27]:
for index, colName in enumerate(estabsColNames):
    estabelecimentos = estabelecimentos.withColumnRenamed(
        f"_c{index}", colName)

estabelecimentos.columns


['cnpj_basico',
 'cnpj_ordem',
 'cnpj_dv',
 'identificador_matriz_filial',
 'nome_fantasia',
 'situacao_cadastral',
 'data_situacao_cadastral',
 'motivo_situacao_cadastral',
 'nome_da_cidade_no_exterior',
 'pais',
 'data_de_inicio_atividade',
 'cnae_fiscal_principal',
 'cnae_fiscal_secundaria',
 'tipo_de_logradouro',
 'logradouro',
 'numero',
 'complemento',
 'bairro',
 'cep',
 'uf',
 'municipio',
 'ddd_1',
 'telefone_1',
 'ddd_2',
 'telefone_2',
 'ddd_do_fax',
 'fax',
 'correio_eletronico',
 'situacao_especial',
 'data_da_situacao_especial']

In [28]:
for index, colName in enumerate(sociosColNames):
    socios = socios.withColumnRenamed(f"_c{index}", colName)

socios.columns


['cnpj_basico',
 'identificador_de_socio',
 'nome_do_socio_ou_razao_social',
 'cnpj_ou_cpf_do_socio',
 'qualificacao_do_socio',
 'data_de_entrada_sociedade',
 'pais',
 'representante_legal',
 'nome_do_representante',
 'qualificacao_do_representante_legal',
 'faixa_etaria']

In [29]:

estabelecimentos.printSchema()


root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: integer (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: integer (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: str

In [30]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: integer (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [31]:
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as f


In [32]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [33]:
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,4519,DANIELA DA SILVA CRUZ,2135,50,0,5,
1,8638,JOAO DOS SANTOS FAGUNDES,2135,50,0,5,
2,11748,PANIFICADORA E CONFEITARIA CONFIANCA RIO PRETO...,2062,49,0,1,
3,12027,L G SORVETERIA LTDA,2062,49,0,5,
4,13289,ANDREIA CRISTINA DELSIN EIRELI,2305,65,10000000,1,


In [34]:
empresas = empresas.withColumn(
    'capital_social_da_empresa', f.regexp_replace('capital_social_da_empresa', ',', '.'))
empresas.limit(5).toPandas()


Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,4519,DANIELA DA SILVA CRUZ,2135,50,0.0,5,
1,8638,JOAO DOS SANTOS FAGUNDES,2135,50,0.0,5,
2,11748,PANIFICADORA E CONFEITARIA CONFIANCA RIO PRETO...,2062,49,0.0,1,
3,12027,L G SORVETERIA LTDA,2062,49,0.0,5,
4,13289,ANDREIA CRISTINA DELSIN EIRELI,2305,65,100000.0,1,


In [35]:
empresas.printSchema()


root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [36]:
empresas = empresas.withColumn(
    'capital_social_da_empresa', empresas['capital_social_da_empresa'].cast(DoubleType()))
empresas.limit(5).toPandas()


Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,4519,DANIELA DA SILVA CRUZ,2135,50,0.0,5,
1,8638,JOAO DOS SANTOS FAGUNDES,2135,50,0.0,5,
2,11748,PANIFICADORA E CONFEITARIA CONFIANCA RIO PRETO...,2062,49,0.0,1,
3,12027,L G SORVETERIA LTDA,2062,49,0.0,5,
4,13289,ANDREIA CRISTINA DELSIN EIRELI,2305,65,100000.0,1,


In [37]:
empresas.printSchema()


root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [38]:
estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,4519,1,48,1,GIRAFFAS,8,19950331,1,,,...,6219,,,,,,,,,
1,8638,1,79,1,AGROPECUARIA FAGUNDES,8,20150209,73,,,...,7255,,,,,,,,,
2,11748,1,90,1,,4,20181219,63,,,...,7097,,,,,,,,,
3,12027,1,2,1,,8,20081231,71,,,...,7107,,,,,,,,,
4,13289,1,83,1,JS MATERIAIS DE CONSTRUCAO,2,20040123,0,,,...,6915,19.0,35811286.0,,,,,CONTATO@LEONECONTABIL.COM.BR,,


In [39]:
estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: integer (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: integer (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: str

In [40]:
estabelecimentos = estabelecimentos \
    .withColumn("data_situacao_cadastral", 
                f.to_date(estabelecimentos.data_situacao_cadastral.cast(StringType()), 'yyyyMMdd')
    ) \
    .withColumn("data_de_inicio_atividade",
                f.to_date(estabelecimentos.data_de_inicio_atividade.cast(StringType()), 'yyyyMMdd')
    ) \
    .withColumn("data_da_situacao_especial",
                f.to_date(estabelecimentos.data_da_situacao_especial.cast(StringType()), 'yyyyMMdd')
    )

estabelecimentos.printSchema()


root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [41]:
estabelecimentos.limit(5).toPandas()


Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,4519,1,48,1,GIRAFFAS,8,1995-03-31,1,,,...,6219,,,,,,,,,
1,8638,1,79,1,AGROPECUARIA FAGUNDES,8,2015-02-09,73,,,...,7255,,,,,,,,,
2,11748,1,90,1,,4,2018-12-19,63,,,...,7097,,,,,,,,,
3,12027,1,2,1,,8,2008-12-31,71,,,...,7107,,,,,,,,,
4,13289,1,83,1,JS MATERIAIS DE CONSTRUCAO,2,2004-01-23,0,,,...,6915,19.0,35811286.0,,,,,CONTATO@LEONECONTABIL.COM.BR,,


In [42]:
socios.limit(5).toPandas()


Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,19940725,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,19940725,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,19940516,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,19940516,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,19940609,,***000000**,,0,8


In [43]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: integer (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [44]:
socios = socios.withColumn("data_de_entrada_sociedade", f.to_date(socios.data_de_entrada_sociedade.cast(StringType()), 'yyyyMMdd'))


socios.printSchema()


root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [45]:
socios.limit(5).toPandas()


Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,,***000000**,,0,8


In [46]:
empresas\
    .select('*')\
    .show(5)

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|       4519|         DANIELA DA SILVA ...|             2135|                         50|                      0.0|               5|                       null|
|       8638|         JOAO DOS SANTOS F...|             2135|                         50|                      0.0|               5|                       null|
|      11748|         PANIFICADORA E CO...|             2062|                         49|                      0.0|               1|                       null|
|      12027|          L G SORVETE

In [47]:
empresas\
    .select('*')\
    .show(5,False)


+-----------+---------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial                      |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+---------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|4519       |DANIELA DA SILVA CRUZ                              |2135             |50                         |0.0                      |5               |null                       |
|8638       |JOAO DOS SANTOS FAGUNDES                           |2135             |50                         |0.0                      |5               |null                       |
|11748      |PANIFICADORA E CONFEITARIA CONFIANCA RIO PRETO LTDA|2062             |49

In [48]:
empresas\
    .select('natureza_juridica', 'porte_da_empresa', 'capital_social_da_empresa')\
    .show(5,False)

+-----------------+----------------+-------------------------+
|natureza_juridica|porte_da_empresa|capital_social_da_empresa|
+-----------------+----------------+-------------------------+
|2135             |5               |0.0                      |
|2135             |5               |0.0                      |
|2062             |1               |0.0                      |
|2062             |5               |0.0                      |
|2305             |1               |100000.0                 |
+-----------------+----------------+-------------------------+
only showing top 5 rows



In [49]:
socios\
    .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .show(5, False)


+-------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social  |faixa_etaria|ano_de_entrada|
+-------------------------------+------------+--------------+
|LILIANA PATRICIA GUASTAVINO    |7           |1994          |
|CRISTINA HUNDERTMARK           |7           |1994          |
|CELSO EDUARDO DE CASTRO STEPHAN|8           |1994          |
|EDUARDO BERRINGER STEPHAN      |5           |1994          |
|HANNE MAHFOUD FADEL            |8           |1994          |
+-------------------------------+------------+--------------+
only showing top 5 rows



In [50]:
estabelecimentos.printSchema()


root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [51]:
estabelecimentos\
    .select('nome_fantasia', 'municipio', f.year('data_de_inicio_atividade').alias('ano_de_inicio_atividade'), 
    f.month('data_de_inicio_atividade').alias('mes_de_inicio_atividade'))\
    .show(5)


+--------------------+---------+-----------------------+-----------------------+
|       nome_fantasia|municipio|ano_de_inicio_atividade|mes_de_inicio_atividade|
+--------------------+---------+-----------------------+-----------------------+
|            GIRAFFAS|     6219|                   1994|                      5|
|AGROPECUARIA FAGU...|     7255|                   1994|                      5|
|                null|     7097|                   1994|                      5|
|                null|     7107|                   1994|                      6|
|JS MATERIAIS DE C...|     6915|                   1994|                      6|
+--------------------+---------+-----------------------+-----------------------+
only showing top 5 rows



In [52]:
data1 = [
    ('GISELLE PAULA GUIMARAES CASTRO', 15),
    ('ELAINE GARCIA DE OLIVEIRA', 22),
    ('JOAO CARLOS ABNER DE LOURDES', 43),
    ('MARTA ZELI FERREIRA', 24),
    ('LAUDENETE WIGGERS ROEDER', 51)
]
colNames1 = ['nome', 'idade']
df1 = spark.createDataFrame(data1, colNames1)
df1.show(truncate=False)


+------------------------------+-----+
|nome                          |idade|
+------------------------------+-----+
|GISELLE PAULA GUIMARAES CASTRO|15   |
|ELAINE GARCIA DE OLIVEIRA     |22   |
|JOAO CARLOS ABNER DE LOURDES  |43   |
|MARTA ZELI FERREIRA           |24   |
|LAUDENETE WIGGERS ROEDER      |51   |
+------------------------------+-----+



In [53]:
df1 \
    .select(
        f.concat_ws(
            ', ',
            f.substring_index('nome', ' ', -1),
            f.substring_index('nome', ' ', 1)
        ).alias('ident'),
        'idade') \
    .show(truncate=False)


+-----------------+-----+
|ident            |idade|
+-----------------+-----+
|CASTRO, GISELLE  |15   |
|OLIVEIRA, ELAINE |22   |
|LOURDES, JOAO    |43   |
|FERREIRA, MARTA  |24   |
|ROEDER, LAUDENETE|51   |
+-----------------+-----+



In [54]:
socios.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in socios.columns]).show()


+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+-------+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|   pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+-------+-------------------+---------------------+-----------------------------------+------------+
|          0|                     0|                          208|                1234|                    0|                        1|2038255|                  0|              1995432|                                  0|           0|
+-----------+----------------------+------------------------

In [55]:
socios.na.fill(0).limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,0,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,0,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,0,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,0,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,0,***000000**,,0,8


In [56]:
socios.na.fill('-').limit(5).toPandas()


Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,,***000000**,-,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,,***000000**,-,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,,***000000**,-,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,,***000000**,-,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,,***000000**,-,0,8


In [57]:
socios\
    .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .orderBy('ano_de_entrada', ascending=False)\
    .show(5, False)

+-----------------------------+------------+--------------+
|nome_do_socio_ou_razao_social|faixa_etaria|ano_de_entrada|
+-----------------------------+------------+--------------+
|ANGELICA FLORENCIO DA SILVA  |5           |2021          |
|IVANOR ROZZA                 |6           |2021          |
|LUARA CABRAL DE CARVALHO     |4           |2021          |
|FERNANDO ALIPRANDINI         |6           |2021          |
|TALLES GONCALVES MOURA       |4           |2021          |
+-----------------------------+------------+--------------+
only showing top 5 rows



In [58]:
socios\
    .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .orderBy(['ano_de_entrada', 'faixa_etaria'], ascending=[False, False])\
    .show(5, False)


+-------------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social        |faixa_etaria|ano_de_entrada|
+-------------------------------------+------------+--------------+
|YARA CABRAL PINTO                    |9           |2021          |
|JOSE LUIZ DE AMORIM CARRAO           |9           |2021          |
|LUZINETE DANTAS DE OLIVEIRA RODRIGUES|9           |2021          |
|JOSE CURADO ADORNO                   |9           |2021          |
|ROSA NEMER RIBEIRO                   |9           |2021          |
+-------------------------------------+------------+--------------+
only showing top 5 rows



In [59]:
data = [
    ('CARMINA RABELO', 4, 2010),
    ('HERONDINA PEREIRA', 6, 2009),
    ('IRANI DOS SANTOS', 12, 2010),
    ('JOAO BOSCO DA FONSECA', 3, 2009),
    ('CARLITO SOUZA', 1, 2010),
    ('WALTER DIAS', 9, 2009),
    ('BRENO VENTUROSO', 1, 2009),
    ('ADELINA TEIXEIRA', 5, 2009),
    ('ELIO SILVA', 7, 2010),
    ('DENIS FONSECA', 6, 2010)
]
colNames = ['nome', 'mes', 'ano']
df = spark.createDataFrame(data, colNames)
df.show(truncate=False)


+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|CARMINA RABELO       |4  |2010|
|HERONDINA PEREIRA    |6  |2009|
|IRANI DOS SANTOS     |12 |2010|
|JOAO BOSCO DA FONSECA|3  |2009|
|CARLITO SOUZA        |1  |2010|
|WALTER DIAS          |9  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
|ELIO SILVA           |7  |2010|
|DENIS FONSECA        |6  |2010|
+---------------------+---+----+



In [60]:
df\
    .select('*')\
    .orderBy(['ano', 'mes'], ascending=[False, False])\
    .show(truncate=False)


+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|IRANI DOS SANTOS     |12 |2010|
|ELIO SILVA           |7  |2010|
|DENIS FONSECA        |6  |2010|
|CARMINA RABELO       |4  |2010|
|CARLITO SOUZA        |1  |2010|
|WALTER DIAS          |9  |2009|
|HERONDINA PEREIRA    |6  |2009|
|ADELINA TEIXEIRA     |5  |2009|
|JOAO BOSCO DA FONSECA|3  |2009|
|BRENO VENTUROSO      |1  |2009|
+---------------------+---+----+



In [61]:
empresas\
            .where("capital_social_da_empresa==50")\
            .show(5, False)

+-----------+--------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial               |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+--------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|14715041   |LIDIANE MARIA DO NASCIMENTO 14542418707     |2135             |50                         |50.0                     |1               |null                       |
|20601885   |CRISTIANO AKIHITO BORDIN 04370949955        |2135             |50                         |50.0                     |1               |null                       |
|23661983   |VITOR ALOISIO DO NASCIMENTO GUIA 12663882739|2135             |50                         |50.0            

In [62]:
socios\
    .select("nome_do_socio_ou_razao_social")\
    .filter(socios.nome_do_socio_ou_razao_social.startswith("RODRIGO"))\
    .filter(socios.nome_do_socio_ou_razao_social.endswith("DIAS"))\
    .limit(10)\
    .toPandas()


Unnamed: 0,nome_do_socio_ou_razao_social
0,RODRIGO BENASSI DIAS
1,RODRIGO RUDIBERTO DIAS
2,RODRIGO AURELIANO DIAS
3,RODRIGO SIMOES LEMOS DIAS
4,RODRIGO GEORGE DIAS
5,RODRIGO AUGUSTO FELICIO DIAS
6,RODRIGO FERNANDES DIAS
7,RODRIGO GARRIDO DIAS
8,RODRIGO OLIVEIRA DIAS
9,RODRIGO GONCALVES DIAS


In [63]:
socios\
    .select("nome_do_socio_ou_razao_social")\
    .filter(socios.nome_do_socio_ou_razao_social.endswith("NOBRE"))\
    .limit(10)\
    .toPandas()


Unnamed: 0,nome_do_socio_ou_razao_social
0,VAGNER NOBRE
1,VALDIR NOBRE
2,CLAUDERLEY SANTANA NOBRE
3,JOAO CARLOS DE OLIVEIRA NOBRE
4,ARLEIDE DE CARVALHO NOBRE
5,LUCAS POTI NOBRE
6,FRANCISCO DE CARVALHO NOBRE
7,LUCAS BERTHOLDO NOBRE
8,MARCELO BASTOS NOBRE
9,JEZALIA SISARA NOBRE


In [64]:
socios\
    .select("nome_do_socio_ou_razao_social")\
    .filter(socios.nome_do_socio_ou_razao_social.contains("NOBRE"))\
    .limit(10)\
    .toPandas()


Unnamed: 0,nome_do_socio_ou_razao_social
0,CARLOS ALBERTO NOBREGA
1,HELIO SERGIO NOBREGA NELSON
2,VANESSA SILVERIO NOBREGA
3,LUIZ ANTONIO DA COSTA NOBREGA
4,ADEMAR NOBREGA MATHIAS
5,OSNI NOBRE DA SILVA
6,VAGNER NOBRE
7,VALDIR NOBRE
8,CHARLES PATRICIO NOBREGA DE ANDRADE
9,MARCIO JEAN NOBRE DE OLIVEIRA


In [65]:
data = [
    ('CARMINA RABELO', 4, 2010),
    ('HERONDINA PEREIRA', 6, 2009),
    ('IRANI DOS SANTOS', 12, 2010),
    ('JOAO BOSCO DA FONSECA', 3, 2009),
    ('CARLITO SOUZA', 1, 2010),
    ('WALTER DIAS', 9, 2009),
    ('BRENO VENTUROSO', 1, 2009),
    ('ADELINA TEIXEIRA', 5, 2009),
    ('ELIO SILVA', 7, 2010),
    ('DENIS FONSECA', 6, 2010)
]
colNames = ['nome', 'mes', 'ano']
df = spark.createDataFrame(data, colNames)
df.show(truncate=False)


+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|CARMINA RABELO       |4  |2010|
|HERONDINA PEREIRA    |6  |2009|
|IRANI DOS SANTOS     |12 |2010|
|JOAO BOSCO DA FONSECA|3  |2009|
|CARLITO SOUZA        |1  |2010|
|WALTER DIAS          |9  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
|ELIO SILVA           |7  |2010|
|DENIS FONSECA        |6  |2010|
+---------------------+---+----+



In [66]:
df\
    .filter(df.mes <= 6)\
    .filter(df.ano = 2009)\
    .show(truncate=False)

SyntaxError: expression cannot contain assignment, perhaps you meant "=="? (4085509072.py, line 3)

In [67]:
df\
    .filter("mes<=6")\
    .filter("ano=2009")\
    .show(truncate=False)


+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|HERONDINA PEREIRA    |6  |2009|
|JOAO BOSCO DA FONSECA|3  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
+---------------------+---+----+



In [68]:
df\
    .filter((df.mes <= 6) & (df.ano == 2009))\
    .show(truncate=False)


+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|HERONDINA PEREIRA    |6  |2009|
|JOAO BOSCO DA FONSECA|3  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
+---------------------+---+----+



In [69]:
df\
    .filter(df.mes <= 6)\
    .filter(df.ano==2009)\
    .show(truncate=False)


+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|HERONDINA PEREIRA    |6  |2009|
|JOAO BOSCO DA FONSECA|3  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
+---------------------+---+----+



In [70]:
df = spark.createDataFrame(
    [('RESTAURANTE DO RUI',), ('Juca restaurantes ltda',), ('Joca Restaurante',)], ['data'])
df.toPandas()


Unnamed: 0,data
0,RESTAURANTE DO RUI
1,Juca restaurantes ltda
2,Joca Restaurante


In [71]:
df\
    .where(f.upper(df.data).like('%RESTAURANTE%'))\
    .show(truncate=False)


+----------------------+
|data                  |
+----------------------+
|RESTAURANTE DO RUI    |
|Juca restaurantes ltda|
|Joca Restaurante      |
+----------------------+



In [72]:
df\
    .where(f.upper(df.data).like('RESTAURANTE%'))\
    .show(truncate=False)


+------------------+
|data              |
+------------------+
|RESTAURANTE DO RUI|
+------------------+



In [73]:
empresas\
    .select('razao_social_nome_empresarial', 'natureza_juridica', 'porte_da_empresa', 'capital_social_da_empresa')\
    .filter(f.upper(empresas['razao_social_nome_empresarial']).like('%RESTAURANTE%'))\
    .show(15, False)


+-----------------------------------------------------+-----------------+----------------+-------------------------+
|razao_social_nome_empresarial                        |natureza_juridica|porte_da_empresa|capital_social_da_empresa|
+-----------------------------------------------------+-----------------+----------------+-------------------------+
|PRAIA MAR RESTAURANTE LTDA                           |2062             |5               |20000.0                  |
|MORENO'S RESTAURANTE E LANCHONETE LTDA               |2062             |1               |0.0                      |
|RESTAURANTE E PETISQUERIA MANE CANTO LTDA            |2062             |5               |0.0                      |
|J.C.DE OLIVEIRA RESTAURANTE                          |2135             |5               |0.0                      |
|RESTAURANTE E LANCHONETE NOVAES LTDA                 |2062             |1               |0.0                      |
|RESTAURANTE GALEAO 2420 LTDA                         |2062     

In [74]:
empresas\
    .select('razao_social_nome_empresarial', 'natureza_juridica', 'porte_da_empresa', 'capital_social_da_empresa')\
    .filter(f.upper(empresas['razao_social_nome_empresarial']).like('%JURISDATA%'))\
    .show(15, False)


+-----------------------------------------------+-----------------+----------------+-------------------------+
|razao_social_nome_empresarial                  |natureza_juridica|porte_da_empresa|capital_social_da_empresa|
+-----------------------------------------------+-----------------+----------------+-------------------------+
|JURISDATA JURISPRUDENCIA E INFORMATICA LTDA M E|2062             |5               |0.0                      |
+-----------------------------------------------+-----------------+----------------+-------------------------+



In [75]:
empresas\
    .select('*')\
    .filter(f.upper(empresas['razao_social_nome_empresarial']).like('%JURISDATA%'))\
    .limit(10).toPandas()


Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,31911381,JURISDATA JURISPRUDENCIA E INFORMATICA LTDA M E,2062,49,0.0,5,


In [76]:
data = [
    ('CARMINA RABELO', 4, 2010),
    ('HERONDINA PEREIRA', 6, 2009),
    ('IRANI DOS SANTOS', 12, 2010),
    ('JOAO BOSCO DA FONSECA', 3, 2009),
    ('CARLITO SOUZA', 1, 2010),
    ('WALTER DIAS', 9, 2009),
    ('BRENO VENTUROSO', 1, 2009),
    ('ADELINA TEIXEIRA', 5, 2009),
    ('ELIO SILVA', 7, 2010),
    ('DENIS FONSECA', 6, 2010)
]
colNames = ['nome', 'mes', 'ano']
df = spark.createDataFrame(data, colNames)
df.show(truncate=False)


+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|CARMINA RABELO       |4  |2010|
|HERONDINA PEREIRA    |6  |2009|
|IRANI DOS SANTOS     |12 |2010|
|JOAO BOSCO DA FONSECA|3  |2009|
|CARLITO SOUZA        |1  |2010|
|WALTER DIAS          |9  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
|ELIO SILVA           |7  |2010|
|DENIS FONSECA        |6  |2010|
+---------------------+---+----+



In [77]:
socios\
    .select(f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .where('ano_de_entrada >= 2010')\
    .groupBy('ano_de_entrada')\
    .count()\
    .orderBy('ano_de_entrada', ascending=True)\
    .show()


+--------------+------+
|ano_de_entrada| count|
+--------------+------+
|          2010| 79337|
|          2011| 83906|
|          2012| 80101|
|          2013| 83919|
|          2014| 80590|
|          2015| 80906|
|          2016| 81587|
|          2017| 90221|
|          2018| 99935|
|          2019|118248|
|          2020|125927|
|          2021| 56316|
+--------------+------+



In [78]:
empresas\
    .select('cnpj_basico', 'porte_da_empresa', 'capital_social_da_empresa')\
    .groupBy('porte_da_empresa')\
    .agg(
        f.avg("capital_social_da_empresa").alias("capital_social_medio"),
        f.count("cnpj_basico").alias("frequencia")
    )\
    .orderBy('porte_da_empresa', ascending=True)\
    .show()


+----------------+--------------------+----------+
|porte_da_empresa|capital_social_medio|frequencia|
+----------------+--------------------+----------+
|            null|    8.35421888053467|      5985|
|               1|   339994.5331350705|   3129043|
|               3|  2601001.7677092687|    115151|
|               5|   708660.4208249793|   1335500|
+----------------+--------------------+----------+



In [79]:
empresas\
    .select("capital_social_da_empresa")\
    .summary()\
    .show()
# .summary("count","mean","stddev","min","25%","50%","75%","max")


+-------+-------------------------+
|summary|capital_social_da_empresa|
+-------+-------------------------+
|  count|                  4585679|
|   mean|        503694.5478542674|
| stddev|     2.1118691490537742E8|
|    min|                      0.0|
|    25%|                      0.0|
|    50%|                   1000.0|
|    75%|                   7000.0|
|    max|         3.22014670262E11|
+-------+-------------------------+



In [80]:
data = [
    ('CARLOS', 'MATEMÁTICA', 7),
    ('IVO', 'MATEMÁTICA', 9),
    ('MÁRCIA', 'MATEMÁTICA', 8),
    ('LEILA', 'MATEMÁTICA', 9),
    ('BRENO', 'MATEMÁTICA', 7),
    ('LETÍCIA', 'MATEMÁTICA', 8),
    ('CARLOS', 'FÍSICA', 2),
    ('IVO', 'FÍSICA', 8),
    ('MÁRCIA', 'FÍSICA', 10),
    ('LEILA', 'FÍSICA', 9),
    ('BRENO', 'FÍSICA', 1),
    ('LETÍCIA', 'FÍSICA', 6),
    ('CARLOS', 'QUÍMICA', 10),
    ('IVO', 'QUÍMICA', 8),
    ('MÁRCIA', 'QUÍMICA', 1),
    ('LEILA', 'QUÍMICA', 10),
    ('BRENO', 'QUÍMICA', 7),
    ('LETÍCIA', 'QUÍMICA', 9)
]
colNames = ['nome', 'materia', 'nota']
df = spark.createDataFrame(data, colNames)
df.show()


+-------+----------+----+
|   nome|   materia|nota|
+-------+----------+----+
| CARLOS|MATEMÁTICA|   7|
|    IVO|MATEMÁTICA|   9|
| MÁRCIA|MATEMÁTICA|   8|
|  LEILA|MATEMÁTICA|   9|
|  BRENO|MATEMÁTICA|   7|
|LETÍCIA|MATEMÁTICA|   8|
| CARLOS|    FÍSICA|   2|
|    IVO|    FÍSICA|   8|
| MÁRCIA|    FÍSICA|  10|
|  LEILA|    FÍSICA|   9|
|  BRENO|    FÍSICA|   1|
|LETÍCIA|    FÍSICA|   6|
| CARLOS|   QUÍMICA|  10|
|    IVO|   QUÍMICA|   8|
| MÁRCIA|   QUÍMICA|   1|
|  LEILA|   QUÍMICA|  10|
|  BRENO|   QUÍMICA|   7|
|LETÍCIA|   QUÍMICA|   9|
+-------+----------+----+



In [81]:
df = df.withColumn('status', f.when(
    df.nota >= 7, "APROVADO").otherwise("REPROVADO"))
df.show()


+-------+----------+----+---------+
|   nome|   materia|nota|   status|
+-------+----------+----+---------+
| CARLOS|MATEMÁTICA|   7| APROVADO|
|    IVO|MATEMÁTICA|   9| APROVADO|
| MÁRCIA|MATEMÁTICA|   8| APROVADO|
|  LEILA|MATEMÁTICA|   9| APROVADO|
|  BRENO|MATEMÁTICA|   7| APROVADO|
|LETÍCIA|MATEMÁTICA|   8| APROVADO|
| CARLOS|    FÍSICA|   2|REPROVADO|
|    IVO|    FÍSICA|   8| APROVADO|
| MÁRCIA|    FÍSICA|  10| APROVADO|
|  LEILA|    FÍSICA|   9| APROVADO|
|  BRENO|    FÍSICA|   1|REPROVADO|
|LETÍCIA|    FÍSICA|   6|REPROVADO|
| CARLOS|   QUÍMICA|  10| APROVADO|
|    IVO|   QUÍMICA|   8| APROVADO|
| MÁRCIA|   QUÍMICA|   1|REPROVADO|
|  LEILA|   QUÍMICA|  10| APROVADO|
|  BRENO|   QUÍMICA|   7| APROVADO|
|LETÍCIA|   QUÍMICA|   9| APROVADO|
+-------+----------+----+---------+



In [82]:
data = [
    ('CARLOS', 'MATEMÁTICA', 7),
    ('IVO', 'MATEMÁTICA', 9),
    ('MÁRCIA', 'MATEMÁTICA', 8),
    ('LEILA', 'MATEMÁTICA', 9),
    ('BRENO', 'MATEMÁTICA', 7),
    ('LETÍCIA', 'MATEMÁTICA', 8),
    ('CARLOS', 'FÍSICA', 2),
    ('IVO', 'FÍSICA', 8),
    ('MÁRCIA', 'FÍSICA', 10),
    ('LEILA', 'FÍSICA', 9),
    ('BRENO', 'FÍSICA', 1),
    ('LETÍCIA', 'FÍSICA', 6),
    ('CARLOS', 'QUÍMICA', 10),
    ('IVO', 'QUÍMICA', 8),
    ('MÁRCIA', 'QUÍMICA', 1),
    ('LEILA', 'QUÍMICA', 10),
    ('BRENO', 'QUÍMICA', 7),
    ('LETÍCIA', 'QUÍMICA', 9)
]
colNames = ['nome', 'materia', 'nota']
df = spark.createDataFrame(data, colNames)
df = df.withColumn('status', f.when(
    df.nota >= 7, "APROVADO").otherwise("REPROVADO"))
df.show()


+-------+----------+----+---------+
|   nome|   materia|nota|   status|
+-------+----------+----+---------+
| CARLOS|MATEMÁTICA|   7| APROVADO|
|    IVO|MATEMÁTICA|   9| APROVADO|
| MÁRCIA|MATEMÁTICA|   8| APROVADO|
|  LEILA|MATEMÁTICA|   9| APROVADO|
|  BRENO|MATEMÁTICA|   7| APROVADO|
|LETÍCIA|MATEMÁTICA|   8| APROVADO|
| CARLOS|    FÍSICA|   2|REPROVADO|
|    IVO|    FÍSICA|   8| APROVADO|
| MÁRCIA|    FÍSICA|  10| APROVADO|
|  LEILA|    FÍSICA|   9| APROVADO|
|  BRENO|    FÍSICA|   1|REPROVADO|
|LETÍCIA|    FÍSICA|   6|REPROVADO|
| CARLOS|   QUÍMICA|  10| APROVADO|
|    IVO|   QUÍMICA|   8| APROVADO|
| MÁRCIA|   QUÍMICA|   1|REPROVADO|
|  LEILA|   QUÍMICA|  10| APROVADO|
|  BRENO|   QUÍMICA|   7| APROVADO|
|LETÍCIA|   QUÍMICA|   9| APROVADO|
+-------+----------+----+---------+



In [83]:
df\
    .select('nota')\
    .summary()\
    .show()


+-------+------------------+
|summary|              nota|
+-------+------------------+
|  count|                18|
|   mean| 7.166666666666667|
| stddev|2.9154759474226504|
|    min|                 1|
|    25%|                 7|
|    50%|                 8|
|    75%|                 9|
|    max|                10|
+-------+------------------+



In [84]:
df\
    .groupBy('status')\
    .count()\
    .orderBy('status', ascending=True)\
    .show()


+---------+-----+
|   status|count|
+---------+-----+
| APROVADO|   14|
|REPROVADO|    4|
+---------+-----+



In [85]:
df\
    .select('nota')\
    .describe()\
    .show()


+-------+------------------+
|summary|              nota|
+-------+------------------+
|  count|                18|
|   mean| 7.166666666666667|
| stddev|2.9154759474226504|
|    min|                 1|
|    max|                10|
+-------+------------------+



In [86]:
produtos = spark.createDataFrame(
    [
        ('1', 'Bebidas', 'Água mineral'),
        ('2', 'Limpeza', 'Sabão em pó'),
        ('3', 'Frios', 'Queijo'),
        ('4', 'Bebidas', 'Refrigerante'),
        ('5', 'Pet', 'Ração para cães')
    ],
    ['id', 'cat', 'prod']
)

impostos = spark.createDataFrame(
    [
        ('Bebidas', 0.15),
        ('Limpeza', 0.05),
        ('Frios', 0.065),
        ('Carnes', 0.08)
    ],
    ['cat', 'tax']
)


In [87]:
produtos.toPandas()


Unnamed: 0,id,cat,prod
0,1,Bebidas,Água mineral
1,2,Limpeza,Sabão em pó
2,3,Frios,Queijo
3,4,Bebidas,Refrigerante
4,5,Pet,Ração para cães


In [88]:
impostos.toPandas()


Unnamed: 0,cat,tax
0,Bebidas,0.15
1,Limpeza,0.05
2,Frios,0.065
3,Carnes,0.08


In [89]:
produtos.join(impostos, 'cat', how='inner')\
    .sort('id')\
    .show()


+-------+---+------------+-----+
|    cat| id|        prod|  tax|
+-------+---+------------+-----+
|Bebidas|  1|Água mineral| 0.15|
|Limpeza|  2| Sabão em pó| 0.05|
|  Frios|  3|      Queijo|0.065|
|Bebidas|  4|Refrigerante| 0.15|
+-------+---+------------+-----+



In [90]:
produtos.join(impostos, 'cat', how='left')\
    .sort('id')\
    .show()


+-------+---+---------------+-----+
|    cat| id|           prod|  tax|
+-------+---+---------------+-----+
|Bebidas|  1|   Água mineral| 0.15|
|Limpeza|  2|    Sabão em pó| 0.05|
|  Frios|  3|         Queijo|0.065|
|Bebidas|  4|   Refrigerante| 0.15|
|    Pet|  5|Ração para cães| null|
+-------+---+---------------+-----+



In [91]:
produtos.join(impostos, 'cat', how='right')\
    .sort('id')\
    .show()


+-------+----+------------+-----+
|    cat|  id|        prod|  tax|
+-------+----+------------+-----+
| Carnes|null|        null| 0.08|
|Bebidas|   1|Água mineral| 0.15|
|Limpeza|   2| Sabão em pó| 0.05|
|  Frios|   3|      Queijo|0.065|
|Bebidas|   4|Refrigerante| 0.15|
+-------+----+------------+-----+



In [92]:
produtos.join(impostos, 'cat', how='outer')\
    .sort('id')\
    .show()


+-------+----+---------------+-----+
|    cat|  id|           prod|  tax|
+-------+----+---------------+-----+
| Carnes|null|           null| 0.08|
|Bebidas|   1|   Água mineral| 0.15|
|Limpeza|   2|    Sabão em pó| 0.05|
|  Frios|   3|         Queijo|0.065|
|Bebidas|   4|   Refrigerante| 0.15|
|    Pet|   5|Ração para cães| null|
+-------+----+---------------+-----+



In [93]:
empresas.printSchema()


root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [94]:
socios.printSchema()


root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [95]:
estabelecimentos.printSchema()


root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [96]:
empresas_join = estabelecimentos.join(empresas, 'cnpj_basico', how='inner')
empresas_join.printSchema()


root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [97]:
freq = empresas_join\
    .select(
        'cnpj_basico',
        f.year('data_de_inicio_atividade').alias('data_de_inicio')
    )\
    .where('data_de_inicio >= 2010')\
    .groupBy('data_de_inicio')\
    .agg(f.count("cnpj_basico").alias("frequencia"))\
    .orderBy('data_de_inicio', ascending=True)


In [98]:
freq.toPandas()

Unnamed: 0,data_de_inicio,frequencia
0,2010,154159
1,2011,172677
2,2012,232480
3,2013,198424
4,2014,202276
5,2015,212523
6,2016,265417
7,2017,237292
8,2018,275435
9,2019,325922


In [99]:
freq.union(
    freq.select(
        f.lit('Total').alias('data_de_inicio'),
        f.sum(freq.frequencia).alias('frequencia')
    )
).show()


+--------------+----------+
|data_de_inicio|frequencia|
+--------------+----------+
|          2010|    154159|
|          2011|    172677|
|          2012|    232480|
|          2013|    198424|
|          2014|    202276|
|          2015|    212523|
|          2016|    265417|
|          2017|    237292|
|          2018|    275435|
|          2019|    325922|
|          2020|    400654|
|          2021|    153275|
|         Total|   2830534|
+--------------+----------+



In [100]:
idades = spark.createDataFrame(
    [
        ('CARLOS', 15),
        ('IVO', 14),
        ('MÁRCIA', 16),
        ('LEILA', 17),
        ('LETÍCIA', 14)
    ],
    ['nomes', 'idades']
)

notas = spark.createDataFrame(
    [
        ('CARLOS', 10),
        ('MÁRCIA', 1),
        ('LEILA', 10),
        ('BRENO', 7),
        ('LETÍCIA', 9)
    ],
    ['nomes', 'notas']
)


In [101]:
idades.toPandas()

Unnamed: 0,nomes,idades
0,CARLOS,15
1,IVO,14
2,MÁRCIA,16
3,LEILA,17
4,LETÍCIA,14


In [102]:
notas.toPandas()

Unnamed: 0,nomes,notas
0,CARLOS,10
1,MÁRCIA,1
2,LEILA,10
3,BRENO,7
4,LETÍCIA,9


In [103]:
idades.join(notas, 'nomes', how="outer")\
    .sort('nomes')\
    .show()


+-------+------+-----+
|  nomes|idades|notas|
+-------+------+-----+
|  BRENO|  null|    7|
| CARLOS|    15|   10|
|    IVO|    14| null|
|  LEILA|    17|   10|
|LETÍCIA|    14|    9|
| MÁRCIA|    16|    1|
+-------+------+-----+



In [104]:
empresas.createOrReplaceTempView("empresasView")


In [105]:
spark.sql("SELECT * FROM empresasView").show(5)


+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|       4519|         DANIELA DA SILVA ...|             2135|                         50|                      0.0|               5|                       null|
|       8638|         JOAO DOS SANTOS F...|             2135|                         50|                      0.0|               5|                       null|
|      11748|         PANIFICADORA E CO...|             2062|                         49|                      0.0|               1|                       null|
|      12027|          L G SORVETE

In [106]:
spark\
    .sql("""
        SELECT * 
            FROM empresasView 
            WHERE capital_social_da_empresa = 50
    """)\
    .show(5)


+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|   14715041|         LIDIANE MARIA DO ...|             2135|                         50|                     50.0|               1|                       null|
|   20601885|         CRISTIANO AKIHITO...|             2135|                         50|                     50.0|               1|                       null|
|   23661983|         VITOR ALOISIO DO ...|             2135|                         50|                     50.0|               1|                       null|
|   23714726|         JOSELINA PAN

In [107]:
spark\
    .sql("""
        SELECT porte_da_empresa, MEAN(capital_social_da_empresa) AS Media 
            FROM empresasView 
            GROUP BY porte_da_empresa
    """)\
    .show(5)


+----------------+------------------+
|porte_da_empresa|             Media|
+----------------+------------------+
|            null|  8.35421888053467|
|               1| 339994.5331350705|
|               3|2601001.7677092687|
|               5| 708660.4208249793|
+----------------+------------------+



In [108]:
empresas_join.createOrReplaceTempView("empresasJoinView")


In [109]:
freq = spark\
    .sql("""
        SELECT YEAR(data_de_inicio_atividade) AS data_de_inicio, COUNT(cnpj_basico) AS count
            FROM empresasJoinView 
            WHERE YEAR(data_de_inicio_atividade) >= 2010
            GROUP BY data_de_inicio
            ORDER BY data_de_inicio
    """)

freq\
    .show()


+--------------+------+
|data_de_inicio| count|
+--------------+------+
|          2010|154159|
|          2011|172677|
|          2012|232480|
|          2013|198424|
|          2014|202276|
|          2015|212523|
|          2016|265417|
|          2017|237292|
|          2018|275435|
|          2019|325922|
|          2020|400654|
|          2021|153275|
+--------------+------+



In [110]:
freq.createOrReplaceTempView("freqView")


In [111]:
spark\
    .sql("""
        SELECT *
            FROM freqView
        UNION ALL
        SELECT 'Total' AS data_de_inicio, SUM(count) AS count
            FROM freqView
    """)\
    .show()


+--------------+-------+
|data_de_inicio|  count|
+--------------+-------+
|          2010| 154159|
|          2011| 172677|
|          2012| 232480|
|          2013| 198424|
|          2014| 202276|
|          2015| 212523|
|          2016| 265417|
|          2017| 237292|
|          2018| 275435|
|          2019| 325922|
|          2020| 400654|
|          2021| 153275|
|         Total|2830534|
+--------------+-------+



In [112]:
empresas_join.createOrReplaceTempView("empresasJoinView")

freq = spark\
    .sql("""
        SELECT YEAR(data_de_inicio_atividade) AS data_de_inicio, COUNT(cnpj_basico) AS count
            FROM empresasJoinView 
            WHERE YEAR(data_de_inicio_atividade) >= 2010
            GROUP BY data_de_inicio
            ORDER BY data_de_inicio
    """)

freq\
    .show()


+--------------+------+
|data_de_inicio| count|
+--------------+------+
|          2010|154159|
|          2011|172677|
|          2012|232480|
|          2013|198424|
|          2014|202276|
|          2015|212523|
|          2016|265417|
|          2017|237292|
|          2018|275435|
|          2019|325922|
|          2020|400654|
|          2021|153275|
+--------------+------+



In [113]:
empresas_join\
    .select(f.year(empresas_join.data_de_inicio_atividade).alias('data_de_inicio'))\
    .where("data_de_inicio >= 2010")\
    .groupBy('data_de_inicio')\
    .count()\
    .orderBy('data_de_inicio')\
    .show()


+--------------+------+
|data_de_inicio| count|
+--------------+------+
|          2010|154159|
|          2011|172677|
|          2012|232480|
|          2013|198424|
|          2014|202276|
|          2015|212523|
|          2016|265417|
|          2017|237292|
|          2018|275435|
|          2019|325922|
|          2020|400654|
|          2021|153275|
+--------------+------+



In [116]:
empresas.write.csv(
    path='D:/ONEDRIVE/EDUCACAO/FORMACAO_SPARK_PYTHON_ALURA/Formacao-Spark-Python-Alura/data/processed/empresa/csv',
    mode='overwrite',
    sep=';',
    header=True
)


In [None]:
spark.version

'3.3.0'

In [119]:
empresas2 = spark.read.csv(
    'D:/ONEDRIVE/EDUCACAO/FORMACAO_SPARK_PYTHON_ALURA/Formacao-Spark-Python-Alura/data/processed/empresa/csv',
    sep=';',
    inferSchema=True,
    header=True
)


In [120]:
empresas2.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [121]:
estabelecimentos.write.csv(
    path='D:/ONEDRIVE/EDUCACAO/FORMACAO_SPARK_PYTHON_ALURA/Formacao-Spark-Python-Alura/data/processed/estabelecimentos/csv',
    mode='overwrite',
    sep=';',
    header=True
)


In [122]:
socios.write.csv(
    path='D:/ONEDRIVE/EDUCACAO/FORMACAO_SPARK_PYTHON_ALURA/Formacao-Spark-Python-Alura/data/processed/socios/csv',
    mode='overwrite',
    sep=';',
    header=True
)


In [123]:
empresas.write.parquet(
    path='D:/ONEDRIVE/EDUCACAO/FORMACAO_SPARK_PYTHON_ALURA/Formacao-Spark-Python-Alura/data/processed/empresa/parquet',
    mode='overwrite'
)


In [124]:
empresas_parquet = spark.read.parquet(
    'D:/ONEDRIVE/EDUCACAO/FORMACAO_SPARK_PYTHON_ALURA/Formacao-Spark-Python-Alura/data/processed/empresa/parquet'
)


In [125]:
empresas_parquet.printSchema()


root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [130]:
# F = f
date_cols = [item[0]
             for item in estabelecimentos.dtypes if item[1].startswith('date')]
for date_col in date_cols:
    estabelecimentos = estabelecimentos.withColumn(date_col,
                                                   f.when(
                                                       f.col(
                                                           date_col) <= '1900-01-01',
                                                       f.to_date(f.lit('1900-01-01'), 'yyyy-MM-dd'))
                                                   .otherwise(f.col(date_col)))


In [131]:
spark.conf.set(
    'spark.sql.legacy.parquet.datetimeRebaseModeInRead', 'CORRECTED')


In [132]:
estabelecimentos.write.parquet(
    path='D:/ONEDRIVE/EDUCACAO/FORMACAO_SPARK_PYTHON_ALURA/Formacao-Spark-Python-Alura/data/processed/estabelecimentos/parquet',
    mode='overwrite'
)


In [129]:
socios.write.parquet(
    path='D:/ONEDRIVE/EDUCACAO/FORMACAO_SPARK_PYTHON_ALURA/Formacao-Spark-Python-Alura/data/processed/socios/parquet',
    mode='overwrite'
)


In [134]:
empresas.coalesce(1).write.csv(
    path='D:/ONEDRIVE/EDUCACAO/FORMACAO_SPARK_PYTHON_ALURA/Formacao-Spark-Python-Alura/data/processed/empresas/csv-unico',
    mode='overwrite',
    sep=';',
    header=True
)


In [136]:
empresas.write.parquet(
    path='D:/ONEDRIVE/EDUCACAO/FORMACAO_SPARK_PYTHON_ALURA/Formacao-Spark-Python-Alura/data/processed/empresas/parquet-partitionBy',
    mode='overwrite',
    partitionBy='porte_da_empresa'
)


In [137]:
spark.stop()