In [None]:
!pip install pyspark
!pip install xgboost
!pip install findspark

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
from pyspark.ml.feature import VectorAssembler
from xgboost.spark import SparkXGBRegressor,SparkXGBClassifier
from pyspark.ml import Pipeline
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier,OneVsRest
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.classification import DecisionTreeClassificationModel


from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from xgboost.spark import SparkXGBClassifier

In [None]:
!unzip har70.zip
folder_path = "har70/"
csv_files = os.listdir(folder_path)
df = spark.read.csv(os.path.join(folder_path, csv_files[0]), header=True, inferSchema=True)
include=['501.csv','502.csv','503.csv']
for filename in csv_files[1:]:
    if filename in include:
        df = df.union(spark.read.csv(os.path.join(folder_path, filename), header=True, inferSchema=True))
df.show()
print(f" total rows = {df.count()}")

In [None]:
# Define the window specification to get the rows for each iteration
window_spec = Window.orderBy("timestamp")
# Define the window size
window_size = 10
# Create lagged columns using the lag window function
lagged_cols = [F.lag(col, i).over(window_spec).alias(f"{col}_lag{i}") for i in range(window_size,0,-1) for col in ["timestamp","back_x", "back_y", "back_z", "thigh_x", "thigh_y", "thigh_z", "label",]]
# Apply the lagged window function and drop rows with null values
lagged_df = df.select(*lagged_cols).na.drop()

filtered_df = lagged_df.filter(
    (lagged_df.label_lag1 == lagged_df.label_lag2) &
    (lagged_df.label_lag2 == lagged_df.label_lag3) &
    (lagged_df.label_lag3 == lagged_df.label_lag4) &
    (lagged_df.label_lag4 == lagged_df.label_lag5) &
    (lagged_df.label_lag5 == lagged_df.label_lag6) &
    (lagged_df.label_lag6 == lagged_df.label_lag7) &
    (lagged_df.label_lag7 == lagged_df.label_lag8) &
    (lagged_df.label_lag8 == lagged_df.label_lag9) &
    (lagged_df.label_lag9 == lagged_df.label_lag10)
)

# Show the resu
# Create a VectorAssembler to combine the features into a single vector column
vector_assembler = VectorAssembler(
    inputCols=[f"{col}_lag{i}" for i in range(window_size, 0, -1) for col in ["back_x", "back_y", "back_z", "thigh_x", "thigh_y", "thigh_z"]],
    outputCol="features"
)
lagged_df2 = vector_assembler.transform(filtered_df)
final_df = lagged_df2.select("features", "label_lag1").withColumnRenamed("label_lag1", "label")
final_df.show(truncate=False)

In [None]:
(training,testing) = final_df.randomSplit([0.7,0.3])

In [None]:
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features",maxDepth= 7)
model_dt = dt.fit(training)
predictions = model_dt.transform(testing)
predictions.select("prediction", "label").show(5)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))
print("Accuracy = %g " % accuracy)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",metricName="f1")
accuracy = evaluator.evaluate(predictions)
print("F1-Score = %g " % accuracy)
model_dt.save('dt')
new_dt = DecisionTreeClassificationModel.load('dt')

In [None]:
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=20, maxDepth= 7)
model_rf = rf.fit(training)
predictions = model_rf.transform(testing)
predictions.select("prediction", "label").show(5)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))
print("Accuracy = %g " % accuracy)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",metricName="f1")
accuracy = evaluator.evaluate(predictions)
print("F1-Score = %g " % accuracy)
model_rf.save('rf')

In [None]:
gbt = GBTClassifier(labelCol="label", featuresCol="features",  maxIter=10)
ovr = OneVsRest(classifier=gbt, labelCol="label")
model_gbt = ovr.fit(training)
predictions = model_gbt.transform(testing)
predictions.select("prediction", "label").show(5)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))
print("Accuracy = %g " % accuracy)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",metricName="f1")
accuracy = evaluator.evaluate(predictions)
print("F1-Score = %g " % accuracy)
model_gbt.save('gbt')

In [None]:
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10, family="multinomial")
model_lr = lr.fit(training)
predictions = model_lr.transform(testing)
predictions.select("prediction", "label").show(5)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))
print("Accuracy = %g " % accuracy)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",metricName="f1")
accuracy = evaluator.evaluate(predictions)
print("F1-Score = %g " % accuracy)
model_lr.save('lr')

In [None]:
classifier = SparkXGBClassifier(
  features_col="features",
  label_col="label",
  num_workers=2,
)
ovr = OneVsRest(classifier=classifier, labelCol="label")
model = ovr.fit(training)
predictions = model.transform(testing)
predictions.select("prediction", "label").show(5)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))
print("Accuracy = %g " % accuracy)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",metricName="f1")
accuracy = evaluator.evaluate(predictions)
print("F1-Score = %g " % accuracy)
model.save('xgb')

Using SKlearn ML algorithms

In [None]:
pandas_df = final_df.toPandas()

In [None]:
X = np.stack(pandas_df['features'].to_numpy())
y = pandas_df['label'].to_numpy()

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier as DCT
from sklearn.metrics import accuracy_score, classification_report

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
clf = DCT()

