# ML Data Preparation - Steam Games Dataset

## Objective
Prepare the preprocessed dataset for machine learning:
- Handle null values
- Encode categorical variables (developers, publishers, genres, categories, languages)
- Scale/normalize numerical features
- Split into train/test sets

## Dataset
- **Input:** `../archive1/games_march2025_ml_ready.csv` (from preprocessing notebook)
- **Output:** ML-ready features and train/test splits


In [17]:
import warnings
warnings.filterwarnings('ignore')

from IPython.display import display, HTML
display(HTML("<style>.container { width:90% !important; }</style>"))


In [18]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, isnan, isnull, regexp_replace, split, size, array, lit
from pyspark.sql.types import IntegerType, DoubleType, StringType
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler, MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.stat import Correlation

# Initialize Spark Session
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("MLDataPreparation") \
    .getOrCreate()

# Set log level to reduce output noise
spark.sparkContext.setLogLevel("WARN")

print("Spark session created successfully!")
print(f"Spark version: {spark.version}")


Spark session created successfully!
Spark version: 4.1.0


## Step 1: Load Preprocessed Data


In [None]:
# Read preprocessed CSV file
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("escape", '"') \
    .option("multiLine", "true") \
    .option("quote", '"') \
    .csv("../archive1/games_march2025_ml_ready.csv")

print(f"Dataset loaded:")
print(f"Total number of games: {df.count():,}")
print(f"Number of columns: {len(df.columns)}")
print("\nSchema:")
df.printSchema()

# Show sample
print("\nSample data:")
df.show(5, truncate=50)


Dataset loaded:
Total number of games: 11,889
Number of columns: 31

Schema:
root
 |-- appid: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- release_date: date (nullable = true)
 |-- required_age: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- dlc_count: integer (nullable = true)
 |-- windows: integer (nullable = true)
 |-- mac: integer (nullable = true)
 |-- linux: integer (nullable = true)
 |-- achievements: integer (nullable = true)
 |-- recommendations: integer (nullable = true)
 |-- supported_languages: string (nullable = true)
 |-- full_audio_languages: string (nullable = true)
 |-- developers: string (nullable = true)
 |-- publishers: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- positive: integer (nullable = true)
 |-- negative: integer (nullable = true)
 |-- estimated_owners: string (nullable = true)
 |-- average_playtime_forever: integer (nullable = true)
 |-- average_play

## Step 2: Handle Null Values

Fill or remove null values in the dataset.


In [20]:
# Check null values before handling
print("="*80)
print("NULL VALUES BEFORE HANDLING")
print("="*80)
total_rows = df.count()
null_summary = []
for col_name in df.columns:
    null_count = df.filter(col(col_name).isNull()).count()
    null_pct = (null_count / total_rows * 100) if total_rows > 0 else 0
    if null_count > 0:
        null_summary.append((col_name, null_count, null_pct))

if null_summary:
    print(f"{'Column':<30} {'Null Count':<15} {'Null %':<15}")
    print("-"*80)
    for col_name, null_count, null_pct in sorted(null_summary, key=lambda x: x[1], reverse=True):
        print(f"{col_name:<30} {null_count:<15,} {null_pct:<15.2f}%")
else:
    print("No null values found!")


NULL VALUES BEFORE HANDLING
No null values found!


In [21]:
# Handle null values
# Strategy:
# - Numerical columns: Fill with 0 or median
# - Categorical columns: Fill with "Unknown" or empty string
# - Date columns: Keep as null or fill with a default date

df_cleaned = df

# Fill numerical nulls with 0
numerical_cols_to_fill = [
    'dlc_count', 'required_age', 'achievements', 'recommendations',
    'positive', 'negative', 'average_playtime_forever', 'average_playtime_2weeks',
    'median_playtime_forever', 'median_playtime_2weeks', 'discount', 'peak_ccu',
    'pct_pos_total', 'num_reviews_total', 'pct_pos_recent', 'num_reviews_recent'
]

for col_name in numerical_cols_to_fill:
    if col_name in df_cleaned.columns:
        df_cleaned = df_cleaned.withColumn(
            col_name,
            when(col(col_name).isNull(), 0).otherwise(col(col_name))
        )

# Fill price nulls with 0 (free games)
if 'price' in df_cleaned.columns:
    df_cleaned = df_cleaned.withColumn(
        'price',
        when(col('price').isNull(), 0.0).otherwise(col('price'))
    )

# Fill categorical nulls with "Unknown"
categorical_cols_to_fill = [
    'developers', 'publishers', 'categories', 'genres',
    'supported_languages', 'full_audio_languages', 'tags', 'estimated_owners'
]

