# Model Training #

Based on the comparison results, we choose to use Random Forest model to train our model.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml import Pipeline


# spark = SparkSession.builder.appName("ModelTraining").getOrCreate()

spark = SparkSession.builder \
                    .config("spark.driver.memory", "8g") \
                    .config("spark.executor.memory", "8g") \
                    .config("spark.executor.instances", "4") \
                    .config("spark.network.timeout", "600s") \
                    .config("spark.executor.heartbeatInterval", "60s") \
                    .config("spark.sql.shuffle.partitions", "200") \
                    .appName("ModelTraining") \
                    .getOrCreate()

# Load CSV dataset
raw_df = spark.read.csv("data/filtered_raw_data.csv", header=True, inferSchema=True)
label_df = spark.read.csv("data/label_table.csv", header=True, inferSchema=True)

# Compute magnitudes
df = raw_df.withColumn("accel_mag", F.sqrt(F.col("acceleration_x")**2 +
                                                F.col("acceleration_y")**2 +
                                                F.col("acceleration_z")**2)) \
                .withColumn("gyro_mag", F.sqrt(F.col("gyro_x")**2 +
                                               F.col("gyro_y")**2 +
                                               F.col("gyro_z")**2))

print(f"The raw data contains {df.count()} records.")

# Aggregate per interval
aggregated_df = df.groupBy("bookingid").agg(
    F.mean("speed").alias("avg_speed"),
    F.stddev("speed").alias("std_speed"),
    
    F.mean("accel_mag").alias("avg_accel_mag"),
    F.max("accel_mag").alias("max_accel_mag"),
    F.stddev("accel_mag").alias("std_accel_mag"),
    
    F.mean("gyro_mag").alias("avg_gyro_mag"),
    F.stddev("gyro_mag").alias("std_gyro_mag"),
    
    F.mean("acceleration_x").alias("avg_accel_x"),
    F.stddev("acceleration_x").alias("std_accel_x"),
    F.max("acceleration_x").alias("max_accel_x"),
    
    F.mean("acceleration_y").alias("avg_accel_y"),
    F.stddev("acceleration_y").alias("std_accel_y"),
    F.max("acceleration_y").alias("max_accel_y"),
    
    F.mean("acceleration_z").alias("avg_accel_z"),
    F.stddev("acceleration_z").alias("std_accel_z"),
    F.max("acceleration_z").alias("max_accel_z"),
    
    F.mean("gyro_x").alias("avg_gyro_x"),
    F.stddev("gyro_x").alias("std_gyro_x"),
    
    F.mean("gyro_y").alias("avg_gyro_y"),
    F.stddev("gyro_y").alias("std_gyro_y"),
    
    F.mean("gyro_z").alias("avg_gyro_z"),
    F.stddev("gyro_z").alias("std_gyro_z"),
    
    F.mean("accuracy").alias("avg_accuracy"),
    F.stddev("accuracy").alias("std_accuracy"),
    
    F.max("second").alias("second"),
)

labeled_df = aggregated_df.join(label_df, "bookingid", "left")

print(f"The labeled data contains {labeled_df.count()} records.")

df = labeled_df.fillna(0.0)

The raw data contains 1613554 records.
The labeled data contains 20000 records.


In [2]:
# Selected features
feature_cols = [
    "std_gyro_z",
    "std_accel_y",
    "std_accel_z",
    "max_accel_x",
    "avg_speed",
    "std_accel_x",
    "max_accel_mag",
    "std_speed",
    "std_gyro_mag",
    "std_gyro_x",
    "max_accel_z",
    "avg_gyro_mag",
    "std_accel_mag",
    "second",
]

In [3]:
# Create vector assembler to transform the features into a vector column
assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

# Split the data into training and test sets (80% training, 20% testing)
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

print(f"Training set count: {train_df.count()}")
print(f"Test set count: {test_df.count()}")

Training set count: 16047
Test set count: 3953


In [4]:
# Create the RandomForestClassifier
rf = RandomForestClassifier(
    labelCol="label",
    featuresCol="features",
    numTrees=50,
    maxDepth=8,
    seed=42
)

# Create the pipeline
pipeline = Pipeline(stages=[assembler, rf])

# Train the model
model = pipeline.fit(train_df)

# Make predictions on the test data
predictions = model.transform(test_df)

In [5]:
# Evaluate the model
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", 
    predictionCol="prediction", 
    metricName="accuracy"
)
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

Accuracy: 0.7773842651151025


In [6]:
# Display feature importances
rf_model = model.stages[-1]
feature_importance = rf_model.featureImportances
feature_importance_list = [(feature, float(importance)) for feature, importance in zip(feature_cols, feature_importance)]
sorted_feature_importance = sorted(feature_importance_list, key=lambda x: x[1], reverse=True)
print("Feature Importances:")
for feature, importance in sorted_feature_importance:
    print(f"{feature}: {importance}")

Feature Importances:
second: 0.42133536055145576
avg_speed: 0.09553662462937278
std_accel_y: 0.05134347397656492
max_accel_mag: 0.050333961732172014
std_speed: 0.048158490088526795
std_accel_z: 0.045270011983094306
avg_gyro_mag: 0.04198473287920141
std_accel_mag: 0.04162121780978348
max_accel_x: 0.040539157384469636
std_gyro_z: 0.04007635607373611
std_accel_x: 0.0362881291112121
max_accel_z: 0.031196803243405853
std_gyro_mag: 0.02966809726775355
std_gyro_x: 0.026647583269251155
