# Aula 5: PySpark, utilizando Python em aplicações de Big Data

## Instalação do PySpark

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=1aa8b909d8820b246de9e8abec20efdc7bde0ff8a7cce0dacaba3635a37ac826
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


Opcionamente, podemos utilizar o Ngrok para ter acesso a console do Spark para acompanhar a execução dos Jobs.

Para instalar, primeiro precisamos ter uma conta no [Ngrok](ngrok.com) e obter uma chave (authtoken) que fica no [Dashboard](https://dashboard.ngrok.com/get-started/your-authtoken)

Logo depois, instalar o Ngrok direto na instância do Colab.

In [None]:
!wget -q https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip

Archive:  ngrok-stable-linux-amd64.zip
  inflating: ngrok                   


Enviar os comandos abaixo para garantir que o Ngrok está utilizando o authtoken obtido do Dashboard e também assegurar que o tunelamento (do localhost) para a internet utilizará a porta 4050.

In [None]:
get_ipython().system_raw('./ngrok authtoken 3ivcC43VHTzC3XaFmS8iA_tpuzgfajEQ5WkBobes7E')
get_ipython().system_raw('./ngrok http 4050 &')

Abrir uma sessão do Spark, configurando a mesma porta do tunelamento do Ngrok.

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
  .master('local[*]') \
  .appName("Analise de dados de eleições") \
  .config('spark.ui.port', '4050') \
  .getOrCreate()

In [None]:
spark

Obter a URL do tunelamento para acessar a console do Spark (Spark UI).

In [None]:
!curl -s http://localhost:4040/api/tunnels

{"tunnels":[{"name":"command_line","uri":"/api/tunnels/command_line","public_url":"https://1dbc-35-230-173-5.ngrok-free.app","proto":"https","config":{"addr":"http://localhost:4050","inspect":true},"metrics":{"conns":{"count":0,"gauge":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0},"http":{"count":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0}}},{"name":"command_line (http)","uri":"/api/tunnels/command_line%20%28http%29","public_url":"http://1dbc-35-230-173-5.ngrok-free.app","proto":"http","config":{"addr":"http://localhost:4050","inspect":true},"metrics":{"conns":{"count":0,"gauge":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0},"http":{"count":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0}}}],"uri":"/api/tunnels"}


## Spark Dataframe

Para as atividades vamos utilizar o dataset das eleições de 2020 no Brasil para prefeito.

Os dados foram ajustados para serem utilizados no ambiente do Colab. Eles podem ser baixados diretamente o site to [TSE](https://dadosabertos.tse.jus.br/dataset/resultados-2020-boletim-de-urna) para consulta, caso seja necessário.

In [3]:
!git clone https://github.com/michelpf/dataset-brazil-elections-2020-mayor-1st-round

Cloning into 'dataset-brazil-elections-2020-mayor-1st-round'...
remote: Enumerating objects: 326, done.[K
remote: Counting objects: 100% (3/3), done.[K
remote: Compressing objects: 100% (3/3), done.[K
remote: Total 326 (delta 0), reused 3 (delta 0), pack-reused 323[K
Receiving objects: 100% (326/326), 127.02 MiB | 23.40 MiB/s, done.
Resolving deltas: 100% (262/262), done.
Updating files: 100% (151/151), done.


Os dados estão dispostos (ou partionados) por Estado. Isto é, existem vários arquivos dentro de cada pasta representado por um Estado do país.

O PySpark, por padrão, é capaz de ler vários arquivos externos (CSV, Parquet, etc.) dentro de uma mesma pasta e concatená-los num mesmo dataframe.

Os parâmetros de leitura podem variar de arquivo para arquivo. Neste exemplos, para cobrir nosso conjunto de caracteres, vamos utilizar o encoding ISO-8859-1. Por padrão os arquivos são abertos utilizando o UTF-8.

In [4]:
dados = spark.read.option("encoding", "iso-8859-1").option("recursiveFileLookup","true").csv('dataset-brazil-elections-2020-mayor-1st-round/dataset', sep=';', header=True, inferSchema=True)

Exibição das colunas carregadas.

In [None]:
dados

DataFrame[DT_GERACAO: string, HH_GERACAO: timestamp, ANO_ELEICAO: int, CD_TIPO_ELEICAO: int, NM_TIPO_ELEICAO: string, CD_PLEITO: int, DT_PLEITO: string, NR_TURNO: int, CD_ELEICAO: int, DS_ELEICAO: string, SG_UF: string, CD_MUNICIPIO: int, NM_MUNICIPIO: string, NR_ZONA: int, NR_SECAO: int, NR_LOCAL_VOTACAO: int, CD_CARGO_PERGUNTA: int, DS_CARGO_PERGUNTA: string, NR_PARTIDO: int, SG_PARTIDO: string, NM_PARTIDO: string, DT_BU_RECEBIDO: string, QT_APTOS: int, QT_COMPARECIMENTO: int, QT_ABSTENCOES: int, CD_TIPO_URNA: int, DS_TIPO_URNA: string, CD_TIPO_VOTAVEL: int, DS_TIPO_VOTAVEL: string, NR_VOTAVEL: int, NM_VOTAVEL: string, QT_VOTOS: int, NR_URNA_EFETIVADA: int, CD_CARGA_1_URNA_EFETIVADA: string, CD_CARGA_2_URNA_EFETIVADA: string, CD_FLASHCARD_URNA_EFETIVADA: string, DT_CARGA_URNA_EFETIVADA: string, DS_CARGO_PERGUNTA_SECAO: string, DS_AGREGADAS: string, DT_ABERTURA: string, DT_ENCERRAMENTO: string, QT_ELEITORES_BIOMETRIA_NH: int, DT_EMISSAO_BU: string, NR_JUNTA_APURADORA: int, NR_TURMA_AP

Podemos exibir o schema detectado de outra forma.

In [None]:
dados.printSchema()

root
 |-- DT_GERACAO: string (nullable = true)
 |-- HH_GERACAO: timestamp (nullable = true)
 |-- ANO_ELEICAO: integer (nullable = true)
 |-- CD_TIPO_ELEICAO: integer (nullable = true)
 |-- NM_TIPO_ELEICAO: string (nullable = true)
 |-- CD_PLEITO: integer (nullable = true)
 |-- DT_PLEITO: string (nullable = true)
 |-- NR_TURNO: integer (nullable = true)
 |-- CD_ELEICAO: integer (nullable = true)
 |-- DS_ELEICAO: string (nullable = true)
 |-- SG_UF: string (nullable = true)
 |-- CD_MUNICIPIO: integer (nullable = true)
 |-- NM_MUNICIPIO: string (nullable = true)
 |-- NR_ZONA: integer (nullable = true)
 |-- NR_SECAO: integer (nullable = true)
 |-- NR_LOCAL_VOTACAO: integer (nullable = true)
 |-- CD_CARGO_PERGUNTA: integer (nullable = true)
 |-- DS_CARGO_PERGUNTA: string (nullable = true)
 |-- NR_PARTIDO: integer (nullable = true)
 |-- SG_PARTIDO: string (nullable = true)
 |-- NM_PARTIDO: string (nullable = true)
 |-- DT_BU_RECEBIDO: string (nullable = true)
 |-- QT_APTOS: integer (nullable

Amostra das 20 primeiras linhas.
Para especificar o número de linhas a ser exibido, inclua o número dentro do comando ```show(#linhas)```.

In [None]:
dados.show()

+----------+-------------------+-----------+---------------+-----------------+---------+----------+--------+----------+--------------------+-----+------------+--------------+-------+--------+----------------+-----------------+-----------------+----------+------------+--------------------+-------------------+--------+-----------------+-------------+------------+------------+---------------+---------------+----------+--------------------+--------+-----------------+-------------------------+-------------------------+---------------------------+-----------------------+-----------------------+------------+-------------------+-------------------+-------------------------+-------------------+------------------+------------------+
|DT_GERACAO|         HH_GERACAO|ANO_ELEICAO|CD_TIPO_ELEICAO|  NM_TIPO_ELEICAO|CD_PLEITO| DT_PLEITO|NR_TURNO|CD_ELEICAO|          DS_ELEICAO|SG_UF|CD_MUNICIPIO|  NM_MUNICIPIO|NR_ZONA|NR_SECAO|NR_LOCAL_VOTACAO|CD_CARGO_PERGUNTA|DS_CARGO_PERGUNTA|NR_PARTIDO|  SG_PARTIDO

In [None]:
dados.show(10)

+----------+-------------------+-----------+---------------+-----------------+---------+----------+--------+----------+--------------------+-----+------------+--------------+-------+--------+----------------+-----------------+-----------------+----------+------------+--------------------+-------------------+--------+-----------------+-------------+------------+------------+---------------+---------------+----------+--------------------+--------+-----------------+-------------------------+-------------------------+---------------------------+-----------------------+-----------------------+------------+-------------------+-------------------+-------------------------+-------------------+------------------+------------------+
|DT_GERACAO|         HH_GERACAO|ANO_ELEICAO|CD_TIPO_ELEICAO|  NM_TIPO_ELEICAO|CD_PLEITO| DT_PLEITO|NR_TURNO|CD_ELEICAO|          DS_ELEICAO|SG_UF|CD_MUNICIPIO|  NM_MUNICIPIO|NR_ZONA|NR_SECAO|NR_LOCAL_VOTACAO|CD_CARGO_PERGUNTA|DS_CARGO_PERGUNTA|NR_PARTIDO|  SG_PARTIDO

Contagem do número de linhas.

In [None]:
dados.count()

3172300

Esquema dos dados que foram inferidos a partir das amostras. O PySpark, quando lemos uma fonte externa, aplica regras para detectar os tipos de dados. Nem sempre eles são os mesmos, por isso é importante revisarmos.

In [None]:
dados.printSchema()

root
 |-- DT_GERACAO: string (nullable = true)
 |-- HH_GERACAO: timestamp (nullable = true)
 |-- ANO_ELEICAO: integer (nullable = true)
 |-- CD_TIPO_ELEICAO: integer (nullable = true)
 |-- NM_TIPO_ELEICAO: string (nullable = true)
 |-- CD_PLEITO: integer (nullable = true)
 |-- DT_PLEITO: string (nullable = true)
 |-- NR_TURNO: integer (nullable = true)
 |-- CD_ELEICAO: integer (nullable = true)
 |-- DS_ELEICAO: string (nullable = true)
 |-- SG_UF: string (nullable = true)
 |-- CD_MUNICIPIO: integer (nullable = true)
 |-- NM_MUNICIPIO: string (nullable = true)
 |-- NR_ZONA: integer (nullable = true)
 |-- NR_SECAO: integer (nullable = true)
 |-- NR_LOCAL_VOTACAO: integer (nullable = true)
 |-- CD_CARGO_PERGUNTA: integer (nullable = true)
 |-- DS_CARGO_PERGUNTA: string (nullable = true)
 |-- NR_PARTIDO: integer (nullable = true)
 |-- SG_PARTIDO: string (nullable = true)
 |-- NM_PARTIDO: string (nullable = true)
 |-- DT_BU_RECEBIDO: string (nullable = true)
 |-- QT_APTOS: integer (nullable

Neste caso, notamos algumas inconsistências:

* DT_GERACAO deveria ser Date e não String
* DT_PLEITO deveria ser Date e não String
* DT_CARGA_URNA_EFETIVADA deveria ser Timestamp (Data e Hora) e não String
* DT_ABERTURA deveria ser Timestamp (Data e Hora) e não String
* DT_ENCERRAMENTO deveria ser Timestamp (Data e Hora) e não String
* DT_EMISSAO_BU deveria ser Timestamp (Data e Hora) e não String
* DT_BU_RECEBIDO deveria ser Timestamp (Data e Hora) e não String

Para as conversões, precisamos utilizar outras funções da biblioteca do PySpart. Functions e Stringtype.

Converteremos um valor de String para Data utilizando uma máscara padrão.

Os padrões podem ser conferidos na [documentação](https://spark.apache.org/docs/3.1.2/sql-ref-datetime-pattern.html) do PySpark.

Neste caso, como temos uma data no formato brasileiro dia/mês/ano hora:minuto:segundo, utilizaremos os seguinte padrão: "dd/MM/yyyy HH:mm:ss".

In [5]:
from pyspark.sql import functions as f
from pyspark.sql.types import StringType

O comando ```.withColumn``` irá retornar a coluna transformada, e assim podemos substituir no dataframe principal.

In [6]:
#05/10/2022
dados = dados.withColumn("DT_GERACAO", f.to_date(dados.DT_GERACAO.cast(StringType()), 'dd/MM/yyyy'))

#02/10/2022 18:45:30
dados = dados.withColumn("DT_PLEITO", f.to_date(dados.DT_PLEITO.cast(StringType()), 'dd/MM/yyyy'))
dados = dados.withColumn("DT_CARGA_URNA_EFETIVADA", f.to_timestamp(dados.DT_CARGA_URNA_EFETIVADA.cast(StringType()), 'dd/MM/yyyy HH:mm:ss'))
dados = dados.withColumn("DT_ABERTURA", f.to_timestamp(dados.DT_ABERTURA.cast(StringType()), 'dd/MM/yyyy HH:mm:ss'))
dados = dados.withColumn("DT_ENCERRAMENTO", f.to_timestamp(dados.DT_ENCERRAMENTO.cast(StringType()), 'dd/MM/yyyy HH:mm:ss'))
dados = dados.withColumn("DT_EMISSAO_BU", f.to_timestamp(dados.DT_EMISSAO_BU.cast(StringType()), 'dd/MM/yyyy HH:mm:ss'))
dados = dados.withColumn("DT_BU_RECEBIDO", f.to_timestamp(dados.DT_BU_RECEBIDO.cast(StringType()), 'dd/MM/yyyy HH:mm:ss'))

Confirmando se as operações foram realizadas.

In [None]:
dados.printSchema()

root
 |-- DT_GERACAO: date (nullable = true)
 |-- HH_GERACAO: timestamp (nullable = true)
 |-- ANO_ELEICAO: integer (nullable = true)
 |-- CD_TIPO_ELEICAO: integer (nullable = true)
 |-- NM_TIPO_ELEICAO: string (nullable = true)
 |-- CD_PLEITO: integer (nullable = true)
 |-- DT_PLEITO: date (nullable = true)
 |-- NR_TURNO: integer (nullable = true)
 |-- CD_ELEICAO: integer (nullable = true)
 |-- DS_ELEICAO: string (nullable = true)
 |-- SG_UF: string (nullable = true)
 |-- CD_MUNICIPIO: integer (nullable = true)
 |-- NM_MUNICIPIO: string (nullable = true)
 |-- NR_ZONA: integer (nullable = true)
 |-- NR_SECAO: integer (nullable = true)
 |-- NR_LOCAL_VOTACAO: integer (nullable = true)
 |-- CD_CARGO_PERGUNTA: integer (nullable = true)
 |-- DS_CARGO_PERGUNTA: string (nullable = true)
 |-- NR_PARTIDO: integer (nullable = true)
 |-- SG_PARTIDO: string (nullable = true)
 |-- NM_PARTIDO: string (nullable = true)
 |-- DT_BU_RECEBIDO: timestamp (nullable = true)
 |-- QT_APTOS: integer (nullable 

Inspecionando os dados.

In [None]:
dados.show()

+----------+-------------------+-----------+---------------+-----------------+---------+----------+--------+----------+--------------------+-----+------------+--------------+-------+--------+----------------+-----------------+-----------------+----------+------------+--------------------+-------------------+--------+-----------------+-------------+------------+------------+---------------+---------------+----------+--------------------+--------+-----------------+-------------------------+-------------------------+---------------------------+-----------------------+-----------------------+------------+-------------------+-------------------+-------------------------+-------------------+------------------+------------------+
|DT_GERACAO|         HH_GERACAO|ANO_ELEICAO|CD_TIPO_ELEICAO|  NM_TIPO_ELEICAO|CD_PLEITO| DT_PLEITO|NR_TURNO|CD_ELEICAO|          DS_ELEICAO|SG_UF|CD_MUNICIPIO|  NM_MUNICIPIO|NR_ZONA|NR_SECAO|NR_LOCAL_VOTACAO|CD_CARGO_PERGUNTA|DS_CARGO_PERGUNTA|NR_PARTIDO|  SG_PARTIDO

In [None]:
dados.limit(20).toPandas()

  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)


Unnamed: 0,DT_GERACAO,HH_GERACAO,ANO_ELEICAO,CD_TIPO_ELEICAO,NM_TIPO_ELEICAO,CD_PLEITO,DT_PLEITO,NR_TURNO,CD_ELEICAO,DS_ELEICAO,...,CD_FLASHCARD_URNA_EFETIVADA,DT_CARGA_URNA_EFETIVADA,DS_CARGO_PERGUNTA_SECAO,DS_AGREGADAS,DT_ABERTURA,DT_ENCERRAMENTO,QT_ELEITORES_BIOMETRIA_NH,DT_EMISSAO_BU,NR_JUNTA_APURADORA,NR_TURMA_APURADORA
0,2020-11-18,2023-07-01 15:49:53,2020,0,Eleição Ordinária,304,2020-11-15,1,426,Eleições Municipais 2020,...,4F8FD6B9,2020-11-03 10:35:00,11 - 366,#NULO#,2020-11-15 07:00:00,2020-11-15 17:03:43,0,2020-11-15 17:06:12,-1,-1
1,2020-11-18,2023-07-01 15:49:53,2020,0,Eleição Ordinária,304,2020-11-15,1,426,Eleições Municipais 2020,...,4F8FD6B9,2020-11-03 10:35:00,11 - 366,#NULO#,2020-11-15 07:00:00,2020-11-15 17:03:43,0,2020-11-15 17:06:12,-1,-1
2,2020-11-18,2023-07-01 15:49:53,2020,0,Eleição Ordinária,304,2020-11-15,1,426,Eleições Municipais 2020,...,4F8FD6B9,2020-11-03 10:35:00,11 - 366,#NULO#,2020-11-15 07:00:00,2020-11-15 17:03:43,0,2020-11-15 17:06:12,-1,-1
3,2020-11-18,2023-07-01 15:49:53,2020,0,Eleição Ordinária,304,2020-11-15,1,426,Eleições Municipais 2020,...,4F8FD6B9,2020-11-03 10:35:00,11 - 366,#NULO#,2020-11-15 07:00:00,2020-11-15 17:03:43,0,2020-11-15 17:06:12,-1,-1
4,2020-11-18,2023-07-01 15:49:53,2020,0,Eleição Ordinária,304,2020-11-15,1,426,Eleições Municipais 2020,...,4F8FD6B9,2020-11-03 10:35:00,11 - 366,#NULO#,2020-11-15 07:00:00,2020-11-15 17:03:43,0,2020-11-15 17:06:12,-1,-1
5,2020-11-18,2023-07-01 15:49:53,2020,0,Eleição Ordinária,304,2020-11-15,1,426,Eleições Municipais 2020,...,4F8FD6B9,2020-11-03 10:35:00,11 - 366,#NULO#,2020-11-15 07:00:00,2020-11-15 17:03:43,0,2020-11-15 17:06:12,-1,-1
6,2020-11-18,2023-07-01 15:49:53,2020,0,Eleição Ordinária,304,2020-11-15,1,426,Eleições Municipais 2020,...,4F8FD6B9,2020-11-03 10:35:00,11 - 366,#NULO#,2020-11-15 07:00:00,2020-11-15 17:03:43,0,2020-11-15 17:06:12,-1,-1
7,2020-11-18,2023-07-01 15:49:53,2020,0,Eleição Ordinária,304,2020-11-15,1,426,Eleições Municipais 2020,...,4F8FD6B9,2020-11-03 10:35:00,11 - 366,#NULO#,2020-11-15 07:00:00,2020-11-15 17:03:43,0,2020-11-15 17:06:12,-1,-1
8,2020-11-18,2023-07-01 15:49:53,2020,0,Eleição Ordinária,304,2020-11-15,1,426,Eleições Municipais 2020,...,4F8FD6B9,2020-11-03 10:35:00,11 - 366,#NULO#,2020-11-15 07:00:00,2020-11-15 17:03:43,0,2020-11-15 17:06:12,-1,-1
9,2020-11-18,2023-07-01 15:49:53,2020,0,Eleição Ordinária,304,2020-11-15,1,426,Eleições Municipais 2020,...,4F8FD6B9,2020-11-03 10:35:00,11 - 366,#NULO#,2020-11-15 07:00:00,2020-11-15 17:03:43,0,2020-11-15 17:06:12,-1,-1


## Análises com Dataframes

Seleção de dados, similar ao ```SELECT``` do SQL, definindo quais colunas deverá ser exibido.

Por exemplo, vamos exibar as colunas Estado, Município, Quantidade de Eleitores Aptos, e Quantidade de Comparecimento.

In [None]:
dados\
  .select("SG_UF", "NM_MUNICIPIO", "QT_APTOS", "QT_COMPARECIMENTO")\
  .show(30)



+-----+--------------+--------+-----------------+
|SG_UF|  NM_MUNICIPIO|QT_APTOS|QT_COMPARECIMENTO|
+-----+--------------+--------+-----------------+
|   RJ|RIO DE JANEIRO|     413|              229|
|   RJ|RIO DE JANEIRO|     413|              229|
|   RJ|RIO DE JANEIRO|     413|              229|
|   RJ|RIO DE JANEIRO|     413|              229|
|   RJ|RIO DE JANEIRO|     413|              229|
|   RJ|RIO DE JANEIRO|     413|              229|
|   RJ|RIO DE JANEIRO|     413|              229|
|   RJ|RIO DE JANEIRO|     413|              229|
|   RJ|RIO DE JANEIRO|     413|              229|
|   RJ|RIO DE JANEIRO|     413|              229|
|   RJ|RIO DE JANEIRO|     413|              229|
|   RJ|RIO DE JANEIRO|     409|              213|
|   RJ|RIO DE JANEIRO|     409|              213|
|   RJ|RIO DE JANEIRO|     409|              213|
|   RJ|RIO DE JANEIRO|     409|              213|
|   RJ|RIO DE JANEIRO|     409|              213|
|   RJ|RIO DE JANEIRO|     409|              213|


Ordenando dados, pelo Estado.

In [None]:
dados\
  .select("SG_UF", "NM_MUNICIPIO", "QT_APTOS", "QT_COMPARECIMENTO")\
  .orderBy("SG_UF", ascending=True)\
  .show(20)

+-----+------------+--------+-----------------+
|SG_UF|NM_MUNICIPIO|QT_APTOS|QT_COMPARECIMENTO|
+-----+------------+--------+-----------------+
|   AC|  ACRELÂNDIA|     382|              277|
|   AC|  ACRELÂNDIA|     383|              291|
|   AC|  ACRELÂNDIA|     382|              277|
|   AC|  ACRELÂNDIA|     382|              277|
|   AC|  ACRELÂNDIA|     382|              277|
|   AC|  ACRELÂNDIA|     382|              277|
|   AC|  ACRELÂNDIA|     382|              277|
|   AC|  ACRELÂNDIA|     430|              350|
|   AC|  ACRELÂNDIA|     430|              350|
|   AC|  ACRELÂNDIA|     430|              350|
|   AC|  ACRELÂNDIA|     430|              350|
|   AC|  ACRELÂNDIA|     430|              350|
|   AC|  ACRELÂNDIA|     430|              350|
|   AC|  ACRELÂNDIA|     427|              338|
|   AC|  ACRELÂNDIA|     427|              338|
|   AC|  ACRELÂNDIA|     427|              338|
|   AC|  ACRELÂNDIA|     427|              338|
|   AC|  ACRELÂNDIA|     427|           

Ordenando dados, pelo Estado e por Muncípio, ordem decrescente.

In [None]:
dados\
  .select("SG_UF", "NM_MUNICIPIO", "QT_APTOS", "QT_COMPARECIMENTO")\
  .orderBy("SG_UF","NM_MUNICIPIO", ascending=False)\
  .show(20)

+-----+------------+--------+-----------------+
|SG_UF|NM_MUNICIPIO|QT_APTOS|QT_COMPARECIMENTO|
+-----+------------+--------+-----------------+
|   TO|     XAMBIOÁ|     202|              178|
|   TO|     XAMBIOÁ|     204|              174|
|   TO|     XAMBIOÁ|     202|              178|
|   TO|     XAMBIOÁ|     202|              178|
|   TO|     XAMBIOÁ|     202|              178|
|   TO|     XAMBIOÁ|     202|              178|
|   TO|     XAMBIOÁ|     202|              178|
|   TO|     XAMBIOÁ|     353|              297|
|   TO|     XAMBIOÁ|     353|              297|
|   TO|     XAMBIOÁ|     353|              297|
|   TO|     XAMBIOÁ|     353|              297|
|   TO|     XAMBIOÁ|     353|              297|
|   TO|     XAMBIOÁ|     353|              297|
|   TO|     XAMBIOÁ|     321|              274|
|   TO|     XAMBIOÁ|     321|              274|
|   TO|     XAMBIOÁ|     321|              274|
|   TO|     XAMBIOÁ|     321|              274|
|   TO|     XAMBIOÁ|     321|           

Para termos ordenações distintas para cada coluna, vamos precisar utilizar dentro do pacote de funções do PySpark, os métodos ```col``` para referenciar uma coluna e os referente a ordenação ```desc()``` e ```asc()```

In [None]:
dados\
  .select("SG_UF", "NM_MUNICIPIO", "QT_APTOS", "QT_COMPARECIMENTO")\
  .orderBy(f.col("SG_UF").asc(), f.col("NM_MUNICIPIO").desc())\
  .show(20)

+-----+------------+--------+-----------------+
|SG_UF|NM_MUNICIPIO|QT_APTOS|QT_COMPARECIMENTO|
+-----+------------+--------+-----------------+
|   AC|      XAPURI|     210|              177|
|   AC|      XAPURI|     209|              157|
|   AC|      XAPURI|     210|              177|
|   AC|      XAPURI|     210|              177|
|   AC|      XAPURI|     210|              177|
|   AC|      XAPURI|     210|              177|
|   AC|      XAPURI|     207|              168|
|   AC|      XAPURI|     207|              168|
|   AC|      XAPURI|     207|              168|
|   AC|      XAPURI|     207|              168|
|   AC|      XAPURI|     207|              168|
|   AC|      XAPURI|     207|              168|
|   AC|      XAPURI|     213|              165|
|   AC|      XAPURI|     213|              165|
|   AC|      XAPURI|     213|              165|
|   AC|      XAPURI|     213|              165|
|   AC|      XAPURI|     213|              165|
|   AC|      XAPURI|     213|           

Filtrando dados considerando somente o Estado de São Paulo e o Município de São Paulo.

In [None]:
dados\
  .select("SG_UF", "NM_MUNICIPIO", "QT_APTOS", "QT_COMPARECIMENTO")\
  .where("`SG_UF`=='SP' AND NM_MUNICIPIO=='SÃO PAULO'")\
  .show(30)

+-----+------------+--------+-----------------+
|SG_UF|NM_MUNICIPIO|QT_APTOS|QT_COMPARECIMENTO|
+-----+------------+--------+-----------------+
|   SP|   SÃO PAULO|     339|              253|
|   SP|   SÃO PAULO|     339|              253|
|   SP|   SÃO PAULO|     339|              253|
|   SP|   SÃO PAULO|     339|              253|
|   SP|   SÃO PAULO|     339|              253|
|   SP|   SÃO PAULO|     339|              253|
|   SP|   SÃO PAULO|     339|              253|
|   SP|   SÃO PAULO|     339|              253|
|   SP|   SÃO PAULO|     339|              253|
|   SP|   SÃO PAULO|     339|              253|
|   SP|   SÃO PAULO|     449|              350|
|   SP|   SÃO PAULO|     449|              350|
|   SP|   SÃO PAULO|     449|              350|
|   SP|   SÃO PAULO|     449|              350|
|   SP|   SÃO PAULO|     449|              350|
|   SP|   SÃO PAULO|     449|              350|
|   SP|   SÃO PAULO|     449|              350|
|   SP|   SÃO PAULO|     449|           

In [None]:
dados\
  .select("SG_UF", "NM_MUNICIPIO", "QT_APTOS", "QT_COMPARECIMENTO")\
  .where("SG_UF!='SP' AND NM_MUNICIPIO LIKE 'SANTO%'")\
  .show(30)

+-----+--------------------+--------+-----------------+
|SG_UF|        NM_MUNICIPIO|QT_APTOS|QT_COMPARECIMENTO|
+-----+--------------------+--------+-----------------+
|   MG|SANTO ANTÔNIO DO ...|     508|              455|
|   MG|SANTO ANTÔNIO DO ...|     508|              455|
|   MG|SANTO ANTÔNIO DO ...|     508|              455|
|   MG|SANTO ANTÔNIO DO ...|     508|              455|
|   MG|SANTO ANTÔNIO DO ...|     508|              458|
|   MG|SANTO ANTÔNIO DO ...|     508|              458|
|   MG|SANTO ANTÔNIO DO ...|     508|              458|
|   MG|SANTO ANTÔNIO DO ...|     508|              458|
|   MG|SANTO ANTÔNIO DO ...|     303|              264|
|   MG|SANTO ANTÔNIO DO ...|     303|              264|
|   MG|SANTO ANTÔNIO DO ...|     303|              264|
|   MG|SANTO ANTÔNIO DO ...|     303|              264|
|   MG|SANTO ANTÔNIO DO ...|     455|              398|
|   MG|SANTO ANTÔNIO DO ...|     455|              398|
|   MG|SANTO ANTÔNIO DO ...|     455|           

O comando ```filter``` também pode ser utilizado, na verdade ele é um apelido do comando ```where```. Com ele podemos especificar mais condições de filtro, como ```==``` (igual), ```startsWith``` (começa com) ou ```endsWith``` (termina com).

Vmoa filtrar todas as cidades que terminam com "ANDRÉ".

> Colunas que tenha espaço no nome, informe com aspas simples, exemplo ```'column name'```.

In [None]:
dados\
  .select("SG_UF", "NM_MUNICIPIO", "QT_APTOS", "QT_COMPARECIMENTO")\
  .filter(dados.NM_MUNICIPIO.endswith("ANDRÉ"))\
  .filter(dados.SG_UF == "SP")\
  .show(30)

+-----+------------+--------+-----------------+
|SG_UF|NM_MUNICIPIO|QT_APTOS|QT_COMPARECIMENTO|
+-----+------------+--------+-----------------+
|   SP| SANTO ANDRÉ|     333|              235|
|   SP| SANTO ANDRÉ|     333|              235|
|   SP| SANTO ANDRÉ|     333|              235|
|   SP| SANTO ANDRÉ|     333|              235|
|   SP| SANTO ANDRÉ|     333|              235|
|   SP| SANTO ANDRÉ|     333|              235|
|   SP| SANTO ANDRÉ|     333|              235|
|   SP| SANTO ANDRÉ|     333|              235|
|   SP| SANTO ANDRÉ|     336|              239|
|   SP| SANTO ANDRÉ|     336|              239|
|   SP| SANTO ANDRÉ|     336|              239|
|   SP| SANTO ANDRÉ|     336|              239|
|   SP| SANTO ANDRÉ|     336|              239|
|   SP| SANTO ANDRÉ|     336|              239|
|   SP| SANTO ANDRÉ|     336|              239|
|   SP| SANTO ANDRÉ|     336|              239|
|   SP| SANTO ANDRÉ|     336|              239|
|   SP| SANTO ANDRÉ|     335|           

Utilizando o operador de igualdade.

In [None]:
dados\
  .select("SG_UF", "NM_MUNICIPIO", "QT_APTOS", "QT_COMPARECIMENTO")\
  .filter(dados.NM_MUNICIPIO == "SÃO CAETANO DO SUL")\
  .show(30)

+-----+------------------+--------+-----------------+
|SG_UF|      NM_MUNICIPIO|QT_APTOS|QT_COMPARECIMENTO|
+-----+------------------+--------+-----------------+
|   SP|SÃO CAETANO DO SUL|     385|              292|
|   SP|SÃO CAETANO DO SUL|     385|              292|
|   SP|SÃO CAETANO DO SUL|     385|              292|
|   SP|SÃO CAETANO DO SUL|     385|              292|
|   SP|SÃO CAETANO DO SUL|     385|              292|
|   SP|SÃO CAETANO DO SUL|     385|              292|
|   SP|SÃO CAETANO DO SUL|     385|              292|
|   SP|SÃO CAETANO DO SUL|     385|              292|
|   SP|SÃO CAETANO DO SUL|     385|              292|
|   SP|SÃO CAETANO DO SUL|     390|              271|
|   SP|SÃO CAETANO DO SUL|     390|              271|
|   SP|SÃO CAETANO DO SUL|     390|              271|
|   SP|SÃO CAETANO DO SUL|     390|              271|
|   SP|SÃO CAETANO DO SUL|     390|              271|
|   SP|SÃO CAETANO DO SUL|     390|              271|
|   SP|SÃO CAETANO DO SUL|  

Aplicando consultas com o comando Like. Nessa operação, o caracter coringa é o "%".
Vamos exibir as linhas que contém "CAETANO" no campo Município.

In [None]:
dados\
  .select("SG_UF", "NM_MUNICIPIO", "QT_APTOS", "QT_COMPARECIMENTO")\
  .filter(dados.NM_MUNICIPIO.like("%CAETANO%"))\
  .show(30)

+-----+------------------+--------+-----------------+
|SG_UF|      NM_MUNICIPIO|QT_APTOS|QT_COMPARECIMENTO|
+-----+------------------+--------+-----------------+
|   SP|SÃO CAETANO DO SUL|     385|              292|
|   SP|SÃO CAETANO DO SUL|     385|              292|
|   SP|SÃO CAETANO DO SUL|     385|              292|
|   SP|SÃO CAETANO DO SUL|     385|              292|
|   SP|SÃO CAETANO DO SUL|     385|              292|
|   SP|SÃO CAETANO DO SUL|     385|              292|
|   SP|SÃO CAETANO DO SUL|     385|              292|
|   SP|SÃO CAETANO DO SUL|     385|              292|
|   SP|SÃO CAETANO DO SUL|     385|              292|
|   SP|SÃO CAETANO DO SUL|     390|              271|
|   SP|SÃO CAETANO DO SUL|     390|              271|
|   SP|SÃO CAETANO DO SUL|     390|              271|
|   SP|SÃO CAETANO DO SUL|     390|              271|
|   SP|SÃO CAETANO DO SUL|     390|              271|
|   SP|SÃO CAETANO DO SUL|     390|              271|
|   SP|SÃO CAETANO DO SUL|  

Operações com agrupamento.
Vamos contar, quantos votos foram obtidos por município no Estado de São Paulo.

In [None]:
dados\
  .select("SG_UF", "NM_MUNICIPIO")\
  .where("SG_UF=='SP'")\
  .groupBy("NM_MUNICIPIO")\
  .count()\
  .show()

+--------------------+------+
|        NM_MUNICIPIO| count|
+--------------------+------+
|          PIRACICABA|  9658|
|              OSASCO| 10480|
|              SANTOS| 13233|
|            SOROCABA| 12571|
|  SÃO CAETANO DO SUL|  3332|
|           SÃO PAULO|253743|
|            CAMPINAS| 28295|
|            SALTINHO|   102|
|            BERTIOGA|  1083|
| RIO GRANDE DA SERRA|   547|
|      RIBEIRÃO PIRES|  1551|
|     ITAQUAQUECETUBA|  4151|
|         SANTO ANDRÉ| 12558|
|SÃO JOAQUIM DA BARRA|   835|
|SÃO JOSÉ DO RIO P...|   631|
|           PIQUEROBI|    28|
|            PRATÂNIA|    56|
|             MARÍLIA|  4503|
|         POTIRENDABA|   132|
|  ÁGUAS DE SÃO PEDRO|    60|
+--------------------+------+
only showing top 20 rows



In [None]:
dados\
  .select("SG_UF", "NM_MUNICIPIO", "QT_VOTOS")\
  .where("SG_UF=='SP'")\
  .groupBy("NM_MUNICIPIO")\
  .sum("QT_VOTOS")\
  .show()

+--------------------+-------------+
|        NM_MUNICIPIO|sum(QT_VOTOS)|
+--------------------+-------------+
|          PIRACICABA|       202203|
|              OSASCO|       407183|
|              SANTOS|       229006|
|            SOROCABA|       356477|
|  SÃO CAETANO DO SUL|       107565|
|           SÃO PAULO|      6354100|
|            CAMPINAS|       583284|
|            SALTINHO|         5419|
|            BERTIOGA|        32404|
| RIO GRANDE DA SERRA|        27481|
|      RIBEIRÃO PIRES|        68018|
|     ITAQUAQUECETUBA|       182221|
|         SANTO ANDRÉ|       404505|
|SÃO JOAQUIM DA BARRA|        27125|
|SÃO JOSÉ DO RIO P...|        29645|
|           PIQUEROBI|         2666|
|            PRATÂNIA|         3805|
|             MARÍLIA|       125732|
|         POTIRENDABA|        10279|
|  ÁGUAS DE SÃO PEDRO|         2939|
+--------------------+-------------+
only showing top 20 rows



Removendo a truncagem dos dados.

In [9]:
dados\
  .select("SG_UF", "NM_MUNICIPIO", "QT_VOTOS")\
  .where("SG_UF=='SP'")\
  .groupBy("NM_MUNICIPIO")\
  .sum("QT_VOTOS")\
  .show(truncate=False)

+---------------------+-------------+
|NM_MUNICIPIO         |sum(QT_VOTOS)|
+---------------------+-------------+
|PIRACICABA           |202203       |
|OSASCO               |407183       |
|SANTOS               |229006       |
|SOROCABA             |356477       |
|SÃO CAETANO DO SUL   |107565       |
|SÃO PAULO            |6354100      |
|CAMPINAS             |583284       |
|SALTINHO             |5419         |
|BERTIOGA             |32404        |
|RIO GRANDE DA SERRA  |27481        |
|RIBEIRÃO PIRES       |68018        |
|ITAQUAQUECETUBA      |182221       |
|SANTO ANDRÉ          |404505       |
|SÃO JOAQUIM DA BARRA |27125        |
|SÃO JOSÉ DO RIO PARDO|29645        |
|PIQUEROBI            |2666         |
|PRATÂNIA             |3805         |
|MARÍLIA              |125732       |
|POTIRENDABA          |10279        |
|ÁGUAS DE SÃO PEDRO   |2939         |
+---------------------+-------------+
only showing top 20 rows



Obtendo a soma de votos por município em cada Estado, ordenando por Estado e Município.

In [None]:
dados\
  .select("SG_UF", "NM_MUNICIPIO", "QT_VOTOS")\
  .groupBy(["SG_UF", "NM_MUNICIPIO"])\
  .agg(
      f.sum("QT_VOTOS").alias("SUM_QT_VOTOS")
  )\
  .orderBy(["SG_UF", "NM_MUNICIPIO"], ascending=True)\
  .show(truncate=False)

+-----+--------------------+------------+
|SG_UF|NM_MUNICIPIO        |SUM_QT_VOTOS|
+-----+--------------------+------------+
|AC   |ACRELÂNDIA          |7093        |
|AC   |ASSIS BRASIL        |4918        |
|AC   |BRASILÉIA           |13522       |
|AC   |BUJARI              |7387        |
|AC   |CAPIXABA            |6108        |
|AC   |CRUZEIRO DO SUL     |44533       |
|AC   |EPITACIOLÂNDIA      |9447        |
|AC   |FEIJÓ               |16218       |
|AC   |JORDÃO              |4308        |
|AC   |MANOEL URBANO       |5716        |
|AC   |MARECHAL THAUMATURGO|8616        |
|AC   |MÂNCIO LIMA         |10964       |
|AC   |PLÁCIDO DE CASTRO   |9823        |
|AC   |PORTO ACRE          |10026       |
|AC   |PORTO WALTER        |5658        |
|AC   |RIO BRANCO          |186769      |
|AC   |RODRIGUES ALVES     |9303        |
|AC   |SANTA ROSA DO PURUS |2992        |
|AC   |SENA MADUREIRA      |22382       |
|AC   |SENADOR GUIOMARD    |14140       |
+-----+--------------------+------

Podemos obter o sumário dos dados, que inclui média, desvio padrão, e distribuição por quartis.

Vamos obter o sumário do dataset filtrado e agrupado utilizado anteriormente.

In [None]:
dados_sumario = dados\
  .select("SG_UF", "NM_MUNICIPIO", "QT_VOTOS")\
  .groupBy(["SG_UF", "NM_MUNICIPIO"])\
  .agg(
      f.sum("QT_VOTOS").alias("SUM_QT_VOTOS")
  )\
  .orderBy(["SG_UF", "NM_MUNICIPIO"], ascending=True)

In [None]:
dados_sumario\
  .select("SUM_QT_VOTOS")\
  .summary()\
  .show()

+-------+------------------+
|summary|      SUM_QT_VOTOS|
+-------+------------------+
|  count|              5568|
|   mean|20415.827047413793|
| stddev|111251.85240125858|
|    min|               930|
|    25%|              3939|
|    50%|              7299|
|    75%|             14799|
|    max|           6354100|
+-------+------------------+



Uniões e junções.

Vamos utilizar o seguinte dataframe abaixo para relacionarmos a ideologia de cada partido como complemento da contagem dos votos.

In [10]:
partido_ideologia = spark.createDataFrame(
    [
        ("PROS", "CENTRO"),
        ("REPUBLICANOS", "DIRETA"),
        ("PSDB", "CENTRO"),
        ("PSOL", "ESQUERDA"),
        ("CIDADANIA", "CENTRO"),
        ("PRTB", "DIRETA"),
        ("PCO", "ESQUERDA"),

        ("DC", "CENTRO"),
        ("PMB", "CENTRO"),
        ("PSB", "CENTRO"),
        ("PSTU", "ESQUERDA"),
        ("PT", "ESQUERDA"),
        ("PMN", "CENTRO"),
        ("SOLIDARIEDADE", "CENTRO"),
        ("MDB", "CENTRO"),
        ("PTC", "DIRETA"),

        ("PDT", "CENTRO"),
        ("PV", "CENTRO"),
        ("PSC", "DIRETA")

    ],
    ['PARTIDO', 'IDEOLOGIA']
)

partido_ideologia.show(truncate=False)

+-------------+---------+
|PARTIDO      |IDEOLOGIA|
+-------------+---------+
|PROS         |CENTRO   |
|REPUBLICANOS |DIRETA   |
|PSDB         |CENTRO   |
|PSOL         |ESQUERDA |
|CIDADANIA    |CENTRO   |
|PRTB         |DIRETA   |
|PCO          |ESQUERDA |
|DC           |CENTRO   |
|PMB          |CENTRO   |
|PSB          |CENTRO   |
|PSTU         |ESQUERDA |
|PT           |ESQUERDA |
|PMN          |CENTRO   |
|SOLIDARIEDADE|CENTRO   |
|MDB          |CENTRO   |
|PTC          |DIRETA   |
|PDT          |CENTRO   |
|PV           |CENTRO   |
|PSC          |DIRETA   |
+-------------+---------+



In [12]:
votacao_partido_estado = dados\
  .select("SG_UF", "SG_PARTIDO", "QT_VOTOS")\
  .where("SG_PARTIDO != '#NULO#'")\
  .groupBy(["SG_UF", "SG_PARTIDO"])\
  .agg(
      f.sum("QT_VOTOS").alias("SUM_QT_VOTOS"),
      f.count("QT_VOTOS").alias("COUNT_QT_VOTOS")
  )\
  .orderBy(["SG_UF", "SG_PARTIDO"], ascending=True)\

votacao_partido_estado.show(truncate=False)

+-----+-------------+------------+--------------+
|SG_UF|SG_PARTIDO   |SUM_QT_VOTOS|COUNT_QT_VOTOS|
+-----+-------------+------------+--------------+
|AC   |AVANTE       |2294        |653           |
|AC   |DEM          |7277        |122           |
|AC   |MDB          |79811       |1580          |
|AC   |PC do B      |5997        |130           |
|AC   |PDT          |11348       |167           |
|AC   |PL           |186         |17            |
|AC   |PP           |146612      |1376          |
|AC   |PROS         |9201        |138           |
|AC   |PSB          |41108       |767           |
|AC   |PSC          |1509        |575           |
|AC   |PSD          |12956       |174           |
|AC   |PSDB         |36555       |941           |
|AC   |PSL          |13477       |399           |
|AC   |PSOL         |284         |90            |
|AC   |PT           |33948       |1111          |
|AC   |PTB          |1124        |106           |
|AC   |REPUBLICANOS |1063        |47            |


In [13]:
ideologia_partido_estado = votacao_partido_estado.join(partido_ideologia, partido_ideologia.PARTIDO == votacao_partido_estado.SG_PARTIDO, how="INNER")
ideologia_partido_estado.show(truncate=False)

+-----+----------+------------+--------------+-------+---------+
|SG_UF|SG_PARTIDO|SUM_QT_VOTOS|COUNT_QT_VOTOS|PARTIDO|IDEOLOGIA|
+-----+----------+------------+--------------+-------+---------+
|AP   |PROS      |8937        |132           |PROS   |CENTRO   |
|AC   |PROS      |9201        |138           |PROS   |CENTRO   |
|RR   |PROS      |5021        |101           |PROS   |CENTRO   |
|MT   |PROS      |68700       |1604          |PROS   |CENTRO   |
|AM   |PROS      |3309        |250           |PROS   |CENTRO   |
|TO   |PROS      |39837       |779           |PROS   |CENTRO   |
|BA   |PROS      |88047       |2874          |PROS   |CENTRO   |
|PA   |PROS      |15848       |359           |PROS   |CENTRO   |
|SE   |PROS      |102         |48            |PROS   |CENTRO   |
|RO   |PROS      |6679        |268           |PROS   |CENTRO   |
|AL   |PROS      |5199        |164           |PROS   |CENTRO   |
|MA   |PROS      |31647       |2345          |PROS   |CENTRO   |
|RS   |PROS      |4135   

In [None]:
ideologia_partido_estado.printSchema()

root
 |-- SG_UF: string (nullable = true)
 |-- SG_PARTIDO: string (nullable = true)
 |-- SUM_QT_VOTOS: long (nullable = true)
 |-- PARTIDO: string (nullable = true)
 |-- IDEOLOGIA: string (nullable = true)



In [None]:
ideologia_partido_estado\
  .select("SG_UF", "IDEOLOGIA", "SUM_QT_VOTOS")\
  .where("SG_UF == 'SP'")\
  .groupBy(["SG_UF", "IDEOLOGIA"])\
  .agg(
      f.sum("SUM_QT_VOTOS").alias("QT_VOTOS")
  )\
  .orderBy(["SG_UF", "IDEOLOGIA"], ascending=True)\
  .show(truncate=False)

+-----+---------+--------+
|SG_UF|IDEOLOGIA|QT_VOTOS|
+-----+---------+--------+
|SP   |CENTRO   |9110189 |
|SP   |DIRETA   |1706364 |
|SP   |ESQUERDA |2851290 |
+-----+---------+--------+



## Spark SQL

Utilizar consultas SQL com o Spark.
Maneira mais fácil para as pessoas que já tem experiência com SQL, podendo utilizar a mesma sintaxe, incluindo seleção, filtro e agrupamentos.

Inicialmente precisamos criar uma View que será utilizada como o nome da tabela para as operações.

In [14]:
dados.createOrReplaceTempView("votacaoView")

Agora podemos referenciá-la em todos os comandos, utilizando a mesma sintaxe do SQL.

In [16]:
query = spark\
    .sql("""
        SELECT DT_GERACAO, ANO_ELEICAO, NM_TIPO_ELEICAO, CD_PLEITO
            FROM votacaoView
            WHERE SG_UF='SP'
    """)

query.show(truncate=False)

+----------+-----------+-----------------+---------+
|DT_GERACAO|ANO_ELEICAO|NM_TIPO_ELEICAO  |CD_PLEITO|
+----------+-----------+-----------------+---------+
|2020-11-18|2020       |Eleição Ordinária|304      |
|2020-11-18|2020       |Eleição Ordinária|304      |
|2020-11-18|2020       |Eleição Ordinária|304      |
|2020-11-18|2020       |Eleição Ordinária|304      |
|2020-11-18|2020       |Eleição Ordinária|304      |
|2020-11-18|2020       |Eleição Ordinária|304      |
|2020-11-18|2020       |Eleição Ordinária|304      |
|2020-11-18|2020       |Eleição Ordinária|304      |
|2020-11-18|2020       |Eleição Ordinária|304      |
|2020-11-18|2020       |Eleição Ordinária|304      |
|2020-11-18|2020       |Eleição Ordinária|304      |
|2020-11-18|2020       |Eleição Ordinária|304      |
|2020-11-18|2020       |Eleição Ordinária|304      |
|2020-11-18|2020       |Eleição Ordinária|304      |
|2020-11-18|2020       |Eleição Ordinária|304      |
|2020-11-18|2020       |Eleição Ordinária|304 

Buscar os top 10 municípios cuja abstenção foi maior que 25%.

In [23]:
query = spark\
    .sql("""
        SELECT SG_UF, NM_MUNICIPIO, (SUM(QT_APTOS)-SUM(QT_COMPARECIMENTO))/SUM(QT_APTOS) AS PORC_ABST
            FROM votacaoView
            GROUP BY SG_UF, NM_MUNICIPIO
            HAVING (SUM(QT_APTOS)-SUM(QT_COMPARECIMENTO))/SUM(QT_APTOS) > 0.25
            ORDER BY PORC_ABST DESC
            LIMIT 10
    """)

query.show(truncate=False)

+-----+---------------+-------------------+
|SG_UF|NM_MUNICIPIO   |PORC_ABST          |
+-----+---------------+-------------------+
|AP   |OIAPOQUE       |0.3806900153409276 |
|MG   |RIO VERMELHO   |0.36941430000769643|
|MG   |MINAS NOVAS    |0.36925768786523794|
|MG   |NOVO CRUZEIRO  |0.3502751045531992 |
|RO   |BURITIS        |0.3455590738049762 |
|SP   |MOCOCA         |0.335387323943662  |
|SP   |BARRETOS       |0.334445628589518  |
|MS   |CHAPADÃO DO SUL|0.3331181407359587 |
|MG   |POÇOS DE CALDAS|0.33307975035014065|
|MT   |ARIPUANÃ       |0.3330401282679375 |
+-----+---------------+-------------------+



Exibir a votação total por partido "DEM" em cada Estado do país.

In [26]:
query_votos_municipio = spark\
    .sql("""
        SELECT SG_UF, SG_PARTIDO, NM_MUNICIPIO, SUM(QT_VOTOS) AS QT_VOTOS
            FROM votacaoView
            WHERE SG_PARTIDO IN ('DEM')
            GROUP BY SG_UF, SG_PARTIDO, NM_MUNICIPIO
            ORDER BY QT_VOTOS DESC
    """)

query_votos_municipio.show(100, truncate=False)

+-----+----------+-----------------------+--------+
|SG_UF|SG_PARTIDO|NM_MUNICIPIO           |QT_VOTOS|
+-----+----------+-----------------------+--------+
|RJ   |DEM       |RIO DE JANEIRO         |974804  |
|BA   |DEM       |SALVADOR               |779408  |
|PR   |DEM       |CURITIBA               |499821  |
|PE   |DEM       |RECIFE                 |200551  |
|SC   |DEM       |FLORIANÓPOLIS          |126144  |
|RJ   |DEM       |VOLTA REDONDA          |85673   |
|MA   |DEM       |SÃO LUÍS               |83138   |
|MG   |DEM       |RIBEIRÃO DAS NEVES     |72679   |
|RJ   |DEM       |SÃO JOÃO DE MERITI     |71730   |
|PA   |DEM       |SANTARÉM               |71594   |
|BA   |DEM       |CAMAÇARI               |68927   |
|AP   |DEM       |MACAPÁ                 |59511   |
|RJ   |DEM       |RESENDE                |54880   |
|SP   |DEM       |BRAGANÇA PAULISTA      |53730   |
|SP   |DEM       |BAURU                  |53299   |
|MG   |DEM       |CONTAGEM               |52371   |
|PI   |DEM  

In [30]:
query_votos_municipio.createOrReplaceTempView("votos_municipioView")

In [31]:
query_votos_municipio = spark\
    .sql("""
        SELECT SG_UF, SG_PARTIDO, SUM(QT_VOTOS) AS QT_VOTOS
            FROM votos_municipioView
            GROUP BY SG_UF, SG_PARTIDO
            ORDER BY QT_VOTOS DESC
    """)

query_votos_municipio.show(10, truncate=False)

+-----+----------+--------+
|SG_UF|SG_PARTIDO|QT_VOTOS|
+-----+----------+--------+
|BA   |DEM       |1582139 |
|RJ   |DEM       |1370214 |
|MG   |DEM       |856913  |
|SP   |DEM       |845659  |
|PR   |DEM       |805878  |
|GO   |DEM       |471704  |
|PE   |DEM       |379586  |
|MA   |DEM       |287778  |
|SC   |DEM       |202139  |
|PB   |DEM       |202089  |
+-----+----------+--------+
only showing top 10 rows



Vamos correlacionar 2 tabelas. Uma com as regiões do Brasil para entendermos a média de votações nelas.

In [34]:
regiao_estado = spark.createDataFrame(
    [
        ("Norte", "AM"),
        ("Norte", "AC"),
        ("Norte", "RO"),
        ("Norte", "RR"),
        ("Norte", "AM"),
        ("Norte", "PA"),
        ("Norte", "TO"),

        ("Nordeste", "MA"),
        ("Nordeste", "PI"),
        ("Nordeste", "RN"),
        ("Nordeste", "CE"),
        ("Nordeste", "PA"),
        ("Nordeste", "BA"),
        ("Nordeste", "PE"),
        ("Nordeste", "AL"),
        ("Nordeste", "SE"),

        ("Centro-Oeste", "GO"),
        ("Centro-Oeste", "MT"),
        ("Centro-Oeste", "MS"),
        ("Centro-Oeste", "DF"),

        ("Sudeste", "MG"),
        ("Sudeste", "ES"),
        ("Sudeste", "RJ"),
        ("Sudeste", "SP"),

        ("Sul", "SC"),
        ("Sul", "PR"),
        ("Sul", "RS"),

    ],
    ['REGIAO', 'ESTADO']
)

In [35]:
regiao_estado.show()

+------------+------+
|      REGIAO|ESTADO|
+------------+------+
|       Norte|    AM|
|       Norte|    AC|
|       Norte|    RO|
|       Norte|    RR|
|       Norte|    AM|
|       Norte|    PA|
|       Norte|    TO|
|    Nordeste|    MA|
|    Nordeste|    PI|
|    Nordeste|    RN|
|    Nordeste|    CE|
|    Nordeste|    PA|
|    Nordeste|    BA|
|    Nordeste|    PE|
|    Nordeste|    AL|
|    Nordeste|    SE|
|Centro-Oeste|    GO|
|Centro-Oeste|    MT|
|Centro-Oeste|    MS|
|Centro-Oeste|    DF|
+------------+------+
only showing top 20 rows



In [36]:
regiao_estado.createOrReplaceTempView("regiao_estadoView")

In [37]:
query_votos_municipio = spark\
    .sql("""
        SELECT r.REGIAO, v.SG_PARTIDO, SUM(v.QT_VOTOS) as QT_VOTOS
            FROM votacaoView v
            INNER JOIN regiao_estadoView r
            ON v.SG_UF = r.ESTADO
            GROUP BY v.SG_PARTIDO, r.REGIAO
            HAVING SG_PARTIDO IN ('DEM')
            ORDER BY QT_VOTOS DESC
    """)

query_votos_municipio.show(truncate=False)

+------------+----------+--------+
|REGIAO      |SG_PARTIDO|QT_VOTOS|
+------------+----------+--------+
|Sudeste     |DEM       |3160461 |
|Nordeste    |DEM       |2747437 |
|Sul         |DEM       |1167685 |
|Centro-Oeste|DEM       |777660  |
|Norte       |DEM       |446049  |
+------------+----------+--------+