# Train the classifier on the training data
clf.fit(X_train, y_train)

# Make predictions on the testing data
y_pred = clf.predict(X_test)

# Evaluate the model
accuracy = accuracy_score(y_test, y_pred)
report = classification_report(y_test, y_pred)
print(accuracy)
print(report)

In [None]:
from sklearn.linear_model import LogisticRegression as LR
clf_lr = LR()

# Train the classifier on the training data
clf_lr.fit(X_train, y_train)

# Make predictions on the testing data
y_pred_lr = clf_lr.predict(X_test)

# Evaluate the model
accuracy_lr = accuracy_score(y_test, y_pred_lr)
report_lr = classification_report(y_test, y_pred_lr)

print("Logistic Regression:")
print(f"Accuracy: {accuracy_lr:.2f}")
print("Classification Report:\n", report_lr)

In [None]:
from sklearn.ensemble import RandomForestClassifier as RFC
clf_rf = RFC(n_estimators=10, random_state=42)

# Train the classifier on the training data
clf_rf.fit(X_train, y_train)

# Make predictions on the testing data
y_pred_rf = clf_rf.predict(X_test)

# Evaluate the model
accuracy_rf = accuracy_score(y_test, y_pred_rf)
report_rf = classification_report(y_test, y_pred_rf)
print("\nRandom Forest:")
print(f"Accuracy: {accuracy_rf:.2f}")
print("Classification Report:\n", report_rf)

In [None]:
from sklearn.ensemble import GradientBoostingClassifier
clf_gbt = GradientBoostingClassifier(n_estimators=10, random_state=42)

# Train the classifier on the training data
clf_gbt.fit(X_train, y_train)

# Make predictions on the testing data
y_pred_gbt = clf_gbt.predict(X_test)

# Evaluate the model
accuracy_gbt = accuracy_score(y_test, y_pred_gbt)
report_gbt = classification_report(y_test, y_pred_gbt)

print("Gradient Boosting Classifier:")
print(f"Accuracy: {accuracy_gbt:.2f}")
print("Classification Report:\n", report_gbt)

In [None]:
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.tree import DecisionTreeClassifier as DTC
from sklearn.metrics import accuracy_score, classification_report


# Create a Decision Tree classifier
clf_dt = DTC()

# Define the hyperparameters to tune
param_grid = {
    'max_depth': [ 3, 5, 7],
}

# Use GridSearchCV to find the best hyperparameters
grid_search = GridSearchCV(estimator=clf_dt, param_grid=param_grid, cv=3, scoring='accuracy')
grid_search.fit(X_train, y_train)

# Get the best hyperparameters
best_params = grid_search.best_params_

# Train a new classifier with the best hyperparameters
best_clf_dt = DTC(**best_params)
best_clf_dt.fit(X_train, y_train)

# Make predictions on the testing data
y_pred_dt = best_clf_dt.predict(X_test)

# Evaluate the model
accuracy_dt = accuracy_score(y_test, y_pred_dt)
report_dt = classification_report(y_test, y_pred_dt)

print("Best Hyperparameters:", best_params)
print("Accuracy: {:.2f}".format(accuracy_dt))
print("Classification Report:\n", report_dt)


In [None]:
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.ensemble import RandomForestClassifier as RFC
from sklearn.metrics import accuracy_score, classification_report
from sklearn.datasets import load_iris

# Create a Random Forest classifier
clf_rf = RFC()

# Define the hyperparameters to tune
param_grid = {
    'n_estimators': [10, 20, 30],
    'max_depth':  [3, 5, 7],
}

# Use GridSearchCV to find the best hyperparameters
grid_search = GridSearchCV(estimator=clf_rf, param_grid=param_grid, cv=3, scoring='accuracy')
grid_search.fit(X_train, y_train)

# Get the best hyperparameters
best_params_rf = grid_search.best_params_

# Train a new classifier with the best hyperparameters
best_clf_rf = RFC(**best_params_rf)
best_clf_rf.fit(X_train, y_train)

# Make predictions on the testing data
y_pred_rf = best_clf_rf.predict(X_test)

# Evaluate the model
accuracy_rf = accuracy_score(y_test, y_pred_rf)
report_rf = classification_report(y_test, y_pred_rf)

print("Best Hyperparameters:", best_params_rf)
print("Accuracy: {:.2f}".format(accuracy_rf))
print("Classification Report:\n", report_rf)

In [None]:
# X = np.stack(pandas_df['features'].to_numpy())
# y = pandas_df['label'].to_numpy()

In [None]:
# X = X.reshape((X.shape[0], X.shape[1], 1))


In [None]:
# from tensorflow.keras.utils import to_categorical
# y_encoded = to_categorical(y)


In [None]:
# X_train, X_test, y_train, y_test = train_test_split(X, y_encoded, test_size=0.2, random_state=42)


In [None]:
# model = Sequential()
# model.add(LSTM(units=50, input_shape=(X_train.shape[1], 1)))
# model.add(Dense(units=64, activation='relu'))
# model.add(Dense(units=9, activation='softmax'))

In [None]:
# model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

In [None]:
# model.fit(X_train, y_train, epochs=10, batch_size=32, validation_data=(X_test, y_test))