# Dia 1/2 (07/08/2025)

In [1]:
!pip --quiet install pyspark kagglehub pyngrok

In [2]:
import kagglehub # Para baixar datasets do Kaggle
from pyspark.sql import SparkSession # Para criar e gerenciar a sessão Spark
from pyspark.sql.functions import col, count, sum, when # Funções úteis para manipulação de DataFrames
from plotly.subplots import make_subplots # Para criar gráficos com múltiplos subplots
import plotly.graph_objects as go # Para construir figuras e gráficos Plotly
from pyngrok import ngrok # Para criar um túnel público para a Spark UI

In [3]:
path = kagglehub.dataset_download("ealaxi/paysim1")

print("Path to dataset files:", path)

csv_file = f"{path}/PS_20174392719_1491204439457_log.csv"
print("CSV file:", csv_file)


Path to dataset files: /kaggle/input/paysim1
CSV file: /kaggle/input/paysim1/PS_20174392719_1491204439457_log.csv


In [4]:
spark = (
    SparkSession.builder
    .master("local[*]")  # Use todos os núcleos disponíveis na máquina local
    .appName("ColabOptimized")  # Define um nome para a aplicação Spark
    .config("spark.ui.port", "4050")  # Configura a porta para a interface web do Spark UI
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")  # Habilita a conversão rápida de dados entre Spark e Pandas DataFrames usando Apache Arrow
    .config("spark.sql.repl.eagerEval.enabled", "true")  # Permite que os DataFrames sejam exibidos automaticamente no console
    .config("spark.driver.memory", "4g")  # Define a quantidade máxima de memória para o driver Spark
    .config("spark.sql.shuffle.partitions", "8")  # Define o número de partições para operações de shuffle; um número menor pode ser melhor para dados pequenos/médios
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")  # Define o serializador a ser usado; Kryo é geralmente mais rápido que o padrão Java
    .config("spark.kryoserializer.buffer.max", "2000")  # Define o tamanho máximo do buffer do serializador Kryo em megabytes
    .config("spark.driver.maxResultSize", "1g")  # Define o tamanho máximo dos resultados coletados do executor para o driver
    .config("spark.dynamicAllocation.enabled","true") # Habilita alocação dinâmica de recursos (executors)
    .config("spark.dynamicAllocation.minExecutors","2") # Define o número mínimo de executors quando a alocação dinâmica está habilitada
    .config("spark.dynamicAllocation.maxExecutors","20") # Define o número máximo de executors quando a alocação dinâmica está habilitada
    .config("spark.sql.adaptive.enabled", "true") # Habilita a execução adaptativa de queries, que pode otimizar planos de execução em tempo de execução
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") # Habilita a coalescência de partições na execução adaptativa para reduzir o número de partições
    .getOrCreate()
)

In [5]:
try:
  df_csv = spark.read.csv(
      csv_file,
      header=True,
      inferSchema=True
  )
  print("Dataframe criado com sucesso!")
  print("Schema do dataframe:")
  df_csv.printSchema()
  print("Primeiras 5 linhas do dataframe:")
  df_csv.show(5)
except Exception as e:
  print(e)

Dataframe criado com sucesso!
Schema do dataframe:
root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)

Primeiras 5 linhas do dataframe:
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     1

In [6]:
# Define o caminho de saída para o arquivo Parquet em um diretório gravável
parquet_output_path = "/content/paysim.parquet"

# Escreve o DataFrame no formato Parquet
df_csv.write.parquet(parquet_output_path, mode="overwrite")

print(f"Arquivo CSV convertido para Parquet e salvo em: {parquet_output_path}")

df = spark.read.parquet(parquet_output_path)

Arquivo CSV convertido para Parquet e salvo em: /content/paysim.parquet


In [7]:
df.describe().show()

