In [0]:
pip install xgboost

Python interpreter will be restarted.
Python interpreter will be restarted.


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, sum as Fsum, isnan, when, create_map, lit
from pyspark.sql.window import Window
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from xgboost.spark import SparkXGBRegressor


In [0]:
# Initialize Spark Session
spark = SparkSession.builder.appName("PodcastListeningTimePrediction").getOrCreate()

In [0]:
ACCESS_KEY = ""
SECRET_KEY = ""
bucket_name = "podcasts5e4"

# Configure Spark for S3 access
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", ACCESS_KEY)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", SECRET_KEY)
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.amazonaws.com")

# S3 paths
train_path = f"s3a://{bucket_name}/train.csv"

# Load CSV files
train = spark.read.csv(train_path, header=True, inferSchema=True)

In [0]:
train = train.na.drop(subset=["Listening_Time_minutes"])

In [0]:
# Calculate medians by Genre
genre_median = train.groupBy("Genre").agg(
    F.expr("percentile_approx(Episode_Length_minutes, 0.5)").alias("Genre_Median")
)

# Join and impute in train
train = train.join(genre_median, on="Genre", how="left")
train = train.withColumn(
    "Episode_Length_minutes",
    F.when(
        F.col("Episode_Length_minutes").isNull(), F.col("Genre_Median")
    ).otherwise(F.col("Episode_Length_minutes"))
).drop("Genre_Median")

In [0]:
train = train.withColumn(
    "Guest_Present", F.when(F.col("Guest_Popularity_percentage").isNotNull(), 1).otherwise(0)
).withColumn(
    "Guest_Popularity_percentage", F.coalesce("Guest_Popularity_percentage", F.lit(0))
)

In [0]:
ads_mode_row = train.groupBy("Number_of_Ads") \
    .count() \
    .orderBy(F.desc("count")) \
    .first()
ads_mode = ads_mode_row["Number_of_Ads"]

train = train.withColumn(
    "Number_of_Ads",
    F.when(F.col("Number_of_Ads").isNull(), ads_mode).otherwise(F.col("Number_of_Ads"))
)

In [0]:
def print_missing_values(df, df_name="DataFrame"):
    print(f"\nMissing value check for {df_name}:")
    total_rows = df.count()
    for column in df.columns:
        missing = df.select(
            Fsum(when(col(column).isNull() | isnan(column), 1).otherwise(0)).alias("missing")
        ).collect()[0]["missing"]
        if missing > 0:
            print(f"  {column}: {missing} missing ({missing / total_rows:.2%})")
        else:
            print(f"  {column}: ✅ no missing values")

print_missing_values(train, "Train")


Missing value check for Train:
  Genre: ✅ no missing values
  id: ✅ no missing values
  Podcast_Name: ✅ no missing values
  Episode_Title: ✅ no missing values
  Episode_Length_minutes: ✅ no missing values
  Host_Popularity_percentage: ✅ no missing values
  Publication_Day: ✅ no missing values
  Publication_Time: ✅ no missing values
  Guest_Popularity_percentage: ✅ no missing values
  Number_of_Ads: ✅ no missing values
  Episode_Sentiment: ✅ no missing values
  Listening_Time_minutes: ✅ no missing values
  Guest_Present: ✅ no missing values


In [0]:
# Count frequencies and normalize
podcast_freq = train.groupBy("Podcast_Name").count()
total_train = train.count()
podcast_freq = podcast_freq.withColumn("Podcast_Name_Freq", F.col("count") / total_train).drop("count")

# Join back to get frequency feature
train = train.join(podcast_freq, on="Podcast_Name", how="left")
# Sanity check
train.select("Podcast_Name", "Podcast_Name_Freq").show(5)

+---------------+--------------------+
|   Podcast_Name|   Podcast_Name_Freq|
+---------------+--------------------+
|    Mind & Body|0.018185333333333335|
| Digital Digest| 0.02156133333333333|
|  Joke Junction|0.020098666666666667|
|  Fitness First|            0.025984|
|Mystery Matters|            0.021336|
+---------------+--------------------+
only showing top 5 rows



In [0]:
# Extract digits
train = train.withColumn("Episode_Number", F.regexp_extract("Episode_Title", r"(\d+)", 1).cast("float"))

# Fill missing with median from train
episode_number_median = train.approxQuantile("Episode_Number", [0.5], 0.01)[0]

train = train.withColumn(
    "Episode_Number", F.when(F.col("Episode_Number").isNull(), episode_number_median).otherwise(F.col("Episode_Number"))
)
# Sanity check
train.select("Episode_Title", "Episode_Number").show(5)


