<a href="https://colab.research.google.com/github/williambrunos/Pyspark/blob/main/Alura/Aula_1/aula0-projeto_spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Começando o Trabalho
---

## Apache Spark - Introdução

### [Apache Spark](https://spark.apache.org/)

Apache Spark é uma plataforma de computação em *cluster* que fornece uma API para programação distribuída para processamento de dados em larga escala, semelhante ao modelo *MapReduce*, mas projetada para ser rápida para consultas interativas e algoritmos iterativos.

O Spark permite que você distribua dados e tarefas em clusters com vários nós. Imagine cada nó como um computador separado. A divisão dos dados torna mais fácil o trabalho com conjuntos de dados muito grandes porque cada nó funciona processa apenas uma parte parte do volume total de dados.

O Spark é amplamente utilizado em projetos analíticos nas seguintes frentes:

- Preparação de dados
- Modelos de machine learning
- Análise de dados em tempo real

### [PySpark](https://spark.apache.org/docs/latest/api/python/index.html)

PySpark é uma interface para Apache Spark em Python. Ele não apenas permite que você escreva aplicativos Spark usando APIs Python, mas também fornece o *shell* PySpark para analisar interativamente seus dados em um ambiente distribuído. O PySpark oferece suporte à maioria dos recursos do Spark, como Spark SQL, DataFrame, Streaming, MLlib (Machine Learning) e Spark Core.

<center><img src="https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/img-001.png"/></center>

#### Spark SQL e DataFrame

Spark SQL é um módulo Spark para processamento de dados estruturados. Ele fornece uma abstração de programação chamada DataFrame e também pode atuar como mecanismo de consulta SQL distribuído.

#### Spark Streaming

Executando em cima do Spark, o recurso de *streaming* no Apache Spark possibilita o uso de poderosas aplicações interativas e analíticas em *streaming* e dados históricos, enquanto herda a facilidade de uso do Spark e as características de tolerância a falhas.

#### Spark MLlib

Construído sobre o Spark, MLlib é uma biblioteca de aprendizado de máquina escalonável que fornece um conjunto uniforme de APIs de alto nível que ajudam os usuários a criar e ajustar *pipelines* de aprendizado de máquina práticos.

#### Spark Core

Spark Core é o mecanismo de execução geral subjacente para a plataforma Spark sobre o qual todas as outras funcionalidades são construídas. Ele fornece um RDD (*Resilient Distributed Dataset*) e recursos de computação na memória.

## Utilizando o Spark no Windows

