<a href="https://colab.research.google.com/github/muhaiminsk/vr-activity-classification/blob/main/VR_Activity_classf.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install tensorflow
!pip install keras
!pip install keras-tuner



In [6]:
#First round of training



from google.colab import files
import io
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
import numpy as np
import tensorflow as tf
from datetime import datetime
import kerastuner as kt

# Check if GPU is available
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    print(f"GPU available: {gpus[0]}")
else:
    print("No GPU available. Using CPU.")

# Upload the training dataset
print("Upload the training dataset:")
uploaded_training = files.upload()

# Read the training dataset
df_train = pd.read_csv(io.BytesIO(list(uploaded_training.values())[0]))

# Select specified columns for training
required_columns = ['Time', 'No. of Packets', 'No. of Bits',
                    'Avg. Packet Length (Bytes)', 'Avg. Inter-packet Arrival Time', 'Activity']
for col in required_columns:
    if col not in df_train.columns:
        raise ValueError(f"Missing column in training dataset: {col}")
df_train = df_train[required_columns]

# Data Preprocessing
label_encoder_activity = LabelEncoder()
df_train['Activity'] = label_encoder_activity.fit_transform(df_train['Activity'])
activity_mapping = dict(zip(label_encoder_activity.classes_, range(len(label_encoder_activity.classes_))))
print("Activity Mapping:", activity_mapping)

# Separate features and targets
X_train = df_train.drop(['Activity'], axis=1)
y_train_activity = df_train['Activity']

# Standardize numerical features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)

# Upload the test dataset
print("\nUpload the test dataset:")
uploaded_test = files.upload()

# Read the test dataset
df_test = pd.read_csv(io.BytesIO(list(uploaded_test.values())[0]))

# Select specified columns for testing
for col in required_columns:
    if col not in df_test.columns:
        raise ValueError(f"Missing column in test dataset: {col}")
df_test = df_test[required_columns]

# Data Preprocessing for the test set
df_test['Activity'] = label_encoder_activity.transform(df_test['Activity'])

# Separate features and targets for the test set
X_test = df_test.drop(['Activity'], axis=1)
y_test_activity = df_test['Activity']

# Standardize numerical features for the test set
X_test_scaled = scaler.transform(X_test)

# Define a model builder for hyperparameter tuning
def model_builder(hp):
    model = Sequential()
    # Tune number of units in the first Dense layer
    hp_units = hp.Int('units', min_value=32, max_value=128, step=32)
    model.add(Dense(hp_units, activation='relu', input_dim=X_train_scaled.shape[1]))
    model.add(Dense(32, activation='relu'))
    model.add(Dense(len(label_encoder_activity.classes_), activation='softmax', name='activity_output'))
    # Tune the learning rate for the optimizer
    hp_learning_rate = hp.Choice('learning_rate', values=[1e-2, 1e-3, 1e-4])
    model.compile(optimizer=Adam(learning_rate=hp_learning_rate),
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])
    return model

# Hyperparameter tuning
tuner = kt.Hyperband(model_builder,
                     objective='val_accuracy',
                     max_epochs=50,
                     factor=5,
                     directory='my_dir',
                     project_name='intro_to_kt')

# Search for the best hyperparameters
tuner.search(X_train_scaled, y_train_activity, epochs=50, validation_split=0.2)
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]

# Train the model with the best hyperparameters
model = tuner.hypermodel.build(best_hps)
history = model.fit(X_train_scaled, y_train_activity, epochs=50, batch_size=32, validation_split=0.2)

# Evaluate the model on the testing set
y_pred_activity = model.predict(X_test_scaled)
y_pred_activity = y_pred_activity.argmax(axis=1)

# Save the test set with predictions to a CSV file
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = f'test_set_with_predictions_{timestamp}.csv'
df_test['Predicted Activity'] = label_encoder_activity.inverse_transform(y_pred_activity)
df_test.to_csv(output_file, index=False)

# Download the CSV file with predictions
files.download(output_file)

# Print accuracy of Activity vs predicted activity
accuracy_activity = sum(y_test_activity == y_pred_activity) / len(y_test_activity)
print(f'Accuracy of Activity vs Predicted Activity: {accuracy_activity * 100:.2f}%')


GPU available: PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')
Upload the training dataset:


