# Experimento de performance e Avaliação de Consumo do Fabric com o Pyspark

Com este experimento simples, estou usando dados de diesel em dois cenários:
- Série histórica de preços de 4 anos totalizando mais de 700mb em csv;
- Arquivo simples de menos de 1mb com informações geofísicas;

A transformação significava trazer de csv consolidado para delta (pyspark) e fazer umas transformações simples.

Configuração Starter pool Oitimizado para memória tamanho médio 10 nodes. Spark 3.5

In [9]:
# Código PySpark com o cenário da série histórica de 700mb

import time
import psutil
from contextlib import contextmanager
from pyspark.sql.functions import regexp_replace, col

class DetailedMetrics:
   def __init__(self):
       self.measures = {}
       
   @contextmanager
   def measure_step(self, step_name):
       start = time.time()
       start_mem = psutil.Process().memory_info().rss
       start_io = psutil.disk_io_counters()
       
       try:
           yield
       finally:
           end = time.time()
           end_mem = psutil.Process().memory_info().rss
           end_io = psutil.disk_io_counters()
           
           self.measures[step_name] = {
               'time': end - start,
               'memory': (end_mem - start_mem) / 1024 / 1024,
               'io_read': end_io.read_bytes - start_io.read_bytes,
               'io_write': end_io.write_bytes - start_io.write_bytes
           }
           print(f"\nMétricas para {step_name}:")
           print(f"Tempo: {self.measures[step_name]['time']:.2f}s")
           print(f"Memória: {self.measures[step_name]['memory']:.2f} MB")
           print(f"IO Leitura: {self.measures[step_name]['io_read'] / 1024 / 1024:.2f} MB")
           print(f"IO Escrita: {self.measures[step_name]['io_write'] / 1024 / 1024:.2f} MB")

metrics = DetailedMetrics()

# Leitura
with metrics.measure_step("PySpark - Leitura"):
   df_2021_01 = spark.read.option("delimiter", ";").option("header", True).csv("Files/diesel/serie_historica_csv/historico_diesel_2021_01.csv")
   df_2022_01 = spark.read.option("delimiter", ";").option("header", True).csv("Files/diesel/serie_historica_csv/historico_diesel_2022_01.csv")
   df_2022_02 = spark.read.option("delimiter", ";").option("header", True).csv("Files/diesel/serie_historica_csv/historico_diesel_2022_02.csv")
   df_2023_01 = spark.read.option("delimiter", ";").option("header", True).csv("Files/diesel/serie_historica_csv/historico_diesel_2023_01.csv")
   df_2023_02 = spark.read.option("delimiter", ";").option("header", True).csv("Files/diesel/serie_historica_csv/historico_diesel_2023_02.csv")
   df_2024_01 = spark.read.option("delimiter", ";").option("header", True).csv("Files/diesel/serie_historica_csv/historico_diesel_2024_01.csv")
   df_2024_02 = spark.read.option("delimiter", ";").option("header", True).csv("Files/diesel/serie_historica_csv/historico_diesel_2024_02.csv")
   df_concatenado = df_2021_01.union(df_2022_02).union(df_2022_01).union(df_2023_01).union(df_2023_02).union(df_2024_01).union(df_2024_02)

# Transformação
with metrics.measure_step("PySpark - Transformação"):
   df_processado = df_concatenado.dropDuplicates() \
       .fillna({"Valor de Compra": "0", "Complemento": "SEM COMPLEMENTO"}) \
       .withColumn("Valor de Venda", regexp_replace(col("Valor de Venda"), ",", ".").cast("double")) \
       .withColumn("Valor de Compra", regexp_replace(col("Valor de Compra"), ",", ".").cast("double")) \
       .withColumn("Margem", col("Valor de Venda") - col("Valor de Compra")) \
       .withColumnRenamed("Regiao - Sigla", "Regiao") \
       .withColumnRenamed("Estado - Sigla", "Estado")

# Escrita
with metrics.measure_step("PySpark - Escrita"):
   df_processado.write.mode("overwrite").parquet('Files/diesel/serie_historica_parquet/diesel_4anos_pyspark')

StatementMeta(, c8da7e14-c036-44b5-9472-52716eccb7c8, 11, Finished, Available, Finished)


Métricas para PySpark - Leitura:
Tempo: 1.82s
Memória: 0.00 MB
IO Leitura: 0.45 MB
IO Escrita: 1.66 MB

Métricas para PySpark - Transformação:
Tempo: 0.06s
Memória: 0.00 MB
IO Leitura: 0.02 MB
IO Escrita: 0.02 MB

Métricas para PySpark - Escrita:
Tempo: 17.03s
Memória: 0.00 MB
IO Leitura: 12.15 MB
IO Escrita: 21.50 MB


In [7]:
# Código PySpark com o cenário da série pequena menor que 1mb
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import time
import psutil
import os

start_time = time.time()
initial_memory = get_memory_usage()

# Ler arquivo sem cabeçalho
df_raw = spark.read.option("delimiter", ";").option("header", False).csv('Files/diesel/pequenos/tabela-programas-geofisicos.csv')

# Pegar a segunda linha para usar como header
header = df_raw.collect()[1]

# Criar novo DataFrame pulando as duas primeiras linhas
df = spark.read.option("delimiter", ";").option("header", False).csv('Files/diesel/pequenos/tabela-programas-geofisicos.csv').filter(f"_c0 != '{header[0]}'")

# Renomear colunas
for idx, name in enumerate(header):
   if name is not None:
       df = df.withColumnRenamed(f"_c{idx}", name)

# Transformações
df = df.select(
   col("Nome").alias("id_programa"),
   col("Categoria"),
   when(col("Natureza") == 'Não-Exclusivo', 'NAO_EXCLUSIVO').otherwise('EXCLUSIVO').alias("tipo_natureza"),
   to_date(col("Inicio")).alias("data_inicio"),
   to_date(col("Término Real")).alias("data_termino"),
   col("Tecnologia"),
   col("Bacia"),
   coalesce(col("Operadora"), lit("NAO_INFORMADO")).alias("operadora"),
   months_between(to_date(col("Término Real")), to_date(col("Inicio"))).alias("duracao_meses"),
   when(col("Bacia") == "Santos", "BACIA_SANTOS").when(col("Bacia") == "Campos", "BACIA_CAMPOS").otherwise("OUTRAS").alias("categoria_bacia"),
   when(col("Tecnologia") == "Sísmica 3D", "SISMICA_3D")
   .when(col("Tecnologia") == "Magnetometria", "MAGNETOMETRIA")
   .when(col("Tecnologia") == "Ocean Bottom Nodes", "OBN")
   .otherwise("OUTROS").alias("tipo_tecnologia")
)

query_time = time.time() - start_time

df.write.format("delta").mode("overwrite").save("Files/diesel/pequenos/programas_geofisicos_benchmark_spark")

total_time = time.time() - start_time
peak_memory = get_memory_usage() - initial_memory

print(f"Métricas de Performance:")
print(f"Tempo total: {total_time:.2f} segundos")
print(f"Tempo query: {query_time:.2f} segundos")
print(f"Memória utilizada: {peak_memory:.2f} MB")
print(f"Linhas processadas: {df.count()}")

StatementMeta(, c8da7e14-c036-44b5-9472-52716eccb7c8, 9, Finished, Available, Finished)

Métricas de Performance:
Tempo total: 6.55 segundos
Tempo query: 0.82 segundos
Memória utilizada: 3.61 MB
Linhas processadas: 3602
