<a href="https://colab.research.google.com/github/lis-r-barreto/ufs-ensef-2025-minicurso-processamento-big-data-spark/blob/main/processamento_big_data_spark_parte_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Fundamentos de PySpark e Processamento de Dados

## 1. Introdução ao Apache Spark

O Apache Spark é um poderoso motor de processamento de dados em larga escala e análise unificada, projetado para ser rápido e flexível. Construído sobre o conceito de Resilient Distributed Datasets (RDDs), o Spark oferece APIs em Java, Scala, Python e R, além de um otimizador de consulta de alto desempenho (Catalyst) e um motor de execução distribuído (Tungsten).

Ele se destaca por sua capacidade de realizar processamento em memória, o que o torna significativamente mais rápido que abordagens baseadas em disco, como o Hadoop MapReduce, para muitos tipos de cargas de trabalho. O Spark é amplamente utilizado para processamento de Big Data, ETL, Machine Learning, processamento de grafos e computação em tempo real.

![](https://imgs.search.brave.com/nz4X42G5-k9Xm04sqxSKLSnZ68PVGuDspIsj-z10XO8/rs:fit:860:0:0:0/g:ce/aHR0cHM6Ly9kMS5h/d3NzdGF0aWMuY29t/L0RhdGElMjBMYWtl/L3doYXQtaXMtYXBh/Y2hlLXNwYXJrLmIz/YTMwOTkyOTY5MzZk/ZjU5NWQ5YTdkMzYx/MGYxYTc3ZmYwNzQ5/ZGYuUE5H)

### 1.1. Arquitetura do Spark: processamento distribuído

A arquitetura do Apache Spark é baseada em um modelo de processamento distribuído que permite lidar com grandes volumes de dados em paralelo. Os principais componentes dessa arquitetura são:

*   **Driver Program:** O processo principal que executa a função `main()` do seu programa e cria o `SparkContext` ou `SparkSession`. Ele coordena as operações paralelas e distribui as tarefas entre os *executors*.
*   **Cluster Manager:** Responsável por gerenciar os recursos do cluster (como máquinas virtuais ou contêineres). Exemplos incluem o YARN, Mesos, Kubernetes ou o *standalone scheduler* do próprio Spark. Ele aloca recursos para a aplicação Spark.
*   **Worker Nodes:** Máquinas no cluster que executam as tarefas atribuídas pelo Driver Program. Cada *worker node* pode ter um ou mais *executors*.
*   **Executors:** Processos que rodam nos *worker nodes* e são responsáveis por executar as tarefas individuais (partições de dados) que compõem uma aplicação Spark. Eles armazenam dados em cache e retornam os resultados para o Driver Program.

A comunicação entre o Driver Program e os Executors é fundamental para o processamento distribuído. O Driver divide a aplicação em *stages* e *tasks*, que são então enviadas para os Executors para execução.


![](https://imgs.search.brave.com/fFJLDPStnmxviZcN6lSFK0vOORvnKcpWVqAWp3zDhH4/rs:fit:860:0:0:0/g:ce/aHR0cHM6Ly9tZWRp/YS5kYXRhY2FtcC5j/b20vY21zL2FkXzRu/eGZ5ZG96cmg3LXp6/dHAwMWE4dmJtdG01/ZmlycWllMGpjbHpw/YzZocjh2NXFsZDJp/a2ExZnZfaGt1dWRu/bW03emZtYWp4cmY2/OHgyZW40LXhycHpt/aWdoYm90LWw4Z3dr/dnJ6ZnFsc25wbXNl/azhneHF2czE3c3Ro/emt5bnd4YnRwMTl6/MmY4LnBuZw)

### 1.2. Estruturas de Dados no Spark

O Apache Spark trabalha com diferentes abstrações de dados, cada uma com suas características e casos de uso. As principais estruturas de dados são:

1.  **Resilient Distributed Datasets (RDDs):** A abstração original do Spark. RDDs são coleções de objetos imutáveis e distribuídos que podem ser operados em paralelo. Eles oferecem controle de baixo nível sobre o processamento e são ideais para dados não estruturados ou quando você precisa de controle preciso sobre as partições.

2.  **DataFrames:** Introduzidos no Spark 1.3, DataFrames são coleções distribuídas de dados organizados em colunas nomeadas. Eles são conceitualmente equivalentes a tabelas em bancos de dados relacionais ou data frames em R/Python. DataFrames fornecem otimizações de performance através do Catalyst Optimizer e são mais fáceis de usar com dados estruturados ou semiestruturados.

3.  **Datasets:** Adicionados no Spark 1.6, Datasets combinam as vantagens dos RDDs (segurança de tipo em tempo de compilação) com as otimizações de DataFrames. Eles são uma coleção de objetos JVM tipados e distribuídos. Datasets são primariamente úteis em Scala e Java devido à verificação de tipo em tempo de compilação. Em PySpark, Datasets e DataFrames são essencialmente a mesma API.


![](https://lam-tran.dev/assets/images/RDD-Dataframe-Dataset-bcdbc9781335a0251713276723599867.svg)

## 2. Configurações Iniciais

In [1]:
!pip --quiet install pyspark kagglehub pyngrok

In [2]:
import kagglehub # Para baixar datasets do Kaggle
from pyspark.sql import SparkSession # Para criar e gerenciar a sessão Spark
from pyspark.sql.functions import col, count, sum, when # Funções úteis para manipulação de DataFrames
from plotly.subplots import make_subplots # Para criar gráficos com múltiplos subplots
import plotly.graph_objects as go # Para construir figuras e gráficos Plotly
from pyngrok import ngrok # Para criar um túnel público para a Spark UI

## 3. Importação dos dados

**PaySim** é um dataset sintético de transações financeiras móveis, gerado a partir de dados reais, que simula operações e fraudes para pesquisa em detecção de fraudes. Inclui tipos, valores, saldos e marcações de fraude em 30 dias de transações.

Disponível no Kaggle: https://www.kaggle.com/datasets/ealaxi/paysim1

### 3.1. Descrição dos Campos do Dataset
Abaixo estão os campos do dataset PaySim, com explicações resumidas em português sobre o que cada um representa.**

---

| Campo              | Descrição                                                                                   |
|--------------------|-------------------------------------------------------------------------------------------------------|
| **step**           | Unidade de tempo (1 passo = 1 hora). Total de 744 passos (30 dias de simulação).                      |
| **type**           | Tipo de transação: CASH-IN, CASH-OUT, DEBIT, PAYMENT ou TRANSFER.                                     |
| **amount**         | Valor da transação na moeda local.                                                                    |
| **nameOrig**       | Identificador do cliente que iniciou a transação.                                                     |
| **oldbalanceOrg**  | Saldo do cliente antes da transação.                                                                  |
| **newbalanceOrig** | Saldo do cliente após a transação.                                                                    |
| **nameDest**       | Identificador do destinatário da transação.                                                           |
| **oldbalanceDest** | Saldo do destinatário antes da transação (não disponível para comerciantes, identificados por "M").    |
| **newbalanceDest** | Saldo do destinatário após a transação (não disponível para comerciantes, identificados por "M").      |
| **isFraud**        | Indica se a transação é fraudulenta (1 para fraude, 0 para normal).                                   |
| **isFlaggedFraud** | Indica se a transação foi sinalizada como tentativa ilegal (1 para sinalizada, 0 para não sinalizada).|

In [3]:
path = kagglehub.dataset_download("ealaxi/paysim1")

print("Path to dataset files:", path)

csv_file = f"{path}/PS_20174392719_1491204439457_log.csv"
print("CSV file:", csv_file)

Path to dataset files: /kaggle/input/paysim1
CSV file: /kaggle/input/paysim1/PS_20174392719_1491204439457_log.csv


## 4. Configurando a Sessão Spark

Primeiramente, criamos a sessão spark a partir do comando abaixo:

In [4]:
spark = (
    SparkSession.builder
    .master("local[*]")  # Use todos os núcleos disponíveis na máquina local
    .appName("ColabOptimized")  # Define um nome para a aplicação Spark
    .config("spark.ui.port", "4050")  # Configura a porta para a interface web do Spark UI
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")  # Habilita a conversão rápida de dados entre Spark e Pandas DataFrames usando Apache Arrow
    .config("spark.sql.repl.eagerEval.enabled", "true")  # Permite que os DataFrames sejam exibidos automaticamente no console
    .config("spark.driver.memory", "4g")  # Define a quantidade máxima de memória para o driver Spark
    .config("spark.sql.shuffle.partitions", "8")  # Define o número de partições para operações de shuffle; um número menor pode ser melhor para dados pequenos/médios
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")  # Define o serializador a ser usado; Kryo é geralmente mais rápido que o padrão Java
    .config("spark.kryoserializer.buffer.max", "2000")  # Define o tamanho máximo do buffer do serializador Kryo em megabytes
    .config("spark.driver.maxResultSize", "1g")  # Define o tamanho máximo dos resultados coletados do executor para o driver
    .config("spark.dynamicAllocation.enabled","true") # Habilita alocação dinâmica de recursos (executors)
    .config("spark.dynamicAllocation.minExecutors","2") # Define o número mínimo de executors quando a alocação dinâmica está habilitada
    .config("spark.dynamicAllocation.maxExecutors","20") # Define o número máximo de executors quando a alocação dinâmica está habilitada
    .config("spark.sql.adaptive.enabled", "true") # Habilita a execução adaptativa de queries, que pode otimizar planos de execução em tempo de execução
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") # Habilita a coalescência de partições na execução adaptativa para reduzir o número de partições
    .getOrCreate()
)

E logo após, fazemos criamos o dataframe a partir da leitura do csv:

In [5]:
try:
  df_csv = spark.read.csv(
      csv_file,
      header=True,
      inferSchema=True
  )
  print("Dataframe criado com sucesso!")
  print("Schema do dataframe:")
  df_csv.printSchema()
  print("Primeiras 5 linhas do dataframe:")
  df_csv.show(5)
except Exception as e:
  print(e)

Dataframe criado com sucesso!
Schema do dataframe:
root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)

Primeiras 5 linhas do dataframe:
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     1

## 5. Conversão de Tipos de Arquivo

Se você estiver trabalhando com um grande conjunto de dados, dê preferências a formatos de dados colunares que favoreçam o processamento, como o parquet. Parquet otimiza consultas, reduz espaço e acelera o processamento em big data com Spark


In [6]:
# Define o caminho de saída para o arquivo Parquet em um diretório gravável
parquet_output_path = "/content/paysim.parquet"

# Escreve o DataFrame no formato Parquet
df_csv.write.parquet(parquet_output_path, mode="overwrite")

print(f"Arquivo CSV convertido para Parquet e salvo em: {parquet_output_path}")

df = spark.read.parquet(parquet_output_path)

Arquivo CSV convertido para Parquet e salvo em: /content/paysim.parquet


In [7]:
# Estatísticas descritivas para colunas numéricas
df.describe().show()

+-------+------------------+--------+------------------+-----------+-----------------+------------------+-----------+------------------+------------------+--------------------+--------------------+
|summary|              step|    type|            amount|   nameOrig|    oldbalanceOrg|    newbalanceOrig|   nameDest|    oldbalanceDest|    newbalanceDest|             isFraud|      isFlaggedFraud|
+-------+------------------+--------+------------------+-----------+-----------------+------------------+-----------+------------------+------------------+--------------------+--------------------+
|  count|           6362620| 6362620|           6362620|    6362620|          6362620|           6362620|    6362620|           6362620|           6362620|             6362620|             6362620|
|   mean|243.39724563151657|    NULL|179861.90354912955|       NULL| 833883.104074466| 855113.6685785623|       NULL|1100701.6665196575|1224996.3982019091|0.001290820448180152| 2.51468734577894E-6|
| stddev|1

In [8]:
# Contagem de transações por tipo
df.groupBy("type").agg(count("*").alias("qtd_transacoes")).show()

+--------+--------------+
|    type|qtd_transacoes|
+--------+--------------+
| PAYMENT|       2151495|
|   DEBIT|         41432|
| CASH_IN|       1399284|
|CASH_OUT|       2237500|
|TRANSFER|        532909|
+--------+--------------+



In [9]:
# Proporção de transações fraudulentas
total = df.count()
fraudes = df.filter(col("isFraud") == 1).count()
print(f"Proporção de fraudes: {fraudes/total:.4%}")

Proporção de fraudes: 0.1291%


In [10]:
# Estatísticas dos valores das transações
df.select("amount").describe().show()

+-------+------------------+
|summary|            amount|
+-------+------------------+
|  count|           6362620|
|   mean|179861.90354912955|
| stddev| 603858.2314629173|
|    min|               0.0|
|    max|     9.244551664E7|
+-------+------------------+



In [11]:
# Estatísticas dos saldos antes e depois da transação para o originador
df.select("oldbalanceOrg", "newbalanceOrig").describe().show()

+-------+-----------------+------------------+
|summary|    oldbalanceOrg|    newbalanceOrig|
+-------+-----------------+------------------+
|  count|          6362620|           6362620|
|   mean| 833883.104074466| 855113.6685785623|
| stddev|2888242.673037513|2924048.5029542376|
|    min|              0.0|               0.0|
|    max|    5.958504037E7|     4.958504037E7|
+-------+-----------------+------------------+



In [12]:
# Contagem de transações sinalizadas como fraude
df.groupBy("isFlaggedFraud").count().show()

+--------------+-------+
|isFlaggedFraud|  count|
+--------------+-------+
|             1|     16|
|             0|6362604|
+--------------+-------+



In [13]:
# Conta valores nulos em cada coluna
df.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df.columns]).show()

