# Iniciando estado do notebook
---

In [1]:
# instalar as dependências
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [3]:
import findspark
findspark.init()

# Carregamento de Dados
---

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Iniciando com Spark") \
    .config('spark.ui.port', '4050') \
    .getOrCreate()

In [None]:
data = [('Zeca','35'), ('Eva', '29')]
colNames = ['Nome', 'Idade']
df = spark.createDataFrame(data, colNames)
df

DataFrame[Nome: string, Idade: string]

In [None]:
df.show()

+----+-----+
|Nome|Idade|
+----+-----+
|Zeca|   35|
| Eva|   29|
+----+-----+



### Montando nosso drive

In [5]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [6]:
empresas_path = '/content/drive/MyDrive/Disciplinas/FundamentosDeBigData/modulo-pyspark/empresas'
empresas = spark.read.csv(empresas_path, sep=';', inferSchema=True)
estab_path = '/content/drive/MyDrive/Disciplinas/FundamentosDeBigData/modulo-pyspark/estabelecimentos'
estabelecimentos = spark.read.csv(estab_path, sep=';', inferSchema=True)
socios_path = '/content/drive/MyDrive/Disciplinas/FundamentosDeBigData/modulo-pyspark/socios'
socios = spark.read.csv(socios_path, sep=';', inferSchema=True)

# Sanitização dos Dados
---

### Renomeando as colunas do DataFrame

In [7]:
empresasColNames = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']

In [8]:
for index, colName in enumerate(empresasColNames):
    empresas = empresas.withColumnRenamed(f"_c{index}", colName)

In [9]:
estabsColNames = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']

In [10]:
for index, colName in enumerate(estabsColNames):
    estabelecimentos = estabelecimentos.withColumnRenamed(f"_c{index}", colName)

In [11]:
sociosColNames = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']

In [12]:
for index, colName in enumerate(sociosColNames):
    socios = socios.withColumnRenamed(f"_c{index}", colName)

## Modificando os tipos de dados

