In [2]:
import os

from pyspark.sql import SparkSession
from pyspark.sql.functions import col


# Recriando a função de iniciar sessão Spark
def create_spark_session():
    spark = (
        SparkSession.builder.appName('Silver Layer - Soccer Analysis')
        .enableHiveSupport()
        .config(
            'spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension'
        )
        .config(
            'spark.sql.catalog.spark_catalog',
            'org.apache.spark.sql.delta.catalog.DeltaCatalog',
        )
        .config(
            'spark.hadoop.fs.s3a.impl',
            'org.apache.hadoop.fs.s3a.S3AFileSystem',
        )
        .config('spark.hadoop.fs.s3a.path.style.access', 'true')
        .config('spark.hadoop.fs.s3a.endpoint', os.getenv('MINIO_ENDPOINT'))
        .config('spark.hadoop.fs.s3a.access.key', os.getenv('MINIO_ROOT_USER'))
        .config(
            'spark.hadoop.fs.s3a.secret.key', os.getenv('MINIO_ROOT_PASSWORD')
        )
        .getOrCreate()
    )
    return spark


spark = create_spark_session()

In [3]:
# Tabela silver_country
# Objetivo: Limpar e enriquecer dados de países.

# Leia a tabela country da Bronze
df_country = spark.read.format('delta').load(
    's3a://bucket-bronze-zone/soccer/country/'
)

# Transformações
df_country_silver = (
    df_country.select(
        col('id').alias('country_id'), col('name').alias('country_name')
    ).dropDuplicates(['country_id'])  # Remove duplicatas
)

# Salvar como Delta Lake particionado
(
    df_country_silver.write.format('delta')
    .mode('overwrite')
    .save('s3a://bucket-silver-zone/soccer/country/')
)

In [5]:
# Tabela silver_league
# Objetivo: Vincular ligas aos países com nomes claros.

df_league = spark.read.format('delta').load(
    's3a://bucket-bronze-zone/soccer/league/'
)
df_country_silver = spark.read.format('delta').load(
    's3a://bucket-silver-zone/soccer/country/'
)

df_league_silver = df_league.join(
    df_country_silver,
    df_league['country_id'] == df_country_silver['country_id'],
    'left',
).select(
    df_league['id'].alias('league_id'),
    df_league['name'].alias('league_name'),
    df_country_silver['country_name'],
    df_league['country_id'],
)

# Salvar
(
    df_league_silver.write.format('delta')
    .mode('overwrite')
    .save('s3a://bucket-silver-zone/soccer/league/')
)

In [6]:
# Tabela silver_team
# Objetivo: Padronizar nomes e remover colunas desnecessárias.

df_team = spark.read.format('delta').load(
    's3a://bucket-bronze-zone/soccer/team/'
)

df_team_silver = df_team.select(
    col('id').alias('team_id'),
    col('team_api_id'),
    col('team_long_name').alias('team_name'),
    col('team_short_name').alias('team_abbreviation'),
)

# Salvar
(
    df_team_silver.write.format('delta')
    .mode('overwrite')
    .save('s3a://bucket-silver-zone/soccer/team/')
)

In [7]:
# Tabela silver_match
# Objetivo:
# Calcular métricas de gols por liga/país e particionar por temporada.

df_match = spark.read.format('delta').load(
    's3a://bucket-bronze-zone/soccer/match/'
)
df_league_silver = spark.read.format('delta').load(
    's3a://bucket-silver-zone/soccer/league/'
)

# Calcule gols totais por partida e agregue por liga/país
df_match_silver = (
    df_match.withColumn(
        'total_goals', col('home_team_goal') + col('away_team_goal')
    )
    .join(df_league_silver, 'league_id', 'left')
    .select(
        col('id').alias('match_id'),
        col('date'),
        col('season'),
        col('league_id'),
        col('league_name'),
        col('country_name'),
        col('home_team_goal'),
        col('away_team_goal'),
        col('total_goals'),
    )
)

# Salve particionado por temporada (para análise eficiente)
(
    df_match_silver.write.format('delta')
    .partitionBy('season')  # Particionamento crítico para performance
    .mode('overwrite')
    .save('s3a://bucket-silver-zone/soccer/match/')
)

In [8]:
# Encerrar a sessão do Spark
spark.stop()