## Spark com Python
### Apresentando a Ferramenta

In [1]:
# instalar as dependências
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
# cria variaveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [3]:
#instalar as dependências [findspark]
#!pip install -q findspark

## 1. SparkSession

In [4]:
#importa e [findspark]
import findspark
findspark.init()

In [5]:
#O ponto de entrada para programar o Spark com a API Dataset e DataFrame.
#Uma SparkSession pode ser utilizada para criar DataFrames, registrar DataFrames como tabelas, executar consultas SQL em tabelas, armazenar em cache e ler arquivos parquet.
#Para criar uma SparkSession, use o seguinte padrão de construtor:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Iniciando com Spark") \
    .config('spark.ui.port', '4050') \
    .getOrCreate()

## ngrok

In [6]:
#Logo depois, fazemos o download e extração dos arquivos do ngrok:
!wget -q https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip

Archive:  ngrok-stable-linux-amd64.zip
  inflating: ngrok                   


In [7]:
!pip install pyngrok
from pyngrok import ngrok

Collecting pyngrok
  Downloading pyngrok-7.1.6-py3-none-any.whl (22 kB)
Installing collected packages: pyngrok
Successfully installed pyngrok-7.1.6


In [8]:
#No próximo conjunto de comandos, devemos configurar o nosso Authtoken obtido no site do ngrok.

#get_ipython().system_raw('./ngrok authtoken 2icn9Kzzz3QPQDhsaeROzH7HdWW_21bpLyk4T9tFrfPVN87w6')
#get_ipython().system_raw('./ngrok http 4050 &')

!ngrok authtoken "2icyQfr7W7Yw3ne0swTY6APrGl0_87BP89iDQ7a58M5wuBQTv"

Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml


In [9]:
!ngrok config add-authtoken "2icyQfr7W7Yw3ne0swTY6APrGl0_87BP89iDQ7a58M5wuBQTv"

Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml


In [10]:
get_ipython().system_raw('./ngrok http 4050 &')

In [11]:
import time
time.sleep(10)

!curl -s http://localhost:4040/api/tunnels
#spark

## 2. DataFrames com Spark

In [12]:
#criar um DataFrame de maneira simples
rawData = [('Juciandro', '26'), ('Henry', '3')] #lista com tuplas
colName = ['Name', 'Idade']

In [13]:
df = spark.createDataFrame(rawData, colName)
df

DataFrame[Name: string, Idade: string]

In [14]:
df.show()

+---------+-----+
|     Name|Idade|
+---------+-----+
|Juciandro|   26|
|    Henry|    3|
+---------+-----+



In [15]:
df.toPandas()

Unnamed: 0,Name,Idade
0,Juciandro,26
1,Henry,3


In [16]:
#Exemplos de criação de dataFrames simples

# 1.
# data = [('Zeca','35'), ('Eva', '29')]
# colNames = ['Nome', 'Idade']
# df = spark.createDataFrame(data, colNames)
# df.toPandas()
# 2.
# data = [{'Nome': 'Zeca', 'Idade': '35'}, {'Nome': 'Eva', 'Idade': '29'}]
# df = spark.createDataFrame(data)
# df.toPandas()
# 3.
# import pandas as pd
# data = [{'Nome': 'Zeca', 'Idade': '35'}, {'Nome': 'Eva', 'Idade': '29'}]
# pandas_df = pd.DataFrame(data)
# df = spark.createDataFrame(pandas_df)
# df.toPandas()

## Projeto
Nosso projeto consiste em ler, manipular, tratar e salvar um conjunto de dados volumosos utilizando como ferramenta o Spark.

## Carregamento de dados

### Dados Públicos CNPJ
#### Receita Federal

> [Empresas](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/empresas.zip)
>
> [Estabelecimentos](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/estabelecimentos.zip)
>
> [Sócios](https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/socios.zip)