[Functions](https://spark.apache.org/docs/3.1.2/api/python/reference/pyspark.sql.html#functions)

[withColumn](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.withColumn.html)

### Convertendo String ➔ Double

#### `StringType ➔ DoubleType`

In [13]:
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as f

In [14]:
empresas = empresas.withColumn('capital_social_da_empresa', f.regexp_replace('capital_social_da_empresa', ',', '.'))
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0.0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0.0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0.0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0.0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,4000.0,1,


In [15]:
empresas = empresas.withColumn('capital_social_da_empresa', empresas['capital_social_da_empresa'].cast(DoubleType()))
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0.0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0.0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0.0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0.0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,4000.0,1,


In [None]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



### Convertendo String ➔ Date

#### `StringType ➔ DateType`

[Datetime Patterns](https://spark.apache.org/docs/3.1.2/sql-ref-datetime-pattern.html)

In [16]:
estabelecimentos = estabelecimentos\
    .withColumn(
        "data_situacao_cadastral",
        f.to_date(estabelecimentos.data_situacao_cadastral.cast(StringType()), 'yyyyMMdd')
    )\
    .withColumn(
        "data_de_inicio_atividade",
        f.to_date(estabelecimentos.data_de_inicio_atividade.cast(StringType()), 'yyyyMMdd')
    )\
    .withColumn(
        "data_da_situacao_especial",
        f.to_date(estabelecimentos.data_da_situacao_especial.cast(StringType()), 'yyyyMMdd')
    )

In [None]:
estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,1879,1,96,1,PIRAMIDE M. C.,8,2001-10-29,1,,,...,7107,,,,,,,,,
1,2818,1,43,1,,8,2008-12-31,71,,,...,7107,,,,,,,,,
2,3110,1,7,1,,8,1997-12-31,1,,,...,7107,,,,,,,,,
3,3733,1,80,1,,8,2008-12-31,71,,,...,7107,,,,,,,,,
4,4628,3,27,2,EMBROIDERY & GIFT,8,1998-04-29,1,,,...,7075,,,,,,,,,


In [17]:
socios = socios\
    .withColumn(
        "data_de_entrada_sociedade",
        f.to_date(socios.data_de_entrada_sociedade.cast(StringType()), 'yyyyMMdd')
    )

In [None]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,,***000000**,,0,8


# Seleções e consultas
---

## Selecionando informações

[DataFrame.select(*cols)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.select.html)

In [None]:
empresas\
    .select('*')\
    .show(10, truncate=False)

+-----------+--------------------------------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial                                                               |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+--------------------------------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|306        |FRANCAMAR REFRIGERACAO TECNICA S/C LTDA                                                     |2240             |49                         |0.0                      |1               |null                       |
|1355       |BRASILEIRO & OLIVEIRA LTDA                                                                 

In [None]:
socios.select(f.year('data_de_entrada_sociedade')\
              .alias('ano_de_entrada')).show()

+--------------+
|ano_de_entrada|
+--------------+
|          1994|
|          1994|
|          1994|
|          1994|
|          1994|
|          1994|
|          1997|
|          2009|
|          1994|
|          1994|
|          1994|
|          1994|
|          1994|
|          1996|
|          1996|
|          1994|
|          1994|
|          1994|
|          2009|
|          1982|
+--------------+
only showing top 20 rows



In [None]:
empresas\
    .select('natureza_juridica', 'data_entrada_sociedade', 'capital_social_da_empresa')\
    .show(5)

+-----------------+----------------+-------------------------+
|natureza_juridica|porte_da_empresa|capital_social_da_empresa|
+-----------------+----------------+-------------------------+
|             2240|               1|                      0.0|
|             2062|               5|                      0.0|
|             3034|               5|                      0.0|
|             2135|               5|                      0.0|
|             2062|               1|                   4000.0|
+-----------------+----------------+-------------------------+
only showing top 5 rows



In [None]:
socios\
    .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .show(5, truncate=False)

+-------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social  |faixa_etaria|ano_de_entrada|
+-------------------------------+------------+--------------+
|LILIANA PATRICIA GUASTAVINO    |7           |1994          |
|CRISTINA HUNDERTMARK           |7           |1994          |
|CELSO EDUARDO DE CASTRO STEPHAN|8           |1994          |
|EDUARDO BERRINGER STEPHAN      |5           |1994          |
|HANNE MAHFOUD FADEL            |8           |1994          |
+-------------------------------+------------+--------------+
only showing top 5 rows



## Faça como eu fiz

In [None]:
estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [None]:
visao_data_atv = estabelecimentos.select('nome_fantasia',
                        'municipio',
                        'data_de_inicio_atividade',
                        f.year('data_de_inicio_atividade').alias('ano_de_inicio_atividade'),
                        f.month('data_de_inicio_atividade').alias('mes_de_inicio_atividade'))
# select() # Seleções no DataFrame de entrada
# f.year() # Coleta ano de uma data
# f.month() # Coleta mes de uma data
# .alias() # Renomeação de coluna

In [None]:
visao_data_atv.select('municipio').show()

+---------+
|municipio|
+---------+
|     7107|
|     7107|
|     7107|
|     7107|
|     7075|
|     7075|
|     7075|
|     7075|
|     7107|
|     7107|
|     6163|
|     7107|
|     6117|
|     6607|
|     7079|
|     7107|
|     7107|
|     6001|
|     7145|
|     9295|
+---------+
only showing top 20 rows



## Identificando valores nulos

In [None]:
df = spark.createDataFrame([(1,), (2,), (3,), (None,)], ['data'])
df.toPandas()

Unnamed: 0,data
0,1.0
1,2.0
2,3.0
3,


In [None]:
df.show()

+----+
|data|
+----+
|   1|
|   2|
|   3|
|null|
+----+



In [None]:
df = spark.createDataFrame([(1.,), (2.,), (3.,), (float('nan'),)], ['data'])
df.toPandas()

Unnamed: 0,data
0,1.0
1,2.0
2,3.0
3,


In [None]:
df.show()

+----+
|data|
+----+
| 1.0|
| 2.0|
| 3.0|
| NaN|
+----+



In [None]:
df = spark.createDataFrame([('1',), ('2',), ('3',), (None,)], ['data'])
df.toPandas()

Unnamed: 0,data
0,1.0
1,2.0
2,3.0
3,


In [None]:
df.show()

+----+
|data|
+----+
|   1|
|   2|
|   3|
|null|
+----+



In [None]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,,***000000**,,0,8


In [None]:
socios.limit(5).show()

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|        411|                     2|         LILIANA PATRICIA ...|         ***678188**|                   22|               1994-07-25|null|        ***000000**|                 null|                                  0|           7|
|        411|                     2|         CRISTINA HUNDERTMARK|      

In [None]:
# Abrindo a list comprenhension
# lista = [f.count(f.when(f.isnull(c), True)).alias(c) for c in socios.columns]

lista = []
for c in socios.columns:
  lista.append(f.count(f.when(f.isnull(c), 1)).alias(c))
lista

[Column<'count(CASE WHEN (cnpj_basico IS NULL) THEN 1 END) AS `cnpj_basico`'>,
 Column<'count(CASE WHEN (identificador_de_socio IS NULL) THEN 1 END) AS `identificador_de_socio`'>,
 Column<'count(CASE WHEN (nome_do_socio_ou_razao_social IS NULL) THEN 1 END) AS `nome_do_socio_ou_razao_social`'>,
 Column<'count(CASE WHEN (cnpj_ou_cpf_do_socio IS NULL) THEN 1 END) AS `cnpj_ou_cpf_do_socio`'>,
 Column<'count(CASE WHEN (qualificacao_do_socio IS NULL) THEN 1 END) AS `qualificacao_do_socio`'>,
 Column<'count(CASE WHEN (data_de_entrada_sociedade IS NULL) THEN 1 END) AS `data_de_entrada_sociedade`'>,
 Column<'count(CASE WHEN (pais IS NULL) THEN 1 END) AS `pais`'>,
 Column<'count(CASE WHEN (representante_legal IS NULL) THEN 1 END) AS `representante_legal`'>,
 Column<'count(CASE WHEN (nome_do_representante IS NULL) THEN 1 END) AS `nome_do_representante`'>,
 Column<'count(CASE WHEN (qualificacao_do_representante_legal IS NULL) THEN 1 END) AS `qualificacao_do_representante_legal`'>,
 Column<'count(C

In [None]:
lista_de_colunas = ['data_de_entrada_sociedade', 'data_de_entrada_sociedade']
socios.select([f.year(c).alias(c) for c in lista_de_colunas]).show()
# socios.select(lista).show()

+-------------------------+-------------------------+
|data_de_entrada_sociedade|data_de_entrada_sociedade|
+-------------------------+-------------------------+
|                     1994|                     1994|
|                     1994|                     1994|
|                     1994|                     1994|
|                     1994|                     1994|
|                     1994|                     1994|
|                     1994|                     1994|
|                     1997|                     1997|
|                     2009|                     2009|
|                     1994|                     1994|
|                     1994|                     1994|
|                     1994|                     1994|
|                     1994|                     1994|
|                     1994|                     1994|
|                     1996|                     1996|
|                     1996|                     1996|
|                     1994| 

In [None]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,,***000000**,,0,8


In [None]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [None]:
socios.na.fill(0).orderBy('pais', ascending=False).show(truncate=False)

+-----------+----------------------+---------------------------------------------+--------------------+---------------------+-------------------------+----+-------------------+------------------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social                |cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais|representante_legal|nome_do_representante         |qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+---------------------------------------------+--------------------+---------------------+-------------------------+----+-------------------+------------------------------+-----------------------------------+------------+
|26764428   |1                     |KINOX INVESTMENTS LTD.                       |33423049000106      |37                   |2020-07-31               |866 |***468087**        |ROBERTO DUQUE ESTRADA DE SOUSA|17                    

In [None]:
socios.na.fill('-').show()

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|        411|                     2|         LILIANA PATRICIA ...|         ***678188**|                   22|               1994-07-25|null|        ***000000**|                    -|                                  0|           7|
|        411|                     2|         CRISTINA HUNDERTMARK|      

## Ordenando os dados

[DataFrame.orderBy(*cols, **kwargs)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.orderBy.html)

In [None]:
socios\
    .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .orderBy('ano_de_entrada', ascending=False)\
    .show(5, False)

+----------------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social           |faixa_etaria|ano_de_entrada|
+----------------------------------------+------------+--------------+
|KASSIANO RODRIGO KICHILESKI             |4           |2021          |
|LEONARDO MENNA BARRETO LARANJA GONCALVES|5           |2021          |
|MANOEL ADRIANO COSTA BARBOSA            |6           |2021          |
|ANTONOALDO GRANGEON TRANCOSO NEVES      |5           |2021          |
|MARIA SUELY DE MOURA                    |5           |2021          |
+----------------------------------------+------------+--------------+
only showing top 5 rows



In [None]:
socios\
    .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .orderBy(['ano_de_entrada', 'faixa_etaria'], ascending=[False, False])\
    .show(10, False)

+-------------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social        |faixa_etaria|ano_de_entrada|
+-------------------------------------+------------+--------------+
|MARIA RAIMUNDA DOS SANTOS LANZA      |9           |2021          |
|RENILDE DAS GRACAS MAIA              |9           |2021          |
|DORIS PEREIRA GOMES JAZRA            |9           |2021          |
|MARIA JOSE DOMINGUES BONATO          |9           |2021          |
|ZELIA MARIA CAMARA RODRIGUES DA SILVA|9           |2021          |
|JOSE DA SILVA                        |9           |2021          |
|DEMOSTENES JACOB HUHN PINTO          |9           |2021          |
|NADIR BICHARA CHUAHY                 |9           |2021          |
|DEIA DA CUNHA BECK PINTO             |9           |2021          |
|REYNALDO FIORIO                      |9           |2021          |
+-------------------------------------+------------+--------------+
only showing top 10 rows



## Filtrando os dados

[DataFrame.where(condition)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.where.html) ou [DataFrame.filter(condition)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.filter.html)

In [None]:
empresas\
    .where("capital_social_da_empresa>1000000000").show(truncate=False)

+-----------+------------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial                                           |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+------------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|6352117    |REBOUCAS E CIA LTDA.                                                    |2062             |49                         |2.9202702116E10          |3               |null                       |
|21199157   |TECHNO-CELLS INDUSTRIA DE SEMICONDUTORES SOLARES ES LTDA.               |2062             |5                          |3.0159349E9              |5               |null     

In [None]:
socios\
    .select("nome_do_socio_ou_razao_social").show(truncate=False)

+-------------------------------+
|nome_do_socio_ou_razao_social  |
+-------------------------------+
|LILIANA PATRICIA GUASTAVINO    |
|CRISTINA HUNDERTMARK           |
|CELSO EDUARDO DE CASTRO STEPHAN|
|EDUARDO BERRINGER STEPHAN      |
|HANNE MAHFOUD FADEL            |
|CLOD ASSAD FADEL               |
|WALKYRIA ALGARVES              |
|SEBASTIAO JADIR TEIXEIRA NUNES |
|JOSE JOAO ADAMO                |
|ROSEMARY CANTUARIA AFONSO ADAMO|
|MARCOS AURELIO MOTTA           |
|EDVAN CANDIDO ALENCAR          |
|JAIME MOURE COLINO             |
|SANDRA APARECIDA CRUZ LEONE    |
|LEIDA MARQUES PEREIRA VICENTE  |
|DONZILIA FERREIRA REGO         |
|ISRAEL AMERICANO REGO          |
|VALERIA RIBEIRO TEIXEIRA       |
|CARLOS RAFAEL DO LIVRAMENTO    |
|IVAN MARQUEZ DE MOURA          |
+-------------------------------+
only showing top 20 rows



In [None]:
socios\
    .select("nome_do_socio_ou_razao_social")\
    .where(socios.nome_do_socio_ou_razao_social.startswith("Lucas".upper()))\
    .where(socios.nome_do_socio_ou_razao_social.endswith("dos Santos".upper()))\
    .limit(10)\
    .toPandas()

Unnamed: 0,nome_do_socio_ou_razao_social
0,LUCAS PEREIRA DOS SANTOS
1,LUCAS RODRIGUES DOS SANTOS
2,LUCAS DA SILVA PRUDENTE DOS SANTOS
3,LUCAS MENDES DOS SANTOS
4,LUCAS AURELIO ARAUJO DOS SANTOS
5,LUCAS BERGAMIM DOS SANTOS
6,LUCAS BORGES DOS SANTOS
7,LUCAS ROGERIO FAZAN DOS SANTOS
8,LUCAS PEREIRA DOS SANTOS
9,LUCAS WESLEY SILVA DOS SANTOS


# Atividade 2
Para aqueles que não fizeram



# Atividade 3
Considerando como entrada os DataFrames `socios` e `empresas`.

1. Criar um DataFrame contendo as empresas que possuem sócios que ingressaram a mais de 50 anos na empresa.
DICA: utilizar select() e where().

2. A partir do DataFrame do passo 1, crie uma lista contendo o cnpj das empresas com sócios antigos.
DICA: utilize select() seguido da operação collect() para extrair uma lista contendo os cnpj.

Solução esperada
>>> [Row(cnpj_basico=1414), Row(cnpj_basico=2323), Row(cnpj_basico=1616), ...]

3. Transforme a lista do passo 2 em uma lista contendo apenas o número do cnpj em cada elemento.

Solução esperada
>>> [1414,2323,1616,...]

4. Filtrar do DataFrame `empresas` apenas as empresas com os cnpj da lista do passo 3.  
Dica: utilize a operação `isin()` dentro do `where()` de forma que a lista de cnpj sirva como filtro.  

5. Verificar o capital social da empresa das empresas que possuem os sócios antigos.

6. Devolver dois Dataframes
  - membros_antigos: contendo uma coluna com os nomes dos socios mais antigos
  - capital_empresas_membros_antigos: contendo todas as colunas de `empresas` com a filtragem aplicada


In [38]:
socios_50anos = socios\
.select('cnpj_basico','nome_do_socio_ou_razao_social',f.year('data_de_entrada_sociedade').alias('ano_entrada'))\
.where('ano_entrada<=1973')

In [43]:
var = socios_50anos.select('cnpj_basico').collect()

list

In [53]:
lista_cnpjs = []
for linha in var:
  lista_cnpjs.append(linha.cnpj_basico)

print(lista_cnpjs, len(lista_cnpjs))

[7256928, 7256928, 7923816, 7923816, 17418401, 28676161, 28854982, 32508681, 33342890, 33863218, 42275818, 46105938, 46105938, 57565186, 57565186, 62799291, 62799291, 75806703, 75806703, 79115416, 82983115, 88337365, 88644851, 88644851, 96201868, 96201868, 97445415, 14269437, 14269437, 16242562, 16242562, 16800831, 16800831, 19213172, 19213172, 21528047, 30096374, 30096374, 33933870, 43913912, 60812963, 60812963, 62986898, 76621499, 78356045, 9428541, 9428541, 9428541, 17384553, 17384553, 17384553, 23640196, 23640196, 23640196, 23640196, 23640196, 23640196, 33016825, 33216565, 33216565, 33216565, 33437492, 33437492, 44179992, 44217354, 45185352, 45185352, 45421690, 45518990, 45518990, 49042732, 49042732, 58156563, 62091897, 62091897, 76599901, 76599901, 76730076, 76730076, 87866273, 92711134, 16671604, 16671604, 18161232, 21993068, 24986481, 29884947, 33992298, 34267047, 34290023, 44497584, 50945765, 50945765, 60943826, 61444246, 75938860, 79118758, 80562739, 87155354, 96306683, 172024

In [None]:
#Compressão de listas (list comprehension)
lista_cnpjs = [linha.cnpj_basico for linha in var]

In [69]:
empresas_filtrado_lista_cnpjs = empresas[empresas.cnpj_basico.isin(lista_cnpjs)]

In [71]:
empresas_filtrado_lista_cnpjs

DataFrame[cnpj_basico: int, razao_social_nome_empresarial: string, natureza_juridica: int, qualificacao_do_responsavel: int, capital_social_da_empresa: double, porte_da_empresa: int, ente_federativo_responsavel: string]

In [73]:
capital_empresas_membros_antigos = empresas_filtrado_lista_cnpjs.select('capital_social_da_empresa')

In [74]:
membros_antigos = socios_50anos.select('nome_do_socio_ou_razao_social')

In [78]:
capital_empresas_membros_antigos.show(50,truncate=False)

+-------------------------+
|capital_social_da_empresa|
+-------------------------+
|0.0                      |
|0.0                      |
|0.0                      |
|100000.0                 |
|0.0                      |
|5300000.0                |
|4100000.0                |
|0.0                      |
|0.0                      |
|0.0                      |
|0.0                      |
|0.0                      |
|0.0                      |
|0.0                      |
|0.0                      |
|0.0                      |
|1000.0                   |
|0.0                      |
|0.0                      |
|0.0                      |
|0.0                      |
|0.0                      |
|50000.0                  |
|0.0                      |
|100000.0                 |
|0.0                      |
|0.0                      |
|0.0                      |
|6000.0                   |
|140000.0                 |
|0.0                      |
|0.0                      |
|0.0                

# Aula 14/11
1. Revisão
2. LIKE
3. Agregações
4. Junções

In [80]:
df = spark.createDataFrame([('RESTAURANTE DO RUI',), ('Juca restaurantes ltda',), ('Joca Restaurante',)], ['nome empresa'])
df.toPandas()

Unnamed: 0,nome empresa
0,RESTAURANTE DO RUI
1,Juca restaurantes ltda
2,Joca Restaurante


In [96]:
df.filter(f.upper(df['nome empresa']).like('%RESTAURANTE%')).show(5,truncate=False)

+----------------------+
|nome empresa          |
+----------------------+
|RESTAURANTE DO RUI    |
|Juca restaurantes ltda|
|Joca Restaurante      |
+----------------------+



In [108]:
empresas.filter(f.upper(empresas['razao_social_nome_empresarial']).like('%LAN HOUSE%')).count()

307

# Agregação

# Agregações e Junções
---

[DataFrame.groupBy(*cols)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.groupBy.html)

[DataFrame.agg(*exprs)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.agg.html)

[DataFrame.summary(*statistics)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.summary.html)

> Funções:
[approx_count_distinct](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.approx_count_distinct.html) |
[avg](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.avg.html) |
[collect_list](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.collect_list.html) |
[collect_set](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.collect_set.html) |
[countDistinct](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.countDistinct.html) |
[count](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.count.html) |
[grouping](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.grouping.html) |
[first](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.first.html) |
[last](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.last.html) |
[kurtosis](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.kurtosis.html) |
[max](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.max.html) |
[min](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.min.html) |
[mean](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.mean.html) |
[skewness](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.skewness.html) |
[stddev ou stddev_samp](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.stddev.html) |
[stddev_pop](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.stddev_pop.html) |
[sum](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.sum.html) |
[sumDistinct](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.sumDistinct.html) |
[variance ou var_samp](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.variance.html) |
[var_pop](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.var_pop.html)

In [110]:
capital_empresas_membros_antigos.printSchema()

root
 |-- capital_social_da_empresa: double (nullable = true)



In [133]:
empresas\
.groupBy(empresas.porte_da_empresa.alias('porte'))\
.agg(f.avg('capital_social_da_empresa').alias('media')).show()

+-----+------------------+
|porte|             media|
+-----+------------------+
| null|  8.35421888053467|
|    1|339994.53313506936|
|    3|2601001.7677092673|
|    5| 708660.4208249798|
+-----+------------------+



In [141]:
from datetime import datetime, date
from pyspark.sql import Row

df = spark.createDataFrame([
    Row(nome='joao', a=1, idade=4, c='GFG1', d=date(2000, 8, 1),
        e=datetime(2000, 8, 1, 12, 0)),

    Row(nome='maria', a=2, idade=8, c='GFG2', d=date(2000, 6, 2),
        e=datetime(2000, 6, 2, 12, 0)),

    Row(nome='helena', a=4, idade=5, c='GFG3', d=date(2000, 5, 3),
        e=datetime(2000, 5, 3, 12, 0))
])



DataFrame[summary: string, nome: string, a: string, idade: string, c: string]

In [146]:
empresas\
.select('capital_social_da_empresa').summary('mean','max','min').show()

+-------+-------------------------+
|summary|capital_social_da_empresa|
+-------+-------------------------+
|   mean|        503694.5478542675|
|    max|         3.22014670262E11|
|    min|                      0.0|
+-------+-------------------------+



In [144]:
df.summary('mean','max').show()

+-------+-----+------------------+-----------------+----+
|summary| nome|                 a|            idade|   c|
+-------+-----+------------------+-----------------+----+
|   mean| null|2.3333333333333335|5.666666666666667|null|
|    max|maria|                 4|                8|GFG3|
+-------+-----+------------------+-----------------+----+



# Atividade 4


Considerando como entrada os DataFrames `estabelecimentos` e `empresas`.

0. Utilize a função `when()` para criar um DataFrame que separe as empresas em dois grupos empresas sem capital social (quando o capital social = 0) e empresas com capital social.

DICA3: A função `when()` é útil para criar uma seleção com alguma condição:
```python
from pyspark.sql import functions as f
df = spark.createDataFrame([('joao',15), ('mariana',26), ('helena',30)], ['nome','idade'])
df.select(f.when(df["idade"] > 18, 'adulto').otherwise('crianca').alias("fase")).show()
```

1. Separe do seu DataFrame do passo 1 aquelas empresas que possuem capital social e guarde o novo DataFrame em `empresas_com_capital`.

2. A partir de `empresas_com_capital`, ordene pelo capital social e utilize a função `monotonically_increasing_id()` para criar uma nova coluna com os indices em um novo DataFrame chamado `empresas_capital_ordenado_idx`.

> DICA: inclua uma coluna contendo um índice incremental para identificar a empresas de maneira ordenada com `monotonically_increasing_id()`.

> Obs: A função `monotonically_increasing_id()` é utilizada para quando você precisa criar uma coluna com IDs únicos que crescem monotonicamente, esses IDs não possuem números consecutivos, porém eles não se repetem e são crescentes (o que é necessário para filtrar dentro dos grupos de percentil).

3. A partir do DataFrame `empresas_capital_ordenado_idx`, utilize a função `summary()` seguida de `when()` para agrupar as empresas que possuem capital social dentro do grupo de 25%, 50% (retirando as empresas que caíram no 25%), 75% (retirando as empresas que caíram no 25% e 50%), 100% percentil (retirando as empresas que caíram no 25%, 50% e 75%). Nomeie esses grupos como `grupo1`, `grupo2`, `grupo3`, `grupo4`.

DICA: Para conseguir aplicar o `when()` você deve ter o id que identifica a empresa que inicia cada percentil. Para encontrar os ids dos grupos de percentil basta aplicar o summary() em `empresas_capital_ordenado_idx`.

# Atividade 5


1. A partir do DataFrame `empresas`, crie um DataFrame `empresas_por_setor` que possui todas as empresas que são de um tipo específico (p.exemplo "mercado", "boate", "igreja") considere três setores. Nomeie esses grupos como `setor1`, `setor2`, `setor3`.

DICA: utilizar o método `like()`.

2. A partir do DataFrame `empresas_por_setor` crie um novo DataFrame contendo três colunas `setor`, `capital_medio`, `numero_de_empresas`, `capital_total_do_grupo`. Cada coluna contém os seguintes dados:

- `setor`: nome do setor escolhido,
- `capital_medio`: media do capital capita_social_da_empresa de cada setor,
- `numero_de_empresas`: contador de empresas dentro de cada setor,
- `capital_total_do_grupo`: somatório dos capitais das empresas do grupo

Exemplo de como utilizar o `agg()` em um DataFrame agrupado:
```python
empresas\
.select('cnpj_basico','porte_da_empresa', 'capital_social_da_empresa')\
.groupBy('setor')\
.agg(
  f.avg('capital_social_da_empresa').alias('capital_medio'),
  f.count('cnpj_basico').alias('frequencia')
)\
.orderBy('capital_social_da_empresa', ascending=True)\
.show()
```

Exemplo do uso do f.avg() e do f.count()

In [183]:
empresas_medio = empresas\
.limit(100).select('cnpj_basico','porte_da_empresa', 'capital_social_da_empresa')\
.groupBy('porte_da_empresa')\
.agg(
  f.avg('capital_social_da_empresa').alias('capital_medio'),
  f.count('cnpj_basico').alias('frequencia'),
)\
.orderBy('capital_medio', ascending=True)
empresas_medio.show()

+----------------+-------------+----------+
|porte_da_empresa|capital_medio|frequencia|
+----------------+-------------+----------+
|               3|          0.0|         1|
|               5|       156.25|        64|
|               1|       3600.0|        35|
+----------------+-------------+----------+



Exemplo do `when()`

In [151]:
df = spark.createDataFrame([('joao',15), ('mariana',26), ('helena',30)], ['nome','idade'])
df.select(f.when(df["idade"] > 18, 'adulto').otherwise('crianca').alias("fase")).show()

+-------+
|   fase|
+-------+
|crianca|
| adulto|
| adulto|
+-------+



In [192]:
# Exemplo com criação de somatório
df = spark.createDataFrame(
[(2, "A" , "A2" , 2500),
(2, "A" , "A11" , 3500),
(2, "A" , "A12" , 5500),
(4, "B" , "B25" , 7600),
(4, "B", "B26" ,5600),
(5, "C" , "c25" ,2658),
(5, "C" , "c27" , 1100),
(5, "C" , "c28" , 1200)],
['parent', 'group' , "brand" , "usage"])


# Group by and sum to get the totals
totals = df.groupBy(['group','parent'])\
.agg(f.sum('usage').alias('usage'))\
.withColumn('brand', f.lit('Total'))
# create a temp variable to sort
totals = totals.withColumn('sort_id', f.lit(2))
df = df.withColumn('sort_id', f.lit(1))
df.unionByName(totals).sort(['group','sort_id']).drop('sort_id').show()
totals.show()

+------+-----+-----+-----+
|parent|group|brand|usage|
+------+-----+-----+-----+
|     2|    A|  A12| 5500|
|     2|    A|  A11| 3500|
|     2|    A|   A2| 2500|
|     2|    A|Total|11500|
|     4|    B|  B25| 7600|
|     4|    B|  B26| 5600|
|     4|    B|Total|13200|
|     5|    C|  c25| 2658|
|     5|    C|  c28| 1200|
|     5|    C|  c27| 1100|
|     5|    C|Total| 4958|
+------+-----+-----+-----+

+-----+------+-----+-----+-------+
|group|parent|usage|brand|sort_id|
+-----+------+-----+-----+-------+
|    A|     2|11500|Total|      2|
|    B|     4|13200|Total|      2|
|    C|     5| 4958|Total|      2|
+-----+------+-----+-----+-------+



In [186]:
totals.show()

+-----+------+-----+
|group|parent|usage|
+-----+------+-----+
|    A|     2|11500|
|    B|     4|13200|
|    C|     5| 4958|
+-----+------+-----+