+-------+------------------+--------+------------------+-----------+-----------------+------------------+-----------+------------------+------------------+--------------------+--------------------+
|summary|              step|    type|            amount|   nameOrig|    oldbalanceOrg|    newbalanceOrig|   nameDest|    oldbalanceDest|    newbalanceDest|             isFraud|      isFlaggedFraud|
+-------+------------------+--------+------------------+-----------+-----------------+------------------+-----------+------------------+------------------+--------------------+--------------------+
|  count|           6362620| 6362620|           6362620|    6362620|          6362620|           6362620|    6362620|           6362620|           6362620|             6362620|             6362620|
|   mean|243.39724563151657|    NULL|179861.90354912955|       NULL| 833883.104074466| 855113.6685785623|       NULL|1100701.6665196575|1224996.3982019091|0.001290820448180152| 2.51468734577894E-6|
| stddev|1

In [8]:
# Proporção de transações fraudulentas
total = df.count()
fraudes = df.filter(col("isFraud") == 1).count()
print(f"Proporção de fraudes: {fraudes/total:.4%}")

Proporção de fraudes: 0.1291%


In [9]:
# Estatísticas dos valores das transações
df.select("amount").describe().show()

+-------+------------------+
|summary|            amount|
+-------+------------------+
|  count|           6362620|
|   mean|179861.90354912955|
| stddev| 603858.2314629173|
|    min|               0.0|
|    max|     9.244551664E7|
+-------+------------------+



In [10]:
# Estatísticas dos saldos antes e depois da transação para o originador
df.select("oldbalanceOrg", "newbalanceOrig").describe().show()

+-------+-----------------+------------------+
|summary|    oldbalanceOrg|    newbalanceOrig|
+-------+-----------------+------------------+
|  count|          6362620|           6362620|
|   mean| 833883.104074466| 855113.6685785623|
| stddev|2888242.673037513|2924048.5029542376|
|    min|              0.0|               0.0|
|    max|    5.958504037E7|     4.958504037E7|
+-------+-----------------+------------------+



In [11]:
# Contagem de transações sinalizadas como fraude
df.groupBy("isFlaggedFraud").count().show()

+--------------+-------+
|isFlaggedFraud|  count|
+--------------+-------+
|             1|     16|
|             0|6362604|
+--------------+-------+



In [12]:
# Conta valores nulos em cada coluna
df.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df.columns]).show()

+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|step|type|amount|nameOrig|oldbalanceOrg|newbalanceOrig|nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|   0|   0|     0|       0|            0|             0|       0|             0|             0|      0|             0|
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+



In [13]:
# Clientes que mais iniciaram transações
df.groupBy("nameOrig").count().orderBy(col("count").desc()).show(5)

+-----------+-----+
|   nameOrig|count|
+-----------+-----+
| C724452879|    3|
|C1677795071|    3|
|C2098525306|    3|
|C1784010646|    3|
|C1902386530|    3|
+-----------+-----+
only showing top 5 rows



In [14]:
# Contagem de transações por tipo
transaction_counts = df.groupBy("type").agg(count("*").alias("qtd_transacoes")).toPandas()

# Criar gráfico de barras usando Plotly
fig = go.Figure(data=[go.Bar(x=transaction_counts['type'], y=transaction_counts['qtd_transacoes'])])

# Atualizar layout
fig.update_layout(title_text="Contagem de Transações por Tipo",
                  xaxis_title="Tipo de Transação",
                  yaxis_title="Quantidade de Transações")

fig.show()

In [15]:
NGROK_AUTHTOKEN = "310Do004ERavHMI0lbX6VjKCKXo_2ER4gDSrQZZtTHxAYngM5" # Substitua pelo seu token de autenticação ngrok
ngrok.set_auth_token(NGROK_AUTHTOKEN) # Configura o token de autenticação ngrok



In [16]:
# Abrir túnel na porta 4050 (Spark UI)
public_url = ngrok.connect(4050)
print(f"Spark UI disponível em: {public_url}")

Spark UI disponível em: NgrokTunnel: "https://67d35c67a9e9.ngrok-free.app" -> "http://localhost:4050"
