# Começando o Trabalho
---

## Apache Spark - Introdução

### [Apache Spark](https://spark.apache.org/)

Apache Spark é uma plataforma de computação em *cluster* que fornece uma API para programação distribuída para processamento de dados em larga escala, semelhante ao modelo *MapReduce*, mas projetada para ser rápida para consultas interativas e algoritmos iterativos.

O Spark permite que você distribua dados e tarefas em clusters com vários nós. Imagine cada nó como um computador separado. A divisão dos dados torna mais fácil o trabalho com conjuntos de dados muito grandes porque cada nó funciona processa apenas uma parte parte do volume total de dados.

O Spark é amplamente utilizado em projetos analíticos nas seguintes frentes:

- Preparação de dados
- Modelos de machine learning
- Análise de dados em tempo real

### [PySpark](https://spark.apache.org/docs/latest/api/python/index.html)

PySpark é uma interface para Apache Spark em Python. Ele não apenas permite que você escreva aplicativos Spark usando APIs Python, mas também fornece o *shell* PySpark para analisar interativamente seus dados em um ambiente distribuído. O PySpark oferece suporte à maioria dos recursos do Spark, como Spark SQL, DataFrame, Streaming, MLlib (Machine Learning) e Spark Core.

<center><img src="https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/img-001.png"/></center>

#### Spark SQL e DataFrame

Spark SQL é um módulo Spark para processamento de dados estruturados. Ele fornece uma abstração de programação chamada DataFrame e também pode atuar como mecanismo de consulta SQL distribuído.

#### Spark Streaming

Executando em cima do Spark, o recurso de *streaming* no Apache Spark possibilita o uso de poderosas aplicações interativas e analíticas em *streaming* e dados históricos, enquanto herda a facilidade de uso do Spark e as características de tolerância a falhas.

#### Spark MLlib

Construído sobre o Spark, MLlib é uma biblioteca de aprendizado de máquina escalonável que fornece um conjunto uniforme de APIs de alto nível que ajudam os usuários a criar e ajustar *pipelines* de aprendizado de máquina práticos.

#### Spark Core

Spark Core é o mecanismo de execução geral subjacente para a plataforma Spark sobre o qual todas as outras funcionalidades são construídas. Ele fornece um RDD (*Resilient Distributed Dataset*) e recursos de computação na memória.

## Utilizando o Spark no Windows