Saving merged_output (10).csv to merged_output (10) (1).csv
Activity Mapping: {'Ball Throwing': 0, 'No Activity': 1, 'Paused': 2, 'Talking': 3, 'Walking': 4}

Upload the test dataset:


Saving test_vrclass 2.csv to test_vrclass 2 (1).csv
Reloading Tuner from my_dir/intro_to_kt/tuner0.json


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


Epoch 1/50
[1m132/132[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 13ms/step - accuracy: 0.6722 - loss: 0.8410 - val_accuracy: 0.7562 - val_loss: 0.7283
Epoch 2/50
[1m132/132[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 4ms/step - accuracy: 0.8173 - loss: 0.4715 - val_accuracy: 0.6271 - val_loss: 1.1561
Epoch 3/50
[1m132/132[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 3ms/step - accuracy: 0.8270 - loss: 0.4619 - val_accuracy: 0.6935 - val_loss: 0.9068
Epoch 4/50
[1m132/132[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 5ms/step - accuracy: 0.8284 - loss: 0.4659 - val_accuracy: 0.6404 - val_loss: 1.0242
Epoch 5/50
[1m132/132[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 4ms/step - accuracy: 0.8219 - loss: 0.4644 - val_accuracy: 0.5977 - val_loss: 1.1676
Epoch 6/50
[1m132/132[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 5ms/step - accuracy: 0.8429 - loss: 0.4123 - val_accuracy: 0.7154 - val_loss: 0.8104
Epoch 7/50
[1m132/132[0m 

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Accuracy of Activity vs Predicted Activity: 74.95%


In [3]:
#Trying SMOTE


import torch
from random import randint
import random
class SMOTE(object):
    def __init__(self,distance='euclidian',dims=512,k=5):
        super(SMOTE,self).__init__()
        self.newindex = 0
        self.k = k
        self.dims = dims
        self.distance_measure = distance

    def populate(self, N,i,nnarray,min_samples,k):
        while N:
            nn = randint(0, k-2)

            diff = min_samples[nnarray[nn]] - min_samples[i]
            gap = random.uniform(0,1)

            self.synthetic_arr[self.newindex,:] = min_samples[i] + gap * diff

            self.newindex += 1

            N -= 1
    def k_neighbors(self, euclid_distance, k):
        nearest_idx = torch.zeros((euclid_distance.shape[0],euclid_distance.shape[0]), dtype = torch.int64)

        idxs = torch.argsort(euclid_distance, dim=1)
        nearest_idx[:,:] = idxs

        return nearest_idx[:,1:k]

    def find_k(self,X,k):
        euclid_distance = torch.zeros((X.shape[0],X.shape[0]), dtype = torch.float32)

        for i in range(len(X)):
            dif = (X - X[i])**2
            dist = torch.sqrt(dif.sum(axis=1))
            euclid_distance[i] = dist

        return self.k_neighbors(euclid_distance,k)

    def generate(self, min_samples, N,k):
        """
            Returns (N/100) * n_minority_samples synthetic minority samples.
    		Parameters
    		----------
    		min_samples : Numpy_array-like, shape = [n_minority_samples, n_features]
    		    Holds the minority samples
    		N : percetange of new synthetic samples:
    		    n_synthetic_samples = N/100 * n_minority_samples. Can be < 100.
    		k : int. Number of nearest neighbours.
    		Returns
    		-------
    		S : Synthetic samples. array,
    		    shape = [(N/100) * n_minority_samples, n_features].
    	"""
        T = min_samples.shape[0]
        self.synthetic_arr = torch.zeros(int(N/100)*T,self.dims)
        N = int(N/100)
        if self.distance_measure == 'euclidian':
            indices = self.find_k(min_samples,k)
        for i in range(indices.shape[0]):
            self.populate(N, i, indices[i], min_samples, k)
        self.newindex = 0
        return self.synthetic_arr

    def fit_generate(self,X,y):
        #get occurence of each class
        occ = torch.eye(int(y.max()+1),int(y.max()+1))[y].sum(axis=0)
        #get the dominant class
        dominant_class = torch.argmax(occ)
        #get occurence of the dominant class
        n_occ = int(occ[dominant_class].item())
        for i in range(len(occ)):
            if i != dominant_class:
                #calculate the amount of synthetic data to generate
                N = (n_occ - occ[i]) * 100 / occ[i]
                candidates = X[y == i]
                xs = self.generate(candidates, N,self.k)
                X = torch.cat((X,xs))
                ys = torch.ones(xs.shape[0]) * i
                y = torch.cat((y,ys))
        return X,y





In [4]:
from google.colab import files
import io
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
import numpy as np
import tensorflow as tf
from datetime import datetime
import kerastuner as kt
from keras_tuner import Hyperband
from tensorflow import keras
from keras_tuner import Hyperband
import numpy as np

# Check if GPU is available
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    print(f"GPU available: {gpus[0]}")
else:
    print("No GPU available. Using CPU.")

# Upload the training dataset
print("Upload the training dataset:")
uploaded_training = files.upload()

# Read the training dataset
df_train = pd.read_csv(io.BytesIO(list(uploaded_training.values())[0]))

# Select specified columns for training
required_columns = ['Time', 'No. of Packets', 'No. of Bits',
                    'Avg. Packet Length (Bytes)', 'Avg. Inter-packet Arrival Time', 'Activity']
for col in required_columns:
    if col not in df_train.columns:
        raise ValueError(f"Missing column in training dataset: {col}")
df_train = df_train[required_columns]

# Data Preprocessing
label_encoder_activity = LabelEncoder()
df_train['Activity'] = label_encoder_activity.fit_transform(df_train['Activity'])
activity_mapping = dict(zip(label_encoder_activity.classes_, range(len(label_encoder_activity.classes_))))
print("Activity Mapping:", activity_mapping)

# Separate features and targets
X_train = df_train.drop(['Activity'], axis=1)
y_train_activity = df_train['Activity']

# Standardize numerical features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)

# Upload the test dataset
print("\nUpload the test dataset:")
uploaded_test = files.upload()

# Read the test dataset
df_test = pd.read_csv(io.BytesIO(list(uploaded_test.values())[0]))

# Select specified columns for testing
for col in required_columns:
    if col not in df_test.columns:
        raise ValueError(f"Missing column in test dataset: {col}")
df_test = df_test[required_columns]

# Data Preprocessing for the test set
df_test['Activity'] = label_encoder_activity.transform(df_test['Activity'])

# Separate features and targets for the test set
X_test = df_test.drop(['Activity'], axis=1)
y_test_activity = df_test['Activity']

# Standardize numerical features for the test set
X_train_scaled = scaler.fit_transform(X_train)

# After scaling the training data, apply SMOTE to balance the dataset
# Convert scaled data to tensor for SMOTE
X_train_tensor = torch.tensor(X_train_scaled, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train_activity.values, dtype=torch.int64)

# Initialize SMOTE
smote = SMOTE(distance='euclidian', dims=X_train_tensor.shape[1], k=5)

# Generate synthetic samples and balance the dataset
X_train_smote, y_train_smote = smote.fit_generate(X_train_tensor, y_train_tensor)

# Convert tensors back to NumPy arrays for use with TensorFlow/Keras
X_train_balanced = X_train_smote.numpy()
y_train_balanced = y_train_smote.numpy()

# Ensure data consistency
print(f"Original dataset size: {len(X_train_scaled)}")
print(f"Balanced dataset size: {len(X_train_balanced)}")


def build_model(hp):
    model = keras.Sequential()
    model.add(
        keras.layers.Dense(
            units=hp.Int('units', min_value=32, max_value=256, step=32),
            activation='relu',
        )
    )
    model.add(keras.layers.Dense(1, activation='sigmoid'))
    model.compile(
        optimizer=keras.optimizers.Adam(
            hp.Choice('learning_rate', values=[1e-2, 1e-3, 1e-4])
        ),
        loss='binary_crossentropy',
        metrics=['accuracy'],
    )
    return model

# Initialize the tuner
tuner = Hyperband(
    build_model,
    objective='val_accuracy',
    max_epochs=50,
    factor=5,
    directory='my_dir',
    project_name='intro_to_kt',
)

# Ensure X_train_balanced and y_train_balanced are ready before this line
tuner.search(X_train_balanced, y_train_balanced, epochs=50, validation_split=0.2)
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]

# Train the model with the best hyperparameters
model = tuner.hypermodel.build(best_hps)
history = model.fit(X_train_balanced, y_train_balanced, epochs=50, batch_size=32, validation_split=0.2)

Trial 23 Complete [00h 00m 06s]
val_accuracy: 0.0

Best val_accuracy So Far: 0.0
Total elapsed time: 00h 02m 01s
Epoch 1/50
[1m285/285[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 5ms/step - accuracy: 0.4891 - loss: -51.0951 - val_accuracy: 0.0000e+00 - val_loss: -1292.4376
Epoch 2/50
[1m285/285[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 3ms/step - accuracy: 0.5055 - loss: -886.7494 - val_accuracy: 0.0000e+00 - val_loss: -5469.4131
Epoch 3/50
[1m285/285[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 3ms/step - accuracy: 0.4979 - loss: -2964.9543 - val_accuracy: 0.0000e+00 - val_loss: -12247.5215
Epoch 4/50
[1m285/285[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 3ms/step - accuracy: 0.5031 - loss: -5994.4937 - val_accuracy: 0.0000e+00 - val_loss: -21366.2344
Epoch 5/50
[1m285/285[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 3ms/step - accuracy: 0.4949 - loss: -10278.2998 - val_accuracy: 0.0000e+00 - val_loss: -32617.5801
Epoch 6/50
[1m2

In [7]:
from google.colab import files
import io
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from imblearn.over_sampling import SMOTE
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
import tensorflow as tf
from datetime import datetime
import kerastuner as kt

# Check GPU availability
gpus = tf.config.list_physical_devices('GPU')
print("GPU Available:", bool(gpus))

# Load and preprocess data
def load_data(uploaded_file):
    df = pd.read_csv(io.BytesIO(uploaded_file))
    required_columns = ['Time', 'No. of Packets', 'No. of Bits',
                        'Avg. Packet Length (Bytes)', 'Avg. Inter-packet Arrival Time', 'Activity']
    df = df[required_columns]
    return df

# Load training data
print("Upload training dataset:")
uploaded_train = files.upload()
df_train = load_data(list(uploaded_train.values())[0])

# Preprocessing
le = LabelEncoder()
df_train['Activity'] = le.fit_transform(df_train['Activity'])
X_train = df_train.drop('Activity', axis=1)
y_train = df_train['Activity']

# Scale data
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)

# Apply SMOTE
sm = SMOTE(random_state=42)
X_res, y_res = sm.fit_resample(X_train_scaled, y_train)

# Load test data
print("\nUpload test dataset:")
uploaded_test = files.upload()
df_test = load_data(list(uploaded_test.values())[0])
df_test['Activity'] = le.transform(df_test['Activity'])
X_test = df_test.drop('Activity', axis=1)
y_test = df_test['Activity']
X_test_scaled = scaler.transform(X_test)

# Model Building
def model_builder(hp):
    model = Sequential()
    model.add(Dense(
        units=hp.Int('units', 64, 256, step=64),
        activation='relu',
        input_shape=(X_res.shape[1],)
    ))
    model.add(Dense(64, activation='relu'))
    model.add(Dense(len(le.classes_), activation='softmax'))

    model.compile(
        optimizer=Adam(learning_rate=hp.Choice('lr', [1e-3, 5e-4, 1e-4])),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    return model

# Hyperparameter Tuning
tuner = kt.Hyperband(
    model_builder,
    objective='val_accuracy',
    max_epochs=50,
    directory='tuning',
    project_name='activity_recognition'
)

# Search and train
tuner.search(X_res, y_res, epochs=50, validation_split=0.2, batch_size=32)

# Best model
best_model = tuner.get_best_models(num_models=1)[0]

# Evaluation
test_loss, test_acc = best_model.evaluate(X_test_scaled, y_test)
print(f"\nTest Accuracy: {test_acc*100:.2f}%")

# Save predictions
df_test['Predicted'] = le.inverse_transform(best_model.predict(X_test_scaled).argmax(axis=1))
df_test.to_csv('predictions.csv', index=False)
files.download('predictions.csv')

Trial 11 Complete [00h 00m 05s]
val_accuracy: 0.19106191396713257

Best val_accuracy So Far: 0.691676914691925
Total elapsed time: 00h 01m 06s


  saveable.load_own_variables(weights_store.get(inner_path))


[1m16/16[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 26ms/step - accuracy: 0.9121 - loss: 0.2949

Test Accuracy: 82.97%
[1m16/16[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 13ms/step


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>