# Começando o Trabalho
---

## Apache Spark - Introdução

### [Apache Spark](https://spark.apache.org/)

Apache Spark é uma plataforma de computação em *cluster* que fornece uma API para programação distribuída para processamento de dados em larga escala, semelhante ao modelo *MapReduce*, mas projetada para ser rápida para consultas interativas e algoritmos iterativos.

O Spark permite que você distribua dados e tarefas em clusters com vários nós. Imagine cada nó como um computador separado. A divisão dos dados torna mais fácil o trabalho com conjuntos de dados muito grandes porque cada nó funciona processa apenas uma parte parte do volume total de dados.

O Spark é amplamente utilizado em projetos analíticos nas seguintes frentes:

- Preparação de dados
- Modelos de machine learning
- Análise de dados em tempo real

### [PySpark](https://spark.apache.org/docs/3.1.2/api/python/index.html)

PySpark é uma interface para Apache Spark em Python. Ele não apenas permite que você escreva aplicativos Spark usando APIs Python, mas também fornece o *shell* PySpark para analisar interativamente seus dados em um ambiente distribuído. O PySpark oferece suporte à maioria dos recursos do Spark, como Spark SQL, DataFrame, Streaming, MLlib (Machine Learning) e Spark Core.

<center><img src="https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/img-001.png"/></center>

#### Spark SQL e DataFrame

Spark SQL é um módulo Spark para processamento de dados estruturados. Ele fornece uma abstração de programação chamada DataFrame e também pode atuar como mecanismo de consulta SQL distribuído.

#### Spark Streaming

Executando em cima do Spark, o recurso de *streaming* no Apache Spark possibilita o uso de poderosas aplicações interativas e analíticas em *streaming* e dados históricos, enquanto herda a facilidade de uso do Spark e as características de tolerância a falhas.

#### Spark MLlib

Construído sobre o Spark, MLlib é uma biblioteca de aprendizado de máquina escalonável que fornece um conjunto uniforme de APIs de alto nível que ajudam os usuários a criar e ajustar *pipelines* de aprendizado de máquina práticos.

#### Spark Core

Spark Core é o mecanismo de execução geral subjacente para a plataforma Spark sobre o qual todas as outras funcionalidades são construídas. Ele fornece um RDD (*Resilient Distributed Dataset*) e recursos de computação na memória.

## Utilizando o Spark no Windows

