# Testing Apache Spark queries 

Disclaimer: O intuito deste trabalho é meramente aprender a utilizar o Apache Spark. Embora os dados utilizados tenham teor político, não há intenção de fazer qualquer tipo apologia, seja positiva ou negativa, a qualquer partido ou candidato.

O autor do trabalho declara não ter vínculo algum com nenhum partido político ou candidato.

Com intuito de negar quaisquer questionamentos, tentarei usar algumas estratégias:
- Sempre utilizar funções aleatórias para printar os dados
- Tentar trabalhar o problema de forma geral, sem focar em nenhum partido ou candidato
- Evitar apresentar nomes de candidatos ao menos que seja necessário

## Instantiating Spark session

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

from pyspark.sql.types import (
    StructType, 
    StructField, 
    StringType, 
    IntegerType, 
    DateType,
    FloatType
)

In [2]:
spark = SparkSession.builder.appName("Test_queries").getOrCreate()



## Loading csv files from the folder

Bens declarados dos Candidatos: https://dados.gov.br/dataset/candidatos-2022

Brasil states: https://github.com/fititnt/gis-dataset-brasil

### Loading bens

In [3]:
df_bens_raw = (
    spark
    .read
    .option("header", "true")
    .option("encoding", 'latin1')
    .option("delimiter", ";")
    .option("inferSchema", "true")
    .csv('./bem_candidato_2022/bem_candidato_2022_BRASIL.csv')
)

Filtrando as colunas usadas e transformando o valor do bem em float

In [4]:
df_bens = (
    df_bens_raw
    .select(
        [
            "SQ_CANDIDATO",
            "DS_TIPO_BEM_CANDIDATO",
            "DS_BEM_CANDIDATO",
            "VR_BEM_CANDIDATO"
        ]
    )
    .withColumn(
        'VR_BEM_CANDIDATO',
        F.regexp_replace('VR_BEM_CANDIDATO', ',', '.')
        .cast(FloatType())
    )
)

In [5]:
df_bens.show(5)

+------------+---------------------+--------------------+----------------+
|SQ_CANDIDATO|DS_TIPO_BEM_CANDIDATO|    DS_BEM_CANDIDATO|VR_BEM_CANDIDATO|
+------------+---------------------+--------------------+----------------+
|110001608768| Veículo automotor...|               Carro|         40000.0|
|200001608811| Fundos: Ações, Mú...|5.617,50 AÇÕES DA...|        50164.28|
|240001614377|              Terreno|TERRENO URBANO,  ...|        186200.0|
|240001614377| Caderneta de poup...|CADERNETA DE POUP...|         2367.39|
|210001647159| Depósito bancário...|SALDO APLICACAO F...|        22713.17|
+------------+---------------------+--------------------+----------------+
only showing top 5 rows



### Loading Candidatos

In [6]:
df_candidatos_raw = (
    spark
    .read
    .option("header", "true")
    .option("encoding", 'latin1')
    .option("delimiter", ";")
    .option("inferSchema", "true")
    .csv('./consulta_cand_2022/consulta_cand_2022_BRASIL.csv')
)

In [7]:
df_deputados_fed = (
    df_candidatos_raw
    .filter(df_candidatos_raw['DS_SITUACAO_CANDIDATURA'] != 'INAPTO')
    .filter(df_candidatos_raw['CD_CARGO'] == '6') # Código do cargo de deputado federal
    .select(
        [
            "SG_UE",
            "NM_UE",
            "SQ_CANDIDATO",
            "NM_CANDIDATO",
            "SG_PARTIDO",
        ]
    )
)

Visualizando os dados ...

In [8]:
(
    df_deputados_fed
    .select( ["SG_UE", "SG_PARTIDO"] )
    .sample(False, 0.01)
    .show(10)
)

+-----+----------+
|SG_UE|SG_PARTIDO|
+-----+----------+
|   SC|      PSOL|
|   SP|       PSD|
|   CE|      REDE|
|   MG|       PDT|
|   PB|     UNIÃO|
|   MA|      AGIR|
|   MG|        PT|
|   SP|    AVANTE|
|   MG|        PT|
|   MG|       PSD|
+-----+----------+
only showing top 10 rows



## Realizando o Join entre os dois DataFrames

Unir as duas tabelas usando o campo `SQ_CANDIDATO` como chave

In [9]:
df_candidatos_bens = (
    df_deputados_fed
    .join(
        df_bens, 
        on='SQ_CANDIDATO', 
        how='left'
    )
    .filter(
        F.col('VR_BEM_CANDIDATO').isNotNull()
    )
)

