<div style="position:relative; 
            color:#fff;
            display:grid;
            grid-template-columns: 1fr auto;
            align-items:center;
            border-radius:15px;
            background-color:#000;
            font-family:SF Pro Display, system-ui, sans-serif;
            letter-spacing:0px;
            padding-top:50px;
            padding-bottom:50px;
            padding-left:0px;
            padding-right:0;
            max-width:100%;
            overflow:hidden;
            ">

  <!-- Pseudo-elemento para imagem de fundo com opacidade reduzida -->
  <div style="content:''; 
              position:absolute;
              top:0;
              left:0;
              right:0;
              bottom:0;
              background-image:url('https://github.com/jobsrobson/Public-Databases/blob/main/election_header.png?raw=true');
              background-size:cover;
              background-position:center;
              opacity:0.8;
              filter: blur(0px);
              z-index:1;"></div>

  <!-- Texto à esquerda -->
  <div style="position:relative; z-index:2; text-align:left; padding-left:50px">
    <p style="font-size:200%; font-weight:900; margin:0;">
      Processamento de Dados em Streaming usando PySpark
    </p>
    <p style="font-size:120%; font-weight:bold; margin-top:10px; margin-bottom:0; color:#fff">
      Simulando a Contagem de Votos nas Eleições Americanas de 2024
    </p>
    <p style="font-size:100%; font-weight:normal; margin-top:10px; margin-bottom:0;color:#fff">
      Robson Ricardo Leite da Silva - 2212120015
    </p>
  </div>

</div>


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.sql.functions import col

# Inicializar a sessão Spark
spark = SparkSession.builder \
    .appName("VoteCountStreaming") \
    .getOrCreate()

spark

<h4 style="color:#3e007a"><b>Passo 1 - Geração de Dados</b></h4> 

A primeira etapa deste trabalho envolve a simulação dos votos registrados nas Eleições Americanas de 2024. Para isso, o script a seguir, desenvolvido com a biblioteca faker, gera até 50 arquivos JSON, cada um contendo informações detalhadas sobre os votos em cada um dos 50 estados americanos. Esses arquivos registram informações individuais para cada voto, incluindo um ID único, timestamp, estado, faixa etária do eleitor e o candidato escolhido.

Os arquivos JSON são criados a cada 5 segundos, simulando a chegada contínua de novos votos em tempo real, e são salvos na pasta vote_stream.

Para otimizar o uso de recursos computacionais, o script utiliza um dicionário com os 50 estados americanos e considera uma amostra de 0,5% das populações aptas a votar de cada estado. Cada arquivo JSON contém aproximadamente 20.971 votos.

Por fim, a execução do script ocorre em uma thread separada, garantindo que a geração de dados não interfira no processamento em streaming.


<div class="alert alert-block alert-warning"><b>️️⚠️ Os dados são gerados de forma aleatória, e os resultados finais não representam nenhuma projeção ou previsão real dos resultados eleitorais.</b></div>

In [3]:
# Importar o módulo threading para executar o script de geração de dados em um thread separado
import threading

# Função que encapsula o script de geração de dados
def start_vote_generation():
    from faker import Faker
    import json
    import random
    import time
    import os
    from datetime import datetime

    # Inicializar o Faker e definir candidatos
    fake = Faker()
    candidates = ["Donald J. Trump", "Kamala Harris", "Cornel West", "Jill Stein", "Chase Oliver", "Claudia De la Cruz"]
    weights = [6, 5, 4, 3, 2, 1] # Determina pesos para cada candidato, para simular uma distribuição de votos mais realista

    # Dicionário com cerca de 0,5% da população votante de cada estado (dados de 2020)
    state_voters = {
        "Alabama": 18000, "Alaska": 2750, "Arizona": 27500, "Arkansas": 10500,
        "California": 125000, "Colorado": 21000, "Connecticut": 12500, "Delaware": 3750,
        "Florida": 75000, "Georgia": 37500, "Hawaii": 5000, "Idaho": 6000,
        "Illinois": 47500, "Indiana": 24500, "Iowa": 11500, "Kansas": 10500,
        "Kentucky": 16500, "Louisiana": 15500, "Maine": 5000, "Maryland": 22500,
        "Massachusetts": 27500, "Michigan": 37500, "Minnesota": 20500, "Mississippi": 11000,
        "Missouri": 22500, "Montana": 4000, "Nebraska": 7000, "Nevada": 12500,
        "New Hampshire": 5500, "New Jersey": 32500, "New Mexico": 8000, "New York": 65000,
        "North Carolina": 35000, "North Dakota": 3000, "Ohio": 42500, "Oklahoma": 14000,
        "Oregon": 16000, "Pennsylvania": 47500, "Rhode Island": 4000, "South Carolina": 17500,
        "South Dakota": 3500, "Tennessee": 25000, "Texas": 85000, "Utah": 10000,
        "Vermont": 2500, "Virginia": 30000, "Washington": 29000, "West Virginia": 7000,
        "Wisconsin": 21500, "Wyoming": 2000
    }

    # Diretório onde os arquivos de votos serão salvos
    output_directory = "./vote_stream"
    os.makedirs(output_directory, exist_ok=True)
    state_vote_count = {state: 0 for state in state_voters}

    # Função para gerar votos aleatórios com pesos para candidatos
    def generate_vote(state):
        # Escolhe o candidato com base nos pesos
        candidate = random.choices(candidates, weights=weights, k=1)[0]
        vote = {
            "vote_id": fake.uuid4(),
            "timestamp": datetime.now().isoformat(),
            "state": state,
            "candidate": candidate,
            "age_group": random.choice(["18-25", "26-40", "41-60", "60+"]),
            "vote_count": 1
        }
        return vote

    # Função que simula um stream de votos, gerando arquivos .json e salvando-os no diretório de saída
    def simulate_vote_stream():
        file_counter = 0
        votes_per_file = 20971  # Número de votos por arquivo calculado para economizar recursos computacionais
        while any(state_vote_count[state] < state_voters[state] for state in state_voters):
            votes = []
            while len(votes) < votes_per_file:
                state = random.choice(list(state_voters.keys()))
                if state_vote_count[state] < state_voters[state]:
                    vote = generate_vote(state)
                    votes.append(vote)
                    state_vote_count[state] += 1

            file_name = os.path.join(output_directory, f"votes_{file_counter}.json")
            with open(file_name, "w") as f:
                json.dump(votes, f)
            # print(f"Gerado: {file_name} com {len(votos)} votos registrados.")
            file_counter += 1
            time.sleep(5)

    simulate_vote_stream()