[fonte](https://spark.apache.org/docs/3.1.2/api/python/getting_started/install.html)

#### Passo 1 - Instalando o Java

O PySpark requer a instalação do Java na versão 7 ou superior. Obtenha a versão mais recente clicando [aqui](https://www.java.com/pt-BR/download/). Para verificar a versão que está instalada em sua máquina execute a seguinte linha de código no seu *prompt*:

```
java -version
```

#### Passo 2 - Instalando o Python

O Python deve ser instalado em sua versão 2.6 ou superior. Para obter a versão mais recente clique [aqui](https://www.python.org/downloads/windows/). Para verificar a versão do Python que está instalada em sua máquina digite o seguinte comando em seu *prompt*:

```
python --version
```

#### Passo 3 - Instalando o Apache Spark 

Selecione a versão mais estável clicando [aqui](http://spark.apache.org/downloads.html). Na criação deste projeto utilizamos a versão do Spark **3.1.2** e como tipo de pacote selecionamos **Pre-built for Apache Hadoop 2.7**.

Para instalar o Apache Spark não é necessário executar um instalador, basta descomprimir os arquivos em uma pasta de sua escolha.

<font color=red>Obs.: certifique-se de que o caminho onde os arquivos do Spark foram armazenados não contenham espaços (ex.: **"C:\spark\spark-3.1.2-bin-hadoop2.7"**).</font>

Para testar o funcionamento do Spark execute os comandos abaixo em seu *prompt* de comando. Esses comandos assumem que você extraiu os arquivos do Spark na pasta **"C:\spark\"**.

```
cd C:\spark\spark-3.1.2-bin-hadoop2.7
```

```
bin\pyspark
```

O comando acima inicia o *shell* do PySpark que permite trabalhar interativamente com o Spark.

Para sair basta digitar `exit()` e logo depois presionar *Enter*. Para voltar ao *prompt* pressione *Enter* novamente.

#### Passo 4 - Instalando o findspark

```
pip install findspark
```

#### Passo 5 - Instalando o winutils

Os arquivos do Spark não incluem o utilitário **winutils.exe** que é utilizado pelo Spark no Windows. Se não informar onde o Spark deve procurar este utilitário, veremos alguns erros no console e também não conseguiremos executar *scripts* Python utilizando o utilitário `spark-submit`.

Faça o [download](https://github.com/steveloughran/winutils) para a versão do Hadoop para a qual sua instalação do Spark foi construída. Em nosso exemplo foi utilizada a [versão 2.7](https://github.com/steveloughran/winutils/tree/master/hadoop-2.7.1/bin). Faça o *download* apenas do arquivo **winutils.exe**.

Crie a pasta **"hadoop\bin"** dentro da pasta que contém os arquivos do Spark (em nosso exemplo **"C:\spark\spark-3.1.2-bin-hadoop2.7"**) e copie o arquivo **winutils.exe** para dentro desta pasta.

Crie duas variáveis de ambiente no seu Windows. A primeira chamada **SPARK_HOME** que aponta para a pasta onde os arquivos Spark foram armazenados (em nosso exemplo **"C:\spark\spark-3.1.2-bin-hadoop2.7"**). A segunda chamada **HADOOP_HOME** que aponta para **%SPARK_HOME%\hadoop** (assim podemos modificar **SPARK_HOME** sem precisar alterar **HADOOP_HOME**).

In [165]:
import os
from pathlib import Path

os.environ["SPARK_HOME"] = str(Path.home() / "spark")

In [166]:
import findspark
findspark.init()

In [167]:
from pyspark.sql import SparkSession

## Utilizando o Spark no Google Colab

Para facilitar o desenvolvimento de nosso projeto neste curso vamos utilizar o Google Colab como ferramenta e para configurar o PySpark basta executar os comandos abaixo na própria célula do seu *notebook*.

In [168]:
# instalar as dependências
#!apt-get update -qq
#!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
#!tar xf spark-3.1.2-bin-hadoop2.7.tgz
#!pip install -q findspark

In [169]:
#import os
#os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
#os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [170]:
#import findspark
#findspark.init()

# Carregamento de Dados
---

## [SparkSession](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.SparkSession.html)

O ponto de entrada para programar o Spark com a API Dataset e DataFrame.

Uma SparkSession pode ser utilizada para criar DataFrames, registrar DataFrames como tabelas, executar consultas SQL em tabelas, armazenar em cache e ler arquivos parquet. Para criar uma SparkSession, use o seguinte padrão de construtor:

In [171]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
    .master('local[*]') \
    .appName("Iniciando com Spark") \
    .getOrCreate()

In [172]:
spark

## Acessando o [Spark UI](https://spark.apache.org/docs/3.1.2/web-ui.html) (Google Colab)

[Site ngrok](https://ngrok.com)

## DataFrames com Spark


### Interfaces Spark

Existem três interfaces principais do Apache Spark que você deve conhecer: Resilient Distributed Dataset, DataFrame e Dataset.

- **Resilient Distributed Dataset**: A primeira abstração do Apache Spark foi o Resilient Distributed Dataset (RDD). É uma interface para uma sequência de objetos de dados que consiste em um ou mais tipos localizados em uma coleção de máquinas (um cluster). Os RDDs podem ser criados de várias maneiras e são a API de “nível mais baixo” disponível. Embora esta seja a estrutura de dados original do Apache Spark, você deve se concentrar na API DataFrame, que é um superconjunto da funcionalidade RDD. A API RDD está disponível nas linguagens Java, Python e Scala.

- **DataFrame**: Trata-se de um conceito similar ao DataFrame que você pode estar familiarizado como o pacote pandas do Python e a linguagem R . A API DataFrame está disponível nas linguagens Java, Python, R e Scala.

- **Dataset**: uma combinação de DataFrame e RDD. Ele fornece a interface digitada que está disponível em RDDs enquanto fornece a conveniência do DataFrame. A API Dataset está disponível nas linguagens Java e Scala.

Em muitos cenários, especialmente com as otimizações de desempenho incorporadas em DataFrames e Datasets, não será necessário trabalhar com RDDs. Mas é importante entender a abstração RDD porque:

- O RDD é a infraestrutura subjacente que permite que o Spark seja executado com tanta rapidez e forneça a linhagem de dados.

- Se você estiver mergulhando em componentes mais avançados do Spark, pode ser necessário usar RDDs.

- As visualizações na Spark UI fazem referência a RDDs.

In [173]:
# Como criar um dataframe de forma simples

data = [('Zeca','35'), ('Eva', '29')]
colNames = ['Nome', 'Idade']
df = spark.createDataFrame(data, colNames)
df

DataFrame[Nome: string, Idade: string]

In [174]:
df.show()

+----+-----+
|Nome|Idade|
+----+-----+
|Zeca|   35|
| Eva|   29|
+----+-----+



In [175]:
df.toPandas()

Unnamed: 0,Nome,Idade
0,Zeca,35
1,Eva,29


## Projeto

Nosso projeto consiste em ler, manipular, tratar e salvar um conjunto de dados volumosos utilizando como ferramenta o Spark.

## Carregamento de dados

### Dados Públicos CNPJ
#### Receita Federal

> [Empresas](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/empresas.zip)
> 
> [Estabelecimentos](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/estabelecimentos.zip)
> 
> [Sócios](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/socios.zip)

[Fonte original dos dados](https://www.gov.br/receitafederal/pt-br/assuntos/orientacao-tributaria/cadastros/consultas/dados-publicos-cnpj)

---
[property SparkSession.read](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.SparkSession.read.html)

[DataFrameReader.csv(*args)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameReader.csv.html)


### Montando nosso drive

In [176]:
import io
import os
from dotenv import load_dotenv
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload

In [177]:
load_dotenv()

True

In [178]:
# Caminho para o arquivo da conta de serviço
SERVICE_ACCOUNT_FILE = os.getenv("SERVICE_ACCOUNT_PATH")

In [179]:
# Escopo de acesso ao Google Drive
SCOPES = ['https://www.googleapis.com/auth/drive']

In [180]:
# Autenticação
credentials = service_account.Credentials.from_service_account_file(
    SERVICE_ACCOUNT_FILE, scopes=SCOPES)


In [181]:
# Inicializa o serviço do Drive
service = build('drive', 'v3', credentials=credentials)

In [182]:
# Listar arquivos no Drive (compartilhados com a conta de serviço)
results = service.files().list(
    pageSize=10,
    fields="files(id, name)").execute()
items = results.get('files', [])

if not items:
    print('Nenhum arquivo encontrado.')
else:
    print('Arquivos disponíveis:')
    for item in items:
        print(f"{item['name']} (ID: {item['id']})")


Arquivos disponíveis:
estabelecimentos.zip (ID: 1yBgZTV2mtAo3RotMRpWDqbKlOx5O4rGw)
socios.zip (ID: 19Qi3hkvM57AzwtHNI7HaVxZNzBrcXnJb)
curso-spark (ID: 1fOqz6n7Eqmiya5Mvt24r_WMdp1lvVgav)
empresas.zip (ID: 1J03jKHzQhcT0CJj-wqEQq5CF9P9Jfw41)


### Carregando os dados das empresas

In [183]:
# ID do arquivo no Drive
file_id = '1J03jKHzQhcT0CJj-wqEQq5CF9P9Jfw41'

# Nome do arquivo e caminho onde o arquivo será salvo
file_name = 'empresas.zip'
output_path = f'data/zip/{file_name}'

# Garante que a pasta existe
os.makedirs(os.path.dirname(output_path), exist_ok=True)

# Download
request = service.files().get_media(fileId=file_id)
fh = io.FileIO(output_path, 'wb')
downloader = MediaIoBaseDownload(fh, request)
done = False
while not done:
    status, done = downloader.next_chunk()
    print(f"Download {int(status.progress() * 100)}%.")


Download 100%.


In [184]:
import zipfile

In [None]:
# Caminho da pasta onde vou pegar o arquivo para deszipar
folder_path = 'data/zip/'

# Nome do arquivo que eu quero deszipar
file_name = 'empresas.zip'

# f-string completa
compiled_path = f'{folder_path}{file_name}'

# Pasta de destino do desempacotamento
path_folder_unpacking = 'data/unziped_data/'

In [186]:
# Garante que a pasta existe
os.makedirs(os.path.dirname(path_folder_unpacking), exist_ok=True)

In [187]:
zipfile.ZipFile(compiled_path, 'r').extractall(path_folder_unpacking)

In [188]:
path = 'data/unziped_data/empresas'
empresas = spark.read.csv(path, sep=';', inferSchema=True)

                                                                                

In [189]:
empresas.count()

4585679

## Faça como eu fiz: Estabelecimentos e Sócios

### Carregando os dados dos estabelecimentos

In [190]:
# ID do arquivo no Drive
file_id = '1yBgZTV2mtAo3RotMRpWDqbKlOx5O4rGw'

# Nome do arquivo e caminho onde o arquivo será salvo
file_name = 'estabelecimentos.zip'
output_path = f'data/zip/{file_name}'

# Garante que a pasta existe
os.makedirs(os.path.dirname(output_path), exist_ok=True)

# Download
request = service.files().get_media(fileId=file_id)
fh = io.FileIO(output_path, 'wb')
downloader = MediaIoBaseDownload(fh, request)
done = False
while not done:
    status, done = downloader.next_chunk()
    print(f"Download {int(status.progress() * 100)}%.")

Download 33%.
Download 67%.
Download 100%.


In [191]:
# Caminho da pasta onde vou pegar o arquivo para deszipar
folder_path = 'data/zip/'

# Nome do arquivo que eu quero deszipar
file_name = 'estabelecimentos.zip'

# f-string completa
compiled_path = f'{folder_path}{file_name}'

# Pasta de destino do desempacotamento
path_folder_unpacking = 'data/unziped_data/'

In [192]:
# Garante que a pasta existe
os.makedirs(os.path.dirname(path_folder_unpacking), exist_ok=True)

In [193]:
zipfile.ZipFile(compiled_path, 'r').extractall(path_folder_unpacking)

In [194]:
path = 'data/unziped_data/estabelecimentos'
estabelecimentos = spark.read.csv(path, sep=';', inferSchema=True)

                                                                                

In [195]:
estabelecimentos.count()

4836219

### Carregando os dados dos sócios

In [196]:
# ID do arquivo no Drive
file_id = '19Qi3hkvM57AzwtHNI7HaVxZNzBrcXnJb'

# Nome do arquivo e caminho onde o arquivo será salvo
file_name = 'socios.zip'
output_path = f'data/zip/{file_name}'

# Garante que a pasta existe
os.makedirs(os.path.dirname(output_path), exist_ok=True)

# Download
request = service.files().get_media(fileId=file_id)
fh = io.FileIO(output_path, 'wb')
downloader = MediaIoBaseDownload(fh, request)
done = False
while not done:
    status, done = downloader.next_chunk()
    print(f"Download {int(status.progress() * 100)}%.")

Download 100%.


In [197]:
# Caminho da pasta onde vou pegar o arquivo para deszipar
folder_path = 'data/zip/'

# Nome do arquivo que eu quero deszipar
file_name = 'socios.zip'

# f-string completa
compiled_path = f'{folder_path}{file_name}'

# Pasta de destino do desempacotamento
path_folder_unpacking = 'data/unziped_data/'

In [198]:
# Garante que a pasta existe
os.makedirs(os.path.dirname(path_folder_unpacking), exist_ok=True)

In [199]:
zipfile.ZipFile(compiled_path, 'r').extractall(path_folder_unpacking)

In [200]:
path = 'data/unziped_data/socios'
socios = spark.read.csv(path, sep=';', inferSchema=True)

                                                                                

In [201]:
socios.count()

2046430

### Verificando as quantidades dos três DataFrames

In [202]:
print('-' * 30)
print(" Info ".center(30, "*"))

print(f"\n{'DataFrame'.center(16, ' ')} | {'Quantidade'.center(10, ' ')}")
print('-' * 30)
print(f"{'Estabelecimentos'.ljust(16, ' ')} | {str(estabelecimentos.count()).rjust(10, ' ')}")
print(f"{'Empresas'.ljust(16, ' ')} | {str(empresas.count()).rjust(10, ' ')}")
print(f"{'Sócios'.ljust(16, ' ')} | {str(socios.count()).rjust(10, ' ')}\n")
print('*' * 30)
print('-' * 30)


------------------------------
************ Info ************

   DataFrame     | Quantidade
------------------------------
Estabelecimentos |    4836219
Empresas         |    4585679
Sócios           |    2046430

******************************
------------------------------


# Manipulando os Dados
---

## Operações básicas

In [203]:
empresas.limit(5).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6
0,612,LAR DOS IDOSOS ASTROGILDO RIBEIRO,3999,16,0,5,
1,5951,DISTRIBUIDORA DE BEBIDAS CLAURITA LTDA,2062,49,0,5,
2,10428,C.R.P. & MASER. COMERCIAL E DESENHOS LTDA,2062,49,0,1,
3,11086,H. P. TEC COMERCIO E REPRESENTACAO LTDA,2062,49,400000,1,
4,11727,JUSTINO GOMES CINTRA,2135,50,0,1,


### Renomeando as colunas do DataFrame

In [204]:
# podemos perceber que as colunas não estão com os nomes que esperamos
empresas.columns

['_c0', '_c1', '_c2', '_c3', '_c4', '_c5', '_c6']

In [205]:
empresasColNames = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']

In [206]:
for index, colNames in enumerate(empresasColNames):
    empresas = empresas.withColumnRenamed(f'_c{index}', colNames)

In [207]:
# agora acabamos de renomear, vamos verificar se foi de fato alterado
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,612,LAR DOS IDOSOS ASTROGILDO RIBEIRO,3999,16,0,5,
1,5951,DISTRIBUIDORA DE BEBIDAS CLAURITA LTDA,2062,49,0,5,
2,10428,C.R.P. & MASER. COMERCIAL E DESENHOS LTDA,2062,49,0,1,
3,11086,H. P. TEC COMERCIO E REPRESENTACAO LTDA,2062,49,400000,1,
4,11727,JUSTINO GOMES CINTRA,2135,50,0,1,


In [208]:
estabsColNames = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']

In [209]:
estabelecimentos.columns

['_c0',
 '_c1',
 '_c2',
 '_c3',
 '_c4',
 '_c5',
 '_c6',
 '_c7',
 '_c8',
 '_c9',
 '_c10',
 '_c11',
 '_c12',
 '_c13',
 '_c14',
 '_c15',
 '_c16',
 '_c17',
 '_c18',
 '_c19',
 '_c20',
 '_c21',
 '_c22',
 '_c23',
 '_c24',
 '_c25',
 '_c26',
 '_c27',
 '_c28',
 '_c29']

In [210]:
for index, colNames in enumerate(estabsColNames):
    estabelecimentos = estabelecimentos.withColumnRenamed(f'_c{index}', colNames)

In [211]:
estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,306,1,48,1,,2,20051103,0,,,...,7107,,,,,,,,,
1,1355,1,3,1,,8,20081231,71,,,...,7231,,,,,,,,,
2,4820,1,51,1,CARTORIO FELIX DE SOUZA,2,19980728,0,,,...,9431,,,,,,,,,
3,5347,1,27,1,QUIOSQUE,8,20000105,1,,,...,6469,,,,,,,,,
4,6846,1,39,1,BADU,8,20080903,54,,,...,7107,,,,,,,,,


In [212]:
sociosColNames = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']

In [213]:
socios.columns

['_c0', '_c1', '_c2', '_c3', '_c4', '_c5', '_c6', '_c7', '_c8', '_c9', '_c10']

In [214]:
for index, colNames in enumerate(sociosColNames):
    socios = socios.withColumnRenamed(f'_c{index}', colNames)

In [215]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,19940725,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,19940725,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,19940516,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,19940516,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,19940609,,***000000**,,0,8


## Analisando os dados

[Data Types](https://spark.apache.org/docs/3.1.2/api/python/reference/pyspark.sql.html#data-types)

In [224]:
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,612,LAR DOS IDOSOS ASTROGILDO RIBEIRO,3999,16,0,5,
1,5951,DISTRIBUIDORA DE BEBIDAS CLAURITA LTDA,2062,49,0,5,
2,10428,C.R.P. & MASER. COMERCIAL E DESENHOS LTDA,2062,49,0,1,
3,11086,H. P. TEC COMERCIO E REPRESENTACAO LTDA,2062,49,400000,1,
4,11727,JUSTINO GOMES CINTRA,2135,50,0,1,


In [None]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



25/08/01 14:12:04 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 122076 ms exceeds timeout 120000 ms
25/08/01 14:12:04 WARN SparkContext: Killing executors is not supported by current scheduler.
25/08/01 14:12:04 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

In [226]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,19940725,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,19940725,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,19940516,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,19940516,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,19940609,,***000000**,,0,8


In [None]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: integer (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



25/08/01 14:12:35 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1240)
	at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:296)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)

In [None]:
estabelecimentos.limit(5).toPandas()

## Modificando os tipos de dados

[Functions](https://spark.apache.org/docs/3.1.2/api/python/reference/pyspark.sql.html#functions)

[withColumn](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.withColumn.html)

### Convertendo String ➔ Double

#### `StringType ➔ DoubleType`

### Convertendo String ➔ Date

#### `StringType ➔ DateType`

[Datetime Patterns](https://spark.apache.org/docs/3.1.2/sql-ref-datetime-pattern.html)

In [216]:
df = spark.createDataFrame([(20200924,), (20201022,), (20210215,)], ['data'])
df.toPandas()

Unnamed: 0,data
0,20200924
1,20201022
2,20210215


# Seleções e consultas
---

## Selecionando informações
 
[DataFrame.select(*cols)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.select.html)

## Faça como eu fiz

## Identificando valores nulos

In [217]:
df = spark.createDataFrame([(1,), (2,), (3,), (None,)], ['data'])
df.toPandas()

Unnamed: 0,data
0,1.0
1,2.0
2,3.0
3,


In [218]:
df.show()

+----+
|data|
+----+
|   1|
|   2|
|   3|
|NULL|
+----+



In [219]:
df = spark.createDataFrame([(1.,), (2.,), (3.,), (float('nan'),)], ['data'])
df.toPandas()

Unnamed: 0,data
0,1.0
1,2.0
2,3.0
3,


In [220]:
df.show()

+----+
|data|
+----+
| 1.0|
| 2.0|
| 3.0|
| NaN|
+----+



In [221]:
df = spark.createDataFrame([('1',), ('2',), ('3',), (None,)], ['data'])
df.toPandas()

Unnamed: 0,data
0,1.0
1,2.0
2,3.0
3,


In [222]:
df.show()

+----+
|data|
+----+
|   1|
|   2|
|   3|
|NULL|
+----+



## Ordenando os dados

[DataFrame.orderBy(*cols, **kwargs)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.orderBy.html)

## Filtrando os dados

[DataFrame.where(condition)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.where.html) ou [DataFrame.filter(condition)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.filter.html)

## O comando LIKE

[Column.like(other)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.Column.like.html)

In [223]:
df = spark.createDataFrame([('RESTAURANTE DO RUI',), ('Juca restaurantes ltda',), ('Joca Restaurante',)], ['data'])
df.toPandas()

Unnamed: 0,data
0,RESTAURANTE DO RUI
1,Juca restaurantes ltda
2,Joca Restaurante


# Agregações e Junções
---

[DataFrame.groupBy(*cols)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.groupBy.html)

[DataFrame.agg(*exprs)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.agg.html)

[DataFrame.summary(*statistics)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.summary.html)

> Funções:
[approx_count_distinct](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.approx_count_distinct.html) | 
[avg](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.avg.html) | 
[collect_list](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.collect_list.html) | 
[collect_set](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.collect_set.html) | 
[countDistinct](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.countDistinct.html) | 
[count](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.count.html) | 
[grouping](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.grouping.html) | 
[first](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.first.html) | 
[last](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.last.html) | 
[kurtosis](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.kurtosis.html) | 
[max](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.max.html) | 
[min](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.min.html) | 
[mean](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.mean.html) | 
[skewness](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.skewness.html) | 
[stddev ou stddev_samp](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.stddev.html) | 
[stddev_pop](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.stddev_pop.html) | 
[sum](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.sum.html) | 
[sumDistinct](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.sumDistinct.html) | 
[variance ou var_samp](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.variance.html) | 
[var_pop](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.var_pop.html)

## Sumarizando os dados

## Juntando DataFrames - Joins

[DataFrame.join(*args)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.join.html)

## SparkSQL

[SparkSession.sql(sqlQuery)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.SparkSession.sql.html)

Para saber mais sobre performance: [Artigo - Spark RDDs vs DataFrames vs SparkSQL](https://community.cloudera.com/t5/Community-Articles/Spark-RDDs-vs-DataFrames-vs-SparkSQL/ta-p/246547)

# Formas de Armazenamento
---

## Arquivos CSV

[property DataFrame.write](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.write.html)

[DataFrameWriter.csv(*args)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.csv.html)

## Faça como eu fiz

## Arquivos PARQUET

[Apache Parquet](https://parquet.apache.org/)

[DataFrameWriter.parquet(*args)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.parquet.html)

## Particionamento dos dados

[DataFrameWriter.partitionBy(*cols)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.partitionBy.html)