for col_name in categorical_cols_to_fill:
    if col_name in df_cleaned.columns:
        df_cleaned = df_cleaned.withColumn(
            col_name,
            when(col(col_name).isNull(), "Unknown").otherwise(col(col_name))
        )

# Fill name nulls
if 'name' in df_cleaned.columns:
    df_cleaned = df_cleaned.withColumn(
        'name',
        when(col('name').isNull(), "Unknown Game").otherwise(col('name'))
    )

print("Null values handled!")


Null values handled!


In [22]:
# Verify null values after handling
print("\n" + "="*80)
print("NULL VALUES AFTER HANDLING")
print("="*80)
total_rows = df_cleaned.count()
null_summary_after = []
for col_name in df_cleaned.columns:
    null_count = df_cleaned.filter(col(col_name).isNull()).count()
    if null_count > 0:
        null_summary_after.append((col_name, null_count))

if null_summary_after:
    print(f"{'Column':<30} {'Null Count':<15}")
    print("-"*80)
    for col_name, null_count in null_summary_after:
        print(f"{col_name:<30} {null_count:<15,}")
else:
    print("All null values have been handled!")



NULL VALUES AFTER HANDLING
All null values have been handled!


## Step 3: Encode Categorical Variables

Convert categorical variables (developers, publishers, genres, categories) to numerical format using StringIndexer.


In [23]:
# Parse list-like strings and extract first/main value for encoding
# For example: "['Valve']" -> "Valve", "['Publisher1', 'Publisher2']" -> "Publisher1"

def extract_first_value(value_str):
    """Extract first value from string representation of list"""
    if value_str is None or value_str == "Unknown" or value_str == "":
        return "Unknown"
    try:
        # Remove brackets and quotes, get first value
        value_str = str(value_str).strip()
        if value_str.startswith('[') and value_str.endswith(']'):
            value_str = value_str[1:-1]
        # Split by comma and get first
        values = [v.strip().strip("'\"") for v in value_str.split(',') if v.strip()]
        return values[0] if values else "Unknown"
    except:
        return "Unknown"

# Apply extraction to categorical columns
from pyspark.sql.udf import UserDefinedFunction
from pyspark.sql.types import StringType

extract_first_udf = UserDefinedFunction(extract_first_value, StringType())

categorical_cols = ['developers', 'publishers', 'genres', 'categories']
df_encoded = df_cleaned

for col_name in categorical_cols:
    if col_name in df_encoded.columns:
        df_encoded = df_encoded.withColumn(
            f"{col_name}_main",
            extract_first_udf(col(col_name))
        )

print("Categorical columns extracted:")
for col_name in categorical_cols:
    if f"{col_name}_main" in df_encoded.columns:
        print(f"  - {col_name} -> {col_name}_main")
        df_encoded.select(col_name, f"{col_name}_main").show(5, truncate=False)


Categorical columns extracted:
  - developers -> developers_main
+--------------------+----------------+
|developers          |developers_main |
+--------------------+----------------+
|['Valve']           |Valve           |
|['PUBG Corporation']|PUBG Corporation|
|['Valve']           |Valve           |
|['Rockstar North']  |Rockstar North  |
|['Ubisoft Montreal']|Ubisoft Montreal|
+--------------------+----------------+
only showing top 5 rows
  - publishers -> publishers_main
+------------------+---------------+
|publishers        |publishers_main|
+------------------+---------------+
|['Valve']         |Valve          |
|['KRAFTON, Inc.'] |KRAFTON        |
|['Valve']         |Valve          |
|['Rockstar Games']|Rockstar Games |
|['Ubisoft']       |Ubisoft        |
+------------------+---------------+
only showing top 5 rows
  - genres -> genres_main
+----------------------------------------------------------------+-----------+
|genres                                                

In [24]:
# Use StringIndexer to encode categorical variables
# StringIndexer converts string categories to numerical indices

indexers = []
indexed_cols = []

categorical_cols_to_index = ['developers_main', 'publishers_main', 'genres_main', 'categories_main']

for col_name in categorical_cols_to_index:
    if col_name in df_encoded.columns:
        indexer = StringIndexer(
            inputCol=col_name,
            outputCol=f"{col_name}_indexed",
            handleInvalid="keep"  # Keep unknown values as a separate category
        )
        indexers.append(indexer)
        indexed_cols.append(f"{col_name}_indexed")

# Fit and transform
print("Fitting StringIndexers...")
for indexer in indexers:
    df_encoded = indexer.fit(df_encoded).transform(df_encoded)

