**Big Data Analysis Coursework 2**

**Harth Motion Sensor Data**

**Author: Sandor Kanda**

**Source: **

In [1]:
import importlib.util
import datetime

# Function to check if a module is available
def is_module_available(module_name):
    return importlib.util.find_spec(module_name) is not None

# Start time
start_time = datetime.datetime.now()
print(f"Start Time: {start_time}")

# Check if 'pyspark' is installed
if not is_module_available('pyspark'):
    !pip install pyspark
else:
    print("pyspark is already installed.")

# End and Duration Calculation
end_time = datetime.datetime.now()
duration = end_time - start_time
minutes, seconds = divmod(duration.total_seconds(), 60)
print(f"Time Spent: {int(minutes)} minutes, {int(seconds)} seconds")

Start Time: 2024-05-07 20:24:26.813276
pyspark is already installed.
Time Spent: 0 minutes, 0 seconds


In [2]:
# Import all necessary libraries
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, PCA
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.functions import col, count, mean, stddev, min, max, when
from pyspark.ml.classification import RandomForestClassifier, DecisionTreeClassifier, LogisticRegression, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.stat import Correlation

import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd

In [3]:
# 

In [4]:
# Create a new SparkSession (which will also create a new SparkContext)

# Start time
start_time = datetime.datetime.now()
print(f"Start Time: {start_time}")

spark = SparkSession.builder \
    .appName("harth") \
    .getOrCreate()

# Access the SparkContext from the new SparkSession
sc = spark.sparkContext

# End and Duration Calculation
end_time = datetime.datetime.now()
duration = end_time - start_time
minutes, seconds = divmod(duration.total_seconds(), 60)
print(f"Time Spent: {int(minutes)} minutes, {int(seconds)} seconds")

Start Time: 2024-05-07 20:24:28.269813
Time Spent: 0 minutes, 32 seconds


In [5]:
# spark.stop()

In [None]:
# Start time
start_time = datetime.datetime.now()
print(f"Start Time: {start_time}")

# Path to the folder containing CSV files on HDFS
folder_path = 'hdfs:///user/skand001/harth_2/'
# folder_path = './harth_2/'

# Read all CSV files from the directory in HDFS using wildcard
combined_df = spark.read.csv(folder_path + '*.csv', header=True, inferSchema=True)

# End and Duration Calculations
end_time = datetime.datetime.now()
duration = end_time - start_time
minutes, seconds = divmod(duration.total_seconds(), 60)
print(f"Time Spent: {int(minutes)} minutes, {int(seconds)} seconds")

Start Time: 2024-05-07 20:25:01.346754


In [None]:
sampled_df = combined_df.sample(withReplacement=False, fraction=0.2, seed=42)
spark_data = combined_df
spark_data.show()

# EDA

In [None]:
# Start time
start_time = datetime.datetime.now()
print(f"Start Time: {start_time}")

# Univariate Analysis
spark_data.describe().show()

# Bivariate Analysis
input_cols = ["back_x", "back_y", "back_z", "thigh_x", "thigh_y", "thigh_z"]
assembler = VectorAssembler(inputCols=input_cols, outputCol="features")
feature_df = assembler.transform(spark_data)

# Compute the correlation matrix
corr_matrix = Correlation.corr(feature_df, "features").head()[0]
corr_matrix_array = corr_matrix.toArray()
corr_df = spark.createDataFrame(
    [(col,) + tuple(float(x) for x in corr_matrix_array[i]) for i, col in enumerate(input_cols)],
    ["Column"] + input_cols
)

print("Correlation Matrix:")
corr_df.show(truncate=False)

# Dimensionality Reduction
pca = PCA(k=2, inputCol="features", outputCol="pca_features")
pca_model = pca.fit(feature_df)
pca_df = pca_model.transform(feature_df)
pca_df.show()

# End and Duration Calculation
end_time = datetime.datetime.now()
duration = end_time - start_time
minutes, seconds = divmod(duration.total_seconds(), 60)
print(f"Time Spent: {int(minutes)} minutes, {int(seconds)} seconds")

In [None]:
# Convert the sampled Spark DataFrame to a Pandas DataFrame for visualization
# Start time
start_time = datetime.datetime.now()
print(f"Start Time: {start_time}")

sampled_data = sampled_df.toPandas()
# A scatter matrix of the sampled data
sns.pairplot(sampled_data, vars=["back_x", "back_y", "back_z", "thigh_x", "thigh_y", "thigh_z"], hue="label")
plt.show()

