In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, desc, abs, count, max as spark_max, min as spark_min, stddev
from pyspark.sql.window import Window
import pyspark.sql.functions as F

# Função para criar sessão Spark
def create_spark_session() -> SparkSession:
    spark = (
        SparkSession.builder
        .appName("ETL Gold - Soccer Analytics")
        .enableHiveSupport()
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        .config("spark.hadoop.fs.s3minio.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        .getOrCreate()
    )
    return spark

# Inicializa Spark
spark = create_spark_session()

# Configurações MinIO
minio_delta_options = {
    "fs.s3a.access.key": "admin",
    "fs.s3a.secret.key": "senhasegura",
    "fs.s3a.endpoint": "http://minio:9000",
    "fs.s3a.path.style.access": "true",
    "fs.s3a.connection.ssl.enabled": "false"
}

### ============================================
###  Parte 1: Análises de Player Attributes
### ============================================

# Leitura da Silver player_attributes
silver_player_attr_path = "s3a://bucket-silver-zone/soccer/player_attributes"
df_player_attr = (
    spark.read
    .format("delta")
    .options(**minio_delta_options)
    .load(silver_player_attr_path)
)

# Remover nulos essenciais
df_player_attr = df_player_attr.dropna(subset=["player_api_id", "overall_rating", "potential"])

# Criar ranking por player_api_id para evitar duplicatas
windowSpec = Window.partitionBy("player_api_id").orderBy(desc("overall_rating"), desc("potential"))
df_ranked = df_player_attr.withColumn("rank", F.row_number().over(windowSpec))

# Manter apenas a melhor entrada por jogador
df_unique = df_ranked.filter(col("rank") == 1).drop("rank")

# Top 10 jogadores
top_players = (
    df_unique
    .select("player_api_id", "overall_rating", "potential")
    .orderBy(desc("overall_rating"), desc("potential"))
    .limit(10)
)

print("Top 10 jogadores com maior overall_rating e potential:")
top_players.show(truncate=False)

# Tabela de agregação de atributos
atributos = [
    "curve", "vision", "agility", "balance", "jumping", "marking",
    "stamina", "volleys", "crossing", "strength", "dribbling", "finishing",
    "long_shots", "shot_power", "positioning", "acceleration", "ball_control",
    "long_passing", "sprint_speed", "interceptions", "short_passing"
]

agg_attrs = (
    df_player_attr
    .select(*atributos)
    .agg(*[avg(col(c)).alias(f"avg_{c}") for c in atributos])
)

print("Médias dos atributos técnicos e físicos:")
agg_attrs.show(truncate=False)

# Caminho Gold para player analytics
gold_base_player = "s3a://bucket-gold-zone/soccer/player_analytics"

# Salvar Top Players
(
    top_players.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .options(**minio_delta_options)
    .save(f"{gold_base_player}/top_players")
)
print("Top 10 jogadores gravados com sucesso na camada Gold.")

# Salvar agregação de atributos
(
    agg_attrs.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .options(**minio_delta_options)
    .save(f"{gold_base_player}/agg_attributes")
)
print("Tabela de agregação de atributos gravada com sucesso na camada Gold.")

### ============================================
###  Parte 2: Análises de Partidas (Match)
### ============================================

# Leitura da Silver Match
silver_match_path = "s3a://bucket-silver-zone/soccer/Match"
df_match = (
    spark.read
    .format("delta")
    .options(**minio_delta_options)
    .load(silver_match_path)
)

# 1. Total de partidas por temporada
total_matches = df_match.groupBy("season").agg(count("*").alias("total_matches"))
print("\n\nTotal de partidas por temporada:")
total_matches.show()

# 2. Desvio padrão de gols por temporada
stddev_goals = df_match.groupBy("season").agg(
    stddev("home_team_goal").alias("stddev_home_goals"),
    stddev("away_team_goal").alias("stddev_away_goals")
)
print("Desvio padrão de gols por temporada:")
stddev_goals.show()

# 3. Máximo e mínimo de gols por temporada
max_min_goals = df_match.groupBy("season").agg(
    spark_max("home_team_goal").alias("max_home_goals"),
    spark_min("home_team_goal").alias("min_home_goals"),
    spark_max("away_team_goal").alias("max_away_goals"),
    spark_min("away_team_goal").alias("min_away_goals")
)
print("Máximo e mínimo de gols por temporada:")
max_min_goals.show()

# 4. Diferença média de gols por temporada
goal_diff = df_match.withColumn(
    "goal_difference", abs(col("home_team_goal") - col("away_team_goal"))
).groupBy("season").agg(
    avg("goal_difference").alias("avg_goal_difference")
)
print("Diferença média de gols por temporada:")
goal_diff.show()

# 5. Média de gols por time (mandante e visitante)
home_goals = df_match.groupBy("home_team_api_id").agg(
    avg("home_team_goal").alias("avg_home_goals")
)
print("Média de gols por time (mandante):")
home_goals.show()

away_goals = df_match.groupBy("away_team_api_id").agg(
    avg("away_team_goal").alias("avg_away_goals")
)
print("Média de gols por time (visitante):")
away_goals.show()

# Caminho Gold para match analytics
gold_base_match = "s3a://bucket-gold-zone/soccer/match_analytics"

# Escrita no Delta Lake Gold
for name, df in [
    ("total_matches", total_matches),
    ("stddev_goals", stddev_goals),
    ("max_min_goals", max_min_goals),
    ("goal_diff", goal_diff),
    ("avg_home_goals", home_goals),
    ("avg_away_goals", away_goals)
]:
    path = f"{gold_base_match}/{name}"
    (
        df.write
        .format("delta")
        .mode("overwrite")
        .option("overwriteSchema", "true")
        .options(**minio_delta_options)
        .save(path)
    )
    print(f"Tabela analítica '{name}' gravada com sucesso na camada Gold no MinIO!")

# Finaliza Spark
spark.stop()


Top 10 jogadores com maior overall_rating e potential:
+-------------+--------------+---------+
|player_api_id|overall_rating|potential|
+-------------+--------------+---------+
|30981        |94            |97       |
|30717        |93            |93       |
|30829        |93            |93       |
|30893        |93            |93       |
|39989        |92            |93       |
|39854        |92            |92       |
|30743        |91            |95       |
|30626        |91            |93       |
|30955        |91            |93       |
|30657        |91            |92       |
+-------------+--------------+---------+

Médias dos atributos técnicos e físicos:
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+------------------+-----------------+------------------+-----------------+-----------------+----------------+----------------+------------------+---