In [0]:
%pyspark
from pyspark.sql import SparkSession
from pybaseball import statcast
from pyspark.sql.functions import col, count, isnan, when, mean, stddev
from pyspark.ml.feature import StringIndexer, OneHotEncoder


#Start Spark Session w. Memory Parameters
spark = SparkSession.builder \
    .appName("Baseball") \
    .master("spark://10.139.0.30:7077") \
    .getOrCreate()

# Check if the SparkContext is alive
print("Spark UI:", spark.sparkContext.uiWebUrl)
print("Master:", spark.sparkContext.master)
print("Executors:", spark.sparkContext._jsc.sc().statusTracker().getExecutorInfos())


In [1]:
%pyspark
#Ingest raw parquet into Spark df

raw_df = spark.read.parquet("/home/travis/statcast_2015_2024.parquet")
raw_df.show(5)

In [2]:
%pyspark
#Add an pk to the dataframe
df = raw_df.withColumn("id", monotonically_increasing_id())

In [3]:
%pyspark
# Remove deprecated columns

columns_to_drop = [
    "spin_rate_deprecated", "break_angle_deprecated", "break_length_deprecated",
    "tfs_deprecated", "tfs_zulu_deprecated"
]

cleaned_df = df.drop(*columns_to_drop)

cleaned_df.printSchema()

In [4]:
%pyspark
#Identify numeric columns and count NaN values

# Get all numeric columns 
numeric_cols = [f.name for f in cleaned_df.schema.fields if f.dataType.simpleString() in ["int", "bigint", "double", "float"]]

# Count NaN values in all numeric columns
nan_counts = cleaned_df.select(
    [count(when(isnan(col(c)), c)).alias(c) for c in numeric_cols]
)

nan_counts.show()


In [5]:
%pyspark

# Count total rows in DataFrame
total_rows = cleaned_df.count()

# Count NaN and NULL values for each numeric column
nan_counts = cleaned_df.select([
    count(when(isnan(col(c)) | col(c).isNull(), c)).alias(c) for c in numeric_cols
]).collect()[0].asDict()

# Identify columns with more than 50% NaN values
threshold = 0.5 * total_rows
columns_to_drop = [col_name for col_name, nan_count in nan_counts.items() if nan_count > threshold]

# Drop columns with excessive missing values
cleaned_df = cleaned_df.drop(*columns_to_drop)

# Show the dropped columns
print(f"Dropped columns: {columns_to_drop}")




In [6]:
%sql

SELECT *
FROM cleaned_df
LIMIT(10)

In [7]:
%pyspark
############################
#Identify & Remove Outliers
############################

# Filter by domain knowledge for pitch speed:

df_no_outliers = cleaned_df.filter(
    (cleaned_df["pitch_speed"] >= 40) & 
    (cleaned_df["pitch_speed"] <= 105)
)

# Filter by standard deviation:

 stats = df_no_outliers.select(
     mean(col("pitch_speed")).alias("mean_speed"),
     stddev(col("pitch_speed")).alias("std_speed")
 ).collect()[0]
 mean_speed = stats["mean_speed"]
 std_speed = stats["std_speed"]
 low_cutoff = mean_speed - 3 * std_speed
 high_cutoff = mean_speed + 3 * std_speed
 df_no_outliers = df_no_outliers.filter(
     (col("pitch_speed") >= low_cutoff) & (col("pitch_speed") <= high_cutoff)
 )

# Reduce the number of columns to lighten df:

filtered_for_iqr = df_no_outliers.select(
    "id",
    "pitch_speed",
    "release_spin_rate",
    "plate_x",
    "plate_z",
    "release_extension",
    "arm_angle",
    "api_break_x_batter_in",
    "api_break_x_arm",
    "api_break_z_with_gravity",
    "hyper_speed",
    "delta_pitcher_run_exp",
    "estimated_slg_using_speedangle",
    "swing_length",
    "bat_speed",
    "spin_axis",
    "pitch_number",
    "launch_speed_angle"
    "iso_value",
    "babip_value",
    "woba_denom",
    "woba_value",
    "estimated_woba_using_speedangle",
    "estimated_ba_using_speedangle",
    "release_pos_y",
    "release_speed",
    "release_pos_x",
    "release_pos_z",
    "spin_dir",
    "zone",
    "pfx_x",
    "pfx_z",
    "hc_x",
    "hc_y",
    "vx0",
    "vy0",
    "vz0",
    "ax",
    "ay",
    "az",
    "sz_top",
    "sz_bot",
    "release_spin_rate",
)

# Compute IQR and filter outliers for each column

for c in numeric_cols:
    quantiles = df_filtered_for_iqr.approxQuantile(c, [0.25, 0.75], 0.0)
    
    # Ensure two quantile values exist before proceeding

    if len(quantiles) == 2:
        Q1, Q3 = quantiles
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        
        # Apply filtering to remove outliers

        outliers_removed = df_filtered_for_iqr.filter((col(c) >= lower_bound) & (col(c) <= upper_bound))
    else:
        print(f"Skipping column '{c}' due to insufficient quantile data.")

# Show the cleaned DataFrame

outliers_removed.show(5)

In [8]:
%pyspark
#Rejoin dataset with outliers removed to attributes from df_cleaned
#Create temp sql views
outliers_removed.createOrReplaceTempView("removed_outliers")
cleaned_df.createOrReplaceTempView("cleaned")

ready_for_index = spark.sql("""
SELECT *
FROM removed_outliers a
LEFT JOIN cleaned b
    ON a.id = b.id
""")

ready_for_index.show()

In [9]:
%pyspark

#Index the categorical pitch_type column
pitch_type_indexer = StringIndexer(
    inputCol="pitch_type", 
    outputCol="pitch_type_index"
)

# One-hot encode the indexed pitch_type
pitch_type_encoder = OneHotEncoder(
    inputCols=["pitch_type_index"], 
    outputCols=["pitch_type_ohe"]
)

# Fit the indexer on df_filtered and transform
indexed_df = pitch_type_indexer.fit(ready_for_index).transform(ready_for_index)

# Fit the encoder on the indexed data and transform
model_data = pitch_type_encoder.fit(indexed_df).transform(indexed_df)

# --> encoded_df now contains a new vector column 'pitch_type_ohe'
#     that you can use downstream in modeling.



In [10]:
%pyspark
#Save cleaned data

indexed_df.write.mode("overwrite").parquet("model_data.parquet")