[fonte](https://spark.apache.org/docs/latest/api/python/getting_started/install.html)

#### Passo 1 - Instalando o Java

O PySpark requer a instalação do Java na versão 7 ou superior. Obtenha a versão mais recente clicando [aqui](https://www.java.com/pt-BR/download/). Para verificar a versão que está instalada em sua máquina execute a seguinte linha de código no seu *prompt*:

```
java -version
```

#### Passo 2 - Instalando o Python

O Python deve ser instalado em sua versão 2.6 ou superior. Para obter a versão mais recente clique [aqui](https://www.python.org/downloads/windows/). Para verificar a versão do Python que está instalada em sua máquina digite o seguinte comando em seu *prompt*:

```
python --version
```

#### Passo 3 - Instalando o Apache Spark 

Selecione a versão mais estável clicando [aqui](http://spark.apache.org/downloads.html). Na criação deste projeto utilizamos a versão do Spark **3.1.2** e como tipo de pacote selecionamos **Pre-built for Apache Hadoop 2.7**.

Para instalar o Apache Spark não é necessário executar um instalador, basta descomprimir os arquivos em uma pasta de sua escolha.

<font color=red>Obs.: certifique-se de que o caminho onde os arquivos do Spark foram armazenados não contenham espaços (ex.: **"C:\spark\spark-3.1.2-bin-hadoop2.7"**).</font>

Para testar o funcionamento do Spark execute os comandos abaixo em seu *prompt* de comando. Esses comandos assumem que você extraiu os arquivos do Spark na pasta **"C:\spark\"**.

```
cd C:\spark\spark-3.1.2-bin-hadoop2.7
```

```
bin\pyspark
```

O comando acima inicia o *shell* do PySpark que permite trabalhar interativamente com o Spark.

Para sair basta digitar `exit()` e logo depois presionar *Enter*. Para voltar ao *prompt* pressione *Enter* novamente.

#### Passo 4 - Instalando o findspark

```
pip install findspark
```

#### Passo 5 - Instalando o winutils

Os arquivos do Spark não incluem o utilitário **winutils.exe** que é utilizado pelo Spark no Windows. Se não informar onde o Spark deve procurar este utilitário, veremos alguns erros no console e também não conseguiremos executar *scripts* Python utilizando o utilitário `spark-submit`.

Faça o [download](https://github.com/steveloughran/winutils) para a versão do Hadoop para a qual sua instalação do Spark foi construída. Em nosso exemplo foi utilizada a [versão 2.7](https://github.com/steveloughran/winutils/tree/master/hadoop-2.7.1/bin). Faça o *download* apenas do arquivo **winutils.exe**.

Crie a pasta **"hadoop\bin"** dentro da pasta que contém os arquivos do Spark (em nosso exemplo **"C:\spark\spark-3.1.2-bin-hadoop2.7"**) e copie o arquivo **winutils.exe** para dentro desta pasta.

Crie duas variáveis de ambiente no seu Windows. A primeira chamada **SPARK_HOME** que aponta para a pasta onde os arquivos Spark foram armazenados (em nosso exemplo **"C:\spark\spark-3.1.2-bin-hadoop2.7"**). A segunda chamada **HADOOP_HOME** que aponta para **%SPARK_HOME%\hadoop** (assim podemos modificar **SPARK_HOME** sem precisar alterar **HADOOP_HOME**).

## Utilizando o Spark no Google Colab

Para facilitar o desenvolvimento de nosso projeto neste curso vamos utilizar o Google Colab como ferramenta e para configurar o PySpark basta executar os comandos abaixo na própria célula do seu *notebook*.

In [1]:
# instalar as dependências
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [3]:
import findspark
findspark.init()

# Carregamento de Dados
---

## [SparkSession](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.SparkSession.html)

O ponto de entrada para programar o Spark com a API Dataset e DataFrame.

Uma SparkSession pode ser utilizada para criar DataFrames, registrar DataFrames como tabelas, executar consultas SQL em tabelas, armazenar em cache e ler arquivos parquet. Para criar uma SparkSession, use o seguinte padrão de construtor:

In [4]:
# from pyspark.sql import SparkSession

# spark = SparkSession.builder \
#     .master('local[*]') \
#     .appName("Iniciando com Spark") \
#     .getOrCreate()

In [5]:
# spark

Se você executou o código anterior, pode perceber que o link destinado à **SPARK UI** não o direciona a nenhum site, gerando um erro. Isto ocorre por conta que o spark UI se destina a visualização de jobs do pyspark por meio de uma interface (UI) que roda localmente.

De acordo com a documentação do Spark UI:

"Apache Spark provides a suite of web user interfaces (UIs) that you can use to monitor the status and resource consumption of your Spark cluster."

[Link documentação spark UI](https://spark.apache.org/docs/latest/web-ui.html)

No entanto, como o código está sendo executado em um ambiente google colab, o servidor "local" está em algum servidor na nuvem, não nos permitindo acessar links "locais". Asism, precisamos criar um tunelamento seguro entre este serviço em nuvem e criar uma url a ser disponibilizada na web utilizando ``ngrok``.

## Acessando o [Spark UI](https://spark.apache.org/docs/latest/web-ui.html) (Google Colab)

Primeiro, configuramos a sessão spark para escutar a porta 4050

In [6]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Iniciando com Spark") \
    .config('spark.ui.port', '4050') \
    .getOrCreate()

Em seguida instalamos o ngrok

In [7]:
!wget -q https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip

Archive:  ngrok-stable-linux-amd64.zip
  inflating: ngrok                   


Validamos o ngrok com nosso token

In [8]:
get_ipython().system_raw('./ngrok authtoken YOUR AUTH TOKEN')
get_ipython().system_raw('./ngrok http 4040 &')

Infelizmente não consegui implementar este passo para a spark ui :p

In [9]:
!curl -s http://localhost:4040/api/tunnels

{"tunnels":[],"uri":"/api/tunnels"}


[Site ngrok](https://ngrok.com)

## DataFrames com Spark


### Interfaces Spark

Existem três interfaces principais do Apache Spark que você deve conhecer: Resilient Distributed Dataset, DataFrame e Dataset.

- **Resilient Distributed Dataset**: A primeira abstração do Apache Spark foi o Resilient Distributed Dataset (RDD). É uma interface para uma sequência de objetos de dados que consiste em um ou mais tipos localizados em uma coleção de máquinas (um cluster). Os RDDs podem ser criados de várias maneiras e são a API de “nível mais baixo” disponível. Embora esta seja a estrutura de dados original do Apache Spark, você deve se concentrar na API DataFrame, que é um superconjunto da funcionalidade RDD. A API RDD está disponível nas linguagens Java, Python e Scala.

- **DataFrame**: Trata-se de um conceito similar ao DataFrame que você pode estar familiarizado como o pacote pandas do Python e a linguagem R . A API DataFrame está disponível nas linguagens Java, Python, R e Scala.

- **Dataset**: uma combinação de DataFrame e RDD. Ele fornece a interface digitada que está disponível em RDDs enquanto fornece a conveniência do DataFrame. A API Dataset está disponível nas linguagens Java e Scala.

Em muitos cenários, especialmente com as otimizações de desempenho incorporadas em DataFrames e Datasets, não será necessário trabalhar com RDDs. Mas é importante entender a abstração RDD porque:

- O RDD é a infraestrutura subjacente que permite que o Spark seja executado com tanta rapidez e forneça a linhagem de dados.

- Se você estiver mergulhando em componentes mais avançados do Spark, pode ser necessário usar RDDs.

- As visualizações na Spark UI fazem referência a RDDs.

In [10]:
data = [('Zeca','35', 'M'), ('Eva', '29', 'F')]
colNames = ['Nome', 'Idade', 'Sexo']

df = spark.createDataFrame(data, colNames)
df

DataFrame[Nome: string, Idade: string, Sexo: string]

In [11]:
df.show()

+----+-----+----+
|Nome|Idade|Sexo|
+----+-----+----+
|Zeca|   35|   M|
| Eva|   29|   F|
+----+-----+----+



## Projeto

Nosso projeto consiste em ler, manipular, tratar e salvar um conjunto de dados volumosos utilizando como ferramenta o Spark.

## Carregamento de dados

### Dados Públicos CNPJ
#### Receita Federal

> [Empresas](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/empresas.zip)
> 
> [Estabelecimentos](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/estabelecimentos.zip)
> 
> [Sócios](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/socios.zip)

[Fonte original dos dados](https://www.gov.br/receitafederal/pt-br/assuntos/orientacao-tributaria/cadastros/consultas/dados-publicos-cnpj)

---
[property SparkSession.read](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.SparkSession.read.html)

[DataFrameReader.csv(*args)](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrameReader.csv.html)


### Montando nosso drive

Subimos as amostras zipadas para nosso drive e iremos trabalhar com ele a partir de agora:

In [12]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### Carregando os dados das empresas

In [13]:
# SÓ PRECISA EXECUTAR UMA VEZ!
# import zipfile 

# zipfile.ZipFile('/content/drive/MyDrive/curso-spark/empresas.zip', 'r').extractall('/content/drive/MyDrive/curso-spark')
# zipfile.ZipFile('/content/drive/MyDrive/curso-spark/estabelecimentos.zip', 'r').extractall('/content/drive/MyDrive/curso-spark')
# zipfile.ZipFile('/content/drive/MyDrive/curso-spark/socios.zip', 'r').extractall('/content/drive/MyDrive/curso-spark')

In [14]:
path = '/content/drive/MyDrive/curso-spark/empresas'

df_empresas = spark.read.csv(path, sep=';', inferSchema=True)
print(f'Número de registros no dataframe de empresas {df_empresas.count()}')

Número de registros no dataframe de empresas 4585679


## Faça como eu fiz: Estabelecimentos e Sócios

### Carregando os dados dos estabelecimentos

In [15]:
path = '/content/drive/MyDrive/curso-spark/estabelecimentos'

df_estabelecimentos = spark.read.csv(path, sep=';', inferSchema=True)
print(f'Número de registros no dataframe de estabelecimentos {df_estabelecimentos.count()}')

Número de registros no dataframe de estabelecimentos 4836219


### Carregando os dados dos sócios

In [16]:
path = '/content/drive/MyDrive/curso-spark/socios'

df_socios = spark.read.csv(path, sep=';', inferSchema=True)
print(f'Número de registros no dataframe de sócios {df_socios.count()}')

Número de registros no dataframe de sócios 2046430


# Manipulando os Dados
---

## Operações básicas

In [17]:
df_empresas.limit(5).show()

+----+--------------------+----+---+-------+---+----+
| _c0|                 _c1| _c2|_c3|    _c4|_c5| _c6|
+----+--------------------+----+---+-------+---+----+
| 306|FRANCAMAR REFRIGE...|2240| 49|   0,00|  1|null|
|1355|BRASILEIRO & OLIV...|2062| 49|   0,00|  5|null|
|4820|REGISTRO DE IMOVE...|3034| 32|   0,00|  5|null|
|5347|ROSELY APARECIDA ...|2135| 50|   0,00|  5|null|
|6846|BADU E FILHOS TEC...|2062| 49|4000,00|  1|null|
+----+--------------------+----+---+-------+---+----+



### Renomeando as colunas do DataFrame

In [18]:
empresasColNames = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']

In [19]:
for idx, col_name in enumerate(empresasColNames):
  df_empresas = df_empresas.withColumnRenamed(f'_c{idx}', col_name)
df_empresas.show(5)

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|        306|         FRANCAMAR REFRIGE...|             2240|                         49|                     0,00|               1|                       null|
|       1355|         BRASILEIRO & OLIV...|             2062|                         49|                     0,00|               5|                       null|
|       4820|         REGISTRO DE IMOVE...|             3034|                         32|                     0,00|               5|                       null|
|       5347|         ROSELY APARE

In [20]:
estabsColNames = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']

In [21]:
for idx, col_name in enumerate(estabsColNames):
  df_estabelecimentos = df_estabelecimentos.withColumnRenamed(f'_c{idx}', col_name)
df_estabelecimentos.show(5)

+-----------+----------+-------+---------------------------+-----------------+------------------+-----------------------+-------------------------+--------------------------+----+------------------------+---------------------+----------------------+------------------+--------------------+------+-----------+------------------+-------+---+---------+-----+----------+-----+----------+----------+----+------------------+-----------------+-------------------------+
|cnpj_basico|cnpj_ordem|cnpj_dv|identificador_matriz_filial|    nome_fantasia|situacao_cadastral|data_situacao_cadastral|motivo_situacao_cadastral|nome_da_cidade_no_exterior|pais|data_de_inicio_atividade|cnae_fiscal_principal|cnae_fiscal_secundaria|tipo_de_logradouro|          logradouro|numero|complemento|            bairro|    cep| uf|municipio|ddd_1|telefone_1|ddd_2|telefone_2|ddd_do_fax| fax|correio_eletronico|situacao_especial|data_da_situacao_especial|
+-----------+----------+-------+---------------------------+--------------

In [22]:
sociosColNames = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']

In [23]:
for idx, col_name in enumerate(sociosColNames):
  df_socios = df_socios.withColumnRenamed(f'_c{idx}', col_name)
df_socios.show(5)

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|        411|                     2|         LILIANA PATRICIA ...|         ***678188**|                   22|                 19940725|null|        ***000000**|                 null|                                  0|           7|
|        411|                     2|         CRISTINA HUNDERTMARK|      

## Analisando os dados

[Data Types](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#data-types)

### Dados de estabelecimentos

In [24]:
df_estabelecimentos.show(5)

+-----------+----------+-------+---------------------------+-----------------+------------------+-----------------------+-------------------------+--------------------------+----+------------------------+---------------------+----------------------+------------------+--------------------+------+-----------+------------------+-------+---+---------+-----+----------+-----+----------+----------+----+------------------+-----------------+-------------------------+
|cnpj_basico|cnpj_ordem|cnpj_dv|identificador_matriz_filial|    nome_fantasia|situacao_cadastral|data_situacao_cadastral|motivo_situacao_cadastral|nome_da_cidade_no_exterior|pais|data_de_inicio_atividade|cnae_fiscal_principal|cnae_fiscal_secundaria|tipo_de_logradouro|          logradouro|numero|complemento|            bairro|    cep| uf|municipio|ddd_1|telefone_1|ddd_2|telefone_2|ddd_do_fax| fax|correio_eletronico|situacao_especial|data_da_situacao_especial|
+-----------+----------+-------+---------------------------+--------------

In [25]:
df_estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: integer (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: integer (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: str

### Dados de empresas

In [26]:
df_empresas.show(5)

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|        306|         FRANCAMAR REFRIGE...|             2240|                         49|                     0,00|               1|                       null|
|       1355|         BRASILEIRO & OLIV...|             2062|                         49|                     0,00|               5|                       null|
|       4820|         REGISTRO DE IMOVE...|             3034|                         32|                     0,00|               5|                       null|
|       5347|         ROSELY APARE

In [27]:
df_empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



### Dados de Sócios

In [28]:
df_socios.show(5)

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|        411|                     2|         LILIANA PATRICIA ...|         ***678188**|                   22|                 19940725|null|        ***000000**|                 null|                                  0|           7|
|        411|                     2|         CRISTINA HUNDERTMARK|      

In [29]:
df_socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: integer (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



## Modificando os tipos de dados

[Functions](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#functions)

[withColumn](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.withColumn.html)

### Convertendo String ➔ Double

#### `StringType ➔ DoubleType`

In [30]:
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as f

In [31]:
df_empresas.show(5)

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|        306|         FRANCAMAR REFRIGE...|             2240|                         49|                     0,00|               1|                       null|
|       1355|         BRASILEIRO & OLIV...|             2062|                         49|                     0,00|               5|                       null|
|       4820|         REGISTRO DE IMOVE...|             3034|                         32|                     0,00|               5|                       null|
|       5347|         ROSELY APARE

In [32]:
df_empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



Perceba que nos dados de **empresas** temos uma coluna chamada ``capital_social_da_empresa``, que deveria possuir dados do tipo Double e com o separador de casas decimais como sendo o '.', e não a ','.

**obs**: para substituir um ponto na função ``regexp_replace`` precisamos passar a string ``\.``.

Podemos substituir tal caractere utilizando regex e a função ``withColumn``:

In [33]:
df_empresas = df_empresas.withColumn('capital_social_da_empresa', f.regexp_replace('capital_social_da_empresa', ',', '.'))
df_empresas.show(5)

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|        306|         FRANCAMAR REFRIGE...|             2240|                         49|                     0.00|               1|                       null|
|       1355|         BRASILEIRO & OLIV...|             2062|                         49|                     0.00|               5|                       null|
|       4820|         REGISTRO DE IMOVE...|             3034|                         32|                     0.00|               5|                       null|
|       5347|         ROSELY APARE

Agora sim podemos converter a coluna em questão para o tipo Double:

In [34]:
df_empresas = df_empresas.withColumn('capital_social_da_empresa', df_empresas['capital_social_da_empresa'].cast(DoubleType()))
df_empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



### Convertendo String ➔ Date

#### `StringType ➔ DateType`

[Datetime Patterns](https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html)

In [35]:
df = spark.createDataFrame([(20200924,), (20201022,), (20210215,)], ['data'])
df.toPandas()

Unnamed: 0,data
0,20200924
1,20201022
2,20210215


In [36]:
df = df.withColumn('data', f.to_date(df.data.cast(StringType()), 'yyyyMMdd'))
df.show(5)

+----------+
|      data|
+----------+
|2020-09-24|
|2020-10-22|
|2021-02-15|
+----------+



### Convertendo datas de estabelecimentos

In [37]:
df_estabelecimentos.show(5)

+-----------+----------+-------+---------------------------+-----------------+------------------+-----------------------+-------------------------+--------------------------+----+------------------------+---------------------+----------------------+------------------+--------------------+------+-----------+------------------+-------+---+---------+-----+----------+-----+----------+----------+----+------------------+-----------------+-------------------------+
|cnpj_basico|cnpj_ordem|cnpj_dv|identificador_matriz_filial|    nome_fantasia|situacao_cadastral|data_situacao_cadastral|motivo_situacao_cadastral|nome_da_cidade_no_exterior|pais|data_de_inicio_atividade|cnae_fiscal_principal|cnae_fiscal_secundaria|tipo_de_logradouro|          logradouro|numero|complemento|            bairro|    cep| uf|municipio|ddd_1|telefone_1|ddd_2|telefone_2|ddd_do_fax| fax|correio_eletronico|situacao_especial|data_da_situacao_especial|
+-----------+----------+-------+---------------------------+--------------

In [38]:
df_estabelecimentos = df_estabelecimentos.withColumn(
    'data_situacao_cadastral', f.to_date(df_estabelecimentos.data_situacao_cadastral.cast(StringType()), 'yyyMMdd')
)\
.withColumn(
    'data_de_inicio_atividade', f.to_date(df_estabelecimentos.data_de_inicio_atividade.cast(StringType()), 'yyyyMMdd')
)\
.withColumn(
    'data_da_situacao_especial', f.to_date(df_estabelecimentos.data_da_situacao_especial.cast(StringType()), 'yyyyMMdd')
)

### Convertendo datas de sócios

In [48]:
df_socios.show(5)

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|        411|                     2|         LILIANA PATRICIA ...|         ***678188**|                   22|                 19940725|null|        ***000000**|                 null|                                  0|           7|
|        411|                     2|         CRISTINA HUNDERTMARK|      

In [49]:
df_socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: integer (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [50]:
df_socios = df_socios\
              .withColumn('data_de_entrada_sociedade', f.to_date(df_socios.data_de_entrada_sociedade.cast(StringType()), 'yyyyMMdd'))
df_socios.show(5)

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|        411|                     2|         LILIANA PATRICIA ...|         ***678188**|                   22|               1994-07-25|null|        ***000000**|                 null|                                  0|           7|
|        411|                     2|         CRISTINA HUNDERTMARK|      

In [52]:
df_socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



# Seleções e consultas
---

## Selecionando informações
 
[DataFrame.select(*cols)](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.select.html)

### Para dados de sócios

In [55]:
df_socios.show(5, truncate=False)

+-----------+----------------------+-------------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social  |cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-------------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|411        |2                     |LILIANA PATRICIA GUASTAVINO    |***678188**         |22                   |1994-07-25               |null|***000000**        |null                 |0                                  |7           |
|411        |2                     |CRISTINA HUNDERTMARK        

In [58]:
df_socios\
  .select('*').show(5, truncate=False)

+-----------+----------------------+-------------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social  |cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-------------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|411        |2                     |LILIANA PATRICIA GUASTAVINO    |***678188**         |22                   |1994-07-25               |null|***000000**        |null                 |0                                  |7           |
|411        |2                     |CRISTINA HUNDERTMARK        

In [61]:
df_socios\
  .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada_sociedade')).show(5, truncate=False)

+-------------------------------+------------+------------------------+
|nome_do_socio_ou_razao_social  |faixa_etaria|ano_de_entrada_sociedade|
+-------------------------------+------------+------------------------+
|LILIANA PATRICIA GUASTAVINO    |7           |1994                    |
|CRISTINA HUNDERTMARK           |7           |1994                    |
|CELSO EDUARDO DE CASTRO STEPHAN|8           |1994                    |
|EDUARDO BERRINGER STEPHAN      |5           |1994                    |
|HANNE MAHFOUD FADEL            |8           |1994                    |
+-------------------------------+------------+------------------------+
only showing top 5 rows



## Faça como eu fiz

Repita este procedimento para o **DataFrame de estabelecimentos** selecionando as colunas nome_fantasia e municipio. Crie para esta seleção as colunas `ano_de_inicio_atividade` e `mes_de_inicio_atividade` que devem conter, *respectivamente*, o **ano** e o **mês** de início da atividade do estabelecimento. Estas duas últimas colunas devem ser criadas a partir da coluna `data_de_inicio_atividade`.

Como suporte utilize a documentação do Spark na [seção Functions](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#functions)

In [62]:
df_estabelecimentos.show(5, truncate=False)

+-----------+----------+-------+---------------------------+-----------------+------------------+-----------------------+-------------------------+--------------------------+----+------------------------+---------------------+----------------------+------------------+----------------------+------+-----------+------------------+-------+---+---------+-----+----------+-----+----------+----------+----+------------------+-----------------+-------------------------+
|cnpj_basico|cnpj_ordem|cnpj_dv|identificador_matriz_filial|nome_fantasia    |situacao_cadastral|data_situacao_cadastral|motivo_situacao_cadastral|nome_da_cidade_no_exterior|pais|data_de_inicio_atividade|cnae_fiscal_principal|cnae_fiscal_secundaria|tipo_de_logradouro|logradouro            |numero|complemento|bairro            |cep    |uf |municipio|ddd_1|telefone_1|ddd_2|telefone_2|ddd_do_fax|fax |correio_eletronico|situacao_especial|data_da_situacao_especial|
+-----------+----------+-------+---------------------------+----------

In [63]:
df_estabelecimentos\
  .select('cnpj_basico', 
          'cnpj_ordem', 
          'nome_fantasia', 
          f.year('data_de_inicio_atividade').alias('ano_inicio_atividade'), 
          f.month('data_de_inicio_atividade').alias('mes_inicio_atividade'))\
  .show(5, truncate=False)

+-----------+----------+-----------------+--------------------+--------------------+
|cnpj_basico|cnpj_ordem|nome_fantasia    |ano_inicio_atividade|mes_inicio_atividade|
+-----------+----------+-----------------+--------------------+--------------------+
|1879       |1         |PIRAMIDE M. C.   |1994                |5                   |
|2818       |1         |null             |1994                |5                   |
|3110       |1         |null             |1994                |5                   |
|3733       |1         |null             |1994                |5                   |
|4628       |3         |EMBROIDERY & GIFT|1995                |5                   |
+-----------+----------+-----------------+--------------------+--------------------+
only showing top 5 rows



## Identificando valores nulos

In [39]:
df = spark.createDataFrame([(1,), (2,), (3,), (None,)], ['data'])
df.toPandas()

Unnamed: 0,data
0,1.0
1,2.0
2,3.0
3,


In [40]:
df.show()

+----+
|data|
+----+
|   1|
|   2|
|   3|
|null|
+----+



In [41]:
df = spark.createDataFrame([(1.,), (2.,), (3.,), (float('nan'),)], ['data'])
df.toPandas()

Unnamed: 0,data
0,1.0
1,2.0
2,3.0
3,


In [42]:
df.show()

+----+
|data|
+----+
| 1.0|
| 2.0|
| 3.0|
| NaN|
+----+



In [43]:
df = spark.createDataFrame([('1',), ('2',), ('3',), (None,)], ['data'])
df.toPandas()

Unnamed: 0,data
0,1.0
1,2.0
2,3.0
3,


In [44]:
df.show()

+----+
|data|
+----+
|   1|
|   2|
|   3|
|null|
+----+



Agora, vamos realizar uma contagem de valores nulos presentes em cada coluna do dataframe de ``sócios``.

In [68]:
count_of_nulls_by_columns_array = [f.count(f.when(f.isnull(column), 1)).alias(column) for column in df_socios.columns]
df_socios.select(count_of_nulls_by_columns_array).show(5, truncate=False)

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+-------+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais   |representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+-------+-------------------+---------------------+-----------------------------------+------------+
|0          |0                     |208                          |1234                |0                    |1                        |2038255|0                  |1995432              |0                                  |0           |
+-----------+----------------------+------------------------

Agora, teremos que preencher estes valores ``null`` de acordo com o tipo da coluna na qual este dado pertence.

In [69]:
df_socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



Pegaremos apenas as colunas com maiores quantidades de valores `null` para trabalharmos melhor, que são: `pais: integer` e `nome_do_representante: string`. 

In [86]:
def replace_null_values_based_on_dtypes(spark_data_frame):
  """
  This function must replace the null values on spark dataframe
  received as argument based on the column dtype. If the column
  type is integer, then the null value will be replaced with 0,
  or a empty string "" otherwise.
  """

  for column_name, column_type in spark_data_frame.dtypes:
    if column_type is 'integer':
      spark_data_frame.withColumn(column_name, spark_data_frame.select(column_name).na.fill(0))
    else:
      spark_data_frame.withColumn(column_name, spark_data_frame.select(column_name).na.fill(''))

  return spark_data_frame

In [87]:
replace_null_values_based_on_dtypes(df_socios)

AssertionError: ignored

## Ordenando os dados

[DataFrame.orderBy(*cols, **kwargs)](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.orderBy.html)

In [89]:
df_socios\
  .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada_sociedade'))\
  .orderBy('ano_de_entrada_sociedade', ascending=False)\
  .show(truncate=False)

+---------------------------------+------------+------------------------+
|nome_do_socio_ou_razao_social    |faixa_etaria|ano_de_entrada_sociedade|
+---------------------------------+------------+------------------------+
|ELENICE SOUZA DA LUZ             |3           |2021                    |
|MAGDA DE ALBUQUERQUE SILVA BARROS|7           |2021                    |
|JOSE DE RIBAMAR SILVA FILHO      |6           |2021                    |
|JOSE ALCEU DO ROSARIO JUNIOR     |5           |2021                    |
|NILSON SOARES DA SILVA           |5           |2021                    |
|ROBERTA BENELLI                  |4           |2021                    |
|CLAUDIMIR ROBERTO FERREIRA       |5           |2021                    |
|LEILANE CRISTINA CARRIJO CLAUSING|4           |2021                    |
|JOSE ANTONIO DE LIMA             |7           |2021                    |
|BENILDES BARBOSA RODRIGUES       |8           |2021                    |
|EVANDO LIMONGI DE RESENDE        |5  

In [91]:
df_socios\
  .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada_sociedade'))\
  .orderBy(['faixa_etaria', 'ano_de_entrada_sociedade'], ascending=[False, True])\
  .show(truncate=False)

+------------------------------------+------------+------------------------+
|nome_do_socio_ou_razao_social       |faixa_etaria|ano_de_entrada_sociedade|
+------------------------------------+------------+------------------------+
|ANTERO DA SILVA RAMALHO CRUZ        |9           |1918                    |
|NELSON FALDINI                      |9           |1937                    |
|ALFREDO MARINHO DE QUEIROZ          |9           |1939                    |
|ALDHEMAR DOS SANTOS FERREIRA        |9           |1941                    |
|PAULO BRASIL FERREIRA VELLOSO       |9           |1941                    |
|EDGAR ERTHAL                        |9           |1941                    |
|JOAO GRICZYNSKI                     |9           |1942                    |
|OSORIO DE MORAES FILHO              |9           |1942                    |
|ROBERTO DE MORAES                   |9           |1942                    |
|GLAUCIA DE MORAES                   |9           |1942                    |

### Exercício - orderBy

In [92]:
data = [
    ('CARMINA RABELO', 4, 2010), 
    ('HERONDINA PEREIRA', 6, 2009), 
    ('IRANI DOS SANTOS', 12, 2010), 
    ('JOAO BOSCO DA FONSECA', 3, 2009), 
    ('CARLITO SOUZA', 1, 2010), 
    ('WALTER DIAS', 9, 2009), 
    ('BRENO VENTUROSO', 1, 2009), 
    ('ADELINA TEIXEIRA', 5, 2009), 
    ('ELIO SILVA', 7, 2010), 
    ('DENIS FONSECA', 6, 2010)
]
colNames = ['nome', 'mes', 'ano']
df = spark.createDataFrame(data, colNames)
df.show(truncate=False)

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|CARMINA RABELO       |4  |2010|
|HERONDINA PEREIRA    |6  |2009|
|IRANI DOS SANTOS     |12 |2010|
|JOAO BOSCO DA FONSECA|3  |2009|
|CARLITO SOUZA        |1  |2010|
|WALTER DIAS          |9  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
|ELIO SILVA           |7  |2010|
|DENIS FONSECA        |6  |2010|
+---------------------+---+----+



## Filtrando os dados

[DataFrame.where(condition)](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.where.html) ou [DataFrame.filter(condition)](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.filter.html)

In [93]:
df_empresas\
  .where('capital_social_da_empresa == 50')\
  .show(5, truncate=False)

+-----------+------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial       |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|17350147   |ERIK MARCELO DOS SANTOS 42107848858 |2135             |50                         |50.0                     |1               |null                       |
|17833214   |ALEXANDRE MACHADO LIMA 73750123772  |2135             |50                         |50.0                     |1               |null                       |
|20860830   |YASMIN MOURA DA FONSECA 13457709793 |2135             |50                         |50.0                     |1               |null                 

In [99]:
df_socios\
  .select('nome_do_socio_ou_razao_social')\
  .filter(df_socios.nome_do_socio_ou_razao_social.startswith('WILLIAM'))\
  .filter(df_socios.nome_do_socio_ou_razao_social.endswith('LIMA'))\
  .show(truncate=False)

+-------------------------------+
|nome_do_socio_ou_razao_social  |
+-------------------------------+
|WILLIAM DIEGO CIPRIANO LIMA    |
|WILLIAM JOSE LIMA              |
|WILLIAM PINHEIRO LIMA          |
|WILLIAM GUIMARAES LIMA         |
|WILLIAM MELANDI DE LIMA        |
|WILLIAM COSTA LIMA             |
|WILLIAM FERNANDO DE LIMA       |
|WILLIAM ROSA LIMA              |
|WILLIAM DE ALMEIDA LIMA        |
|WILLIAM JAMES NOGUEIRA LIMA    |
|WILLIAM ZACARIAS DE LIMA       |
|WILLIAM EUZEBIO DE LIMA        |
|WILLIAM FERREIRA DE LIMA       |
|WILLIAM COELHO TEIXEIRA DE LIMA|
|WILLIAM VEIGA DE LIMA          |
|WILLIAM BARBOSA LIMA           |
|WILLIAM RODRIGUES DE LIMA      |
|WILLIAM VIANA LIMA             |
|WILLIAM FERNANDES DE LIMA      |
|WILLIAM DE SOUSA LIMA          |
+-------------------------------+
only showing top 20 rows



## O comando LIKE

[Column.like(other)](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.Column.like.html)

**OBS**: O comando LIKE só pode ser utilizado em objetos do tipo columns do pyspark, como uma Series do pandas. Assim, ele deve sempre ser precedido por algo como `filter` ou `where`.

In [104]:
df = spark.createDataFrame([('RESTAURANTE DO RUI',), ('Juca restaurantes ltda',), ('Joca Restaurante',)], ['data'])
df.toPandas()

Unnamed: 0,data
0,RESTAURANTE DO RUI
1,Juca restaurantes ltda
2,Joca Restaurante


Pegando somente os dados que possuem **resturante** no nome, independente de como seja:

In [102]:
df\
  .where(f.upper(df.data).like('%RESTAURANTE%'))\
  .show(5, truncate=False)

+----------------------+
|data                  |
+----------------------+
|RESTAURANTE DO RUI    |
|Juca restaurantes ltda|
|Joca Restaurante      |
+----------------------+



Pegando os dados que possuam **resturante** no final do nome:

In [106]:
df\
  .where(f.upper(df.data).like('%RESTAURANTE'))\
  .show(5, truncate=False)

+----------------+
|data            |
+----------------+
|Joca Restaurante|
+----------------+



Pegando os dados que possuam **resturante** no início do nome:

In [107]:
df\
  .where(f.upper(df.data).like('RESTAURANTE%'))\
  .show(5, truncate=False)

+------------------+
|data              |
+------------------+
|RESTAURANTE DO RUI|
+------------------+



Realizando o mesmo exercício para os dados de empresa:

In [108]:
df_empresas.columns

['cnpj_basico',
 'razao_social_nome_empresarial',
 'natureza_juridica',
 'qualificacao_do_responsavel',
 'capital_social_da_empresa',
 'porte_da_empresa',
 'ente_federativo_responsavel']

In [110]:
df_empresas\
  .select('razao_social_nome_empresarial', 'natureza_juridica', 'porte_da_empresa')\
  .where(f.upper(df_empresas.razao_social_nome_empresarial).like('%RESTAURANTE%'))\
  .show(20, truncate=False)

+-------------------------------------------------------+-----------------+----------------+
|razao_social_nome_empresarial                          |natureza_juridica|porte_da_empresa|
+-------------------------------------------------------+-----------------+----------------+
|RESTAURANTE IMIGRANTE PORTUGUES LTDA.                  |2062             |5               |
|MORAIS & CARVALHO RESTAURANTE E PIZZARIA LTDA          |2062             |1               |
|BAR E RESTAURANTE PAGANOTTO LTDA                       |2062             |5               |
|RODRIGUES & RODRIGUES RESTAURANTE LTDA                 |2062             |5               |
|TEXAS RANCH BAR RESTAURANTE PRODUCOES ARTISTICAS E CULT|2062             |1               |
|V V SANTOS RESTAURANTE BAR E ATIV DESPORTIVAS LTDA     |2062             |1               |
|BAR E RESTAURANTE CASA DA QUINTA LTDA                  |2062             |1               |
|DON MUGO RESTAURANTE LTDA                              |2062         

# Agregações e Junções
---

[DataFrame.groupBy(*cols)](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.groupBy.html)

[DataFrame.agg(*exprs)](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.agg.html)

[DataFrame.summary(*statistics)](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.summary.html)

> Funções:
[approx_count_distinct](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.approx_count_distinct.html) | 
[avg](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.avg.html) | 
[collect_list](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.collect_list.html) | 
[collect_set](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.collect_set.html) | 
[countDistinct](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.countDistinct.html) | 
[count](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.count.html) | 
[grouping](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.grouping.html) | 
[first](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.first.html) | 
[last](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.last.html) | 
[kurtosis](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.kurtosis.html) | 
[max](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.max.html) | 
[min](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.min.html) | 
[mean](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.mean.html) | 
[skewness](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.skewness.html) | 
[stddev ou stddev_samp](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.stddev.html) | 
[stddev_pop](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.stddev_pop.html) | 
[sum](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.sum.html) | 
[sumDistinct](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.sumDistinct.html) | 
[variance ou var_samp](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.variance.html) | 
[var_pop](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.var_pop.html)

## Sumarizando os dados

## Juntando DataFrames - Joins

[DataFrame.join(*args)](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.join.html)

## SparkSQL

[SparkSession.sql(sqlQuery)](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.SparkSession.sql.html)

Para saber mais sobre performance: [Artigo - Spark RDDs vs DataFrames vs SparkSQL](https://community.cloudera.com/t5/Community-Articles/Spark-RDDs-vs-DataFrames-vs-SparkSQL/ta-p/246547)

# Formas de Armazenamento
---

## Arquivos CSV

[property DataFrame.write](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.write.html)

[DataFrameWriter.csv(*args)](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrameWriter.csv.html)

## Faça como eu fiz

## Arquivos PARQUET

[Apache Parquet](https://parquet.apache.org/)

[DataFrameWriter.parquet(*args)](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrameWriter.parquet.html)

## Particionamento dos dados

[DataFrameWriter.partitionBy(*cols)](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrameWriter.partitionBy.html)