[Fonte original dos dados](https://www.gov.br/receitafederal/pt-br/assuntos/orientacao-tributaria/cadastros/consultas/dados-publicos-cnpj)

---
[property SparkSession.read](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.SparkSession.read.html)

[DataFrameReader.csv(*args)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameReader.csv.html)

### Montando nosso drive

In [18]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### Carregando os dados das empresas

In [19]:
#é uma biblioteca padrão do Python, para realizarmos essas extrações.
import zipfile

In [20]:
#Ao extrair um arquivo, precisamos passá-lo para outro lugar, então acrescentamos o método extractall() que deve receber como parâmetro o caminho da pasta "curso_spark".
zipfile.ZipFile('/content/drive/MyDrive/curso_spark/empresas.zip', 'r').extractall('/content/drive/MyDrive/curso_spark')

In [21]:
#Na linha seguinte, chamaremos a função que fará a leitura dos arquivos CSV presentes nesta pasta e os transformará em um dataframe.
#Para isso, criaremos uma variável empresa na qual chamaremos o spark e o método de leitura .read.csv(), que receberá path como parâmetro.
#Passaremos, também, um separador como parâmetro de .read.csv(). Como no site da Receita Federal o separador é ponto e vírgula, aqui também será este, portanto, sep=';'.
#Por último, passamos o parâmetro inferSchema=True a fim de que ele identifique o tipo de dado - string, número, entre outros.
pathEmpresas = '/content/drive/MyDrive/curso_spark/empresas'
empresas = spark.read.csv(pathEmpresas, sep=';', inferSchema=True)

In [22]:
print("Total Empresas: ", empresas.count())

Total Empresas:  4585679


### Carregando os dados dos Socios

In [23]:
zipfile.ZipFile('/content/drive/MyDrive/curso_spark/socios.zip', 'r').extractall('/content/drive/MyDrive/curso_spark')

In [24]:
pathSocios = '/content/drive/MyDrive/curso_spark/socios'
socios = spark.read.csv(pathSocios, sep=';', inferSchema=True)

In [25]:
print("Total Socios: ", socios.count())

Total Socios:  2046430


### Carregando os dados dos Estabelecimentos

In [26]:
zipfile.ZipFile('/content/drive/MyDrive/curso_spark/estabelecimentos.zip', 'r').extractall('/content/drive/MyDrive/curso_spark')

In [27]:
pathEstabelecimentos = '/content/drive/MyDrive/curso_spark/estabelecimentos'
estabelecimentos = spark.read.csv(pathEstabelecimentos, sep=';', inferSchema=True)

In [28]:
print("Total Estabelecimentos: ", estabelecimentos.count())

Total Estabelecimentos:  4836219


## Manipulando os Dados

### Operações Básicas

In [29]:
# visualizar somente os cinco primeiros itens do dataframe. O método .toPandas() que deve nos trazer essa visualização um pouco mais semelhante à visualização que o Pandas nos traz.
empresas.limit(5).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,400000,1,


In [30]:
#Criaremos uma variável empresasColNames, que diz respeito aos nomes das colunas de empresas. Nesta variável, passaremos os novos nomes das colunas em um array.
empresasColNames = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']

In [31]:
#loop for que deve percorrer o array e, para cada item, chamará a função enumerate(), que receberá a variável empresasColNames.

#for item in enumerate(empresasColNames):
#  print(item)

#index, colName, o que fará com que os índices da tupla sejam atribuídos à variável colName, ou seja, ao nome da coluna.

for index, colName in enumerate(empresasColNames):
  empresas = empresas.withColumnRenamed(f"_c{index}", colName)

#empresas.columns
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,400000,1,


In [32]:
sociosColNames = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']

In [33]:
for index, colName in enumerate(sociosColNames):
  socios = socios.withColumnRenamed(f"_c{index}", colName)

#socios.columns()
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,19940725,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,19940725,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,19940516,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,19940516,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,19940609,,***000000**,,0,8


In [34]:
estabsColNames = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']

In [35]:
for index, colName in enumerate(estabsColNames):
  estabelecimentos = estabelecimentos.withColumnRenamed(f"_c{index}", colName)

#estabelecimentos.columns()
estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,1879,1,96,1,PIRAMIDE M. C.,8,20011029,1,,,...,7107,,,,,,,,,
1,2818,1,43,1,,8,20081231,71,,,...,7107,,,,,,,,,
2,3110,1,7,1,,8,19971231,1,,,...,7107,,,,,,,,,
3,3733,1,80,1,,8,20081231,71,,,...,7107,,,,,,,,,
4,4628,3,27,2,EMBROIDERY & GIFT,8,19980429,1,,,...,7075,,,,,,,,,


## Analisando os dados

[Data Types](https://spark.apache.org/docs/3.1.2/api/python/reference/pyspark.sql.html#data-types)

In [36]:
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,400000,1,


In [37]:
#imprime o esquema do dataframe no formato da árvore, juntamente com o nome da coluna e o tipo de dado
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: string (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [38]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,19940725,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,19940725,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,19940516,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,19940516,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,19940609,,***000000**,,0,8


In [39]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: integer (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [40]:
estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,1879,1,96,1,PIRAMIDE M. C.,8,20011029,1,,,...,7107,,,,,,,,,
1,2818,1,43,1,,8,20081231,71,,,...,7107,,,,,,,,,
2,3110,1,7,1,,8,19971231,1,,,...,7107,,,,,,,,,
3,3733,1,80,1,,8,20081231,71,,,...,7107,,,,,,,,,
4,4628,3,27,2,EMBROIDERY & GIFT,8,19980429,1,,,...,7075,,,,,,,,,


In [41]:
estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: integer (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: integer (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: str

## Modificando os tipos de dados

[Functions](https://spark.apache.org/docs/3.1.2/api/python/reference/pyspark.sql.html#functions)

[withColumn](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.withColumn.html)

### Convertendo String ➔ Double

#### `StringType ➔ DoubleType`

In [42]:
#importação das classes DoubleType e StringType, que fazem parte do pacote SQL do PySpark.
#Pacote de funções que chamaremos de f

from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as f

In [43]:
#Este método, por sua vez, recebe a variável onde será a feita a substituição - que é a mesma que a coluna - 'capital_social_da_empresa', o item que deve ser substituído ',' e o item pelo qual deve ser feita a substituição '.'.
#Ou seja, toda vez que a vírgula for encontrada em 'capital_social_da_empresa', será substituída pelo ponto.

empresas = empresas.withColumn('capital_social_da_empresa', f.regexp_replace('capital_social_da_empresa', ',', '.'))

In [44]:
empresas.limit(5).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0.0,1,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0.0,5,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0.0,5,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0.0,5,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,4000.0,1,


In [45]:
#o método .cast(), que transforma string em double, e a classe DoubleType() como parâmetro.

empresas = empresas.withColumn('capital_social_da_empresa', empresas['capital_social_da_empresa'].cast(DoubleType()))

In [46]:
empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [47]:
#Assinale a opção que indica os passos corretos para realizar essa transformação.
#Obs.: Considere df como uma DataFrame genérico que possui a coluna valor com a informação apresentada acima (“1.000.000,00”).

#df.withColumn(“valor”, f.regexp_replace(“valor”, “\.”, “”))
#df = df.withColumn(“valor”, f.regexp_replace(“valor”, “,”, “.”))
#df = df.withColumn(“valor”, df["valor"].cast(DoubleType()))

#Alternativa correta! Primeiro retiramos os separadores de milhar que são desnecessários, depois modificamos o separador decimal de “,” para “.” e por fim convertemos a string para double.

### Convertendo String ➔ Date

#### `StringType ➔ DateType`

[Datetime Patterns](https://spark.apache.org/docs/3.1.2/sql-ref-datetime-pattern.html)

In [48]:
df = spark.createDataFrame([(20240701,), (20240702,), (20240703,)], ['datas'])
df.printSchema()

root
 |-- datas: long (nullable = true)



In [49]:
df = df.withColumn("datas", f.to_date(df.datas.cast(StringType()), 'yyyyMMdd'))
df.printSchema()

root
 |-- datas: date (nullable = true)



In [50]:
df.toPandas()

Unnamed: 0,datas
0,2024-07-01
1,2024-07-02
2,2024-07-03


## Estabelecimentos

In [51]:
estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,1879,1,96,1,PIRAMIDE M. C.,8,20011029,1,,,...,7107,,,,,,,,,
1,2818,1,43,1,,8,20081231,71,,,...,7107,,,,,,,,,
2,3110,1,7,1,,8,19971231,1,,,...,7107,,,,,,,,,
3,3733,1,80,1,,8,20081231,71,,,...,7107,,,,,,,,,
4,4628,3,27,2,EMBROIDERY & GIFT,8,19980429,1,,,...,7075,,,,,,,,,


In [52]:
estabelecimentos = estabelecimentos\
  .withColumn(
      "data_situacao_cadastral",
      f.to_date(estabelecimentos.data_situacao_cadastral.cast(StringType()), 'yyyyMMdd')
  )\
  .withColumn(
      "data_de_inicio_atividade",
      f.to_date(estabelecimentos.data_de_inicio_atividade.cast(StringType()), 'yyyyMMdd')
  )\
  .withColumn(
      "data_da_situacao_especial",
      f.to_date(estabelecimentos.data_da_situacao_especial.cast(StringType()), 'yyyyMMdd')
  )

estabelecimentos.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [53]:
estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,1879,1,96,1,PIRAMIDE M. C.,8,2001-10-29,1,,,...,7107,,,,,,,,,
1,2818,1,43,1,,8,2008-12-31,71,,,...,7107,,,,,,,,,
2,3110,1,7,1,,8,1997-12-31,1,,,...,7107,,,,,,,,,
3,3733,1,80,1,,8,2008-12-31,71,,,...,7107,,,,,,,,,
4,4628,3,27,2,EMBROIDERY & GIFT,8,1998-04-29,1,,,...,7075,,,,,,,,,


In [54]:
estabelecimentos\
        .select('data_situacao_cadastral', 'data_de_inicio_atividade', 'data_da_situacao_especial')\
        .show(5)

+-----------------------+------------------------+-------------------------+
|data_situacao_cadastral|data_de_inicio_atividade|data_da_situacao_especial|
+-----------------------+------------------------+-------------------------+
|             2001-10-29|              1994-05-09|                     null|
|             2008-12-31|              1994-05-12|                     null|
|             1997-12-31|              1994-05-12|                     null|
|             2008-12-31|              1994-05-13|                     null|
|             1998-04-29|              1995-05-09|                     null|
+-----------------------+------------------------+-------------------------+
only showing top 5 rows



In [55]:
socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: integer (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [56]:
socios = socios\
  .withColumn(
      "data_de_entrada_sociedade",
      f.to_date(socios.data_de_entrada_sociedade.cast(StringType()), 'yyyyMMdd')
  )

socios.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- identificador_de_socio: integer (nullable = true)
 |-- nome_do_socio_ou_razao_social: string (nullable = true)
 |-- cnpj_ou_cpf_do_socio: string (nullable = true)
 |-- qualificacao_do_socio: integer (nullable = true)
 |-- data_de_entrada_sociedade: date (nullable = true)
 |-- pais: integer (nullable = true)
 |-- representante_legal: string (nullable = true)
 |-- nome_do_representante: string (nullable = true)
 |-- qualificacao_do_representante_legal: integer (nullable = true)
 |-- faixa_etaria: integer (nullable = true)



In [57]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,,***000000**,,0,8


# Seleções e consultas
---

## Selecionando informações

[DataFrame.select(*cols)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.select.html)

In [58]:
empresas\
  .select('*')\
  .show(5)

+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial|natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|        306|         FRANCAMAR REFRIGE...|             2240|                         49|                      0.0|               1|                       null|
|       1355|         BRASILEIRO & OLIV...|             2062|                         49|                      0.0|               5|                       null|
|       4820|         REGISTRO DE IMOVE...|             3034|                         32|                      0.0|               5|                       null|
|       5347|         ROSELY APARE

In [59]:
#O parametro "False" é para trazer os dados completos
empresas\
  .select('cnpj_basico', 'razao_social_nome_empresarial', 'capital_social_da_empresa')\
  .show(5, False)

+-----------+--------------------------------------------------------------------------------------------+-------------------------+
|cnpj_basico|razao_social_nome_empresarial                                                               |capital_social_da_empresa|
+-----------+--------------------------------------------------------------------------------------------+-------------------------+
|306        |FRANCAMAR REFRIGERACAO TECNICA S/C LTDA                                                     |0.0                      |
|1355       |BRASILEIRO & OLIVEIRA LTDA                                                                  |0.0                      |
|4820       |REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E TABELIONATO E REGISTRO DE CONSTRATOS MARITIMOS|0.0                      |
|5347       |ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS                                                |0.0                      |
|6846       |BADU E FILHOS TECIDOS LTDA                              

In [60]:
#o ano em que o sócio entrou na sociedade
socios\
  .select('cnpj_basico', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
  .show(5, False)

+-----------+-------------------------------+--------------------+--------------+
|cnpj_basico|nome_do_socio_ou_razao_social  |cnpj_ou_cpf_do_socio|ano_de_entrada|
+-----------+-------------------------------+--------------------+--------------+
|411        |LILIANA PATRICIA GUASTAVINO    |***678188**         |1994          |
|411        |CRISTINA HUNDERTMARK           |***637848**         |1994          |
|5813       |CELSO EDUARDO DE CASTRO STEPHAN|***786068**         |1994          |
|5813       |EDUARDO BERRINGER STEPHAN      |***442348**         |1994          |
|14798      |HANNE MAHFOUD FADEL            |***760388**         |1994          |
+-----------+-------------------------------+--------------------+--------------+
only showing top 5 rows



In [61]:
estabelecimentos\
  .select('nome_fantasia', 'municipio', f.year('data_de_inicio_atividade').alias('ano_de_inicio_atividade'), f.month('data_de_inicio_atividade').alias('mes_de_inicio_atividade'))\
  .show(5, False)

+-----------------+---------+-----------------------+-----------------------+
|nome_fantasia    |municipio|ano_de_inicio_atividade|mes_de_inicio_atividade|
+-----------------+---------+-----------------------+-----------------------+
|PIRAMIDE M. C.   |7107     |1994                   |5                      |
|null             |7107     |1994                   |5                      |
|null             |7107     |1994                   |5                      |
|null             |7107     |1994                   |5                      |
|EMBROIDERY & GIFT|7075     |1995                   |5                      |
+-----------------+---------+-----------------------+-----------------------+
only showing top 5 rows



In [62]:
#Exercicio

data = [
    ('GISELLE PAULA GUIMARAES CASTRO', 15),
    ('ELAINE GARCIA DE OLIVEIRA', 22),
    ('JOAO CARLOS ABNER DE LOURDES', 43),
    ('MARTA ZELI FERREIRA', 24),
    ('LAUDENETE WIGGERS ROEDER', 51)
]
colNames = ['nome', 'idade']
df = spark.createDataFrame(data, colNames)
df.show(truncate=False)

+------------------------------+-----+
|nome                          |idade|
+------------------------------+-----+
|GISELLE PAULA GUIMARAES CASTRO|15   |
|ELAINE GARCIA DE OLIVEIRA     |22   |
|JOAO CARLOS ABNER DE LOURDES  |43   |
|MARTA ZELI FERREIRA           |24   |
|LAUDENETE WIGGERS ROEDER      |51   |
+------------------------------+-----+



In [63]:
#A função concat_ws concatena (junta) várias colunas de strings em uma única coluna utilizando como separador a informação fornecida no primeiro argumento.
#Outra função que utilizamos foi a substring_index que divide uma string de acordo com um delimitador específico e retorna a substring de acordo com sua posição.
#Observe que quando queremos pegar a última ocorrência podemos utilizar o índice -1.

df \
    .select(
        f.concat_ws(
            ', ',
            f.substring_index('nome', ' ', -1),
            f.substring_index('nome', ' ', 1)
        ).alias('ident'),
        'idade') \
    .show(truncate=False)

+-----------------+-----+
|ident            |idade|
+-----------------+-----+
|CASTRO, GISELLE  |15   |
|OLIVEIRA, ELAINE |22   |
|LOURDES, JOAO    |43   |
|FERREIRA, MARTA  |24   |
|ROEDER, LAUDENETE|51   |
+-----------------+-----+



## Identificando valores nulos

In [64]:
df = spark.createDataFrame([(1,), (2,), (3,), (None,)], ['dados'])

df.toPandas()

Unnamed: 0,dados
0,1.0
1,2.0
2,3.0
3,


In [65]:
df.show()

+-----+
|dados|
+-----+
|    1|
|    2|
|    3|
| null|
+-----+



In [66]:
df = spark.createDataFrame([(1.,), (2.,), (3.,), (float('nan'),)], ['dados'])

df.toPandas()

Unnamed: 0,dados
0,1.0
1,2.0
2,3.0
3,


In [67]:
df.show()

+-----+
|dados|
+-----+
|  1.0|
|  2.0|
|  3.0|
|  NaN|
+-----+



In [68]:
df = spark.createDataFrame([('1',), ('2',), ('3',), (None,)], ['dados'])

df.toPandas()

Unnamed: 0,dados
0,1.0
1,2.0
2,3.0
3,


In [69]:
df.show()

+-----+
|dados|
+-----+
|    1|
|    2|
|    3|
| null|
+-----+



In [70]:
socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,,***000000**,,0,8


In [71]:
#Retorna um dataframe indicando a quantidade de vezes que ocorre valor nulo em cada coluna.
socios\
  .select([f.count(f.when(f.col(c).isNull(), 1)).alias(c) for c in socios.columns]).show()

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+-------+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|   pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+-------+-------------------+---------------------+-----------------------------------+------------+
|          0|                     0|                          208|                1234|                    0|                        1|2038255|                  0|              1995432|                                  0|           0|
+-----------+----------------------+------------------------

In [72]:
# Este comando ordena que os valores nulos, ou seja na, sejam substituídos por 0

socios.na.fill(0).limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,0,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,0,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,0,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,0,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,0,***000000**,,0,8


In [73]:
socios.na.fill('-').limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,,***000000**,-,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,,***000000**,-,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,,***000000**,-,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,,***000000**,-,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,,***000000**,-,0,8


## Ordenando os dados

[DataFrame.orderBy(*cols, **kwargs)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.orderBy.html)

In [74]:
socios\
  .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
  .show(5, False)

+-------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social  |faixa_etaria|ano_de_entrada|
+-------------------------------+------------+--------------+
|LILIANA PATRICIA GUASTAVINO    |7           |1994          |
|CRISTINA HUNDERTMARK           |7           |1994          |
|CELSO EDUARDO DE CASTRO STEPHAN|8           |1994          |
|EDUARDO BERRINGER STEPHAN      |5           |1994          |
|HANNE MAHFOUD FADEL            |8           |1994          |
+-------------------------------+------------+--------------+
only showing top 5 rows



In [75]:
#retorna um dataframe em que a faixa etária e o ano estão ordenados de maneira decrescente.
socios\
  .select('nome_do_socio_ou_razao_social', 'faixa_etaria', f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
  .orderBy(['ano_de_entrada', 'faixa_etaria'], ascending = [False, False])\
  .show(5, False)

+-------------------------------+------------+--------------+
|nome_do_socio_ou_razao_social  |faixa_etaria|ano_de_entrada|
+-------------------------------+------------+--------------+
|MARIA RAIMUNDA DOS SANTOS LANZA|9           |2021          |
|MARIA JOSE DOMINGUES BONATO    |9           |2021          |
|DORIS PEREIRA GOMES JAZRA      |9           |2021          |
|NADIR BICHARA CHUAHY           |9           |2021          |
|RAIMUNDA TORRES MARTINS        |9           |2021          |
+-------------------------------+------------+--------------+
only showing top 5 rows



In [76]:
data = [
    ('CARMINA RABELO', 4, 2010),
    ('HERONDINA PEREIRA', 6, 2009),
    ('IRANI DOS SANTOS', 12, 2010),
    ('JOAO BOSCO DA FONSECA', 3, 2009),
    ('CARLITO SOUZA', 1, 2010),
    ('WALTER DIAS', 9, 2009),
    ('BRENO VENTUROSO', 1, 2009),
    ('ADELINA TEIXEIRA', 5, 2009),
    ('ELIO SILVA', 7, 2010),
    ('DENIS FONSECA', 6, 2010)
]
colNames = ['nome', 'mes', 'ano']
df = spark.createDataFrame(data, colNames)
df.show(truncate=False)

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|CARMINA RABELO       |4  |2010|
|HERONDINA PEREIRA    |6  |2009|
|IRANI DOS SANTOS     |12 |2010|
|JOAO BOSCO DA FONSECA|3  |2009|
|CARLITO SOUZA        |1  |2010|
|WALTER DIAS          |9  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
|ELIO SILVA           |7  |2010|
|DENIS FONSECA        |6  |2010|
+---------------------+---+----+



In [77]:
#ssinale a alternativa que apresenta o código correto para ordenar este DataFrame dos alunos mais novos para os mais velhos.

df\
  .select('*')\
  .orderBy(['ano', 'mes'], ascending = [False, False])\
  .show(truncate = False)

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|IRANI DOS SANTOS     |12 |2010|
|ELIO SILVA           |7  |2010|
|DENIS FONSECA        |6  |2010|
|CARMINA RABELO       |4  |2010|
|CARLITO SOUZA        |1  |2010|
|WALTER DIAS          |9  |2009|
|HERONDINA PEREIRA    |6  |2009|
|ADELINA TEIXEIRA     |5  |2009|
|JOAO BOSCO DA FONSECA|3  |2009|
|BRENO VENTUROSO      |1  |2009|
+---------------------+---+----+



## Filtrando os dados

[DataFrame.where(condition)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.where.html) ou [DataFrame.filter(condition)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.filter.html)

In [78]:
#O método filter() nos permite filtrar linhas usando condicionais, enquanto where() funciona como um alias para o filter, ou seja, ao chamarmos um, também estaremos chamando o outro.
empresas\
  .where("capital_social_da_empresa == 100")\
  .show(5, False)

+-----------+-----------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial      |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-----------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|10938778   |REGINA CELIA VARJAO CECE           |2135             |50                         |100.0                    |1               |null                       |
|11037883   |ISAMARA CUSTODIO                   |2135             |50                         |100.0                    |1               |null                       |
|11276983   |LUIZ CARLOS CORREA DE OLIVEIRA     |2135             |50                         |100.0                    |1               |null                       

In [79]:
#o método .startswith() ("começa com", em português)
#o método endswith() ("termina com", em português)
socios\
  .select('nome_do_socio_ou_razao_social')\
  .filter(socios.nome_do_socio_ou_razao_social.startswith('RODRIGO'))\
  .filter(socios.nome_do_socio_ou_razao_social.endswith('FERNANDES'))\
  .limit(5).toPandas()

Unnamed: 0,nome_do_socio_ou_razao_social
0,RODRIGO CARDOSO FERNANDES
1,RODRIGO CAMARGO FERNANDES
2,RODRIGO SILVA FERNANDES
3,RODRIGO STRAFIT FERNANDES
4,RODRIGO LEAO FERNANDES


In [80]:
#Assinale as três alternativas com a utilização correta do método filter para realizar a seleção de apenas alunos nascidos no primeiro semestre de 2009.

#Note que podemos, neste caso, utilizar o encadeamento de métodos filter. Isto é equivalente a utilizar o operador lógico AND.
#Outro ponto interessante de ser destacado é que em uma query no formato string não existe diferença entre os operadores = e ==. Ambos podem ser utilizados para comparações neste tipo de query.

df\
    .filter("mes<=6")\
    .filter("ano=2009")\
    .show(truncate=False)

df\
    .filter("mes<=6 and ano==2009")\
    .show(truncate=False)

df\
    .filter((df.mes <= 6) & (df.ano == 2009))\
    .show(truncate=False)

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|HERONDINA PEREIRA    |6  |2009|
|JOAO BOSCO DA FONSECA|3  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
+---------------------+---+----+

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|HERONDINA PEREIRA    |6  |2009|
|JOAO BOSCO DA FONSECA|3  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
+---------------------+---+----+

+---------------------+---+----+
|nome                 |mes|ano |
+---------------------+---+----+
|HERONDINA PEREIRA    |6  |2009|
|JOAO BOSCO DA FONSECA|3  |2009|
|BRENO VENTUROSO      |1  |2009|
|ADELINA TEIXEIRA     |5  |2009|
+---------------------+---+----+



## O comando LIKE

[Column.like(other)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.Column.like.html)

In [81]:
#tudo que contenha
df\
  .select(df.nome)\
  .where(f.upper(df.nome).like('%FONSECA%'))\
  .show(truncate=False)

#tudo que começa com
df\
  .select(df.nome)\
  .where(df.nome.like('JOAO%'))\
  .show(truncate=False)

#tudo que termina com
df\
  .select(df.nome)\
  .where(df.nome.like('%FONSECA'))\
  .show(truncate=False)

+---------------------+
|nome                 |
+---------------------+
|JOAO BOSCO DA FONSECA|
|DENIS FONSECA        |
+---------------------+

+---------------------+
|nome                 |
+---------------------+
|JOAO BOSCO DA FONSECA|
+---------------------+

+---------------------+
|nome                 |
+---------------------+
|JOAO BOSCO DA FONSECA|
|DENIS FONSECA        |
+---------------------+



In [82]:
#ambos fazem a mesma coisa
empresas\
  .select('razao_social_nome_empresarial', 'natureza_juridica', 'porte_da_empresa', 'capital_social_da_empresa')\
  .filter(f.upper(empresas.razao_social_nome_empresarial).like('%RESTAURANTE%'))\
  .limit(5).toPandas()

empresas\
  .select('razao_social_nome_empresarial', 'natureza_juridica', 'porte_da_empresa', 'capital_social_da_empresa')\
  .filter(f.upper(empresas['razao_social_nome_empresarial']).like('%RESTAURANTE%'))\
  .show(5, False)

+-------------------------------------------------------+-----------------+----------------+-------------------------+
|razao_social_nome_empresarial                          |natureza_juridica|porte_da_empresa|capital_social_da_empresa|
+-------------------------------------------------------+-----------------+----------------+-------------------------+
|RESTAURANTE IMIGRANTE PORTUGUES LTDA.                  |2062             |5               |0.0                      |
|MORAIS & CARVALHO RESTAURANTE E PIZZARIA LTDA          |2062             |1               |0.0                      |
|BAR E RESTAURANTE PAGANOTTO LTDA                       |2062             |5               |0.0                      |
|RODRIGUES & RODRIGUES RESTAURANTE LTDA                 |2062             |5               |0.0                      |
|TEXAS RANCH BAR RESTAURANTE PRODUCOES ARTISTICAS E CULT|2062             |1               |0.0                      |
+-----------------------------------------------

In [83]:
#Assinale a alternativa que tem o código correto para selecionarmos apenas os alunos que têm seus nomes iniciados com a letra “C”.
df\
  .where(df.nome.like('C%'))\
  .show(truncate=False)

+--------------+---+----+
|nome          |mes|ano |
+--------------+---+----+
|CARMINA RABELO|4  |2010|
|CARLITO SOUZA |1  |2010|
+--------------+---+----+



# Agregações e Junções
---

[DataFrame.groupBy(*cols)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.groupBy.html)

[DataFrame.agg(*exprs)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.agg.html)

[DataFrame.summary(*statistics)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.summary.html)

> Funções:
[approx_count_distinct](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.approx_count_distinct.html) |
[avg](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.avg.html) |
[collect_list](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.collect_list.html) |
[collect_set](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.collect_set.html) |
[countDistinct](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.countDistinct.html) |
[count](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.count.html) |
[grouping](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.grouping.html) |
[first](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.first.html) |
[last](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.last.html) |
[kurtosis](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.kurtosis.html) |
[max](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.max.html) |
[min](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.min.html) |
[mean](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.mean.html) |
[skewness](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.skewness.html) |
[stddev ou stddev_samp](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.stddev.html) |
[stddev_pop](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.stddev_pop.html) |
[sum](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.sum.html) |
[sumDistinct](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.sumDistinct.html) |
[variance ou var_samp](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.variance.html) |
[var_pop](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.var_pop.html)

## Sumarizando os dados

In [84]:
socios\
    .select(f.year('data_de_entrada_sociedade').alias('ano_de_entrada'))\
    .where('ano_de_entrada >= 2010')\
    .groupBy('ano_de_entrada')\
    .count()\
    .orderBy('ano_de_entrada', ascending=True)\
    .show()

+--------------+------+
|ano_de_entrada| count|
+--------------+------+
|          2010| 79337|
|          2011| 83906|
|          2012| 80101|
|          2013| 83919|
|          2014| 80590|
|          2015| 80906|
|          2016| 81587|
|          2017| 90221|
|          2018| 99935|
|          2019|118248|
|          2020|125927|
|          2021| 56316|
+--------------+------+



In [85]:
#a função agg(), de aggregate. Ou seja, faremos uma agregação, criando mais de um conjunto de estatísticas dentro do groupBy()
empresas\
    .select('cnpj_basico', 'porte_da_empresa', 'capital_social_da_empresa')\
    .groupBy('porte_da_empresa')\
    .agg(
        f.avg("capital_social_da_empresa").alias("capital_social_medio"),# ou mean(), é a mesma coisa
        f.count("cnpj_basico").alias("frequencia")
    )\
    .orderBy('porte_da_empresa', ascending=True)\
    .show()

+----------------+--------------------+----------+
|porte_da_empresa|capital_social_medio|frequencia|
+----------------+--------------------+----------+
|            null|    8.35421888053467|      5985|
|               1|  339994.53313506936|   3129043|
|               3|  2601001.7677092673|    115151|
|               5|   708660.4208249798|   1335500|
+----------------+--------------------+----------+



In [86]:
#outra forma de fazer sumarizações, que é o summary(), uma alternative ao describe().
#A partir do dataframe empresas, faremos uma selação do capital_social_da_empresa e chamaremos o método summary(), exibindo os resultado em seguida com o show().
empresas\
    .select("capital_social_da_empresa")\
    .summary()\
    .show()

+-------+-------------------------+
|summary|capital_social_da_empresa|
+-------+-------------------------+
|  count|                  4585679|
|   mean|        503694.5478542675|
| stddev|     2.1118691490537405E8|
|    min|                      0.0|
|    25%|                      0.0|
|    50%|                   1000.0|
|    75%|                   7000.0|
|    max|         3.22014670262E11|
+-------+-------------------------+



In [87]:
empresas\
    .select("capital_social_da_empresa")\
    .summary('count', 'min', 'max', 'mean')\
    .show()

+-------+-------------------------+
|summary|capital_social_da_empresa|
+-------+-------------------------+
|  count|                  4585679|
|    min|                      0.0|
|    max|         3.22014670262E11|
|   mean|        503694.5478542675|
+-------+-------------------------+



##  A função `when`

In [88]:
data = [
    ('CARLOS', 'MATEMÁTICA', 7),
    ('IVO', 'MATEMÁTICA', 9),
    ('MÁRCIA', 'MATEMÁTICA', 8),
    ('LEILA', 'MATEMÁTICA', 9),
    ('BRENO', 'MATEMÁTICA', 7),
    ('LETÍCIA', 'MATEMÁTICA', 8),
    ('CARLOS', 'FÍSICA', 2),
    ('IVO', 'FÍSICA', 8),
    ('MÁRCIA', 'FÍSICA', 10),
    ('LEILA', 'FÍSICA', 9),
    ('BRENO', 'FÍSICA', 1),
    ('LETÍCIA', 'FÍSICA', 6),
    ('CARLOS', 'QUÍMICA', 10),
    ('IVO', 'QUÍMICA', 8),
    ('MÁRCIA', 'QUÍMICA', 1),
    ('LEILA', 'QUÍMICA', 10),
    ('BRENO', 'QUÍMICA', 7),
    ('LETÍCIA', 'QUÍMICA', 9)
]
colNames = ['nome', 'materia', 'nota']
df = spark.createDataFrame(data, colNames)
df.show()

+-------+----------+----+
|   nome|   materia|nota|
+-------+----------+----+
| CARLOS|MATEMÁTICA|   7|
|    IVO|MATEMÁTICA|   9|
| MÁRCIA|MATEMÁTICA|   8|
|  LEILA|MATEMÁTICA|   9|
|  BRENO|MATEMÁTICA|   7|
|LETÍCIA|MATEMÁTICA|   8|
| CARLOS|    FÍSICA|   2|
|    IVO|    FÍSICA|   8|
| MÁRCIA|    FÍSICA|  10|
|  LEILA|    FÍSICA|   9|
|  BRENO|    FÍSICA|   1|
|LETÍCIA|    FÍSICA|   6|
| CARLOS|   QUÍMICA|  10|
|    IVO|   QUÍMICA|   8|
| MÁRCIA|   QUÍMICA|   1|
|  LEILA|   QUÍMICA|  10|
|  BRENO|   QUÍMICA|   7|
|LETÍCIA|   QUÍMICA|   9|
+-------+----------+----+



In [89]:
df = df.withColumn('status', f.when(df.nota >= 7, "APROVADO").otherwise("REPROVADO"))
df.show()

+-------+----------+----+---------+
|   nome|   materia|nota|   status|
+-------+----------+----+---------+
| CARLOS|MATEMÁTICA|   7| APROVADO|
|    IVO|MATEMÁTICA|   9| APROVADO|
| MÁRCIA|MATEMÁTICA|   8| APROVADO|
|  LEILA|MATEMÁTICA|   9| APROVADO|
|  BRENO|MATEMÁTICA|   7| APROVADO|
|LETÍCIA|MATEMÁTICA|   8| APROVADO|
| CARLOS|    FÍSICA|   2|REPROVADO|
|    IVO|    FÍSICA|   8| APROVADO|
| MÁRCIA|    FÍSICA|  10| APROVADO|
|  LEILA|    FÍSICA|   9| APROVADO|
|  BRENO|    FÍSICA|   1|REPROVADO|
|LETÍCIA|    FÍSICA|   6|REPROVADO|
| CARLOS|   QUÍMICA|  10| APROVADO|
|    IVO|   QUÍMICA|   8| APROVADO|
| MÁRCIA|   QUÍMICA|   1|REPROVADO|
|  LEILA|   QUÍMICA|  10| APROVADO|
|  BRENO|   QUÍMICA|   7| APROVADO|
|LETÍCIA|   QUÍMICA|   9| APROVADO|
+-------+----------+----+---------+



In [90]:
df\
  .select('nota')\
  .summary('min', '25%', '50%', '75%', 'max')\
  .show()

+-------+----+
|summary|nota|
+-------+----+
|    min|   1|
|    25%|   7|
|    50%|   8|
|    75%|   9|
|    max|  10|
+-------+----+



In [91]:
df\
  .select('status')\
  .groupBy('status')\
  .count()\
  .orderBy('status', ascending=True)\
  .show(truncate = False)

+---------+-----+
|status   |count|
+---------+-----+
|APROVADO |14   |
|REPROVADO|4    |
+---------+-----+



## Juntando DataFrames - Joins

[DataFrame.join(*args)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.join.html)

In [92]:
produtos = spark.createDataFrame(
    [
        ('1', 'Bebidas', 'Água mineral'),
        ('2', 'Limpeza', 'Sabão em pó'),
        ('3', 'Frios', 'Queijo'),
        ('4', 'Bebidas', 'Refrigerante'),
        ('5', 'Pet', 'Ração para cães')
    ],
    ['id', 'cat', 'prod']
)

print('Produtos: ')

produtos.show()

impostos = spark.createDataFrame(
    [
        ('Bebidas', 0.15),
        ('Limpeza', 0.05),
        ('Frios', 0.065),
        ('Carnes', 0.08)
    ],
    ['cat', 'tax']
)

print('Impostos: ')

impostos.show()

Produtos: 
+---+-------+---------------+
| id|    cat|           prod|
+---+-------+---------------+
|  1|Bebidas|   Água mineral|
|  2|Limpeza|    Sabão em pó|
|  3|  Frios|         Queijo|
|  4|Bebidas|   Refrigerante|
|  5|    Pet|Ração para cães|
+---+-------+---------------+

Impostos: 
+-------+-----+
|    cat|  tax|
+-------+-----+
|Bebidas| 0.15|
|Limpeza| 0.05|
|  Frios|0.065|
| Carnes| 0.08|
+-------+-----+



In [93]:
#O argumento how está relacionado a como faremos essa junção, e existem quatro possibilidades principais: inner, left, right e outer.
#O argumento inner fará a interseção entre os dois conjuntos, e somente nos retornará os valores de cat que existem em ambos os dataframes.
#O sort(id), uma função parecida com o orderBy(), mas um pouco mais performática mas que não retorna classificações perfeitas, pois atua dentro de particições específicas dos dataframes.
produtos.join(impostos, 'cat', how='inner')\
  .sort('id')\
  .show()

#retorna o mesmo resultado
#produtos.join(impostos, 'cat', 'inner').sort('id').show()

+-------+---+------------+-----+
|    cat| id|        prod|  tax|
+-------+---+------------+-----+
|Bebidas|  1|Água mineral| 0.15|
|Limpeza|  2| Sabão em pó| 0.05|
|  Frios|  3|      Queijo|0.065|
|Bebidas|  4|Refrigerante| 0.15|
+-------+---+------------+-----+



In [94]:
#Retorna todas as categorias encontradas no dataframe da esquerda, mantendo aquelas que têm uma inserceção no da direita.
#Como não temos uma referência para a categoria Pet no dataframe impostos, o valor retornado na coluna tax será null.
#Além disso, a categoria Carnes, que não existe no dataframe produtos, é omitida.
produtos.join(impostos, 'cat', how='left')\
  .sort('id')\
  .show()

+-------+---+---------------+-----+
|    cat| id|           prod|  tax|
+-------+---+---------------+-----+
|Bebidas|  1|   Água mineral| 0.15|
|Limpeza|  2|    Sabão em pó| 0.05|
|  Frios|  3|         Queijo|0.065|
|Bebidas|  4|   Refrigerante| 0.15|
|    Pet|  5|Ração para cães| null|
+-------+---+---------------+-----+



In [95]:
produtos.join(impostos, 'cat', how='right')\
  .sort('id')\
  .show()

+-------+----+------------+-----+
|    cat|  id|        prod|  tax|
+-------+----+------------+-----+
| Carnes|null|        null| 0.08|
|Bebidas|   1|Água mineral| 0.15|
|Limpeza|   2| Sabão em pó| 0.05|
|  Frios|   3|      Queijo|0.065|
|Bebidas|   4|Refrigerante| 0.15|
+-------+----+------------+-----+



In [96]:
#Por fim, temos o argumento outer, que tratá todas as categorias em ambos os conjuntos, retornando null nas colunas sem correspondência.

produtos.join(impostos, 'cat', how='outer')\
  .sort('id')\
  .show()

+-------+----+---------------+-----+
|    cat|  id|           prod|  tax|
+-------+----+---------------+-----+
| Carnes|null|           null| 0.08|
|Bebidas|   1|   Água mineral| 0.15|
|Limpeza|   2|    Sabão em pó| 0.05|
|  Frios|   3|         Queijo|0.065|
|Bebidas|   4|   Refrigerante| 0.15|
|    Pet|   5|Ração para cães| null|
+-------+----+---------------+-----+



In [97]:
empresas_join = estabelecimentos.join(empresas, 'cnpj_basico', how='inner')

empresas_join.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- cnpj_ordem: integer (nullable = true)
 |-- cnpj_dv: integer (nullable = true)
 |-- identificador_matriz_filial: integer (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: integer (nullable = true)
 |-- data_situacao_cadastral: date (nullable = true)
 |-- motivo_situacao_cadastral: integer (nullable = true)
 |-- nome_da_cidade_no_exterior: string (nullable = true)
 |-- pais: integer (nullable = true)
 |-- data_de_inicio_atividade: date (nullable = true)
 |-- cnae_fiscal_principal: integer (nullable = true)
 |-- cnae_fiscal_secundaria: string (nullable = true)
 |-- tipo_de_logradouro: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- ddd_1: string (n

In [98]:
freq = empresas_join\
    .select(
        'cnpj_basico',
        f.year('data_de_inicio_atividade').alias('data_de_inicio')
    )\
    .where('data_de_inicio >= 2010')\
    .groupBy('data_de_inicio')\
    .agg(f.count("cnpj_basico").alias("frequencia"))\
    .orderBy('data_de_inicio', ascending=True)

In [99]:
freq.toPandas()

Unnamed: 0,data_de_inicio,frequencia
0,2010,154159
1,2011,172677
2,2012,232480
3,2013,198424
4,2014,202276
5,2015,212523
6,2016,265417
7,2017,237292
8,2018,275435
9,2019,325922


In [100]:
freq.union(
    freq.select(
        f.lit('Total').alias('data_de_inicio'),
        f.sum(freq.frequencia).alias('frequencia')
    )
).show()

+--------------+----------+
|data_de_inicio|frequencia|
+--------------+----------+
|          2010|    154159|
|          2011|    172677|
|          2012|    232480|
|          2013|    198424|
|          2014|    202276|
|          2015|    212523|
|          2016|    265417|
|          2017|    237292|
|          2018|    275435|
|          2019|    325922|
|          2020|    400654|
|          2021|    153275|
|         Total|   2830534|
+--------------+----------+



## SparkSQL

[SparkSession.sql(sqlQuery)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.SparkSession.sql.html)

Para saber mais sobre performance: [Artigo - Spark RDDs vs DataFrames vs SparkSQL](https://community.cloudera.com/t5/Community-Articles/Spark-RDDs-vs-DataFrames-vs-SparkSQL/ta-p/246547)

In [101]:
#Cria uma view temporária do dataframe, que será usada dentro das instruções SQL em nosso método.
empresas.createOrReplaceTempView('empresasView')

In [102]:
spark\
  .sql('SELECT *FROM empresasView')\
  .show(5, False)

+-----------+--------------------------------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial                                                               |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+--------------------------------------------------------------------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|306        |FRANCAMAR REFRIGERACAO TECNICA S/C LTDA                                                     |2240             |49                         |0.0                      |1               |null                       |
|1355       |BRASILEIRO & OLIVEIRA LTDA                                                                 

In [103]:
spark\
  .sql(
      """
      SELECT
        *
      FROM empresasView
      WHERE capital_social_da_empresa >= 100
      """
  )\
  .show(5, False)

+-----------+-------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|cnpj_basico|razao_social_nome_empresarial  |natureza_juridica|qualificacao_do_responsavel|capital_social_da_empresa|porte_da_empresa|ente_federativo_responsavel|
+-----------+-------------------------------+-----------------+---------------------------+-------------------------+----------------+---------------------------+
|6846       |BADU E FILHOS TECIDOS LTDA     |2062             |49                         |4000.0                   |1               |null                       |
|24205      |SUELY LEME MARI ANI 25086572800|2135             |50                         |1000.0                   |1               |null                       |
|58970      |TOTAL CAR VEICULOS LTDA        |2062             |49                         |20000.0                  |1               |null                       |
|70273      |VANDA DA 

In [104]:
spark\
    .sql("""
        SELECT
          porte_da_empresa,
            MEAN(capital_social_da_empresa) AS Media
          FROM empresasView
          GROUP BY porte_da_empresa
    """)\
    .show(5)

+----------------+------------------+
|porte_da_empresa|             Media|
+----------------+------------------+
|            null|  8.35421888053467|
|               1|339994.53313506936|
|               3|2601001.7677092673|
|               5| 708660.4208249798|
+----------------+------------------+



In [105]:
empresas_join.createOrReplaceTempView('empresasJoinView')

In [106]:
freq = spark\
    .sql("""
        SELECT
          YEAR(data_de_inicio_atividade) AS data_de_inicio,
          COUNT(cnpj_basico) AS count
        FROM empresasJoinView
        WHERE YEAR(data_de_inicio_atividade) >= 2010
        GROUP BY data_de_inicio
        ORDER BY data_de_inicio
    """)

freq\
    .show()

+--------------+------+
|data_de_inicio| count|
+--------------+------+
|          2010|154159|
|          2011|172677|
|          2012|232480|
|          2013|198424|
|          2014|202276|
|          2015|212523|
|          2016|265417|
|          2017|237292|
|          2018|275435|
|          2019|325922|
|          2020|400654|
|          2021|153275|
+--------------+------+



In [107]:
freq.createOrReplaceTempView("freqView")

In [108]:
spark\
    .sql("""
        SELECT
          *
        FROM freqView
        UNION ALL
        SELECT
          'Total' AS data_de_inicio,
          SUM(count) AS count
        FROM freqView
    """)\
    .show()

+--------------+-------+
|data_de_inicio|  count|
+--------------+-------+
|          2010| 154159|
|          2011| 172677|
|          2012| 232480|
|          2013| 198424|
|          2014| 202276|
|          2015| 212523|
|          2016| 265417|
|          2017| 237292|
|          2018| 275435|
|          2019| 325922|
|          2020| 400654|
|          2021| 153275|
|         Total|2830534|
+--------------+-------+



# Formas de Armazenamento
---

## Arquivos CSV

[property DataFrame.write](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.write.html)

[DataFrameWriter.csv(*args)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.csv.html)

In [110]:
empresas.write.csv(
    path = '/content/drive/MyDrive/curso_spark/empresas/csv', #onde vai salvar
    mode = 'overwrite', #se já existe, sobscreva
    sep = ';', #separador
    header = True, #salva com o cabeçalho
)

In [111]:
empresas2 = spark.read.csv(
    path = '/content/drive/MyDrive/curso_spark/empresas/csv',
    sep = ';',
    inferSchema = True,
    header = True
)

In [112]:
empresas2.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [113]:
estabelecimentos.write.csv(
    path = '/content/drive/MyDrive/curso_spark/estabelecimentos/csv',
    mode = 'overwrite',
    sep = ';',
    header = True
)

In [114]:
socios.write.csv(
    path = '/content/drive/MyDrive/curso_spark/socios/csv',
    mode = 'overwrite',
    sep = ';',
    header = True
)

## Arquivos PARQUET

[Apache Parquet](https://parquet.apache.org/)

[DataFrameWriter.parquet(*args)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.parquet.html)

In [115]:
empresas.write.parquet(
    path = '/content/drive/MyDrive/curso_spark/empresas/parquet',
    mode = 'overwrite'
)

In [116]:
empresas_parquet  = spark.read.parquet(
    '/content/drive/MyDrive/curso_spark/empresas/parquet'
)

In [117]:
empresas_parquet.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [124]:
#estabelecimentos.write.parquet(
#    path = '/content/drive/MyDrive/curso_spark/estabelecimentos/parquet',
#    mode = 'overwrite'
#)

print('Arquivo com falha ao salvar!')

Arquivo com falha ao salvar!


In [122]:
socios.write.parquet(
    path = '/content/drive/MyDrive/curso_spark/socios/parquet',
    mode = 'overwrite'
)

## Particionamento dos dados

[DataFrameWriter.partitionBy(*cols)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.partitionBy.html)

In [127]:
#O método coalesce() somente diminui o número de partições, e é mais performático que o repartition(), que pode tanto diminuir quanto aumentar o número de partições - um procedimento mais pesado.
empresas.coalesce(1).write.csv(
    path='/content/drive/MyDrive/curso_spark/empresas/csv-unico',
    mode='overwrite',
    sep=';',
    header=True
)

In [126]:
empresas.write.parquet(
    path='/content/drive/MyDrive/curso_spark/empresas/parquet-partitionBy',
    mode='overwrite',
        partitionBy='porte_da_empresa'
)

In [128]:
#encerando sessão Spark

spark.stop()