print(f"\nEncoded {len(indexed_cols)} categorical columns:")
for col_name in indexed_cols:
    print(f"  - {col_name}")

# Show sample of encoded values
print("\nSample encoded values:")
sample_cols = ['developers_main', 'developers_main_indexed', 'publishers_main', 'publishers_main_indexed']
existing_sample_cols = [c for c in sample_cols if c in df_encoded.columns]
if existing_sample_cols:
    df_encoded.select(existing_sample_cols).show(10, truncate=False)


Fitting StringIndexers...

Encoded 4 categorical columns:
  - developers_main_indexed
  - publishers_main_indexed
  - genres_main_indexed
  - categories_main_indexed

Sample encoded values:
+-----------------+-----------------------+-----------------+-----------------------+
|developers_main  |developers_main_indexed|publishers_main  |publishers_main_indexed|
+-----------------+-----------------------+-----------------+-----------------------+
|Valve            |24.0                   |Valve            |81.0                   |
|PUBG Corporation |5246.0                 |KRAFTON          |337.0                  |
|Valve            |24.0                   |Valve            |81.0                   |
|Rockstar North   |700.0                  |Rockstar Games   |115.0                  |
|Ubisoft Montreal |19.0                   |Ubisoft          |5.0                    |
|Valve            |24.0                   |Valve            |81.0                   |
|Re-Logic         |5596.0           

Traceback (most recent call last):
  File "/home/ismail/pyspark_env/lib64/python3.14/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 233, in manager
    code = worker(sock, authenticated)
  File "/home/ismail/pyspark_env/lib64/python3.14/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 87, in worker
    outfile.flush()
    ~~~~~~~~~~~~~^^
BrokenPipeError: [Errno 32] Broken pipe


## Step 4: Prepare Numerical Features

Select and prepare numerical features for ML models.


In [25]:
# Select numerical features for ML
numerical_features = [
    'price', 'dlc_count', 'required_age',
    'achievements', 'recommendations', 'positive', 'negative',
    'average_playtime_forever', 'median_playtime_forever',
    'discount', 'peak_ccu', 'pct_pos_total', 'num_reviews_total',
    'pct_pos_recent', 'num_reviews_recent',
    'windows', 'mac', 'linux'
]

# Add indexed categorical features
all_features = numerical_features + indexed_cols

# Filter to only existing columns
existing_features = [col for col in all_features if col in df_encoded.columns]

print(f"Selected {len(existing_features)} features for ML:")
for i, feat in enumerate(existing_features, 1):
    print(f"{i:2d}. {feat}")

# Check for any nulls in feature columns
print("\nChecking for nulls in feature columns:")
for feat in existing_features:
    null_count = df_encoded.filter(col(feat).isNull()).count()
    if null_count > 0:
        print(f"  {feat}: {null_count} nulls")
    else:
        print(f"  {feat}: No nulls")


Selected 22 features for ML:
 1. price
 2. dlc_count
 3. required_age
 4. achievements
 5. recommendations
 6. positive
 7. negative
 8. average_playtime_forever
 9. median_playtime_forever
10. discount
11. peak_ccu
12. pct_pos_total
13. num_reviews_total
14. pct_pos_recent
15. num_reviews_recent
16. windows
17. mac
18. linux
19. developers_main_indexed
20. publishers_main_indexed
21. genres_main_indexed
22. categories_main_indexed

Checking for nulls in feature columns:
  price: No nulls
  dlc_count: No nulls
  required_age: No nulls
  achievements: No nulls
  recommendations: No nulls
  positive: No nulls
  negative: No nulls
  average_playtime_forever: No nulls
  median_playtime_forever: No nulls
  discount: No nulls
  peak_ccu: No nulls
  pct_pos_total: No nulls
  num_reviews_total: No nulls
  pct_pos_recent: No nulls
  num_reviews_recent: No nulls
  windows: No nulls
  mac: No nulls
  linux: No nulls
  developers_main_indexed: No nulls
  publishers_main_indexed: No nulls
  genres_

## Step 5: Scale/Normalize Numerical Features

Apply StandardScaler or MinMaxScaler to normalize numerical features.


In [26]:
# Separate numerical and categorical features
numerical_features_clean = [f for f in numerical_features if f in df_encoded.columns]
categorical_features_clean = [f for f in indexed_cols if f in df_encoded.columns]

print(f"Numerical features to scale: {len(numerical_features_clean)}")
print(f"Categorical features (already indexed): {len(categorical_features_clean)}")

# Create feature vector using VectorAssembler
# First assemble numerical features
assembler_numerical = VectorAssembler(
    inputCols=numerical_features_clean,
    outputCol="numerical_features",
    handleInvalid="skip"
)

