# Transform Phase - PySpark Data Processing

In [None]:
# Parameters (injected by papermill)
execution_date = '2025-10-31'
input_dir = '/opt/airflow/data/raw'
output_dir = '/opt/airflow/data/processed'

In [None]:
import sys
sys.path.append('/opt/airflow')

from src.transform import (
    get_spark_session, 
    load_raw_data, 
    process_player_stats, 
    aggregate_hero_performance, 
    save_spark_df
)
from src.utils import setup_logging
from pyspark.sql import functions as F

logger = setup_logging('transform', '/opt/airflow/logs')
logger.info(f'Starting PySpark transformation for {execution_date}')
print(f'⚙️ Starting PySpark transformation for {execution_date}')

In [None]:
# Initialize Spark and load data
print('Initializing Spark session...')
matches_df, hero_stats_df, spark = load_raw_data(input_dir, execution_date)
logger.info(f'Loaded data into Spark')
print(f'✓ Loaded {matches_df.count()} matches into Spark')
print(f'✓ Spark UI available at http://localhost:4040')

In [None]:
# Display schema
print('\nMatches Schema:')
matches_df.printSchema()
print('\nSample match data:')
matches_df.select('match_id', 'radiant_win', 'duration').show(3)

In [None]:
# Process player-level statistics
print('\nProcessing player-level statistics...')
player_stats = process_player_stats(matches_df)
logger.info('Processed player-level statistics')
print(f'✓ Processed {player_stats.count()} player records')
print('\nSample player stats:')
player_stats.select(
    'match_id', 'hero_id', 'kills', 'deaths', 'assists', 
    'kda', 'gpm', 'player_won'
).show(5, truncate=False)

In [None]:
# Aggregate hero performance
print('\nAggregating hero performance metrics...')
hero_performance = aggregate_hero_performance(player_stats, hero_stats_df)
logger.info('Aggregated hero performance metrics')
print(f'✓ Aggregated {hero_performance.count()} hero stats')
print('\nTop 10 heroes by win rate:')
hero_performance.orderBy(F.desc('win_rate')).select(
    'localized_name', 'games_played', 'win_rate', 
    'avg_kda', 'avg_gpm'
).show(10, truncate=False)

In [None]:
# Save processed data
print('\nSaving processed data...')
player_stats_path = f'{output_dir}/player_stats_{execution_date}.csv'
hero_perf_path = f'{output_dir}/hero_performance_{execution_date}.csv'

save_spark_df(player_stats, player_stats_path)
save_spark_df(hero_performance, hero_perf_path)

logger.info('Saved processed data')
print(f'✓ Saved player stats to: {player_stats_path}')
print(f'✓ Saved hero performance to: {hero_perf_path}')
print('\n✓ PySpark transformation complete!')