# Iniciando estado do notebook
---

In [None]:
# instalar as dependências
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()

# Carregamento de Dados
---

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Iniciando com Spark") \
    .config('spark.ui.port', '4050') \
    .getOrCreate()

In [None]:
data = [('Zeca','35'), ('Eva', '29')]
colNames = ['Nome', 'Idade']
df = spark.createDataFrame(data, colNames)
df

DataFrame[Nome: string, Idade: string]

In [None]:
df.show()

+----+-----+
|Nome|Idade|
+----+-----+
|Zeca|   35|
| Eva|   29|
+----+-----+



### Montando nosso drive

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)
%cd '/content/drive/MyDrive/GRANDES_DADOS'

Mounted at /content/drive
/content/drive/MyDrive/GRANDES_DADOS


In [None]:
empresas_path = '/content/drive/MyDrive/GRANDES_DADOS/empresas/'
empresas = spark.read.csv(empresas_path, sep=';', inferSchema=True)
estab_path = '/content/drive/MyDrive/GRANDES_DADOS/estabelecimentos'
estabelecimentos = spark.read.csv(estab_path, sep=';', inferSchema=True)
socios_path = '/content/drive/MyDrive/GRANDES_DADOS/socios'
socios = spark.read.csv(socios_path, sep=';', inferSchema=True)

# Sanitização dos Dados
---

### Renomeando as colunas do DataFrame

In [None]:
empresasColNames = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']

In [None]:
for index, colName in enumerate(empresasColNames):
    empresas = empresas.withColumnRenamed(f"_c{index}", colName)

In [None]:
estabsColNames = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']

In [None]:
for index, colName in enumerate(estabsColNames):
    estabelecimentos = estabelecimentos.withColumnRenamed(f"_c{index}", colName)

In [None]:
sociosColNames = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']

In [None]:
for index, colName in enumerate(sociosColNames):
    socios = socios.withColumnRenamed(f"_c{index}", colName)

## Modificando os tipos de dados