# End and Duration Calculation
end_time = datetime.datetime.now()
duration = end_time - start_time
minutes, seconds = divmod(duration.total_seconds(), 60)
print(f"Time Spent: {int(minutes)} minutes, {int(seconds)} seconds")

In [None]:
# Calculate correlations
corr = sampled_data[["back_x", "back_y", "back_z", "thigh_x", "thigh_y", "thigh_z"]].corr()
plt.figure(figsize=(10, 8))
sns.heatmap(corr, annot=True, fmt=".2f", cmap="coolwarm", cbar=True)
plt.title("Correlation Matrix")
plt.show()

# Data Pre-Processing

In [None]:
# Define input columns and the assembler
input_cols = ['back_x', 'back_y', 'back_z', 'thigh_x', 'thigh_y', 'thigh_z']
assembler = VectorAssembler(inputCols=input_cols, outputCol="features")
assembled_data = assembler.transform(combined_df)

In [None]:
# Apply StringIndexer to create indexed labels
indexer = StringIndexer(inputCol="label", outputCol="indexedLabel")
indexed_data = indexer.fit(assembled_data).transform(assembled_data)

In [None]:
# Initialize the classifiers

# Start time
start_time = datetime.datetime.now()
print(f"Start Time: {start_time}")

rf_classifier = RandomForestClassifier(labelCol="indexedLabel", 
                                       featuresCol="features", 
                                       numTrees=10)

dt_classifier = DecisionTreeClassifier(labelCol="indexedLabel", 
                                       featuresCol="features")

lr_classifier = LogisticRegression(labelCol="label", 
                                   featuresCol="features")

gbt_classifier = GBTClassifier(labelCol="label", 
                               featuresCol="features", 
                               maxIter=10)

# End and Duration Calculation
end_time = datetime.datetime.now()
duration = end_time - start_time
minutes, seconds = divmod(duration.total_seconds(), 60)
print(f"Time Spent: {int(minutes)} minutes, {int(seconds)} seconds")

In [None]:
# Split the data into train, validation, and test sets
train_data, val_test_data = indexed_data.randomSplit([0.7, 0.3], seed=42)
val_data, test_data = val_test_data.randomSplit([0.5, 0.5], seed=42)

In [None]:
# Count the distinct number of labels
distinct_label_count = train_data.select("indexedLabel").distinct().count()
print("Total number of distinct labels:", distinct_label_count)

In [None]:
all_labels = train_data.select("indexedLabel").distinct().collect()
print("All distinct labels:", [row.indexedLabel for row in all_labels])

In [None]:
train_data.groupBy("indexedLabel").count().orderBy("count", ascending=False).show()

# Model Building

In [None]:
# Define the classifiers
from pyspark.ml.classification import MultilayerPerceptronClassifier

# Start time
start_time = datetime.datetime.now()
print(f"Start Time: {start_time}")

rf_classifier = RandomForestClassifier(labelCol="indexedLabel", 
                                       featuresCol="features", 
                                       numTrees=10)

dt_classifier = DecisionTreeClassifier(labelCol="indexedLabel", 
                                       featuresCol="features")

lr_classifier = LogisticRegression(labelCol="indexedLabel", 
                                   featuresCol="features")

layers = [len(input_cols), 128, 64, 11]
mlp = MultilayerPerceptronClassifier(layers=layers, 
                                     labelCol="indexedLabel", 
                                     featuresCol="features")

# End and Duration Calculation
end_time = datetime.datetime.now()
duration = end_time - start_time
minutes, seconds = divmod(duration.total_seconds(), 60)
print(f"Time Spent: {int(minutes)} minutes, {int(seconds)} seconds")

In [None]:
# Create pipelines
pipeline_rf = Pipeline(stages=[rf_classifier])
pipeline_dt = Pipeline(stages=[dt_classifier])
pipeline_lr = Pipeline(stages=[lr_classifier])
pipeline_mlp = Pipeline(stages=[mlp])

In [None]:
# Start time
start_time = datetime.datetime.now()
print(f"Start Time: {start_time}")

# Fit the models
rf_model = pipeline_rf.fit(train_data)
dt_model = pipeline_dt.fit(train_data)
lr_model = pipeline_lr.fit(train_data)
mlp_model = pipeline_mlp.fit(train_data)


mlp = MultilayerPerceptronClassifier(layers=layers, labelCol="indexedLabel", featuresCol="features")
pipeline_mlp = Pipeline(stages=[mlp])
mlp_model = pipeline_mlp.fit(train_data)

# Cache the train data to speed up model fitting if still needed
train_data.cache()

