In [0]:
from pyspark.sql import functions as F

In [0]:
task_name = ""
# start_dt = dbutils.jobs.taskValues.get(taskKey=task_name, key="start_date")
# end_dt = dbutils.jobs.taskValues.get(taskKey=task_name, key="end_date")
continue_flag = dbutils.jobs.taskValues.get(taskKey=task_name, key="continue_downstream", default="no")

if continue_flag == "no":
    dbutils.notebook.exit("No new data to aggregate.")

In [0]:
silver_df = spark.read.table("mlb.02_silver.statcast_enrich")

In [0]:
silver_df.display()

## Yearly Pitcher Performance by Pitch Type

In [0]:
in_zone_labels = ["high-left", "high-center", "high-right", "middle-left", "middle-center", "middle-right", "low-left", "low-center", "low-right"]

In [0]:
enriched_df = silver_df.select(
    "*",
    F.year("date").alias("season"),
    # Outcome Flags (Strict matching to avoid double_play overlap)
    (F.col("events") == "single").alias("is_single"),
    (F.col("events") == "double").alias("is_double"),
    (F.col("events") == "triple").alias("is_triple"),
    (F.col("events") == "home_run").alias("is_hr"),
    (F.col("events").isin("strikeout", "strikeout_double_play")).alias("is_k"),
    (F.col("events").isin("walk", "intent_walk")).alias("is_bb"),
    
    # Swing & Whiff Flags
    F.col("description").contains("swinging").alias("is_whiff"),
    F.col("description").rlike("swinging_strike|foul|hit_into_play").alias("is_swing"),
    
    # Contact Quality Flags
    (F.col("launch_speed_angle") == 6).alias("is_barrel"),
    (F.col("launch_speed_angle") <= 2).alias("is_poor_contact")
)

pitcher_pitch_yearly_gold_df = enriched_df.groupBy(
    "season", "pitcher", "fielding_team", "pitch_name"
).agg(
    F.count("idx_game_pitch").alias("pitch_count"),
    
    # Physical Metrics
    F.round(F.avg("pitcher_release_speed_kmh"), 2).alias("avg_velocity"),
    F.round(F.avg("spin_axis_degree"), 2).alias("avg_spin_axis"),
    F.round(F.avg("pitch_ball_move_x_cm_catcher_view"), 2).alias("avg_break_x"),
    F.round(F.avg("pitch_ball_move_z_cm_catcher_view"), 2).alias("avg_break_z"),
    
    # Count Metrics
    F.sum(F.when(F.col("pitch_zone_catcher_view").isin(in_zone_labels), 1).otherwise(0)).alias("zone_in_count"),
    F.sum(F.coalesce(F.col("is_whiff").cast("int"), F.lit(0))).alias("whiff_count"),
    F.sum(F.coalesce(F.col("is_swing").cast("int"), F.lit(0))).alias("swing_count"),
    F.sum(F.coalesce(F.col("is_single").cast("int"), F.lit(0))).alias("single_count"),
    F.sum(F.coalesce(F.col("is_double").cast("int"), F.lit(0))).alias("double_count"),
    F.sum(F.coalesce(F.col("is_triple").cast("int"), F.lit(0))).alias("triple_count"),
    F.sum(F.coalesce(F.col("is_hr").cast("int"), F.lit(0))).alias("hr_count"),

    F.sum(F.when(F.col("launch_speed_angle") == "Barrel", 1).otherwise(0)).alias("barrel_count"),
    
    # 'count Weak, Topped, Under as poor_contact
    F.sum(F.when(F.col("launch_speed_angle").isin("Weak", "Topped", "Under"), 1).otherwise(0)).alias("poor_contact_count")
)

In [0]:
pitcher_pitch_yearly_gold_df.display()

In [0]:
pitcher_pitch_yearly_gold_df = pitcher_pitch_yearly_gold_df.select(
    "*",
    F.round(F.sqrt(F.pow("avg_break_x", 2) + F.pow("avg_break_z", 2)), 2).alias("total_break"),
    F.round(F.when(F.col("swing_count") > 0, F.col("whiff_count") / F.col("swing_count")).otherwise(0), 5).alias("whiff_rate"),
    F.round(F.when(F.col("pitch_count") > 0, F.col("barrel_count") / F.col("pitch_count")).otherwise(0), 5).alias("barrel_rate"),
    F.round(F.when(F.col("pitch_count") > 0, F.col("poor_contact_count") / F.col("pitch_count")).otherwise(0), 5).alias("poor_contact_rate"),
    F.round(F.when(F.col("pitch_count") > 0, F.col("zone_in_count") / F.col("pitch_count")).otherwise(0), 5).alias("zone_rate"),
    F.round(F.when(F.col("pitch_count") > 0, 
           ((F.col("single_count") + F.col("double_count")*2 + F.col("triple_count")*3 + F.col("hr_count")*4) / F.col("pitch_count"))
          ).otherwise(0), 5).alias("slugging_per_pitch")
)

In [0]:
pitcher_pitch_yearly_gold_df.display()

In [0]:
pitcher_pitch_yearly_gold_df.write.mode("overwrite").saveAsTable("mlb.03_gold.pitcher_yearly")