+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|step|type|amount|nameOrig|oldbalanceOrg|newbalanceOrig|nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|   0|   0|     0|       0|            0|             0|       0|             0|             0|      0|             0|
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+



In [14]:
# Clientes que mais iniciaram transações
df.groupBy("nameOrig").count().orderBy(col("count").desc()).show(5)

+-----------+-----+
|   nameOrig|count|
+-----------+-----+
| C724452879|    3|
|C1677795071|    3|
|C2098525306|    3|
|C1784010646|    3|
|C1902386530|    3|
+-----------+-----+
only showing top 5 rows



## 6. Conversão pra Pandas DataFrame

In [15]:
# Contagem de transações por tipo
transaction_counts = df.groupBy("type").agg(count("*").alias("qtd_transacoes")).toPandas()

# Criar gráfico de barras usando Plotly
fig = go.Figure(data=[go.Bar(x=transaction_counts['type'], y=transaction_counts['qtd_transacoes'])])

# Atualizar layout
fig.update_layout(title_text="Contagem de Transações por Tipo",
                  xaxis_title="Tipo de Transação",
                  yaxis_title="Quantidade de Transações")

fig.show()

## 7. Acessando a Spark UI e History Server

Para monitorar suas aplicações Spark e visualizar detalhes sobre as tarefas, *stages* e uso de recursos, você pode acessar a Spark UI. No Google Colab, como o ambiente não é diretamente acessível, utilizamos o serviço de tunelamento **ngrok** para expor a interface web do Spark (geralmente na porta 4040 ou 4050) para um endereço público na internet.

Para configurar o ngrok:

1.  **Obtenha seu token de autenticação ngrok:**
    *   Acesse o site do ngrok: [https://dashboard.ngrok.com/get-started/your-authtoken](https://dashboard.ngrok.com/get-started/your-authtoken)
    *   Crie uma conta (se ainda não tiver uma) e faça login.
    *   Encontre seu token de autenticação na página.
2.  **Configure o token no Colab:**
    *   Armazene seu token obtido no passo 1 na variável `NGROK_AUTHTOKEN` na célula de código abaixo.
3.  **Crie o túnel:**
    *   Execute a célula de código que utiliza `ngrok.connect()` para criar o túnel. O código imprimirá um link público que você pode usar para acessar a Spark UI no seu navegador.

In [18]:
NGROK_AUTHTOKEN = "" # Substitua pelo seu token de autenticação ngrok
ngrok.set_auth_token(NGROK_AUTHTOKEN) # Configura o token de autenticação ngrok

In [19]:
# Abrir túnel na porta 4050 (Spark UI)
public_url = ngrok.connect(4050)
print(f"Spark UI disponível em: {public_url}")

Spark UI disponível em: NgrokTunnel: "https://526fcd5e12d3.ngrok-free.app" -> "http://localhost:4050"
