In [0]:
# 03_data_transformation_and_curation.ipynb - Célula 1: Configuração da SparkSession

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, lit, expr, from_unixtime, to_timestamp, array_contains, when, concat_ws, to_date, sum as spark_sum, count as spark_count, avg as spark_avg

print("Bibliotecas importadas com sucesso.")

# A SparkSession já é automaticamente fornecida e configurada pelo Databricks.
# Ela já herda o acesso ao S3 via a IAM Role do seu CloudFormation.
spark = SparkSession.builder.appName("TFTDataTransformationAndCuratio").getOrCreate()

print("SparkSession configurada.")

In [0]:
# 03_data_transformation_and_curation.ipynb - Célula 2: Carregar Dados Brutos do Unity Catalog

from pyspark.sql.functions import col

# Nome completo da sua tabela no Unity Catalog
UNITY_CATALOG_RAW_TABLE_NAME = "tft_analytics.raw.tft_matches_raw"

print(f"Carregando dados da tabela '{UNITY_CATALOG_RAW_TABLE_NAME}'...")

# Carrega o DataFrame a partir da tabela do Unity Catalog
df_raw_spark = spark.read.table(UNITY_CATALOG_RAW_TABLE_NAME)

print(f"Dados carregados. Total de registros: {df_raw_spark.count()}")
print("\nEsquema inicial dos dados brutos:")
df_raw_spark.printSchema()

In [0]:
# 03_data_transformation_and_curation.ipynb - Célula 3: Transformação e Achatamento dos Dados (SOLUÇÃO COM selectExpr)

from pyspark.sql.functions import col, explode, from_unixtime, to_date, when, concat_ws # 'transform' NÃO será usado diretamente aqui

print("Iniciando o achatamento e transformação dos dados...")

# 1. Achatar (Flatten) as informações de partida e explodir a lista de participantes
df_flattened_participants = df_raw_spark.select(
    col("metadata.match_id").alias("match_id"),
    from_unixtime(col("info.game_datetime") / 1000).alias("game_datetime_utc"),
    to_date(from_unixtime(col("info.game_datetime") / 1000)).alias("game_date_utc"),
    col("info.game_length").alias("game_length_seconds"),
    col("info.game_version").alias("game_version"),
    col("info.queue_id").alias("queue_id"),
    col("info.tft_set_core_name").alias("tft_set_name"),
    col("info.tft_set_number").alias("tft_set_number"),
    explode(col("info.participants")).alias("participant_data")
)

# 2. Extrair detalhes específicos de cada participante e usar selectExpr para transformar arrays
# USAMOS SELECTEXPR AQUI PARA TODAS AS COLUNAS, INCLUINDO AS TRANSFORMAÇÕES DE ARRAY
df_transformed = df_flattened_participants.selectExpr(
    "match_id",
    "game_datetime_utc",
    "game_date_utc",
    "game_length_seconds",
    "game_version",
    "queue_id",
    "tft_set_name",
    "tft_set_number",
    "participant_data.puuid AS puuid",
    "participant_data.placement AS placement",
    "participant_data.gold_left AS gold_left",
    "participant_data.last_round AS last_round",
    "participant_data.level AS level",
    "participant_data.players_eliminated AS players_eliminated",
    "participant_data.total_damage_to_players AS total_damage_to_players",
    "participant_data.riotIdGameName AS riot_id_game_name",
    "participant_data.riotIdTagline AS riot_id_tagline",
    "participant_data.time_eliminated AS time_eliminated_seconds",
    # SOLUÇÃO PARA ARRAY DE STRUCT COM TRANSFORM: Usar a função SQL TRANSFORM
    "CONCAT_WS(',', TRANSFORM(participant_data.traits, t -> t.name)) AS traits_activated_names",
    "CONCAT_WS(',', TRANSFORM(participant_data.units, u -> u.character_id)) AS units_used_ids"
)

