In [31]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, when
import pandas as pd
from pyspark.sql.functions import monotonically_increasing_id

In [32]:
spark = SparkSession.builder.appName('MIGRACAO_HIVE').getOrCreate()

In [33]:
df_total_votos_candidato = spark \
    .read \
    .orc('../data/processed/total_votos_candidato_completo.orc')
df_total_votos_candidato.show()

+------------+--------------------+------------------+----------------+----------------------------+------------------+-----------------------+-----------+--------+------------+----------------+
|NR_CANDIDATO|        NM_CANDIDATO| NM_URNA_CANDIDATO|DS_SIT_TOT_TURNO|TOTAL_VOTOS_CANDIDATO_CIDADE|TOTAL_VOTOS_CIDADE|PERCENTUAL_VOTOS_CIDADE|ANO_ELEICAO|CD_CARGO|CD_MUNICIPIO|CD_SIT_TOT_TURNO|
+------------+--------------------+------------------+----------------+----------------------------+------------------+-----------------------+-----------+--------+------------+----------------+
|        1322|ARLINDO CHIGNALIA...| ARLINDO CHINAGLIA|   ELEITO POR QP|                         351|              2102|                   16.7|       2018|       6|       61069|               2|
|        1019|     LEANDRO MOREIRA|LEANDRO MOREIRA LÊ|        SUPLENTE|                           0|              2102|                    0.0|       2018|       6|       61069|               5|
|        1720|EDUARDO NAN

In [34]:
df_total_votos_candidato.count()

4382775

In [35]:
df_total_votos_candidato = df_total_votos_candidato \
    .select(
        'NR_CANDIDATO',
        'NM_CANDIDATO',
        'NM_URNA_CANDIDATO',
        'DS_SIT_TOT_TURNO',
        'TOTAL_VOTOS_CANDIDATO_CIDADE',
        'TOTAL_VOTOS_CIDADE',
        'PERCENTUAL_VOTOS_CIDADE',
        'ANO_ELEICAO',
        'CD_CARGO',
        'CD_MUNICIPIO',
        'CD_SIT_TOT_TURNO'

    )

df_total_votos_candidato.show()

+------------+--------------------+------------------+----------------+----------------------------+------------------+-----------------------+-----------+--------+------------+----------------+
|NR_CANDIDATO|        NM_CANDIDATO| NM_URNA_CANDIDATO|DS_SIT_TOT_TURNO|TOTAL_VOTOS_CANDIDATO_CIDADE|TOTAL_VOTOS_CIDADE|PERCENTUAL_VOTOS_CIDADE|ANO_ELEICAO|CD_CARGO|CD_MUNICIPIO|CD_SIT_TOT_TURNO|
+------------+--------------------+------------------+----------------+----------------------------+------------------+-----------------------+-----------+--------+------------+----------------+
|        1322|ARLINDO CHIGNALIA...| ARLINDO CHINAGLIA|   ELEITO POR QP|                         351|              2102|                   16.7|       2018|       6|       61069|               2|
|        1019|     LEANDRO MOREIRA|LEANDRO MOREIRA LÊ|        SUPLENTE|                           0|              2102|                    0.0|       2018|       6|       61069|               5|
|        1720|EDUARDO NAN

In [36]:
lista_colunas = df_total_votos_candidato.columns

In [37]:
lista_colunas

['NR_CANDIDATO',
 'NM_CANDIDATO',
 'NM_URNA_CANDIDATO',
 'DS_SIT_TOT_TURNO',
 'TOTAL_VOTOS_CANDIDATO_CIDADE',
 'TOTAL_VOTOS_CIDADE',
 'PERCENTUAL_VOTOS_CIDADE',
 'ANO_ELEICAO',
 'CD_CARGO',
 'CD_MUNICIPIO',
 'CD_SIT_TOT_TURNO']