# End and Duration Calculation
end_time = datetime.datetime.now()
duration = end_time - start_time
minutes, seconds = divmod(duration.total_seconds(), 60)
print(f"Time Spent: {int(minutes)} minutes, {int(seconds)} seconds")

In [None]:
# Start time
from tqdm import tqdm
start_time = datetime.datetime.now()
print(f"Start Time: {start_time}")

evaluator_accuracy = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="f1")
evaluator_precision = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="weightedPrecision")
evaluator_recall = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="weightedRecall")

# Evaluate the models on the training set
rf_train_metrics = {}
dt_train_metrics = {}
lr_train_metrics = {}
mlp_train_metrics = {}

print("Evaluating models on the training set...")
for metric, evaluator in tqdm([("Accuracy", evaluator_accuracy), ("F1-score", evaluator_f1), ("Precision", evaluator_precision), ("Recall", evaluator_recall)]):
    rf_train_metrics[metric] = evaluator.evaluate(rf_model.transform(train_data))
    dt_train_metrics[metric] = evaluator.evaluate(dt_model.transform(train_data))
    lr_train_metrics[metric] = evaluator.evaluate(lr_model.transform(train_data))
    mlp_train_metrics[metric] = evaluator.evaluate(mlp_model.transform(train_data))

# Evaluate the models on the validation set
rf_val_metrics = {}
dt_val_metrics = {}
lr_val_metrics = {}
mlp_val_metrics = {}

print("Evaluating models on the validation set...")
for metric, evaluator in tqdm([("Accuracy", evaluator_accuracy), ("F1-score", evaluator_f1), ("Precision", evaluator_precision), ("Recall", evaluator_recall)]):
    rf_val_metrics[metric] = evaluator.evaluate(rf_model.transform(val_data))
    dt_val_metrics[metric] = evaluator.evaluate(dt_model.transform(val_data))
    lr_val_metrics[metric] = evaluator.evaluate(lr_model.transform(val_data))
    mlp_val_metrics[metric] = evaluator.evaluate(mlp_model.transform(val_data))

# End and Duration Calculation
end_time = datetime.datetime.now()
duration = end_time - start_time
minutes, seconds = divmod(duration.total_seconds(), 60)
print(f"Time Spent: {int(minutes)} minutes, {int(seconds)} seconds")

In [None]:
import datetime

# Start time
start_time = datetime.datetime.now()
print(f"Start Time: {start_time}")

model_history = {
    "Random Forest": {
        "Training": rf_train_metrics,
        "Validation": rf_val_metrics
    },
    "Decision Tree": {
        "Training": dt_train_metrics,
        "Validation": dt_val_metrics
    },
    "Logistic Regression": {
        "Training": lr_train_metrics,
        "Validation": lr_val_metrics 
    },
    "Multi-Layer Perceptron": {
        "Training": mlp_train_metrics,
        "Validation": mlp_val_metrics 
    }
}  # Added the missing closing brace here

for model_name, metrics in model_history.items():
    print(f"--- {model_name} Metrics ---")
    for metric_type, value in metrics.items():
        if isinstance(value, dict):
            # If the metric itself is a dictionary, iterate through its items
            print(f"{metric_type}:")
            for sub_key, sub_value in value.items():
                print(f"  {sub_key}: {sub_value}")
        else:
            # If the metric is a scalar, just print it
            print(f"{metric_type}: {value}")
    print()  # Blank line for separation
    
# End and Duration Calculation
end_time = datetime.datetime.now()
duration = end_time - start_time
minutes, seconds = divmod(duration.total_seconds(), 60)
print(f"Time Spent: {int(minutes)} minutes, {int(seconds)} seconds")

In [None]:
models = list(model_history.keys())
metrics = ["Accuracy", "F1-score", "Precision", "Recall"]

# Create subplots for each metric
fig, axs = plt.subplots(len(metrics), 1, figsize=(10, 20), sharex=True)

for i, metric in enumerate(metrics):
    train_values = [model_history[model]["Training"][metric] for model in models]
    val_values = [model_history[model]["Validation"][metric] for model in models]
    
    axs[i].plot(models, train_values, marker='o', label=f"Training {metric}")
    axs[i].plot(models, val_values, marker='o', label=f"Validation {metric}")
    axs[i].set_ylabel(metric)
    axs[i].legend()
    axs[i].grid(True)

axs[-1].set_xlabel("Model")
plt.suptitle("Model Performance Comparison")
plt.tight_layout()
plt.show()

In [None]:
# sc.stop()