df_features = assembler_numerical.transform(df_encoded)

# Scale numerical features using StandardScaler
scaler = StandardScaler(
    inputCol="numerical_features",
    outputCol="scaled_numerical_features",
    withStd=True,
    withMean=True
)

scaler_model = scaler.fit(df_features)
df_features = scaler_model.transform(df_features)

print("\nNumerical features scaled using StandardScaler")

# Show sample
print("\nSample of scaled features:")
df_features.select("numerical_features", "scaled_numerical_features").show(3, truncate=False)


Numerical features to scale: 18
Categorical features (already indexed): 4

Numerical features scaled using StandardScaler

Sample of scaled features:
+--------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|numerical_features                                                                                                  |scaled_numerical_features                                                                                                                                                                                                                                                         

In [27]:
# Quick comparison: Original vs Scaled Features
print("="*80)
print("SCALING COMPARISON: Original vs Scaled Features")
print("="*80)

# Show a few example features before and after scaling
from pyspark.ml.functions import vector_to_array
from pyspark.sql.functions import col as spark_col

# Extract scaled values from vector
df_scaled_check = df_features.withColumn(
    "scaled_array", 
    vector_to_array("scaled_numerical_features")
)

# Show comparison for first 3 numerical features
print("\nSIDE-BY-SIDE COMPARISON (First 3 Features):")
print("="*80)

for i in range(min(3, len(numerical_features_clean))):
    feature_name = numerical_features_clean[i]
    print(f"\n{feature_name}:")
    print("-" * 60)
    
    # Original values
    original_vals = df_encoded.select(feature_name).limit(5).collect()
    print("  Original values (first 5):")
    for row in original_vals:
        val = row[feature_name]
        print(f"    {val}")
    
    # Scaled values
    scaled_col = spark_col("scaled_array")[i].alias(f"{feature_name}_scaled")
    scaled_vals = df_scaled_check.select(scaled_col).limit(5).collect()
    print("  Scaled values (first 5):")
    for row in scaled_vals:
        val = row[f"{feature_name}_scaled"]
        print(f"    {val:.4f}")

# Show statistics
print("\nSTATISTICS COMPARISON:")
print("="*80)
print("\nBEFORE SCALING - Sample features statistics:")
df_encoded.select(numerical_features_clean[:3]).describe().show()

print("\nAFTER SCALING - Mean and Std should be ~0 and ~1:")
# Check mean and std of first scaled feature
from pyspark.sql.functions import mean as spark_mean, stddev as spark_stddev
scaled_stats = df_scaled_check.select(
    spark_mean(spark_col("scaled_array")[0]).alias("mean"),
    spark_stddev(spark_col("scaled_array")[0]).alias("std")
).collect()

if scaled_stats:
    print(f"  First feature after scaling:")
    print(f"    Mean: {scaled_stats[0].mean:.6f} (should be ~0)")
    print(f"    Std:  {scaled_stats[0].std:.6f} (should be ~1)")

print("\nScaling verification complete!")


SCALING COMPARISON: Original vs Scaled Features

SIDE-BY-SIDE COMPARISON (First 3 Features):

price:
------------------------------------------------------------
  Original values (first 5):
    0.0
    0.0
    0.0
    0.0
    3.99
  Scaled values (first 5):
    -0.9618
    -0.9618
    -0.9618
    -0.9618
    -0.6555

dlc_count:
------------------------------------------------------------
  Original values (first 5):
    1
    0
    2
    0
    9
  Scaled values (first 5):
    -0.0441
    -0.0692
    -0.0191
    -0.0692
    0.1562

required_age:
------------------------------------------------------------
  Original values (first 5):
    0
    0
    0
    17
    17
  Scaled values (first 5):
    -0.2393
    -0.2393
    -0.2393
    4.2973
    4.2973

STATISTICS COMPARISON:

BEFORE SCALING - Sample features statistics:
+-------+------------------+------------------+------------------+
|summary|             price|         dlc_count|      required_age|
+-------+------------------+---------

In [28]:
# Combine scaled numerical features with categorical features
# Create final feature vector
final_feature_cols = ["scaled_numerical_features"] + categorical_features_clean

assembler_final = VectorAssembler(
    inputCols=final_feature_cols,
    outputCol="features",
    handleInvalid="skip"
)

df_ml_ready = assembler_final.transform(df_features)

print("Final feature vector created!")
print(f"Feature vector size: {len(numerical_features_clean)} scaled numerical + {len(categorical_features_clean)} categorical")