+-------------+--------------+
|Episode_Title|Episode_Number|
+-------------+--------------+
|   Episode 98|          98.0|
|   Episode 26|          26.0|
|   Episode 16|          16.0|
|   Episode 45|          45.0|
|   Episode 86|          86.0|
+-------------+--------------+
only showing top 5 rows



In [0]:
# Create mapping dictionaries
day_map = {'Monday': 1, 'Tuesday': 2, 'Wednesday': 3, 'Thursday': 4,
           'Friday': 5, 'Saturday': 6, 'Sunday': 7}
time_map = {'Morning': 1, 'Afternoon': 2, 'Evening': 3, 'Night': 4}
sentiment_map = {'Negative': 0, 'Neutral': 1, 'Positive': 2}

def map_column(df, colname, mapping):
    mapping_expr = create_map(*[lit(x) for kv in mapping.items() for x in kv])
    return df.withColumn(colname, mapping_expr[col(colname)])

train = map_column(train, "Publication_Day", day_map)

train = map_column(train, "Publication_Time", time_map)

train = map_column(train, "Episode_Sentiment", sentiment_map)

# Sanity check
train.select("Publication_Day", "Publication_Time", "Episode_Sentiment").show(5)


+---------------+----------------+-----------------+
|Publication_Day|Publication_Time|Episode_Sentiment|
+---------------+----------------+-----------------+
|              4|               4|                2|
|              6|               2|                0|
|              2|               3|                0|
|              1|               1|                2|
|              1|               2|                1|
+---------------+----------------+-----------------+
only showing top 5 rows



In [0]:
train = train.drop("Podcast_Name", "Episode_Title")

In [0]:
indexer = StringIndexer(inputCol="Genre", outputCol="Genre_Idx")
genre_indexer_model = indexer.fit(train)

train = genre_indexer_model.transform(train)

In [0]:
train = train.drop("Genre")

In [0]:
train.groupBy('Genre_Idx').count().show()

+---------+-----+
|Genre_Idx|count|
+---------+-----+
|      8.0|62743|
|      0.0|87606|
|      7.0|63385|
|      1.0|86256|
|      4.0|81453|
|      3.0|82461|
|      2.0|85059|
|      6.0|71416|
|      5.0|80521|
|      9.0|49100|
+---------+-----+



In [0]:
# Split into training and validation sets
train_split, val_split = train.randomSplit([0.8, 0.2], seed=42)

# Define feature columns by excluding the target
feature_cols = [col for col in train.columns if col != "Listening_Time_minutes"]

# Assemble features into a single vector
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

In [0]:
# XGBoost Regressor
xgb = SparkXGBRegressor(
    features_col="features",
    label_col="Listening_Time_minutes",
    prediction_col="prediction",
    objective='reg:squarederror',
    num_workers=2
)

pipeline = Pipeline(stages=[assembler, xgb])

# Cross-validator
paramGrid = ParamGridBuilder() \
    .addGrid(xgb.max_depth, [4, 6]) \
    .addGrid(xgb.learning_rate, [0.1, 0.2]) \
    .addGrid(xgb.n_estimators, [50, 100]) \
    .build()

evaluator = RegressionEvaluator(
    labelCol="Listening_Time_minutes",
    predictionCol="prediction",
    metricName="rmse"
)

cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3,
    parallelism=2
)

# Train and evaluate
cv_model = cv.fit(train_split)

2025-04-24 14:04:18,752 INFO XGBoost-PySpark: _fit Running xgboost-2.1.4 on 2 workers with
	booster params: {'device': 'cpu', 'learning_rate': 0.1, 'max_depth': 4, 'objective': 'reg:squarederror', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 50}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
2025-04-24 14:04:18,810 INFO XGBoost-PySpark: _fit Running xgboost-2.1.4 on 2 workers with
	booster params: {'device': 'cpu', 'learning_rate': 0.1, 'max_depth': 4, 'objective': 'reg:squarederror', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
2025-04-24 14:05:31,319 INFO XGBoost-PySpark: _fit Finished xgboost training!
2025-04-24 14:05:33,452 INFO XGBoost-PySpark: _fit Finished xgboost training!
2025-04-24 14:06:53,838 INFO XGBoost-PySpark: _fit Running xgboost-2.1.4 on 2 workers with
	booster params: {'device': 'cpu', 'learning_rate': 0.2, 'max_depth': 4, 'objective': 'r

In [0]:
val_predictions = cv_model.transform(val_split)
val_rmse = evaluator.evaluate(val_predictions)
print(f"📊 Validation RMSE: {val_rmse:.4f}")

📊 Validation RMSE: 13.0539