In [38]:
list_cidade = [ linha['CD_MUNICIPIO'] for linha in df_total_votos_candidato.select('CD_MUNICIPIO').distinct().collect() ]
list_cidade

[71510,
 66672,
 61867,
 61344,
 66613,
 65013,
 71498,
 63592,
 62812,
 61654,
 64459,
 72052,
 63835,
 70491,
 61786,
 66699,
 63630,
 66435,
 65196,
 61158,
 65099,
 64351,
 65714,
 67253,
 71692,
 61514,
 64637,
 66931,
 62456,
 61077,
 69558,
 65234,
 66656,
 61395,
 61573,
 61611,
 61298,
 69779,
 67652,
 69493,
 70211,
 63975,
 63495,
 70718,
 63037,
 62103,
 61581,
 61620,
 66214,
 70530,
 62154,
 70270,
 62375,
 70394,
 66494,
 64599,
 71099,
 62383,
 71218,
 63673,
 61760,
 67490,
 70033,
 63690,
 65676,
 72230,
 63150,
 63193,
 68411,
 63452,
 62120,
 67059,
 70190,
 71633,
 63819,
 67350,
 63797,
 64211,
 66095,
 68470,
 70556,
 67555,
 66915,
 61352,
 66710,
 67873,
 69973,
 69159,
 61964,
 68233,
 61727,
 71439,
 64297,
 64831,
 64777,
 64998,
 65030,
 67032,
 67393,
 65595,
 63657,
 68896,
 62650,
 68373,
 65536,
 68713,
 68314,
 68535,
 71897,
 64718,
 72192,
 71730,
 68292,
 69310,
 62790,
 61662,
 64270,
 66311,
 72150,
 67210,
 71536,
 66753,
 71358,
 67610,
 61140,


In [39]:
df_total_votos_candidato = df_total_votos_candidato.withColumn('ID_LINHA', monotonically_increasing_id()) 

In [40]:
df_total_votos_candidato.show()

+------------+--------------------+------------------+----------------+----------------------------+------------------+-----------------------+-----------+--------+------------+----------------+--------+
|NR_CANDIDATO|        NM_CANDIDATO| NM_URNA_CANDIDATO|DS_SIT_TOT_TURNO|TOTAL_VOTOS_CANDIDATO_CIDADE|TOTAL_VOTOS_CIDADE|PERCENTUAL_VOTOS_CIDADE|ANO_ELEICAO|CD_CARGO|CD_MUNICIPIO|CD_SIT_TOT_TURNO|ID_LINHA|
+------------+--------------------+------------------+----------------+----------------------------+------------------+-----------------------+-----------+--------+------------+----------------+--------+
|        1322|ARLINDO CHIGNALIA...| ARLINDO CHINAGLIA|   ELEITO POR QP|                         351|              2102|                   16.7|       2018|       6|       61069|               2|       0|
|        1019|     LEANDRO MOREIRA|LEANDRO MOREIRA LÊ|        SUPLENTE|                           0|              2102|                    0.0|       2018|       6|       61069|       

In [41]:
numero_linha_particao = 1000
df_total_votos_candidato = df_total_votos_candidato.withColumn('PARTICAO_ID', (df_total_votos_candidato['ID_LINHA'] / numero_linha_particao).cast('integer'))
df_total_votos_candidato.show()

+------------+--------------------+------------------+----------------+----------------------------+------------------+-----------------------+-----------+--------+------------+----------------+--------+-----------+
|NR_CANDIDATO|        NM_CANDIDATO| NM_URNA_CANDIDATO|DS_SIT_TOT_TURNO|TOTAL_VOTOS_CANDIDATO_CIDADE|TOTAL_VOTOS_CIDADE|PERCENTUAL_VOTOS_CIDADE|ANO_ELEICAO|CD_CARGO|CD_MUNICIPIO|CD_SIT_TOT_TURNO|ID_LINHA|PARTICAO_ID|
+------------+--------------------+------------------+----------------+----------------------------+------------------+-----------------------+-----------+--------+------------+----------------+--------+-----------+
|        1322|ARLINDO CHIGNALIA...| ARLINDO CHINAGLIA|   ELEITO POR QP|                         351|              2102|                   16.7|       2018|       6|       61069|               2|       0|          0|
|        1019|     LEANDRO MOREIRA|LEANDRO MOREIRA LÊ|        SUPLENTE|                           0|              2102|                 

In [42]:
particoes = df_total_votos_candidato.select('PARTICAO_ID').distinct().collect()

In [43]:
particoes.sort(reverse=True)

In [44]:
len(particoes)

4389

In [45]:
for chave, particao in enumerate(particoes):
    particao_id = particao['PARTICAO_ID']
    particao_df = df_total_votos_candidato.filter(df_total_votos_candidato['PARTICAO_ID'] == particao_id).drop('PARTICAO_ID', 'ID_LINHA')
    particao_df.write \
    .parquet(f'../data/migracao_hive/total_votos_candidados/total_votos_candidados_{particao_id}.orc') 

                                                                                

- Total Votos Partido

In [46]:
df_total_votos_partido = spark \
    .read \
    .orc('../data/processed/total_votos_partido_completo.orc')
df_total_votos_partido.show()

+----------+--------------------+----------------+-------------------+--------------------------+-----------------------+-----------+--------+------------+-------------+----------------+
|NR_PARTIDO|          NM_PARTIDO|DS_SIT_TOT_TURNO|TOTAL_VOTOS_PARTIDO|TOTAL_VOTOS_PARTIDO_CIDADE|PERCENTUAL_VOTOS_CIDADE|ANO_ELEICAO|CD_CARGO|CD_MUNICIPIO|   SG_PARTIDO|CD_SIT_TOT_TURNO|
+----------+--------------------+----------------+-------------------+--------------------------+-----------------------+-----------+--------+------------+-------------+----------------+
|        17|Partido Social Li...|        SUPLENTE|                 43|                      7319|                   0.59|       2018|       6|       61042|          PSL|               5|
|        36|Partido Trabalhis...|      NÃO ELEITO|                  0|                      7319|                    0.0|       2018|       6|       61042|          PTC|               4|
|        13|Partido dos Traba...|ELEITO POR MÉDIA|               

In [47]:
numero_linha_particao = 1000
df_total_votos_partido = df_total_votos_partido.withColumn('ID_LINHA', monotonically_increasing_id()) 
df_total_votos_partido = df_total_votos_partido.withColumn('PARTICAO_ID', (df_total_votos_partido['ID_LINHA'] / numero_linha_particao).cast('integer'))
df_total_votos_partido.show()

+----------+--------------------+----------------+-------------------+--------------------------+-----------------------+-----------+--------+------------+-------------+----------------+--------+-----------+
|NR_PARTIDO|          NM_PARTIDO|DS_SIT_TOT_TURNO|TOTAL_VOTOS_PARTIDO|TOTAL_VOTOS_PARTIDO_CIDADE|PERCENTUAL_VOTOS_CIDADE|ANO_ELEICAO|CD_CARGO|CD_MUNICIPIO|   SG_PARTIDO|CD_SIT_TOT_TURNO|ID_LINHA|PARTICAO_ID|
+----------+--------------------+----------------+-------------------+--------------------------+-----------------------+-----------+--------+------------+-------------+----------------+--------+-----------+
|        17|Partido Social Li...|        SUPLENTE|                 43|                      7319|                   0.59|       2018|       6|       61042|          PSL|               5|       0|          0|
|        36|Partido Trabalhis...|      NÃO ELEITO|                  0|                      7319|                    0.0|       2018|       6|       61042|          PTC

In [48]:
particoes = df_total_votos_partido.select('PARTICAO_ID').distinct().collect()

                                                                                

In [50]:
for chave, particao in enumerate(particoes):
    particao_id = particao['PARTICAO_ID']
    particao_df = df_total_votos_partido.filter(df_total_votos_partido['PARTICAO_ID'] == particao_id).drop('PARTICAO_ID', 'ID_LINHA')
    particao_df.write \
    .format('orc').save(f'../data/migracao_hive/total_votos_partido/total_votos_partido_{particao_id}.orc') 

                                                                                

- Desempenho Partido

In [51]:
df_desempenho_candidatos = spark \
    .read \
    .orc('../data/processed/desempenho_candidatos.orc')
df_desempenho_candidatos.show()

+--------------------+-----------------+----------------+----------------+-------------------+------------+
|        NM_CANDIDATO|NM_URNA_CANDIDATO|TOTAL_VOTOS_2018|TOTAL_VOTOS_2022|VARIACAO_PERCENTUAL|CD_MUNICIPIO|
+--------------------+-----------------+----------------+----------------+-------------------+------------+
|ALEX SPINELLI MAN...|     ALEX MANENTE|               1|               1|              100.0|       61000|
| CRISTIANY DE CASTRO|   DRA. CRISTIANY|               1|               2|               50.0|       61000|
|GILDEVANIO ILSO D...|        GIL DINIZ|              14|               5|              280.0|       61000|
|MARCOS ROBERTO DA...|   MARCOS DAMASIO|               7|               5|              140.0|       61000|
|MARTA MARIA FREIR...|      MARTA COSTA|             139|              39|             356.41|       61000|
|ORLANDO SILVA DE ...|    ORLANDO SILVA|               6|              10|               60.0|       61000|
|PAULO ROBERTO FIO...|    PA

In [52]:
df_desempenho_candidatos = df_desempenho_candidatos.withColumn('ID_LINHA', monotonically_increasing_id()) 
numero_linha_particao = 1000
df_desempenho_candidatos = df_desempenho_candidatos.withColumn('PARTICAO_ID', (df_desempenho_candidatos['ID_LINHA'] / numero_linha_particao).cast('integer'))
df_desempenho_candidatos.show()

+--------------------+-----------------+----------------+----------------+-------------------+------------+--------+-----------+
|        NM_CANDIDATO|NM_URNA_CANDIDATO|TOTAL_VOTOS_2018|TOTAL_VOTOS_2022|VARIACAO_PERCENTUAL|CD_MUNICIPIO|ID_LINHA|PARTICAO_ID|
+--------------------+-----------------+----------------+----------------+-------------------+------------+--------+-----------+
|ALEX SPINELLI MAN...|     ALEX MANENTE|               1|               1|              100.0|       61000|       0|          0|
| CRISTIANY DE CASTRO|   DRA. CRISTIANY|               1|               2|               50.0|       61000|       1|          0|
|GILDEVANIO ILSO D...|        GIL DINIZ|              14|               5|              280.0|       61000|       2|          0|
|MARCOS ROBERTO DA...|   MARCOS DAMASIO|               7|               5|              140.0|       61000|       3|          0|
|MARTA MARIA FREIR...|      MARTA COSTA|             139|              39|             356.41|   

In [53]:
particoes = df_desempenho_candidatos.select('PARTICAO_ID').distinct().collect()

In [54]:
len(particoes)

88

In [55]:
for chave, particao in enumerate(particoes):
    particao_id = particao['PARTICAO_ID']
    particao_df = df_desempenho_candidatos.filter(df_desempenho_candidatos['PARTICAO_ID'] == particao_id).drop('PARTICAO_ID', 'ID_LINHA')
    nome_arquivo = f'../data/processed/desempenho_candidato/df_desempenho_candidato_{particao_id}.csv'
    particao_df.write \
    .format('orc').save(f'../data/migracao_hive/df_desempenho_candidatos/df_desempenho_candidatos_{particao_id}.orc') 

In [None]:
spark.stop()