In [10]:
df_candidatos_bens.sample(False, 0.01).show(5)

+------------+-----+--------------+--------------------+------------+---------------------+--------------------+----------------+
|SQ_CANDIDATO|SG_UE|         NM_UE|        NM_CANDIDATO|  SG_PARTIDO|DS_TIPO_BEM_CANDIDATO|    DS_BEM_CANDIDATO|VR_BEM_CANDIDATO|
+------------+-----+--------------+--------------------+------------+---------------------+--------------------+----------------+
|190001619493|   RJ|RIO DE JANEIRO|OTAVIO SANTOS SIL...|       UNIÃO| Depósito bancário...|Conta corrente - ...|          793.96|
|230001602809|   RR|       RORAIMA|    DUDA BRITO RAMOS|         MDB|                 Casa|Rua Edson Vieira ...|        120000.0|
| 10001619994|   AC|          ACRE|GERALDO ISRAEL MI...|REPUBLICANOS| Quotas ou quinhõe...|26% DAS QUOTAS DO...|         98039.0|
|250001612101|   SP|     SÃO PAULO|        MÁRCIA ROCHA|   CIDADANIA| Jóia, quadro, obj...| ESCULTURA ARTISTICA|          4000.0|
|250001643749|   SP|     SÃO PAULO|LEANDRO CSEIMAN D...|         PDT|              Terreno

# Queries

Existem duas formas principais de realizar queries no apache Spark aqui no Python: Funcional e via SQL.
Essa diferença é meramente de sintaxe e escolha pessoal, pois o Spark executa as queries da mesma forma.

Como eu tenho mais familiaridade com o Pandas, para mim, é mais natural utilizar o encadeamento de funções.

**Top 10 candidatos a Deputado Federal com maior patrimônio**

In [11]:
# Nomes e patrimonio dos Candidatos mais ricos
patrimonio_candidatos = (
    df_candidatos_bens
    .groupBy("SQ_CANDIDATO")
    .agg(
        F.sum("VR_BEM_CANDIDATO").alias("VR_BEM_CANDIDATO")
    )
    .join(
        df_deputados_fed,
        on='SQ_CANDIDATO',
        how='left'
    )
    .orderBy(
        F.desc("VR_BEM_CANDIDATO")
    )
    .select(
        [
            "NM_CANDIDATO",
            "SG_PARTIDO",
            "SG_UE",
            "VR_BEM_CANDIDATO",
        ]
    )
    .withColumn(
        "VR_BEM_CANDIDATO",
        F.format_number(
            F.col("VR_BEM_CANDIDATO") / 1e6, 2
        )
    )
)

patrimonio_candidatos.show(10, truncate=False)

+----------------------------------+----------+-----+----------------+
|NM_CANDIDATO                      |SG_PARTIDO|SG_UE|VR_BEM_CANDIDATO|
+----------------------------------+----------+-----+----------------+
|EUNÍCIO LOPES DE OLIVEIRA         |MDB       |CE   |158.18          |
|RUY ADRIANO BORGES MUNIZ          |AVANTE    |MG   |158.02          |
|JOSE GOMES FERREIRA FILHO         |PP        |DF   |128.64          |
|JADYEL SILVA ALENCAR              |PV        |PI   |107.55          |
|PABLO HENRIQUE COSTA MARÇAL       |PROS      |SP   |88.44           |
|ALEX DOS SANTOS GARCIA            |PSC       |RN   |80.01           |
|ADRIANA MANGABEIRA WANDERLEY      |PSD       |DF   |77.06           |
|NEWTON BONIN                      |UNIÃO     |PR   |76.12           |
|HERCILIO ARAUJO DINIZ FILHO       |MDB       |MG   |65.90           |
|PAULO ROBERTO ROQUE ANTONIO KHOURI|NOVO      |DF   |65.29           |
+----------------------------------+----------+-----+----------------+
only s

**Valor total declarado**

In [12]:
total_candidatos_bens = (
    df_candidatos_bens
    .agg(
        F.sum("VR_BEM_CANDIDATO").alias("TOTAL_BEM_CANDIDATO_BILHOES"),
        F.countDistinct("SQ_CANDIDATO").alias("TOTAL_CANDIDATOS")
    )
    .withColumn(
        "TOTAL_BEM_CANDIDATO_BILHOES",
        F.format_number(
            F.col("TOTAL_BEM_CANDIDATO_BILHOES") / 1e9, 2
        )
    )
)