# Iniciar o script de geração de dados em um thread separado
threading.Thread(target=start_vote_generation).start()

<br><br>

<h4 style="color:#3e007a"><b>Passo 2 - Configuração do Spark e Inicialização do Streaming</b></h4> 

Nesta etapa, o ambiente Spark é configurado para processar os dados em streaming. O código a seguir define o esquema dos dados de entrada e inicia o streaming de dados a partir da pasta ```vote_stream```.

In [4]:
# Define o schema dos dados para corresponder ao formato JSON
schema = StructType() \
    .add("vote_id", StringType()) \
    .add("timestamp", StringType()) \
    .add("state", StringType()) \
    .add("candidate", StringType()) \
    .add("age_group", StringType()) \
    .add("vote_count", IntegerType())

In [5]:
# Configura a leitura em streaming do diretório monitorado "data_stream"
vote_stream_df = spark.readStream \
    .schema(schema) \
    .json("./vote_stream")

<br><br>

<h4 style="color:#3e007a"><b>Passo 3 - Processamento dos Dados em Streaming</b></h4> 

Nesta última etapa, os votos são processados em streaming para calcular a contagem de votos de cada candidato em tempo real. O código a seguir realiza as seguintes operações:

1. Agrupa os votos por candidato.
2. Calcula e exibe a contagem de votos para cada candidato.


In [None]:
from pyspark.sql.functions import col

# Inicializar uma variável global para armazenar o total acumulado
accumulated_votes = None

# Definindo uma função para processar cada micro-batch e manter a contagem acumulada
def process_batch(batch_df, batch_id):
    global accumulated_votes  # Referencia a variável global
    
    # Contagem de votos por candidato no micro-batch atual
    vote_counts = batch_df.groupBy("candidate").sum("vote_count").withColumnRenamed("sum(vote_count)", "batch_total")
    
    # Se o acumulador ainda não tem dados, inicialize com o primeiro micro-batch
    if accumulated_votes is None:
        accumulated_votes = vote_counts.withColumnRenamed("batch_total", "total_votes")
    else:
        # Caso contrário, faça um join para atualizar o acumulado
        accumulated_votes = accumulated_votes.join(vote_counts, "candidate", "outer") \
                                             .na.fill(0) \
                                             .withColumn("total_votes", col("total_votes") + col("batch_total")) \
                                             .select("candidate", "total_votes")
    
    # Mostrar a contagem acumulada no console
    print(f"Total acumulado até o micro-batch {batch_id}:")
    accumulated_votes.show()

# Usando foreachBatch para processar cada micro-batch e aplicar a função de contagem
query = vote_stream_df.writeStream \
    .outputMode("append") \
    .foreachBatch(process_batch) \
    .start()

# Executa a query por até 150 segundos (somente para fins de demonstração)
query.awaitTermination(150)


Total acumulado até o micro-batch 0:
+------------------+-----------+
|         candidate|total_votes|
+------------------+-----------+
|     Kamala Harris|       9959|
|      Chase Oliver|       3970|
|   Donald J. Trump|      12029|
|Claudia De la Cruz|       1976|
|        Jill Stein|       5983|
|       Cornel West|       8025|
+------------------+-----------+

Total acumulado até o micro-batch 1:
+------------------+-----------+
|         candidate|total_votes|
+------------------+-----------+
|     Kamala Harris|      14970|
|      Chase Oliver|       5976|
|   Donald J. Trump|      17922|
|Claudia De la Cruz|       2979|
|        Jill Stein|       9040|
|       Cornel West|      12026|
+------------------+-----------+

Total acumulado até o micro-batch 2:
+------------------+-----------+
|         candidate|total_votes|
+------------------+-----------+
|     Kamala Harris|      19906|
|      Chase Oliver|       8014|
|   Donald J. Trump|      24105|
|Claudia De la Cruz|       39

In [None]:
# PARAR A GERAÇÃO DE DADOS
import os
def stop():
    print("Stopping vote generation")
    os._exit(0)
    return

stop()