# Show sample
print("\nSample of final features:")
df_ml_ready.select("features").show(3, truncate=False)


Final feature vector created!
Feature vector size: 18 scaled numerical + 4 categorical

Sample of final features:
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                                                                                                                                                                                                                                                                                                                               |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------

Traceback (most recent call last):
  File "/home/ismail/pyspark_env/lib64/python3.14/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 233, in manager
    code = worker(sock, authenticated)
  File "/home/ismail/pyspark_env/lib64/python3.14/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 87, in worker
    outfile.flush()
    ~~~~~~~~~~~~~^^
BrokenPipeError: [Errno 32] Broken pipe


## Step 6: Split into Train/Test Sets

Split the dataset into training and testing sets (typically 80/20 or 70/30).


In [13]:
# Split into train and test sets
# 80% training, 20% testing
train_df, test_df = df_ml_ready.randomSplit([0.8, 0.2], seed=42)

print("="*80)
print("DATASET SPLIT")
print("="*80)
print(f"Training set: {train_df.count():,} samples ({train_df.count()/df_ml_ready.count()*100:.1f}%)")
print(f"Test set: {test_df.count():,} samples ({test_df.count()/df_ml_ready.count()*100:.1f}%)")
print(f"Total: {df_ml_ready.count():,} samples")

# Cache the datasets for faster access during ML training
train_df.cache()
test_df.cache()

print("\n Datasets cached for faster ML operations")


DATASET SPLIT
Training set: 9,576 samples (80.5%)
Test set: 2,313 samples (19.5%)
Total: 11,889 samples

 Datasets cached for faster ML operations


## Step 7: Save ML-Ready Data

Save the prepared datasets for ML model training.


In [None]:
# Select key columns for saving (including features)
columns_to_save = ['appid', 'name'] + existing_features + ['features']

# Filter to existing columns
columns_to_save = [c for c in columns_to_save if c in df_ml_ready.columns]

# Save training set
print("Saving training set...")
train_output = "../archive1/train_ml_ready.parquet"
train_df.select(columns_to_save).write.mode("overwrite").parquet(train_output)
print(f"Training set saved: {train_output}")

# Save test set
print("Saving test set...")
test_output = "../archive1/test_ml_ready.parquet"
test_df.select(columns_to_save).write.mode("overwrite").parquet(test_output)
print(f"Test set saved: {test_output}")

print("\n" + "="*80)
print("ML DATA PREPARATION COMPLETE!")
print("="*80)
print(f"\nTraining set: {train_output}")
print(f"Test set: {test_output}")
print(f"\nFeatures prepared: {len(existing_features)}")
print(f"  - Numerical (scaled): {len(numerical_features_clean)}")
print(f"  - Categorical (indexed): {len(categorical_features_clean)}")
print(f"\nReady for ML model training!")


Saving training set...


25/12/25 18:12:23 WARN DAGScheduler: Broadcasting large task binary with size 1395.8 KiB
25/12/25 18:12:24 WARN DAGScheduler: Broadcasting large task binary with size 1395.8 KiB


Training set saved: archive1/train_ml_ready.parquet
Saving test set...
Test set saved: archive1/test_ml_ready.parquet

ML DATA PREPARATION COMPLETE!

Training set: archive1/train_ml_ready.parquet
Test set: archive1/test_ml_ready.parquet

Features prepared: 22
  - Numerical (scaled): 18
  - Categorical (indexed): 4

Ready for ML model training!


## Summary

### ML Preparation Steps Completed:

1. **Handled null values:**
   - Numerical columns: Filled with 0
   - Categorical columns: Filled with "Unknown"
   - Price: Filled with 0.0 (free games)

2. **Encoded categorical variables:**
   - Extracted main value from list-like strings (developers, publishers, genres, categories)
   - Used StringIndexer to convert to numerical indices
   - Handled unknown values

3. **Scaled numerical features:**
   - Used StandardScaler (mean=0, std=1)
   - Normalized all numerical features

4. **Created feature vectors:**
   - Combined scaled numerical features with indexed categorical features
   - Created final "features" column ready for ML

5. **Split into train/test:**
   - 80% training set
   - 20% test set
   - Both cached for faster access

6. **Saved ML-ready data:**
   - Training set: `../archive1/train_ml_ready.parquet`
   - Test set: `../archive1/test_ml_ready.parquet`

### Next Steps:
- Load the parquet files for ML model training
- Use the "features" column as input for ML algorithms
- Choose appropriate ML algorithms (classification, regression, clustering, etc.)


In [29]:
# Clean up
spark.stop()
print("Spark session stopped.")


Spark session stopped.