# 3. Adicionar colunas de indicadores de vitória/top 4
df_transformed = df_transformed.withColumn(
    "is_winner",
    when(col("placement") == 1, True).otherwise(False)
).withColumn(
    "is_top_4",
    when(col("placement") <= 4, True).otherwise(False)
)

print("\nTransformação e achatamento concluídos. Schema do DataFrame transformado:")
df_transformed.printSchema()

# --- CONTAR TODOS OS REGISTROS TRANSFORMADOS ---
print(f"Total de registros (participantes) no DataFrame transformado: {df_transformed.count()}")
# -----------------------------------------------

print("\nExemplo das primeiras 10 linhas do DataFrame transformado:")
display(df_transformed.limit(10))

In [0]:
print(f"Total de registros (participantes) no DataFrame transformado: {df_transformed.count()}")

In [0]:
# 03_data_transformation_and_curation.ipynb - Célula 4: Salvar Dados Transformados como Tabela Delta Lake

# Você já tem o esquema 'curated' criado no 'tft_analytics' Catalog.
# Se não criou ainda, volte e crie o esquema 'tft_analytics.curated' no Unity Catalog.

# SUBSTITUA PELO NOME REAL E EXATO DO SEU BUCKET S3
S3_BUCKET_NAME = "tft-data-pipeline-towwers" # O mesmo bucket usado para os dados raw

# Caminho de destino no S3 para a tabela Delta transformados
# Criaremos uma subpasta 'curated/tft_player_performance_delta/' no seu bucket
S3_CURATED_DELTA_PATH = f"s3a://{S3_BUCKET_NAME}/curated/tft_player_performance_delta/"

# Nome completo da tabela Delta no Unity Catalog (catalog.schema.table)
UNITY_CATALOG_DELTA_TABLE_NAME = "tft_analytics.curated.tft_player_performance_delta"

print(f"Iniciando gravação do DataFrame transformado como tabela Delta em: {S3_CURATED_DELTA_PATH}...")

try:
    # Salva o DataFrame df_transformed como uma tabela Delta no S3
    # O modo 'overwrite' substitui os dados existentes na pasta.
    df_transformed.write.format("delta") \
                        .mode("overwrite") \
                        .save(S3_CURATED_DELTA_PATH)
    print(f"\nDados transformados salvos com sucesso como arquivos Delta em: {S3_CURATED_DELTA_PATH}")

    # Cria (ou recria) a Tabela Externa no Unity Catalog apontando para o Delta Lake
    print(f"Tentando criar/recriar tabela externa '{UNITY_CATALOG_DELTA_TABLE_NAME}' no Unity Catalog...")
    
    # Registra a tabela Delta no Unity Catalog usando o caminho do S3.
    df_transformed.write.format("delta") \
                        .option("path", S3_CURATED_DELTA_PATH) \
                        .mode("overwrite") \
                        .saveAsTable(UNITY_CATALOG_DELTA_TABLE_NAME)

    print(f"Tabela externa '{UNITY_CATALOG_DELTA_TABLE_NAME}' criada/atualizada com sucesso no Unity Catalog!")

    # Verificação rápida da tabela Delta recém-criada
    print("\nVerificação (query SQL na tabela Delta):")
    spark.sql(f"SELECT COUNT(*) FROM {UNITY_CATALOG_DELTA_TABLE_NAME}").show()
    spark.sql(f"SELECT * FROM {UNITY_CATALOG_DELTA_TABLE_NAME} LIMIT 5").show(truncate=False)

except Exception as e:
    print(f"\nERRO ao salvar dados ou criar tabela Delta: {e}")
    print("Verifique:")
    print(f"  - O nome do bucket S3 '{S3_BUCKET_NAME}' está correto e exato.")
    print(f"  - As permissões da IAM Role permitem escrita no S3 para o caminho '{S3_CURATED_DELTA_PATH}'.")
    print(f"  - Se você tem as permissões 'CREATE TABLE' e 'USE SCHEMA' no esquema 'tft_analytics.curated'.")
    print(f"  - Detalhes do erro: {e}")