[fonte](https://spark.apache.org/docs/latest/api/python/getting_started/install.html)

#### Passo 1 - Instalando o Java

O PySpark requer a instalação do Java na versão 7 ou superior. Obtenha a versão mais recente clicando [aqui](https://www.java.com/pt-BR/download/). Para verificar a versão que está instalada em sua máquina execute a seguinte linha de código no seu *prompt*:

```
java -version
```

#### Passo 2 - Instalando o Python

O Python deve ser instalado em sua versão 2.6 ou superior. Para obter a versão mais recente clique [aqui](https://www.python.org/downloads/windows/). Para verificar a versão do Python que está instalada em sua máquina digite o seguinte comando em seu *prompt*:

```
python --version
```

#### Passo 3 - Instalando o Apache Spark 

Selecione a versão mais estável clicando [aqui](http://spark.apache.org/downloads.html). Na criação deste projeto utilizamos a versão do Spark **3.1.2** e como tipo de pacote selecionamos **Pre-built for Apache Hadoop 2.7**.

Para instalar o Apache Spark não é necessário executar um instalador, basta descomprimir os arquivos em uma pasta de sua escolha.

<font color=red>Obs.: certifique-se de que o caminho onde os arquivos do Spark foram armazenados não contenham espaços (ex.: **"C:\spark\spark-3.1.2-bin-hadoop2.7"**).</font>

Para testar o funcionamento do Spark execute os comandos abaixo em seu *prompt* de comando. Esses comandos assumem que você extraiu os arquivos do Spark na pasta **"C:\spark\"**.

```
cd C:\spark\spark-3.1.2-bin-hadoop2.7
```

```
bin\pyspark
```

O comando acima inicia o *shell* do PySpark que permite trabalhar interativamente com o Spark.

Para sair basta digitar `exit()` e logo depois presionar *Enter*. Para voltar ao *prompt* pressione *Enter* novamente.

#### Passo 4 - Instalando o findspark

```
pip install findspark
```

#### Passo 5 - Instalando o winutils

Os arquivos do Spark não incluem o utilitário **winutils.exe** que é utilizado pelo Spark no Windows. Se não informar onde o Spark deve procurar este utilitário, veremos alguns erros no console e também não conseguiremos executar *scripts* Python utilizando o utilitário `spark-submit`.

Faça o [download](https://github.com/steveloughran/winutils) para a versão do Hadoop para a qual sua instalação do Spark foi construída. Em nosso exemplo foi utilizada a [versão 2.7](https://github.com/steveloughran/winutils/tree/master/hadoop-2.7.1/bin). Faça o *download* apenas do arquivo **winutils.exe**.

Crie a pasta **"hadoop\bin"** dentro da pasta que contém os arquivos do Spark (em nosso exemplo **"C:\spark\spark-3.1.2-bin-hadoop2.7"**) e copie o arquivo **winutils.exe** para dentro desta pasta.

Crie duas variáveis de ambiente no seu Windows. A primeira chamada **SPARK_HOME** que aponta para a pasta onde os arquivos Spark foram armazenados (em nosso exemplo **"C:\spark\spark-3.1.2-bin-hadoop2.7"**). A segunda chamada **HADOOP_HOME** que aponta para **%SPARK_HOME%\hadoop** (assim podemos modificar **SPARK_HOME** sem precisar alterar **HADOOP_HOME**).

## Utilizando o Spark no Google Colab

Para facilitar o desenvolvimento de nosso projeto neste curso vamos utilizar o Google Colab como ferramenta e para configurar o PySpark basta executar os comandos abaixo na própria célula do seu *notebook*.

In [None]:
# instalar as dependências
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()

# Carregamento de Dados
---

## [SparkSession](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.SparkSession.html)

O ponto de entrada para programar o Spark com a API Dataset e DataFrame.

Uma SparkSession pode ser utilizada para criar DataFrames, registrar DataFrames como tabelas, executar consultas SQL em tabelas, armazenar em cache e ler arquivos parquet. Para criar uma SparkSession, use o seguinte padrão de construtor:

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Iniciando com Spark") \
    .getOrCreate()

2022-06-08 08:14:22 WARN  Utils:66 - Your hostname, laura-ThinkPad-E470 resolves to a loopback address: 127.0.1.1; using 10.0.0.84 instead (on interface wlp5s0)
2022-06-08 08:14:22 WARN  Utils:66 - Set SPARK_LOCAL_IP if you need to bind to another address
2022-06-08 08:14:23 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [5]:
spark

## Acessando o [Spark UI](https://spark.apache.org/docs/latest/web-ui.html) (Google Colab)

[Site ngrok](https://ngrok.com)

## DataFrames com Spark


### Interfaces Spark

Existem três interfaces principais do Apache Spark que você deve conhecer: Resilient Distributed Dataset, DataFrame e Dataset.

- **Resilient Distributed Dataset**: A primeira abstração do Apache Spark foi o Resilient Distributed Dataset (RDD). É uma interface para uma sequência de objetos de dados que consiste em um ou mais tipos localizados em uma coleção de máquinas (um cluster). Os RDDs podem ser criados de várias maneiras e são a API de “nível mais baixo” disponível. Embora esta seja a estrutura de dados original do Apache Spark, você deve se concentrar na API DataFrame, que é um superconjunto da funcionalidade RDD. A API RDD está disponível nas linguagens Java, Python e Scala.

- **DataFrame**: Trata-se de um conceito similar ao DataFrame que você pode estar familiarizado como o pacote pandas do Python e a linguagem R . A API DataFrame está disponível nas linguagens Java, Python, R e Scala.

- **Dataset**: uma combinação de DataFrame e RDD. Ele fornece a interface digitada que está disponível em RDDs enquanto fornece a conveniência do DataFrame. A API Dataset está disponível nas linguagens Java e Scala.

Em muitos cenários, especialmente com as otimizações de desempenho incorporadas em DataFrames e Datasets, não será necessário trabalhar com RDDs. Mas é importante entender a abstração RDD porque:

- O RDD é a infraestrutura subjacente que permite que o Spark seja executado com tanta rapidez e forneça a linhagem de dados.

- Se você estiver mergulhando em componentes mais avançados do Spark, pode ser necessário usar RDDs.

- As visualizações na Spark UI fazem referência a RDDs.

In [8]:
data = [('Zeca','35'), ('Eva', '29')]
colNames = ['Nome', 'Idade']

In [9]:
df = spark.createDataFrame(data, colNames)
df

DataFrame[Nome: string, Idade: string]

In [7]:
df.show()

[Stage 0:>                                                          (0 + 1) / 1]

+----+-----+
|Nome|Idade|
+----+-----+
|Zeca|   35|
| Eva|   29|
+----+-----+



                                                                                

In [8]:
df.toPandas()

Unnamed: 0,Nome,Idade
0,Zeca,35
1,Eva,29


## Projeto

Nosso projeto consiste em ler, manipular, tratar e salvar um conjunto de dados volumosos utilizando como ferramenta o Spark.

## Carregamento de dados

### Dados Públicos CNPJ
#### Receita Federal

> [Empresas](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/empresas.zip)
> 
> [Estabelecimentos](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/estabelecimentos.zip)
> 
> [Sócios](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/socios.zip)

[Fonte original dos dados](https://www.gov.br/receitafederal/pt-br/assuntos/orientacao-tributaria/cadastros/consultas/dados-publicos-cnpj)

---
[property SparkSession.read](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.SparkSession.read.html)

[DataFrameReader.csv(*args)](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrameReader.csv.html)


### Montando nosso drive

In [None]:
from google.colab import drive # Quando no google colab, ajuda a pegar os dados do drive;
drive.mount('/content/drive')

### Carregando os dados das empresas

In [10]:
import zipfile

In [13]:
# Nesse caso, eu tinha um arquivo zip com vários arquivos csv, a partir desse comando tudo contudo no zip foi 
# extraído para uma nova pasta chamada 'empresas' e agora dá para acessar cada um dos csv.
zipfile.ZipFile('empresas.zip', 'r').extractall() # extractall recebe como argumento o caminho onde a pasta é salva

In [2]:
# E como mágica, só passando o nome da pasta, a função lê todos os arquivos e ainda junta...
empresas = spark.read.csv('empresas', sep = ";", inferSchema = True)

                                                                                

In [16]:
empresas.count()

                                                                                

4585679

In [17]:
empresas.limit(10).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,400000,1,
5,8416,ELETRICA RUBI LTDA,2062,49,0,5,
6,8992,SHIROMA VEICULOS LTDA.,2062,49,0,5,
7,9091,CONTATOS BAR E LANCHONETE LTDA,2062,49,0,5,
8,9614,ANTONIA APARECIDA DE SOUZA ULIANA,2135,50,0,5,
9,9896,DORACY CORAT DA COSTA,2135,50,0,5,


In [20]:
empresas.schema.fields

[StructField(_c0,IntegerType,true),
 StructField(_c1,StringType,true),
 StructField(_c2,IntegerType,true),
 StructField(_c3,IntegerType,true),
 StructField(_c4,StringType,true),
 StructField(_c5,IntegerType,true),
 StructField(_c6,StringType,true)]

## Faça como eu fiz: Estabelecimentos e Sócios

### Carregando os dados dos estabelecimentos

In [23]:
zipfile.ZipFile('estabelecimentos.zip', 'r').extractall()

In [3]:
estabelecimentos = spark.read.csv('estabelecimentos', sep = ";", inferSchema = True)

                                                                                

In [30]:
estabelecimentos.count()

2022-06-06 17:04:29 WARN  Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.


                                                                                

4836219

### Carregando os dados dos sócios

In [27]:
zipfile.ZipFile('socios.zip', 'r').extractall()

In [4]:
socios = spark.read.csv('socios', sep = ";", inferSchema = True)

                                                                                

In [31]:
socios.count()

2046430

# Manipulando os Dados
---

## Operações básicas

### Renomeando as colunas do DataFrame

In [5]:
empresasColNames = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']

In [6]:
for index, colName in enumerate(empresasColNames): # o enumerate adiciona índices para a lista
    empresas = empresas.withColumnRenamed(f"_c{index}", colName) # só dá para mudar um por vez, então é isso...

In [17]:
empresas.columns

['cnpj_basico',
 'razao_social_nome_empresarial',
 'natureza_juridica',
 'qualificacao_do_responsavel',
 'capital_social_da_empresa',
 'porte_da_empresa',
 'ente_federativo_responsavel']

In [7]:
estabsColNames = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']

In [8]:
for index, colName in enumerate(estabsColNames):
    estabelecimentos = estabelecimentos.withColumnRenamed(f"_c{index}", colName)

In [9]:
sociosColNames = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']

In [10]:
for index, colName in enumerate(sociosColNames):
    socios = socios.withColumnRenamed(f"_c{index}", colName)

## Analisando os dados

[Data Types](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#data-types)

In [22]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



## Modificando os tipos de dados

[Functions](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#functions)

[withColumn](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.withColumn.html)

### Convertendo String ➔ Double

#### `StringType ➔ DoubleType`

In [13]:
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as f

In [25]:
# O padrão do spark de decimal é o ponto, e aqui estamos fazendo a troca
empresas = empresas.withColumn('capital_social_da_empresa', f.regexp_replace(f.col('capital_social_da_empresa'), ',', '.'))
# Agora é possível transformar o tipo da variável que foi identificada como string como double
empresas = empresas.withColumn('capital_social_da_empresa', empresas['capital_social_da_empresa'].cast(DoubleType()))
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



### Convertendo String ➔ Date

#### `StringType ➔ DateType`

[Datetime Patterns](https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html)

In [27]:
df = spark.createDataFrame([(20200924,), (20201022,), (20210215,)], ['data'])
df.toPandas()

[Stage 6:>                                                          (0 + 4) / 4]                                                                                

Unnamed: 0,data
0,20200924
1,20201022
2,20210215


In [28]:
# Para transformar as colunas no tipo de data, é necessário que a coluna esteja em string
# Para identificar o padrão das datas é preciso seguir as siglas definidas pelo próprio spark, de letra e tamanhp
df = df.withColumn('data', f.to_date(df.data.cast(StringType()), 'yyyyMMdd')) 
df.show()

+----------+
|      data|
+----------+
|2020-09-24|
|2020-10-22|
|2021-02-15|
+----------+



In [26]:
estabelecimentos = estabelecimentos\
    .withColumn(
        'data_situacao_cadastral',
        f.to_date(f.col('data_situacao_cadastral').cast(StringType()), 'yyyyMMdd')
    )\
    .withColumn(
        'data_de_inicio_atividade',
        f.to_date(f.col('data_de_inicio_atividade').cast(StringType()), 'yyyyMMdd')
    )\
    .withColumn(
        'data_da_situacao_especial',
        f.to_date(f.col('data_da_situacao_especial').cast(StringType()), 'yyyyMMdd')
    )

estabelecimentos.printSchema

<bound method DataFrame.printSchema of DataFrame[cnpj_basico: int, cnpj_ordem: int, cnpj_dv: int, identificador_matriz_filial: int, nome_fantasia: string, situacao_cadastral: int, data_situacao_cadastral: date, motivo_situacao_cadastral: int, nome_da_cidade_no_exterior: string, pais: int, data_de_inicio_atividade: date, cnae_fiscal_principal: int, cnae_fiscal_secundaria: string, tipo_de_logradouro: string, logradouro: string, numero: string, complemento: string, bairro: string, cep: int, uf: string, municipio: int, ddd_1: string, telefone_1: string, ddd_2: string, telefone_2: string, ddd_do_fax: int, fax: string, correio_eletronico: string, situacao_especial: string, data_da_situacao_especial: date]>

In [27]:
socios = socios\
    .withColumn(
        'data_de_entrada_sociedade',
        f.to_date(f.col('data_de_entrada_sociedade').cast(StringType()), 'yyyyMMdd'))

# Seleções e consultas
---

## Selecionando informações
 
[DataFrame.select(*cols)](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.select.html)

In [37]:
empresas.select('*').show(5, truncate=False)

+-----------+--------------------------------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial                                                               |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+--------------------------------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|306        |FRANCAMAR REFRIGERACAO TECNICA S/C LTDA                                                     |2240             |49                         |0.0                      |1               |null                       |
|1355       |BRASILEIRO & OLIVEIRA LTDA                                                                 

In [40]:
empresas\
    .select('cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica')\
    .show(5, truncate=False)

+-----------+--------------------------------------------------------------------------------------------+-----------------+
|cnpj_basico|razao_social_nome_empresarial                                                               |natureza_juridica|
+-----------+--------------------------------------------------------------------------------------------+-----------------+
|306        |FRANCAMAR REFRIGERACAO TECNICA S/C LTDA                                                     |2240             |
|1355       |BRASILEIRO & OLIVEIRA LTDA                                                                  |2062             |
|4820       |REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E TABELIONATO E REGISTRO DE CONSTRATOS MARITIMOS|3034             |
|5347       |ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS                                                |2135             |
|6846       |BADU E FILHOS TECIDOS LTDA                                                                  |2062             |


In [45]:
socios\
    .select('faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada_sociedade'))\
    .show(5, truncate=False)

+------------+------------------------+
|faixa_etaria|ano_de_entrada_sociedade|
+------------+------------------------+
|7           |1994                    |
|7           |1994                    |
|8           |1994                    |
|5           |1994                    |
|8           |1994                    |
+------------+------------------------+
only showing top 5 rows



## Faça como eu fiz

In [46]:
data = [
    ('GISELLE PAULA GUIMARAES CASTRO', 15),
    ('ELAINE GARCIA DE OLIVEIRA', 22),
    ('JOAO CARLOS ABNER DE LOURDES', 43),
    ('MARTA ZELI FERREIRA', 24),
    ('LAUDENETE WIGGERS ROEDER', 51)
]
colNames = ['nome', 'idade']
df = spark.createDataFrame(data, colNames)
df.show(truncate=False)

+------------------------------+-----+
|nome                          |idade|
+------------------------------+-----+
|GISELLE PAULA GUIMARAES CASTRO|15   |
|ELAINE GARCIA DE OLIVEIRA     |22   |
|JOAO CARLOS ABNER DE LOURDES  |43   |
|MARTA ZELI FERREIRA           |24   |
|LAUDENETE WIGGERS ROEDER      |51   |
+------------------------------+-----+



In [47]:
df \
    .select(
        f.concat_ws( # Concatena várias colunas e o separador utilizado é o primeiro argumento;
            ', ', 
            f.substring_index('nome', ' ', -1), # extrai todos caracteres da string até primeiro ' ' (ao contrário)
            f.substring_index('nome', ' ', 1)   # extrai todos caracteres da string até primeiro ' ' 
        ).alias('ident'), 
        'idade') \
    .show(truncate=False)

+-----------------+-----+
|ident            |idade|
+-----------------+-----+
|CASTRO, GISELLE  |15   |
|OLIVEIRA, ELAINE |22   |
|LOURDES, JOAO    |43   |
|FERREIRA, MARTA  |24   |
|ROEDER, LAUDENETE|51   |
+-----------------+-----+



## Identificando valores nulos

In [48]:
df = spark.createDataFrame([(1,), (2,), (3,), (None,)], ['data']) # Aqui são valores inteiros
df.toPandas()

Unnamed: 0,data
0,1.0
1,2.0
2,3.0
3,


In [49]:
df.show()

+----+
|data|
+----+
|   1|
|   2|
|   3|
|null|
+----+



In [50]:
df = spark.createDataFrame([(1.,), (2.,), (3.,), (float('nan'),)], ['data']) # Os pontos fazem ser float
df.toPandas()

Unnamed: 0,data
0,1.0
1,2.0
2,3.0
3,


In [51]:
df.show()

+----+
|data|
+----+
| 1.0|
| 2.0|
| 3.0|
| NaN|
+----+



In [52]:
df = spark.createDataFrame([('1',), ('2',), ('3',), (None,)], ['data']) # Aqui já são strings
df.toPandas()

Unnamed: 0,data
0,1.0
1,2.0
2,3.0
3,


In [53]:
df.show()

+----+
|data|
+----+
|   1|
|   2|
|   3|
|null|
+----+



In [58]:
socios.select('pais', 'nome_do_representante').limit(5).toPandas() # No do cara tava diferente

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,,***000000**,,0,8


In [57]:
socios.select('pais', 'nome_do_representante').limit(5).show()



+----+---------------------+
|pais|nome_do_representante|
+----+---------------------+
|null|                 null|
|null|                 null|
|null|                 null|
|null|                 null|
|null|                 null|
+----+---------------------+



                                                                                

In [60]:
socios\
    .select([\ # Selecionar
             f.count(\ # A contagem
                     f.when(f.isnull(c), 1)).alias(c) \ # de onde tem valor nulo
                             for c in socios.columns\ # Para todas as colunas
            ]).toPandas()

                                                                                

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,0,0,208,1234,0,1,2038255,0,1995432,0,0


In [62]:
socios.na.fill(0).limit(5).toPandas() # Substituindo os NA, de acordo com o tipo da variável

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,0,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,0,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,0,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,0,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,0,***000000**,,0,8


In [63]:
socios.na.fill('-').limit(5).toPandas() # Substituindo os NA, de acordo com o tipo da variável

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,,***000000**,-,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,,***000000**,-,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,,***000000**,-,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,,***000000**,-,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,,***000000**,-,0,8


## Ordenando os dados

[DataFrame.orderBy(*cols, **kwargs)](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.orderBy.html)

In [67]:
socios\
    .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .orderBy('ano_de_entrada')\
    .limit(10).toPandas()

                                                                                

Unnamed: 0,nome_do_socio_ou_razao_social,faixa_etaria,ano_de_entrada
0,JOAO FRANCISCO DE AMORIM JUNCAL,3,
1,NAIR YOKO HIRAI TAKAKI,7,1900.0
2,MARIA SILENE BEZERRA DE AGUIAR,8,1900.0
3,JOSE NELSON VIEIRA CAMPOS,6,1901.0
4,VALMAR CARDOSO DE SANTANA,5,1901.0
5,ANTERO DA SILVA RAMALHO CRUZ,9,1918.0
6,MILTON MARCHETTI FILHO,7,1919.0
7,MARCIA REGINA KLEFENS MARCHETTI,7,1919.0
8,PAULO ALCANTARA DO AMARAL,6,1922.0
9,WANDA GASPE,8,1937.0


In [68]:
socios\
    .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .orderBy('ano_de_entrada', ascending = False)\
    .limit(10).toPandas()

                                                                                

Unnamed: 0,nome_do_socio_ou_razao_social,faixa_etaria,ano_de_entrada
0,JAQUELINE MARIA GIACHINI ENGELMANN,4,2021
1,GUILLERMO ECHEGARAY INDA,6,2021
2,ELTON ROBERTO DA SILVA,5,2021
3,FRANCISCO ANIOMARIO REIS BARBOSA,6,2021
4,MARCIO AURELIO LEITE,7,2021
5,JULIANA GABRIELA VIEIRA PASSOS,2,2021
6,CLAUDIA MIDORI ARIE,3,2021
7,MARCELO SOARES SANTOS,5,2021
8,NORBERTO AGUIAR TOMAZ,7,2021
9,MATHEUS ROCHA SOARES,3,2021


In [70]:
socios\
    .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .orderBy(['ano_de_entrada', 'faixa_etaria'], asceding = [True, False])\
    .limit(10).toPandas()

                                                                                

Unnamed: 0,nome_do_socio_ou_razao_social,faixa_etaria,ano_de_entrada
0,JOAO FRANCISCO DE AMORIM JUNCAL,3,
1,NAIR YOKO HIRAI TAKAKI,7,1900.0
2,MARIA SILENE BEZERRA DE AGUIAR,8,1900.0
3,VALMAR CARDOSO DE SANTANA,5,1901.0
4,JOSE NELSON VIEIRA CAMPOS,6,1901.0
5,ANTERO DA SILVA RAMALHO CRUZ,9,1918.0
6,MILTON MARCHETTI FILHO,7,1919.0
7,MARCIA REGINA KLEFENS MARCHETTI,7,1919.0
8,PAULO ALCANTARA DO AMARAL,6,1922.0
9,JULIE DE AZEVEDO E SA MULLER CARIOBA,7,1937.0


## Filtrando os dados

[DataFrame.where(condition)](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.where.html) ou [DataFrame.filter(condition)](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.filter.html)

In [71]:
empresas\
    .where('capital_social_da_empresa == 50')\
    .limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,17350147,ERIK MARCELO DOS SANTOS 42107848858,2135,50,50.0,1,
1,17833214,ALEXANDRE MACHADO LIMA 73750123772,2135,50,50.0,1,
2,20860830,YASMIN MOURA DA FONSECA 13457709793,2135,50,50.0,1,
3,22242856,JOAO CESAR MESSIAS 08707149883,2135,50,50.0,1,
4,23238540,EVERTON ROBERTO DA SILVA 42101963809,2135,50,50.0,1,


In [72]:
socios\
    .filter(socios.nome_do_socio_ou_razao_social.startswith('RODRIGO'))\ # Começa com
    .filter(socios.nome_do_socio_ou_razao_social.endswith('DIAS'))\ # Termina com
    .limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,24615559,2,RODRIGO BENASSI DIAS,***390078**,5,2016-04-18,,***000000**,,0,5
1,9201777,2,RODRIGO RUDIBERTO DIAS,***668809**,22,2010-05-20,,***000000**,,0,3
2,10479364,2,RODRIGO AURELIANO DIAS,***233809**,49,2013-08-30,,***000000**,,0,3
3,11165614,2,RODRIGO SIMOES LEMOS DIAS,***303326**,22,2019-12-19,,***000000**,,0,5
4,18007795,2,RODRIGO GEORGE DIAS,***581078**,49,2013-04-25,,***000000**,,0,5


## O comando LIKE

[Column.like(other)](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.Column.like.html)

In [11]:
df = spark.createDataFrame([('RESTAURANTE DO RUI',), ('Juca restaurantes ltda',), ('Joca Restaurante',)], ['data'])
df.toPandas()

[Stage 6:>                                                          (0 + 4) / 4]                                                                                

Unnamed: 0,data
0,RESTAURANTE DO RUI
1,Juca restaurantes ltda
2,Joca Restaurante


In [16]:
df\
   .where(f.upper(df.data).like('%RESTAURANTE%'))\
   .show(truncate = False)

+----------------------+
|data                  |
+----------------------+
|RESTAURANTE DO RUI    |
|Juca restaurantes ltda|
|Joca Restaurante      |
+----------------------+



In [17]:
df\
   .where(f.upper(df.data).like('RESTAURANTE%'))\
   .show(truncate = False)

+------------------+
|data              |
+------------------+
|RESTAURANTE DO RUI|
+------------------+



In [18]:
df\
   .where(f.upper(df.data).like('%RESTAURANTE'))\
   .show(truncate = False)

+----------------+
|data            |
+----------------+
|Joca Restaurante|
+----------------+



In [20]:
empresas\
    .select('razao_social_nome_empresarial', 'natureza_juridica', 'porte_da_empresa', 'capital_social_da_empresa')\
    .filter(f.upper(f.col('razao_social_nome_empresarial')).like('%RESTAURANTE%'))\
    .limit(10).toPandas()

Unnamed: 0,razao_social_nome_empresarial,natureza_juridica,porte_da_empresa,capital_social_da_empresa
0,RESTAURANTE IMIGRANTE PORTUGUES LTDA.,2062,5,0
1,MORAIS & CARVALHO RESTAURANTE E PIZZARIA LTDA,2062,1,0
2,BAR E RESTAURANTE PAGANOTTO LTDA,2062,5,0
3,RODRIGUES & RODRIGUES RESTAURANTE LTDA,2062,5,0
4,TEXAS RANCH BAR RESTAURANTE PRODUCOES ARTISTIC...,2062,1,0
5,V V SANTOS RESTAURANTE BAR E ATIV DESPORTIVAS ...,2062,1,0
6,BAR E RESTAURANTE CASA DA QUINTA LTDA,2062,1,500000
7,DON MUGO RESTAURANTE LTDA,2062,1,1000000
8,MARIA ROZA DOS SANTOS- BAR E RESTAURANTE,2135,5,0
9,GERACAO DE OURO - BAR E RESTAURANTE LTDA,2062,5,4000000


# Agregações e Junções
---

[DataFrame.groupBy(*cols)](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.groupBy.html)

[DataFrame.agg(*exprs)](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.agg.html)

[DataFrame.summary(*statistics)](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.summary.html)

> Funções:
[approx_count_distinct](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.approx_count_distinct.html) | 
[avg](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.avg.html) | 
[collect_list](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.collect_list.html) | 
[collect_set](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.collect_set.html) | 
[countDistinct](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.countDistinct.html) | 
[count](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.count.html) | 
[grouping](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.grouping.html) | 
[first](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.first.html) | 
[last](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.last.html) | 
[kurtosis](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.kurtosis.html) | 
[max](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.max.html) | 
[min](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.min.html) | 
[mean](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.mean.html) | 
[skewness](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.skewness.html) | 
[stddev ou stddev_samp](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.stddev.html) | 
[stddev_pop](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.stddev_pop.html) | 
[sum](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.sum.html) | 
[sumDistinct](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.sumDistinct.html) | 
[variance ou var_samp](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.variance.html) | 
[var_pop](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.var_pop.html)

## Sumarizando os dados

In [29]:
socios\
    .select(f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .where('ano_de_entrada >= 2010')\
    .groupby('ano_de_entrada')\
    .count()\
    .orderBy('ano_de_entrada')\
    .show()



+--------------+------+
|ano_de_entrada| count|
+--------------+------+
|          2010| 79337|
|          2011| 83906|
|          2012| 80101|
|          2013| 83919|
|          2014| 80590|
|          2015| 80906|
|          2016| 81587|
|          2017| 90221|
|          2018| 99935|
|          2019|118248|
|          2020|125927|
|          2021| 56316|
+--------------+------+





In [30]:
empresas\
    .groupby('porte_da_empresa')\
    .agg(
        f.avg('capital_social_da_empresa').alias('capital_social_medio'),
        f.count('cnpj_basico').alias('frequência')
    )\
    .orderBy('porte_da_empresa')\
    .show()



+----------------+--------------------+----------+
|porte_da_empresa|capital_social_medio|frequência|
+----------------+--------------------+----------+
|            null|    8.35421888053467|      5985|
|               1|  339994.53313507006|   3129043|
|               3|   2601001.767709269|    115151|
|               5|   708660.4208249792|   1335500|
+----------------+--------------------+----------+





In [31]:
empresas\
    .select('capital_social_da_empresa')\
    .summary()\
    .show()

# .summary('count', 'mean', 'stddev', 'min', '25%', '50%', '75%', 'max') - Também dá para escolher as informações;



+-------+-------------------------+
|summary|capital_social_da_empresa|
+-------+-------------------------+
|  count|                  4585679|
|   mean|        503694.5478542677|
| stddev|      2.111869149053794E8|
|    min|                      0.0|
|    25%|                      0.0|
|    50%|                   1000.0|
|    75%|                   7000.0|
|    max|         3.22014670262E11|
+-------+-------------------------+



                                                                                

In [32]:
empresas\
    .select('capital_social_da_empresa')\
    .describe()\
    .show()



+-------+-------------------------+
|summary|capital_social_da_empresa|
+-------+-------------------------+
|  count|                  4585679|
|   mean|        503694.5478542677|
| stddev|      2.111869149053794E8|
|    min|                      0.0|
|    max|         3.22014670262E11|
+-------+-------------------------+



                                                                                

In [45]:
socios\
    .select('nome_do_socio_ou_razao_social')\
    .summary()\
    .show()



+-------+-----------------------------+
|summary|nome_do_socio_ou_razao_social|
+-------+-----------------------------+
|  count|                      2046222|
|   mean|                         null|
| stddev|                         null|
|    min|         '' PROTENDE '' SI...|
|    25%|                         null|
|    50%|                         null|
|    75%|                         null|
|    max|         iDtrust Tecnologi...|
+-------+-----------------------------+



                                                                                

In [44]:
socios\
    .select('nome_do_socio_ou_razao_social')\
    .describe()\
    .show()



+-------+-----------------------------+
|summary|nome_do_socio_ou_razao_social|
+-------+-----------------------------+
|  count|                      2046222|
|   mean|                         null|
| stddev|                         null|
|    min|         '' PROTENDE '' SI...|
|    max|         iDtrust Tecnologi...|
+-------+-----------------------------+



                                                                                

## Juntando DataFrames - Joins

[DataFrame.join(*args)](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.join.html)

In [46]:
produtos = spark.createDataFrame(
    [
        ('1', 'Bebidas', 'Água mineral'),
        ('2', 'Limpeza', 'Sabão em pó'),
        ('3', 'Frios', 'Queijo'),
        ('4', 'Bebidas', 'Refrigerante'),
        ('5', 'Pet', 'Ração para cães')
    ],
    ['id', 'cat', 'prod']
)

impostos = spark.createDataFrame(
    [
        ('Bebidas', 0.15),
        ('Limpeza', 0.05),
        ('Frios', 0.065),
        ('Carnes', 0.08)
    ],
    ['cat', 'tax']
)

In [47]:
produtos.join(impostos, 'cat', how = 'inner')\ # Mostra o que tem em ambos
    .sort('id')\
    .show()

+-------+---+------------+-----+
|    cat| id|        prod|  tax|
+-------+---+------------+-----+
|Bebidas|  1|Água mineral| 0.15|
|Limpeza|  2| Sabão em pó| 0.05|
|  Frios|  3|      Queijo|0.065|
|Bebidas|  4|Refrigerante| 0.15|
+-------+---+------------+-----+





In [49]:
produtos.join(impostos, 'cat', how = 'left')\
    .sort('id')\
    .show()

+-------+---+---------------+-----+
|    cat| id|           prod|  tax|
+-------+---+---------------+-----+
|Bebidas|  1|   Água mineral| 0.15|
|Limpeza|  2|    Sabão em pó| 0.05|
|  Frios|  3|         Queijo|0.065|
|Bebidas|  4|   Refrigerante| 0.15|
|    Pet|  5|Ração para cães| null|
+-------+---+---------------+-----+





In [50]:
produtos.join(impostos, 'cat', how = 'right')\
    .sort('id')\
    .show()



+-------+----+------------+-----+
|    cat|  id|        prod|  tax|
+-------+----+------------+-----+
| Carnes|null|        null| 0.08|
|Bebidas|   1|Água mineral| 0.15|
|Limpeza|   2| Sabão em pó| 0.05|
|  Frios|   3|      Queijo|0.065|
|Bebidas|   4|Refrigerante| 0.15|
+-------+----+------------+-----+



                                                                                

In [51]:
produtos.join(impostos, 'cat', how = 'outer')\
    .sort('id')\
    .show()

+-------+----+---------------+-----+
|    cat|  id|           prod|  tax|
+-------+----+---------------+-----+
| Carnes|null|           null| 0.08|
|Bebidas|   1|   Água mineral| 0.15|
|Limpeza|   2|    Sabão em pó| 0.05|
|  Frios|   3|         Queijo|0.065|
|Bebidas|   4|   Refrigerante| 0.15|
|    Pet|   5|Ração para cães| null|
+-------+----+---------------+-----+



In [52]:
empresas_join = estabelecimentos.join(empresas, 'cnpj_basico', how = 'inner')
empresas_join.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [57]:
freq = empresas_join\
    .select(
        'cnpj_basico', 
        f.year('data_de_inicio_atividade').alias('data_de_inicio')
    )\
    .where('data_de_inicio >= 2010')\
    .groupby('data_de_inicio')\
    .agg(f.count('cnpj_basico').alias('frequencia'))\
    .orderBy('data_de_inicio')

freq.toPandas()

                                                                                

Unnamed: 0,data_de_inicio,frequencia
0,2010,154159
1,2011,172677
2,2012,232480
3,2013,198424
4,2014,202276
5,2015,212523
6,2016,265417
7,2017,237292
8,2018,275435
9,2019,325922


In [58]:
freq.union(     # Embaixo do banco freq, vai ser adicionado mais linhas de outro banco,
    freq.select(   # Criando o segundo banco
        f.lit('Total').alias('data_de_inicio'),    # Que possui o valor 'Total' para a variável 'data_de_inicio'
        f.sum(freq.frequencia).alias('frequencia') # E a soma de todas frequências na variável 'frequencia'
    )
).show()

                                                                                

+--------------+----------+
|data_de_inicio|frequencia|
+--------------+----------+
|          2010|    154159|
|          2011|    172677|
|          2012|    232480|
|          2013|    198424|
|          2014|    202276|
|          2015|    212523|
|          2016|    265417|
|          2017|    237292|
|          2018|    275435|
|          2019|    325922|
|          2020|    400654|
|          2021|    153275|
|         Total|   2830534|
+--------------+----------+



## SparkSQL

[SparkSession.sql(sqlQuery)](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.SparkSession.sql.html)

Para saber mais sobre performance: [Artigo - Spark RDDs vs DataFrames vs SparkSQL](https://community.cloudera.com/t5/Community-Articles/Spark-RDDs-vs-DataFrames-vs-SparkSQL/ta-p/246547)

In [59]:
empresas.createOrReplaceTempView('empresasView')

In [60]:
spark.sql('SELECT * FROM empresasView').show(5)

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|        306|         FRANCAMAR REFRIGE...|             2240|                         49|                      0.0|               1|                       null|
|       1355|         BRASILEIRO & OLIV...|             2062|                         49|                      0.0|               5|                       null|
|       4820|         REGISTRO DE IMOVE...|             3034|                         32|                      0.0|               5|                       null|
|       5347|         ROSELY APARE

In [61]:
spark\
    .sql("""
        SELECT *
        FROM empresasView
        WHERE capital_social_da_empresa =50
    """).toPandas()

                                                                                

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,17350147,ERIK MARCELO DOS SANTOS 42107848858,2135,50,50.0,1,
1,17833214,ALEXANDRE MACHADO LIMA 73750123772,2135,50,50.0,1,
2,20860830,YASMIN MOURA DA FONSECA 13457709793,2135,50,50.0,1,
3,22242856,JOAO CESAR MESSIAS 08707149883,2135,50,50.0,1,
4,23238540,EVERTON ROBERTO DA SILVA 42101963809,2135,50,50.0,1,
...,...,...,...,...,...,...,...
6546,39717910,JANAINA CRISTINA GUEDES DE NOVAIS 40789379856,2135,50,50.0,1,
6547,40134726,JOSE VENITH 44292147949,2135,50,50.0,1,
6548,40983141,VALERIO ANTONIO CRISPIM DA CRUZ 53025776668,2135,50,50.0,1,
6549,41238271,MARIANGELA RAMOS PIMENTA 73179787600,2135,50,50.0,1,


In [63]:
spark\
    .sql("""
        SELECT porte_da_empresa, MEAN(capital_social_da_empresa) AS Media 
        FROM empresasView
        GROUP BY porte_da_empresa
    """).toPandas()

                                                                                

Unnamed: 0,porte_da_empresa,Media
0,,8.354219
1,1.0,339994.5
2,3.0,2601002.0
3,5.0,708660.4


In [64]:
empresas_join.createOrReplaceTempView('empresasJoinView')

In [65]:
freq = spark\
    .sql("""
        SELECT YEAR(data_de_inicio_atividade) AS data_de_inicio, COUNT(cnpj_basico) as count
        FROM empresasJoinView
        WHERE YEAR(data_de_inicio_atividade) >= 2010
        GROUP BY data_de_inicio
        ORDER BY data_de_inicio
    """)

freq.toPandas()

                                                                                

Unnamed: 0,data_de_inicio,count
0,2010,154159
1,2011,172677
2,2012,232480
3,2013,198424
4,2014,202276
5,2015,212523
6,2016,265417
7,2017,237292
8,2018,275435
9,2019,325922


In [66]:
freq.createOrReplaceTempView('freqView')

In [67]:
spark\
    .sql("""
        SELECT *
        FROM freqView
        UNION ALL
        SELECT 'Total' AS data_de_inicio, SUM(count) AS count
        FROM freqView
    """).toPandas()

                                                                                

Unnamed: 0,data_de_inicio,count
0,2010,154159
1,2011,172677
2,2012,232480
3,2013,198424
4,2014,202276
5,2015,212523
6,2016,265417
7,2017,237292
8,2018,275435
9,2019,325922


# Formas de Armazenamento
---

## Arquivos CSV

[property DataFrame.write](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.write.html)

[DataFrameWriter.csv(*args)](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrameWriter.csv.html)

In [69]:
empresas.write.csv(
    path = 'empresas/csv',
    mode = 'overwrite', # sobscreve o arquivo se já tiver outro
    sep = ';',
    header = True
)

                                                                                

## Faça como eu fiz

In [70]:
socios.write.csv(
    path = 'socios/csv',
    mode = 'overwrite', # sobscreve o arquivo se já tiver outro
    sep = ';',
    header = True
)

                                                                                

In [71]:
estabelecimentos.write.csv(
    path = 'estabelecimentos/csv',
    mode = 'overwrite', # sobscreve o arquivo se já tiver outro
    sep = ';',
    header = True
)

                                                                                

## Arquivos PARQUET

[Apache Parquet](https://parquet.apache.org/)

[DataFrameWriter.parquet(*args)](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrameWriter.parquet.html)

In [82]:
empresas.write.parquet(
    path = 'empresas/parquet',
    mode = 'overwrite' # sobscreve o arquivo se já tiver outro
)

                                                                                

In [73]:
empresas_parquet = spark.read.parquet( # Lê bem mais rápido
    'empresas/parquet'
)

In [74]:
empresas_parquet.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [None]:
socios.write.parquet(
    path = 'socios/parquet',
    mode = 'overwrite' # sobscreve o arquivo se já tiver outro
)

In [None]:
estabelecimentos.write.parquet(
    path = 'estabelecimentos/parquet',
    mode = 'overwrite' # sobscreve o arquivo se já tiver outro
)

## Particionamento dos dados

[DataFrameWriter.partitionBy(*cols)](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrameWriter.partitionBy.html)

In [75]:
empresas.coalesce(1).write.csv( # diminui o número de partições
    path = 'empresas/csv-unico',
    mode = 'overwrite', 
    sep = ';',
    header = True
)

                                                                                

In [83]:
empresas.write.parquet( # Lê bem mais rápido
    path = 'empresas/parquet-partitionBy',
    mode = 'overwrite',
    partitionBy = 'porte_da_empresa'
)

                                                                                

In [84]:
# Para encerrar a seção Spark
spark.stop()