In [13]:
total_candidatos_bens.collect()

[Row(TOTAL_BEM_CANDIDATO_BILHOES='7.21', TOTAL_CANDIDATOS=6366)]

**Valor total declarado de bens por UF**

In [14]:
valor_por_uf = (
    df_candidatos_bens
    .groupBy(
        "SG_UE"
    )
    # Aggregate sum and avg
    .agg(
        F.sum( F.col("VR_BEM_CANDIDATO") ).alias("TOTAL BENS"),
        F.avg( F.col("VR_BEM_CANDIDATO") ).alias("MEDIA BENS")
    )
    .orderBy(
        F.desc("TOTAL BENS")
    )
)

In [15]:
valor_por_uf.show(30, truncate=False)

+-----+--------------------+------------------+
|SG_UE|TOTAL BENS          |MEDIA BENS        |
+-----+--------------------+------------------+
|SP   |1.2143276856198936E9|205469.99756681785|
|MG   |9.504191711380489E8 |256523.39301971628|
|RJ   |5.998792442260875E8 |195209.64667298648|
|PR   |4.687364865754497E8 |190002.62933743402|
|DF   |4.3387237743134475E8|611949.756602743  |
|GO   |3.1266681836336124E8|234032.04967317457|
|CE   |2.976540537267532E8 |272327.5880391154 |
|BA   |2.840689506582558E8 |157378.92003227468|
|RS   |2.6503172541893113E8|130493.21783305323|
|SC   |2.4177128341094604E8|180696.02646558   |
|MA   |2.0494777125577286E8|170363.89963073388|
|PE   |2.028025139128284E8 |159561.3799471506 |
|PA   |2.0055216283283943E8|214494.29179982826|
|MT   |1.9994814255513492E8|292321.84584084054|
|RR   |1.7936015321644616E8|452929.6798395105 |
|PI   |1.626300498293633E8 |309182.60423833324|
|RN   |1.5322524850030357E8|212223.33587299663|
|AC   |1.2491514657306153E8|444537.88816

**Valor total declarado de bens por Tipo de bem**

In [16]:
from pyspark.sql.window import Window

In [17]:
valor_por_tipo_bem = (
    df_candidatos_bens
    .groupBy(
        "DS_TIPO_BEM_CANDIDATO"
    )
    .agg(
        F.sum( F.col("VR_BEM_CANDIDATO") ).alias("TOTAL BENS")
    )
    .orderBy(
        F.desc("TOTAL BENS")
    )
    .withColumn(
        "Percentual",
        # Trick to get the percentage of each row
        # https://stackoverflow.com/questions/48915834/how-to-calculate-percentage-of-each-row-in-pyspark
        F.col("TOTAL BENS") / F.sum(F.col("TOTAL BENS")).over(Window.partitionBy()) * 100
    )
)   

In [18]:
valor_por_tipo_bem.show(10, truncate=False)

+------------------------------------------------------------+--------------------+------------------+
|DS_TIPO_BEM_CANDIDATO                                       |TOTAL BENS          |Percentual        |
+------------------------------------------------------------+--------------------+------------------+
|Casa                                                        |1.0589040910269547E9|14.6958322559435  |
|Quotas ou quinhões de capital                               |9.503185969242871E8 |13.188845721200835|
|Apartamento                                                 |8.900710215289758E8 |12.352709314382224|
|Outros bens imóveis                                         |6.791679916829965E8 |9.425725109532399 |
|OUTROS BENS E DIREITOS                                      |5.823552953922777E8 |8.082125479509068 |
|Terreno                                                     |5.2137738439256346E8|7.235853225994051 |
|Outras participações societárias                            |4.374760876

## Saving answers to csv files

In [19]:
ANSWERS_PATH = './answers'

# Convert to Pandas
# and save to CSV

patrimonio_candidatos.toPandas().to_csv(
    f'{ANSWERS_PATH}/patrimonio_candidatos.csv',
    index=False
)

total_candidatos_bens.toPandas().to_csv(
    f'{ANSWERS_PATH}/total_candidatos_bens.csv',
    index=False
)

valor_por_uf.toPandas().to_csv(
    f'{ANSWERS_PATH}/valor_por_uf.csv',
    index=False
)

valor_por_tipo_bem.toPandas().to_csv(
    f'{ANSWERS_PATH}/valor_por_tipo_bem.csv',
    index=False
)


In [20]:
spark.stop()