In [9]:
import time
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean, stddev, sum as _sum, desc, count, lit
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType, TimestampType

In [10]:
MINIO_ENDPOINT = "http://minio:9000"
ACCESS_KEY = "admin"      
SECRET_KEY = "password"  
BUCKET_NAME = "datalake"

# Caminhos
INPUT_PATH = f"s3a://{BUCKET_NAME}/dataset.csv"
BASE_OUTPUT_PATH = f"s3a://{BUCKET_NAME}"


def get_spark_session():
    print("Iniciando Spark .")
    return SparkSession.builder \
        .appName("Sales_Analysis_ETL_Final") \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4") \
        .config("spark.hadoop.fs.s3a.endpoint", MINIO_ENDPOINT) \
        .config("spark.hadoop.fs.s3a.access.key", ACCESS_KEY) \
        .config("spark.hadoop.fs.s3a.secret.key", SECRET_KEY) \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
        .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
        .getOrCreate()

In [11]:
def load_data(spark):
    print(f"\nLendo dados de origem: {INPUT_PATH}")
    # Lê CSV inferindo schema 
    df = spark.read \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .csv(INPUT_PATH)
    
    print(f"Registros carregados: {df.count()}")
    return df

In [None]:
#Pipeline 1

In [4]:
def pipeline_1_stats(df):
    """Estatísticas de Vendas e Lucro por Sub-Categoria"""
    print("Pipeline 1: Estatísticas (Média/Desvio Padrão)...")
    
    df_media = df.groupBy("Sub-Category").agg(mean("Profit").alias("MeanProfit"), mean("Sales").alias("MeanSales"))
    df_std = df.groupBy("Sub-Category").agg(stddev("Profit").alias("StdProfit"), stddev("Sales").alias("StdSales"))
    
    df_final = df_media.join(df_std, on="Sub-Category", how="left") \
        .withColumn("DiffMean", col("MeanProfit") - col("StdProfit")) \
        .orderBy(desc("DiffMean"))
        
    path = f"{BASE_OUTPUT_PATH}/pipeline1/statistics"
    df_final.write.mode("overwrite").parquet(path)
    return ("Pipeline1", path, df_final.count())

In [None]:
#Pipeline 2

In [5]:
def pipeline_2_top_clients(df):
    """Top 20 Clientes pivotado por Região"""
    print("Pipeline 2: Top 20 Clientes por Região")
    
    #Top 20 Global
    top_clients = df.groupBy("Customer ID").agg(_sum("Sales").alias("TotalSales")).orderBy(desc("TotalSales")).limit(20)
    top_ids = [row["Customer ID"] for row in top_clients.collect()]
    
    # 2. Filtrar e Pivotar
    df_filtered = df.filter(col("Customer ID").isin(top_ids))
    df_pivot = df_filtered.groupBy("Sub-Category").pivot("Region").agg(mean("Sales"))
    
    
    for c in df_pivot.columns:
        if c != "Sub-Category":
            df_pivot = df_pivot.withColumnRenamed(c, f"Sales_{c}")

    path = f"{BASE_OUTPUT_PATH}/pipeline2/top_clients_by_region"
    df_pivot.write.mode("overwrite").parquet(path)
    return ("Pipeline2", path, df_pivot.count())

In [None]:
#Pipeline 3

In [6]:
def pipeline_3_central(df, spark):
    """Foco na Região Central"""
    print("Pipeline 3: Análise Região Central")
    
    # Filtrar e Top 20 Central
    df_central = df.filter(col("Region") == "Central")
    top_central = df_central.groupBy("Customer ID").agg(_sum("Sales").alias("S")).orderBy(desc("S")).limit(20)
    ids_central = [row["Customer ID"] for row in top_central.collect()]
    
    # Pivot
    df_data = df_central.filter(col("Customer ID").isin(ids_central))
    df_pivot = df_data.groupBy("Customer ID").pivot("Sub-Category").agg(_sum("Sales"))
    
    path = f"{BASE_OUTPUT_PATH}/pipeline3/central_sales"
    df_pivot.write.mode("overwrite").parquet(path)
    return ("Pipeline3", path, df_pivot.count())

In [None]:
#Pipeline 4

In [7]:
def pipeline_4_profitability(df):
    """Rentabilidade (Profit Ratio)"""
    print("Pipeline 4: Rentabilidade")
    
    df_prof = df.groupBy("Region", "Customer ID") \
        .agg(_sum("Sales").alias("Sales"), _sum("Profit").alias("Profit")) \
        .withColumn("profitRatio", col("Profit") / col("Sales")) \
        .orderBy(desc("profitRatio"))
        
    path = f"{BASE_OUTPUT_PATH}/pipeline4/profitability"
    df_prof.write.mode("overwrite").parquet(path)
    return ("Pipeline4", path, df_prof.count())

In [None]:
#Pipeline 5

In [10]:
def pipeline_5_long_format(df):
    """Unpivot para formato Longo"""
    print("Pipeline 5: Formato Longo (Stack)...")
    
    df_base = df.groupBy("Region", "Customer ID") \
                .agg(_sum("Sales").alias("Sales"), _sum("Profit").alias("Profit")) \
                .withColumn("profitRatio", col("Profit") / col("Sales"))
    
    # Transforma colunas em linhas
    df_long = df_base.select(
        "Region", "Customer ID",
        F.expr("stack(3, 'Sales', Sales, 'Profit', Profit, 'ProfitRatio', profitRatio) as (Metric, Value)")
    )
    
    path = f"{BASE_OUTPUT_PATH}/pipeline5/long_format"
    df_long.write.mode("overwrite").parquet(path)
    return ("Pipeline5", path, df_long.count())


In [None]:
#Salvando

In [11]:
def save_logs(spark, logs):
    """Salva metadados da execução"""
    print("\nSalvando Logs")
    schema = StructType([
        StructField("pipeline", StringType()), StructField("path", StringType()), 
        StructField("count", LongType()), StructField("timestamp", TimestampType())
    ])
    data = [(l[0], l[1], l[2], datetime.now()) for l in logs]
    spark.createDataFrame(data, schema).write.mode("overwrite").parquet(f"{BASE_OUTPUT_PATH}/logs")

In [None]:
#Executando

In [12]:
if __name__ == "__main__":
    spark = get_spark_session()
    spark.sparkContext.setLogLevel("ERROR")
    
    try:
        df_raw = load_data(spark)
        
        logs = []
        logs.append(pipeline_1_stats(df_raw))
        logs.append(pipeline_2_top_clients(df_raw))
        logs.append(pipeline_3_central(df_raw, spark))
        logs.append(pipeline_4_profitability(df_raw))
        logs.append(pipeline_5_long_format(df_raw))
        
        # 3. Finalizar
        save_logs(spark, logs)
        print("\n✅ SUCESSO! Todos os pipelines foram executados e salvos no MinIO.")
        
    except Exception as e:
        print(f"Erro: {str(e)}")
    finally:
        spark.stop()

Iniciando Spark .

Lendo dados de origem: s3a://datalake/dataset.csv
Registros carregados: 9994
Pipeline 1: Estatísticas (Média/Desvio Padrão)...
Pipeline 2: Top 20 Clientes por Região
Pipeline 3: Análise Região Central
Pipeline 4: Rentabilidade
Pipeline 5: Formato Longo (Stack)...

Salvando Logs

✅ SUCESSO! Todos os pipelines foram executados e salvos no MinIO.