[Functions](https://spark.apache.org/docs/3.1.2/api/python/reference/pyspark.sql.html#functions)

[withColumn](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.withColumn.html)

### Convertendo String ➔ Double

#### `StringType ➔ DoubleType`

In [None]:
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as f

In [None]:
empresas = empresas.withColumn('capital_social_da_empresa', f.regexp_replace('capital_social_da_empresa', ',', '.'))
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0.0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0.0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0.0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0.0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,4000.0,1,


In [None]:
empresas = empresas.withColumn('capital_social_da_empresa', empresas['capital_social_da_empresa'].cast(DoubleType()))
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0.0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0.0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0.0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0.0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,4000.0,1,


In [None]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



### Convertendo String ➔ Date

#### `StringType ➔ DateType`

[Datetime Patterns](https://spark.apache.org/docs/3.1.2/sql-ref-datetime-pattern.html)

In [None]:
estabelecimentos = estabelecimentos\
    .withColumn(
        "data_situacao_cadastral",
        f.to_date(estabelecimentos.data_situacao_cadastral.cast(StringType()), 'yyyyMMdd')
    )\
    .withColumn(
        "data_de_inicio_atividade",
        f.to_date(estabelecimentos.data_de_inicio_atividade.cast(StringType()), 'yyyyMMdd')
    )\
    .withColumn(
        "data_da_situacao_especial",
        f.to_date(estabelecimentos.data_da_situacao_especial.cast(StringType()), 'yyyyMMdd')
    )

In [None]:
estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,1879,1,96,1,PIRAMIDE M. C.,8,2001-10-29,1,,,...,7107,,,,,,,,,
1,2818,1,43,1,,8,2008-12-31,71,,,...,7107,,,,,,,,,
2,3110,1,7,1,,8,1997-12-31,1,,,...,7107,,,,,,,,,
3,3733,1,80,1,,8,2008-12-31,71,,,...,7107,,,,,,,,,
4,4628,3,27,2,EMBROIDERY & GIFT,8,1998-04-29,1,,,...,7075,,,,,,,,,


In [None]:
socios = socios\
    .withColumn(
        "data_de_entrada_sociedade",
        f.to_date(socios.data_de_entrada_sociedade.cast(StringType()), 'yyyyMMdd')
    )

In [None]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,,***000000**,,0,8


# Seleções e consultas
---

## Selecionando informações

[DataFrame.select(*cols)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.select.html)

In [None]:
empresas\
    .select('*')\
    .show(10, truncate=False)

+-----------+--------------------------------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial                                                               |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+--------------------------------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|306        |FRANCAMAR REFRIGERACAO TECNICA S/C LTDA                                                     |2240             |49                         |0.0                      |1               |null                       |
|1355       |BRASILEIRO & OLIVEIRA LTDA                                                                 

In [None]:
socios.select(f.year('data_de_entrada_sociedade')\
              .alias('ano_de_entrada')).show()

+--------------+
|ano_de_entrada|
+--------------+
|          1994|
|          1994|
|          1994|
|          1994|
|          1994|
|          1994|
|          1997|
|          2009|
|          1994|
|          1994|
|          1994|
|          1994|
|          1994|
|          1996|
|          1996|
|          1994|
|          1994|
|          1994|
|          2009|
|          1982|
+--------------+
only showing top 20 rows



In [None]:
empresas\
    .select('natureza_juridica', 'capital_social_da_empresa')\
    .show(5)

+-----------------+-------------------------+
|natureza_juridica|capital_social_da_empresa|
+-----------------+-------------------------+
|             2240|                      0.0|
|             2062|                      0.0|
|             3034|                      0.0|
|             2135|                      0.0|
|             2062|                   4000.0|
+-----------------+-------------------------+
only showing top 5 rows



In [None]:
socios\
    .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .show(5, truncate=False)

+-------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social  |faixa_etaria|ano_de_entrada|
+-------------------------------+------------+--------------+
|LILIANA PATRICIA GUASTAVINO    |7           |1994          |
|CRISTINA HUNDERTMARK           |7           |1994          |
|CELSO EDUARDO DE CASTRO STEPHAN|8           |1994          |
|EDUARDO BERRINGER STEPHAN      |5           |1994          |
|HANNE MAHFOUD FADEL            |8           |1994          |
+-------------------------------+------------+--------------+
only showing top 5 rows



## Faça como eu fiz

In [None]:
estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [None]:
visao_data_atv = estabelecimentos.select('nome_fantasia',
                        'municipio',
                        'data_de_inicio_atividade',
                        f.year('data_de_inicio_atividade').alias('ano_de_inicio_atividade'),
                        f.month('data_de_inicio_atividade').alias('mes_de_inicio_atividade'))
# select() # Seleções no DataFrame de entrada
# f.year() # Coleta ano de uma data
# f.month() # Coleta mes de uma data
# .alias() # Renomeação de coluna

In [None]:
visao_data_atv.select('municipio').show()

+---------+
|municipio|
+---------+
|     7107|
|     7107|
|     7107|
|     7107|
|     7075|
|     7075|
|     7075|
|     7075|
|     7107|
|     7107|
|     6163|
|     7107|
|     6117|
|     6607|
|     7079|
|     7107|
|     7107|
|     6001|
|     7145|
|     9295|
+---------+
only showing top 20 rows



## Identificando valores nulos

In [None]:
df = spark.createDataFrame([(1,), (2,), (3,), (None,)], ['data'])
df.toPandas()

Unnamed: 0,data
0,1.0
1,2.0
2,3.0
3,


In [None]:
df.show()

+----+
|data|
+----+
|   1|
|   2|
|   3|
|null|
+----+



In [None]:
df = spark.createDataFrame([(1.,), (2.,), (3.,), (float('nan'),)], ['data'])
df.toPandas()

Unnamed: 0,data
0,1.0
1,2.0
2,3.0
3,


In [None]:
df.show()

+----+
|data|
+----+
| 1.0|
| 2.0|
| 3.0|
| NaN|
+----+



In [None]:
df = spark.createDataFrame([('1',), ('2',), ('3',), (None,)], ['data'])
df.toPandas()

Unnamed: 0,data
0,1.0
1,2.0
2,3.0
3,


In [None]:
df.show()

+----+
|data|
+----+
|   1|
|   2|
|   3|
|null|
+----+



In [None]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,,***000000**,,0,8


In [None]:
socios.limit(5).show()

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|        411|                     2|         LILIANA PATRICIA ...|         ***678188**|                   22|               1994-07-25|null|        ***000000**|                 null|                                  0|           7|
|        411|                     2|         CRISTINA HUNDERTMARK|      

In [None]:
# Abrindo a list comprenhension
# lista = [f.count(f.when(f.isnull(c), True)).alias(c) for c in socios.columns]

lista = []
for c in socios.columns:
  lista.append(f.count(f.when(f.isnull(c), 1)).alias(c))
lista

[Column<'count(CASE WHEN (cnpj_basico IS NULL) THEN 1 END) AS `cnpj_basico`'>,
 Column<'count(CASE WHEN (identificador_de_socio IS NULL) THEN 1 END) AS `identificador_de_socio`'>,
 Column<'count(CASE WHEN (nome_do_socio_ou_razao_social IS NULL) THEN 1 END) AS `nome_do_socio_ou_razao_social`'>,
 Column<'count(CASE WHEN (cnpj_ou_cpf_do_socio IS NULL) THEN 1 END) AS `cnpj_ou_cpf_do_socio`'>,
 Column<'count(CASE WHEN (qualificacao_do_socio IS NULL) THEN 1 END) AS `qualificacao_do_socio`'>,
 Column<'count(CASE WHEN (data_de_entrada_sociedade IS NULL) THEN 1 END) AS `data_de_entrada_sociedade`'>,
 Column<'count(CASE WHEN (pais IS NULL) THEN 1 END) AS `pais`'>,
 Column<'count(CASE WHEN (representante_legal IS NULL) THEN 1 END) AS `representante_legal`'>,
 Column<'count(CASE WHEN (nome_do_representante IS NULL) THEN 1 END) AS `nome_do_representante`'>,
 Column<'count(CASE WHEN (qualificacao_do_representante_legal IS NULL) THEN 1 END) AS `qualificacao_do_representante_legal`'>,
 Column<'count(C

In [None]:
lista_de_colunas = ['data_de_entrada_sociedade', 'data_de_entrada_sociedade']
socios.select([f.year(c).alias(c) for c in lista_de_colunas]).show()
# socios.select(lista).show()

+-------------------------+-------------------------+
|data_de_entrada_sociedade|data_de_entrada_sociedade|
+-------------------------+-------------------------+
|                     1994|                     1994|
|                     1994|                     1994|
|                     1994|                     1994|
|                     1994|                     1994|
|                     1994|                     1994|
|                     1994|                     1994|
|                     1997|                     1997|
|                     2009|                     2009|
|                     1994|                     1994|
|                     1994|                     1994|
|                     1994|                     1994|
|                     1994|                     1994|
|                     1994|                     1994|
|                     1996|                     1996|
|                     1996|                     1996|
|                     1994| 

In [None]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,,***000000**,,0,8


In [None]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [None]:
socios.na.fill(0).orderBy('pais', ascending=False).show(truncate=False)

+-----------+----------------------+---------------------------------------------+--------------------+---------------------+-------------------------+----+-------------------+------------------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social                |cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais|representante_legal|nome_do_representante         |qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+---------------------------------------------+--------------------+---------------------+-------------------------+----+-------------------+------------------------------+-----------------------------------+------------+
|26764428   |1                     |KINOX INVESTMENTS LTD.                       |33423049000106      |37                   |2020-07-31               |866 |***468087**        |ROBERTO DUQUE ESTRADA DE SOUSA|17                    

In [None]:
socios.na.fill('-').show()

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|        411|                     2|         LILIANA PATRICIA ...|         ***678188**|                   22|               1994-07-25|null|        ***000000**|                    -|                                  0|           7|
|        411|                     2|         CRISTINA HUNDERTMARK|      

## Ordenando os dados

[DataFrame.orderBy(*cols, **kwargs)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.orderBy.html)

In [None]:
socios\
    .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .orderBy('ano_de_entrada', ascending=False)\
    .show(5, False)

+-----------------------------+------------+--------------+
|nome_do_socio_ou_razao_social|faixa_etaria|ano_de_entrada|
+-----------------------------+------------+--------------+
|JOSE HUMBERTO PAIVA          |6           |2021          |
|BENILDES BARBOSA RODRIGUES   |8           |2021          |
|MARCELO MOCELIN              |5           |2021          |
|ROBERTA BENELLI              |4           |2021          |
|EDUARDO DE ANDRADE PEIXOTO   |5           |2021          |
+-----------------------------+------------+--------------+
only showing top 5 rows



In [None]:
socios\
    .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .orderBy(['ano_de_entrada', 'faixa_etaria'], ascending=[False, False])\
    .show(10, False)

+---------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social    |faixa_etaria|ano_de_entrada|
+---------------------------------+------------+--------------+
|ANTONIO TAVARES DE ANDRADE       |9           |2021          |
|ANNA MARIA TELLES FERREIRA SANTOS|9           |2021          |
|ANTONIA DE SOUSA VIEIRA          |9           |2021          |
|AURA MARIA DE ANDRADE            |9           |2021          |
|SONIA MARQUES SAMAJA             |9           |2021          |
|CARLOS ERANE DE AGUIAR           |9           |2021          |
|MATILDE CONCEICAO DE JESUS       |9           |2021          |
|MANUEL TAVARES DE SOUSA          |9           |2021          |
|ALBERTO DE BANDOS MENDES         |9           |2021          |
|EMERSON AZEVEDO                  |9           |2021          |
+---------------------------------+------------+--------------+
only showing top 10 rows



## Filtrando os dados

[DataFrame.where(condition)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.where.html) ou [DataFrame.filter(condition)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.filter.html)

In [None]:
empresas\
    .where("capital_social_da_empresa>1000000000").show(truncate=False)

+-----------+------------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial                                           |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+------------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|6352117    |REBOUCAS E CIA LTDA.                                                    |2062             |49                         |2.9202702116E10          |3               |null                       |
|21199157   |TECHNO-CELLS INDUSTRIA DE SEMICONDUTORES SOLARES ES LTDA.               |2062             |5                          |3.0159349E9              |5               |null     

In [None]:
socios\
    .select("nome_do_socio_ou_razao_social").show(truncate=False)

+-------------------------------+
|nome_do_socio_ou_razao_social  |
+-------------------------------+
|LILIANA PATRICIA GUASTAVINO    |
|CRISTINA HUNDERTMARK           |
|CELSO EDUARDO DE CASTRO STEPHAN|
|EDUARDO BERRINGER STEPHAN      |
|HANNE MAHFOUD FADEL            |
|CLOD ASSAD FADEL               |
|WALKYRIA ALGARVES              |
|SEBASTIAO JADIR TEIXEIRA NUNES |
|JOSE JOAO ADAMO                |
|ROSEMARY CANTUARIA AFONSO ADAMO|
|MARCOS AURELIO MOTTA           |
|EDVAN CANDIDO ALENCAR          |
|JAIME MOURE COLINO             |
|SANDRA APARECIDA CRUZ LEONE    |
|LEIDA MARQUES PEREIRA VICENTE  |
|DONZILIA FERREIRA REGO         |
|ISRAEL AMERICANO REGO          |
|VALERIA RIBEIRO TEIXEIRA       |
|CARLOS RAFAEL DO LIVRAMENTO    |
|IVAN MARQUEZ DE MOURA          |
+-------------------------------+
only showing top 20 rows



In [None]:
socios\
    .select("nome_do_socio_ou_razao_social")\
    .where(socios.nome_do_socio_ou_razao_social.startswith("Lucas".upper()))\
    .where(socios.nome_do_socio_ou_razao_social.endswith("dos Santos".upper()))\
    .limit(10)\
    .toPandas()

Unnamed: 0,nome_do_socio_ou_razao_social
0,LUCAS PEREIRA DOS SANTOS
1,LUCAS RODRIGUES DOS SANTOS
2,LUCAS DA SILVA PRUDENTE DOS SANTOS
3,LUCAS MENDES DOS SANTOS
4,LUCAS AURELIO ARAUJO DOS SANTOS
5,LUCAS BERGAMIM DOS SANTOS
6,LUCAS BORGES DOS SANTOS
7,LUCAS ROGERIO FAZAN DOS SANTOS
8,LUCAS PEREIRA DOS SANTOS
9,LUCAS WESLEY SILVA DOS SANTOS


# Atividade 2



In [None]:
from pyspark.sql import functions as f
from pyspark.sql.types import StringType, DoubleType, DateType

In [None]:
porte = empresas.sort("porte_da_empresa", ascending=False)
porte.limit(10).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,25644,JOSE LUIZ ARAO,2135,50,0.0,5,
1,85894,AGROPECUARIA E INDUSTRIA MOURA LTDA,2062,49,0.0,5,
2,27848,EZERTAM INDUSTRIA E COMERCIO DE CONFECCAO LTDA,2062,49,0.0,5,
3,12610,MAQTALIA PRESTACAO DE SERVICOS LTDA,2062,49,0.0,5,
4,28630,INDUSTRIA E COMERCIO DE FUNDOS DA ALTA NOROEST...,2062,49,0.0,5,
5,18744,JOAQUIM ANDRADE BAURU,2135,50,0.0,5,
6,34811,CONDOMINIO EDIFICIO TAPAJOS,3085,19,0.0,5,
7,87165,WILSON F ERDMANN,2135,50,0.0,5,
8,62049,AILTON NUNES,2135,50,0.0,5,
9,411,PEDREIRA SC LTDA,2240,28,0.0,5,


In [None]:
faixa_etaria_socios = socios.approxQuantile("faixa_etaria", [0.25, 0.5, 0.75], 0.1)
faixa_etaria_socios

[5.0, 6.0, 7.0]

# Atividade 3

Considerando como entrada os DataFrames `socios` e `empresas`.

1. Criar um DataFrame contendo as empresas que possuem sócios que ingressaram a mais de 50 anos na empresa.
DICA: utilizar select() e where().

2. A partir do DataFrame do passo 1, crie uma lista contendo o cnpj das empresas com sócios antigos.
DICA: utilize select() seguido da operação collect() para extrair uma lista contendo os cnpj.

Solução esperada
>>> [Row(cnpj_basico=1414), Row(cnpj_basico=2323), Row(cnpj_basico=1616), ...]

3. Transforme a lista do passo 2 em uma lista contendo apenas o número do cnpj em cada elemento.

Solução esperada
>>> [1414,2323,1616,...]

4. Filtrar do DataFrame `empresas` apenas as empresas com os cnpj da lista do passo 3.  
Dica: utilize a operação `isin()` dentro do `where()` de forma que a lista de cnpj sirva como filtro.  

5. Verificar o capital social da empresa das empresas que possuem os sócios antigos.

6. Devolver dois Dataframes
  - membros_antigos: contendo uma coluna com os nomes dos socios mais antigos
  - capital_empresas_membros_antigos: contendo todas as colunas de `empresas` com a filtragem aplicada


In [None]:
socios_50anos = socios\
.select('cnpj_basico','nome_do_socio_ou_razao_social',f.year('data_de_entrada_sociedade').alias('ano_entrada'))\
.where('ano_entrada<=1973')

In [None]:
var = socios_50anos.select('cnpj_basico').collect()

In [None]:
lista_cnpjs = []
for linha in var:
  lista_cnpjs.append(linha.cnpj_basico)

print(lista_cnpjs, len(lista_cnpjs))

[7256928, 7256928, 7923816, 7923816, 17418401, 28676161, 28854982, 32508681, 33342890, 33863218, 42275818, 46105938, 46105938, 57565186, 57565186, 62799291, 62799291, 75806703, 75806703, 79115416, 82983115, 88337365, 88644851, 88644851, 96201868, 96201868, 97445415, 14269437, 14269437, 16242562, 16242562, 16800831, 16800831, 19213172, 19213172, 21528047, 30096374, 30096374, 33933870, 43913912, 60812963, 60812963, 62986898, 76621499, 78356045, 9428541, 9428541, 9428541, 17384553, 17384553, 17384553, 23640196, 23640196, 23640196, 23640196, 23640196, 23640196, 33016825, 33216565, 33216565, 33216565, 33437492, 33437492, 44179992, 44217354, 45185352, 45185352, 45421690, 45518990, 45518990, 49042732, 49042732, 58156563, 62091897, 62091897, 76599901, 76599901, 76730076, 76730076, 87866273, 92711134, 16671604, 16671604, 18161232, 21993068, 24986481, 29884947, 33992298, 34267047, 34290023, 44497584, 50945765, 50945765, 60943826, 61444246, 75938860, 79118758, 80562739, 87155354, 96306683, 172024

In [None]:
#Compressão de listas (list comprehension)
lista_cnpjs = [linha.cnpj_basico for linha in var]

In [None]:
empresas_filtrado_lista_cnpjs = empresas[empresas.cnpj_basico.isin(lista_cnpjs)]

In [None]:
empresas_filtrado_lista_cnpjs

DataFrame[cnpj_basico: int, razao_social_nome_empresarial: string, natureza_juridica: int, qualificacao_do_responsavel: int, capital_social_da_empresa: double, porte_da_empresa: int, ente_federativo_responsavel: string]

In [None]:
capital_empresas_membros_antigos = empresas_filtrado_lista_cnpjs.select('capital_social_da_empresa')

In [None]:
membros_antigos = socios_50anos.select('nome_do_socio_ou_razao_social')

In [None]:
capital_empresas_membros_antigos.show(50,truncate=False)

+-------------------------+
|capital_social_da_empresa|
+-------------------------+
|0.0                      |
|0.0                      |
|0.0                      |
|100000.0                 |
|0.0                      |
|5300000.0                |
|4100000.0                |
|0.0                      |
|0.0                      |
|0.0                      |
|0.0                      |
|0.0                      |
|0.0                      |
|0.0                      |
|0.0                      |
|0.0                      |
|1000.0                   |
|0.0                      |
|0.0                      |
|0.0                      |
|0.0                      |
|0.0                      |
|50000.0                  |
|0.0                      |
|100000.0                 |
|0.0                      |
|0.0                      |
|0.0                      |
|6000.0                   |
|140000.0                 |
|0.0                      |
|0.0                      |
|0.0                

In [None]:
empresas.filter(f.upper(empresas['razao_social_nome_empresarial']).like('%LAN HOUSE%')).count()

307

#Agregação

In [None]:
capital_empresas_membros_antigos.agg(f.max(capital_empresas_membros_antigos['capital_social_da_empresa'])\
                                      .alias('capital_maximo')).show(5)

+---------------+
| capital_maximo|
+---------------+
|2.74294810565E9|
+---------------+



In [None]:
empresas.groupBy(empresas.porte_da_empresa.alias('porte'))\
.avg('capital_social_da_empresa').show(5)

+-----+------------------------------+
|porte|avg(capital_social_da_empresa)|
+-----+------------------------------+
| null|              8.35421888053467|
|    1|            339994.53313506936|
|    3|            2601001.7677092673|
|    5|             708660.4208249798|
+-----+------------------------------+



#Atividade Sala

In [None]:
!pip install --upgrade pandas

Collecting pandas
  Downloading pandas-2.1.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (12.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.3/12.3 MB[0m [31m36.2 MB/s[0m eta [36m0:00:00[0m
Collecting tzdata>=2022.1 (from pandas)
  Downloading tzdata-2023.3-py2.py3-none-any.whl (341 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m341.8/341.8 kB[0m [31m24.7 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: tzdata, pandas
  Attempting uninstall: pandas
    Found existing installation: pandas 1.5.3
    Uninstalling pandas-1.5.3:
      Successfully uninstalled pandas-1.5.3
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
lida 0.0.10 requires fastapi, which is not installed.
lida 0.0.10 requires kaleido, which is not installed.
lida 0.0.10 requires python-multipart, which is not instal

In [None]:
import pandas as pd

data = [
    ('CARLOS', 'MATEMÁTICA', 7),
    ('IVO', 'MATEMÁTICA', 9),
    ('MÁRCIA', 'MATEMÁTICA', 8),
    ('LEILA', 'MATEMÁTICA', 9),
    ('BRENO', 'MATEMÁTICA', 7),
    ('LETÍCIA', 'MATEMÁTICA', 8),
    ('CARLOS', 'FÍSICA', 2),
    ('IVO', 'FÍSICA', 8),
    ('MÁRCIA', 'FÍSICA', 10),
    ('LEILA', 'FÍSICA', 9),
    ('BRENO', 'FÍSICA', 1),
    ('LETÍCIA', 'FÍSICA', 6),
    ('CARLOS', 'QUÍMICA', 10),
    ('IVO', 'QUÍMICA', 8),
    ('MÁRCIA', 'QUÍMICA', 1),
    ('LEILA', 'QUÍMICA', 10),
    ('BRENO', 'QUÍMICA', 7),
    ('LETÍCIA', 'QUÍMICA', 9)
]
colNames = ['nome', 'materia', 'nota']
df = spark.createDataFrame(data, colNames)
df = df.withColumn('status', f.when(df.nota >= 7, "APROVADO").otherwise("REPROVADO"))
df.show()

+-------+----------+----+---------+
|   nome|   materia|nota|   status|
+-------+----------+----+---------+
| CARLOS|MATEMÁTICA|   7| APROVADO|
|    IVO|MATEMÁTICA|   9| APROVADO|
| MÁRCIA|MATEMÁTICA|   8| APROVADO|
|  LEILA|MATEMÁTICA|   9| APROVADO|
|  BRENO|MATEMÁTICA|   7| APROVADO|
|LETÍCIA|MATEMÁTICA|   8| APROVADO|
| CARLOS|    FÍSICA|   2|REPROVADO|
|    IVO|    FÍSICA|   8| APROVADO|
| MÁRCIA|    FÍSICA|  10| APROVADO|
|  LEILA|    FÍSICA|   9| APROVADO|
|  BRENO|    FÍSICA|   1|REPROVADO|
|LETÍCIA|    FÍSICA|   6|REPROVADO|
| CARLOS|   QUÍMICA|  10| APROVADO|
|    IVO|   QUÍMICA|   8| APROVADO|
| MÁRCIA|   QUÍMICA|   1|REPROVADO|
|  LEILA|   QUÍMICA|  10| APROVADO|
|  BRENO|   QUÍMICA|   7| APROVADO|
|LETÍCIA|   QUÍMICA|   9| APROVADO|
+-------+----------+----+---------+



In [None]:
df = df.toPandas()
# Sumarização com todas as estatísticas
summary_df = df.describe(include='all')

# Exibindo o DataFrame de sumarização
print("Sumarização:")
print(summary_df)

Sumarização:
          nome     materia       nota    status
count       18          18  18.000000        18
unique       6           3        NaN         2
top     CARLOS  MATEMÁTICA        NaN  APROVADO
freq         3           6        NaN        14
mean       NaN         NaN   7.166667       NaN
std        NaN         NaN   2.915476       NaN
min        NaN         NaN   1.000000       NaN
25%        NaN         NaN   7.000000       NaN
50%        NaN         NaN   8.000000       NaN
75%        NaN         NaN   9.000000       NaN
max        NaN         NaN  10.000000       NaN


In [None]:
# Contagem de alunos aprovados e reprovados
count_df = df['status'].value_counts().reset_index()
count_df.columns = ['status', 'count']

# Exibindo o DataFrame de contagem
print("\nContagem de Alunos APROVADOS e REPROVADOS:")
print(count_df)


Contagem de Alunos APROVADOS e REPROVADOS:
      status  count
0   APROVADO     14
1  REPROVADO      4


#Atividade 4

In [None]:
from pyspark.sql import functions as f
from pyspark.sql.window import Window

# Passo 0: Separar empresas com e sem capital social
empresas_condicao = f.when(empresas["capital_social_da_empresa"] == 0, "empresas sem capital social").otherwise("empresas com capital social")
empresas_separadas = empresas.withColumn("grupo", empresas_condicao)

# Passo 1: Separar empresas com capital social
empresas_com_capital = empresas_separadas.filter("grupo = 'empresas com capital social'")

# Passo 2: Ordenar pelo capital social e adicionar índices
windowSpec = Window.orderBy("capital_social")
empresas_capital_ordenado_idx = empresas_com_capital.withColumn("idx", f.monotonically_increasing_id())

# Passo 3: Identificar os índices dos grupos percentis
percentis = empresas_capital_ordenado_idx.approxQuantile("capital_social_da_empresa", [0.25, 0.5, 0.75], 0.0)

# Adicionando os índices de percentis aos DataFrames
empresas_capital_ordenado_idx = empresas_capital_ordenado_idx.withColumn(
    "grupo",
    f.when(empresas_capital_ordenado_idx["capital_social_da_empresa"] <= percentis[0], "grupo1")
    .when(empresas_capital_ordenado_idx["capital_social_da_empresa"] <= percentis[1], "grupo2")
    .when(empresas_capital_ordenado_idx["capital_social_da_empresa"] <= percentis[2], "grupo3")
    .otherwise("grupo4")
)

# Passo 4: Sumarizar os grupos
sumario_capital_empresas = (
    empresas_capital_ordenado_idx.groupBy("grupo")
    .agg(
        f.avg("capital_social_da_empresa").alias("capital_medio"),
        f.count("capital_social_da_empresa").alias("numero_de_empresas")
    )
)

# Exibindo os resultados
sumario_capital_empresas.show()

+------+------------------+------------------+
| grupo|     capital_medio|numero_de_empresas|
+------+------------------+------------------+
|grupo3| 10395.86204163233|            467185|
|grupo1| 482.8745954591196|            935369|
|grupo4|3181855.6211296236|            723355|
|grupo2| 3722.786498275767|            768747|
+------+------------------+------------------+



#Atividade 5

In [None]:
empresas.limit(15).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0.0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0.0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0.0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0.0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,4000.0,1,
5,8416,ELETRICA RUBI LTDA,2062,49,0.0,5,
6,8992,SHIROMA VEICULOS LTDA.,2062,49,0.0,5,
7,9091,CONTATOS BAR E LANCHONETE LTDA,2062,49,0.0,5,
8,9614,ANTONIA APARECIDA DE SOUZA ULIANA,2135,50,0.0,5,
9,9896,DORACY CORAT DA COSTA,2135,50,0.0,5,


In [None]:
import pandas as pd

# Supondo que você já tenha o DataFrame empresas
# Substitua 'natureza_juridica' e 'capital_social_da_empresa' pelos nomes reais das colunas em seu DataFrame

# Crie o DataFrame empresas_por_setor
setor1 = empresas[empresas['razao_social_nome_empresarial'].like('%COMERCIO%')]
setor2 = empresas[empresas['razao_social_nome_empresarial'].like('%LANCHONETE%')]
setor3 = empresas[empresas['razao_social_nome_empresarial'].like('%ELETRICA%')]

# Crie um DataFrame com as informações desejadas
empresas_por_setor = pd.DataFrame(columns=['setor', 'capital_medio', 'numero_de_empresas', 'capital_total_do_grupo'])

In [None]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, sum, countDistinct

# Assuming setor1, setor2, setor3 are PySpark DataFrames
setores = [setor1, setor2, setor3]

# Create an empty DataFrame with the desired schema
empresas_por_setor = pd.DataFrame(columns=['setor', 'capital_medio', 'numero_de_empresas', 'capital_total_do_grupo'])

for i, setor_df in enumerate(setores, start=1):
    setor_nome = f'setor{i}'

    # Calculate average, count distinct, and sum using agg
    agg_result = setor_df.agg(
        avg('capital_social_da_empresa').alias('capital_medio'),
        countDistinct('qualificacao_do_responsavel').alias('numero_de_empresas'),
        sum('capital_social_da_empresa').alias('capital_total_do_grupo')
    ).collect()[0]

    row = pd.Series({'setor': setor_nome, 'capital_medio': agg_result['capital_medio'], 'numero_de_empresas': agg_result['numero_de_empresas'], 'capital_total_do_grupo': agg_result['capital_total_do_grupo']})
    empresas_por_setor = pd.concat([empresas_por_setor, pd.DataFrame([row])], ignore_index=True)

# Visualize the DataFrame empresas_por_setor
empresas_por_setor


Unnamed: 0,setor,capital_medio,numero_de_empresas,capital_total_do_grupo
0,setor1,252356.4,22,49094920000.0
1,setor2,16490.46,10,242063400.0
2,setor3,1133800.0,12,6759714000.0
