STEP. 01. Importing the required libraries to implement traditional data augmentation techniques.

In [1]:
import numpy as np
import os
import pandas as pd
from imblearn.over_sampling import SMOTE
import glob
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix,f1_score
import tensorflow as tf
from tensorflow.keras import layers, models
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
import warnings


In [None]:
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=UserWarning)
warnings.simplefilter(action='ignore', category=pd.errors.PerformanceWarning)

STEP. 02. IMPLEMENTING DATA AUGMENTATION TECHNIQUES.

I HAVE CONSIDERED SEVEN TECHNIQUES:
1. JITTER
2. RESAMPLE
3. TIME WARPING
4. MAGNITUDE WARPING
5. CROPPING
6. PERMUTATION
7. ROTATION


1. JITTER DATA AUGMENTATION TECHNIQUES.

In [2]:
def UCI_Jitter_Tech(UCI_Data_Jitter, sigma=0.01):
    UCI_Integer_Values = UCI_Data_Jitter.select_dtypes(include=[np.number])
    noise = np.random.normal(loc=0, scale=sigma, size=UCI_Integer_Values.shape)
    UCI_Jittered_Data = UCI_Integer_Values + noise
    return UCI_Jittered_Data

In [3]:
def UCI_Resample_Data(UCI_Data_Resample, Resample_New_Length):
    UCI_Integer_Values = UCI_Data_Resample.select_dtypes(include=[np.number])
    UCI_Resample_Data = UCI_Integer_Values.apply(lambda x: np.interp(np.linspace(0, len(x) - 1, Resample_New_Length), np.arange(len(x)), x))
    return pd.DataFrame(UCI_Resample_Data, columns=UCI_Integer_Values.columns)


In [4]:
def UCI_Time_Warp_Data(UCI_Data_Time_Warp, sigma=0.2):
    
    # Considering only integers
    UCI_Interger_Values = UCI_Data_Time_Warp.select_dtypes(include=[np.number])
    warp = np.cumsum(np.random.normal(loc=1.0, scale=sigma, size=len(UCI_Interger_Values)))
    warp = warp / warp[-1]
    Time_Warped_Data = pd.DataFrame()
    for col in UCI_Interger_Values.columns:
        Time_Warped_Data[col] = np.interp(np.linspace(0, 1, len(UCI_Interger_Values)), warp, UCI_Interger_Values[col])
    return Time_Warped_Data


In [5]:
def UCI_Magnitude_Warp_Data(UCI_Data_Magnitude_Warp, sigma=0.2):
    # only integers
    UCI_Interger_Value = UCI_Data_Magnitude_Warp.select_dtypes(include=[np.number])
    UCI_Magnitude_Warp = np.random.normal(loc=1.0, scale=sigma, size=UCI_Interger_Value.shape)
    UCI_Magnitude_Warped_Data = UCI_Interger_Value * UCI_Magnitude_Warp
    return UCI_Magnitude_Warped_Data

In [6]:
def UCI_Crop_Data(UCI_Data_Crop, crop_fraction=0.1):
    UCI_Interger_Value = UCI_Data_Crop.select_dtypes(include=[np.number])
    UCI_Num_To_Crop = int(len(UCI_Interger_Value) * crop_fraction)
    start = np.random.randint(0, len(UCI_Interger_Value) - UCI_Num_To_Crop)
    UCI_Cropped_Data = UCI_Interger_Value.iloc[start:start+UCI_Num_To_Crop].reset_index(drop=True)
    return UCI_Cropped_Data

In [7]:
def UCI_Permutation_Data(UCI_Data_Permute,Permute_Num_Segments=4):
    # Select only numeric values and calculate segment length
    UCI_Integer_Value = UCI_Data_Permute.select_dtypes(include=[np.number])
    UCI_Permute_Length = len(UCI_Integer_Value) // Permute_Num_Segments
    Permuted_Remainder = len(UCI_Integer_Value) % Permute_Num_Segments
    Permuted_Segments = []
    # Splitting the data into segments
    for i in range(Permute_Num_Segments):
        start_idx = i * UCI_Permute_Length
        end_idx = (i + 1) * UCI_Permute_Length
        if i == Permute_Num_Segments - 1:  # Add the remainder to the last segment
            end_idx += Permuted_Remainder
        Permuted_Segments.append(UCI_Integer_Value.iloc[start_idx:end_idx])
    np.random.shuffle(Permuted_Segments)
    UCI_Permuted_Data = pd.concat(Permuted_Segments).reset_index(drop=True)
    
    return UCI_Permuted_Data

In [8]:
def UCI_Rotate_Data(UCI_Data_Rotated, angle=15):
    UCI_Interger_Value = UCI_Data_Rotated.select_dtypes(include=[np.number])
    rotation_matrix = np.array([[np.cos(np.radians(angle)), -np.sin(np.radians(angle))],
                                [np.sin(np.radians(angle)), np.cos(np.radians(angle))]])
    Rotation_Augmented_Data = np.dot(UCI_Interger_Value.iloc[:, :2], rotation_matrix)
    Rotate_Result = UCI_Interger_Value.copy()
    Rotate_Result.iloc[:, :2] = Rotation_Augmented_Data
    return Rotate_Result

In [9]:
UCI_Cleaned_Dataset = pd.read_csv('./UCI HAR Dataset/Data/BaseFiles/UCI_Cleaned_Dataset.csv')
def saveDataToCSV(ModelsName):
    UCI_Cleaned_Dataset_Augmented = UCI_Cleaned_Dataset.copy()
    match ModelsName:
        case 'JITTER':
            UCI_Cleaned_Dataset_Augmented.update(UCI_Jitter_Tech(UCI_Cleaned_Dataset, sigma=0.01))
        case 'RESAMPLING':
            UCI_Cleaned_Dataset_Augmented.update(UCI_Resample_Data(UCI_Cleaned_Dataset, Resample_New_Length=100))
        case 'TIME WRAPPING':
            UCI_Cleaned_Dataset_Augmented.update(UCI_Time_Warp_Data(UCI_Cleaned_Dataset_Augmented))
        case 'MAGNITUDE WRAPPING':
            UCI_Cleaned_Dataset_Augmented.update(UCI_Magnitude_Warp_Data(UCI_Cleaned_Dataset))
        case 'CROP DATA':
            UCI_Cleaned_Dataset_Augmented.update(UCI_Crop_Data(UCI_Cleaned_Dataset))
        case 'PERMUTATION':
            UCI_Cleaned_Dataset_Augmented.update(UCI_Permutation_Data(UCI_Cleaned_Dataset) )
        case 'ROTATION':
            UCI_Cleaned_Dataset_Augmented.update(UCI_Rotate_Data(UCI_Cleaned_Dataset))
    UCI_Cleaned_Dataset_Augmented.to_csv('./UCI HAR Dataset/Data/Traditional/UCI_Cleaned_Dataset_Augmented_'+ModelsName+'.csv', index=False, float_format='%.2f')

In [None]:
saveDataToCSV('JITTER')
saveDataToCSV('RESAMPLING')
saveDataToCSV('TIME WRAPPING')
saveDataToCSV('MAGNITUDE WRAPPING')
saveDataToCSV('CROP DATA')
saveDataToCSV('PERMUTATION')
saveDataToCSV('ROTATION')

Combining the traditional data augmentation technquies to perform the further steps.

In [None]:
pattern = './UCI HAR Dataset/Data/Traditional/UCI_Cleaned_Dataset_Augmented_*.csv'
files = glob.glob(pattern)
Combined_Traditional_Augmentated = pd.concat((pd.read_csv(f) for f in files), ignore_index=True)
Combined_Traditional_Augmentated.to_csv('./UCI HAR Dataset/Data/Traditional/UCI_Cleaned_Dataset_Traditional_Augmented.csv', index=False, float_format='%.2f')
print(Combined_Traditional_Augmentated.shape)



STEP. 04. SETTING SOME CHECKPOINTS TO PREPARE COMBINED TRADITION AUGMENTATED DATASET FOR MODEL TRAINING.
1. CHECKING FOR DUPLICATES AND NULL VALUES.
2. SPLITTING THE DATASET INTO TRAIN AND TEST.
3. CHECKING AND BALANCING THE DATA.

STEP. 04. 1 Checking for duplicates and null value and fixing them.

In [None]:
#To check duplicates and null valaues available in new combined datasheet.
DuplicateValues = Combined_Traditional_Augmentated.duplicated().sum()
print(f"Number of duplicates in the augmented dataset: {DuplicateValues}")
NullValues = Combined_Traditional_Augmentated.isnull().sum().sum()
print(f"Number of null values in the augmented dataset: {NullValues}")
if DuplicateValues == 0 and NullValues == 0:
    print("No duplicates or null values found.We are good to proceed!")
else:
    print("Duplicates or null values found.")

Combined_Traditional_Augmentated = Combined_Traditional_Augmentated.drop_duplicates()
print(f"Shape after removing duplicates: {Combined_Traditional_Augmentated.shape}")

STEP. 04. 2. Data Spliting into 80% train and 20% test dataset.

In [13]:
# Splitting the dataset on the basis of activities performed by the each participant
Traditional_Augmented_X_Value = Combined_Traditional_Augmentated.drop(columns=['Activity'])
Traditional_Augmented_Y_Value = Combined_Traditional_Augmentated['Activity']
label_encoder_augmented = LabelEncoder()
Traditional_Augmented_Y_Value_Encoded = label_encoder_augmented.fit_transform(Traditional_Augmented_Y_Value)

# Spliting augmented data into 80% for training and 20% for test
X_Augmented_train, X_Augmented_test, Y_Augmented_Train, Y_Augmented_test = train_test_split(
    Traditional_Augmented_X_Value, Traditional_Augmented_Y_Value_Encoded, test_size=0.2, random_state=42, stratify=Traditional_Augmented_Y_Value_Encoded)
non_numeric_columns = X_Augmented_train.select_dtypes(include=['object']).columns

for i in non_numeric_columns:
    le = LabelEncoder()
    X_Augmented_train[i] = le.fit_transform(X_Augmented_train[i])
    X_Augmented_test[i] = le.transform(X_Augmented_test[i])
label_encoder = LabelEncoder()
Y_Augmented_Train_encoded = label_encoder.fit_transform(Y_Augmented_Train)


# Saving all the split test and train dataset for the augmented data
X_Augmented_train.to_csv('./UCI HAR Dataset/Data/Traditional/UCI_Traditional_Train_Augmented.csv', index=False, float_format='%.2f')
X_Augmented_test.to_csv('./UCI HAR Dataset/Data/Traditional/UCI_Traditional_Test_Augmented.csv', index=False, float_format='%.2f')
pd.DataFrame(Y_Augmented_Train, columns=['Activity']).to_csv('./UCI HAR Dataset/Data/Traditional/UCI_Traditional_Train_Labels_Augmented.csv', index=False)
pd.DataFrame(Y_Augmented_test, columns=['Activity']).to_csv('./UCI HAR Dataset/Data/Traditional/UCI_Traditional_Test_Labels_Augmented.csv', index=False)

STEP. 04. 3. Data Banalcing (Checking and balancing them)

In [None]:
# Initialize SMOTE and apply it to balance the training set
UCI_Trad_Smote = SMOTE(random_state=42)
X_train_balanced, y_train_balanced_encoded = UCI_Trad_Smote.fit_resample(X_Augmented_train, Y_Augmented_Train_encoded)
y_train_balanced = label_encoder.inverse_transform(y_train_balanced_encoded)

# Save the balanced training data
X_train_balanced.to_csv('./UCI HAR Dataset/Data/Traditional/UCI_Traditional_Train_Balanced_Agumented.csv', index=False, float_format='%.2f')
y_train_balanced_df = pd.DataFrame(y_train_balanced, columns=['Activity'])
y_train_balanced_df.to_csv('./UCI HAR Dataset/Data/Traditional/UCI_Traditional_Train_Labels_Balanced.csv', index=False)
Y_Augmented_Train_series = pd.Series(Y_Augmented_Train)
Imbalance_Distribution_DataFrame = pd.DataFrame(Y_Augmented_Train_series.value_counts()).reset_index()
Imbalance_Distribution_DataFrame.columns = ['ACTIVITY', 'HEADCOUNTS']
Balanced_Distribution_DataFrame = pd.DataFrame(y_train_balanced_df['Activity'].value_counts()).reset_index()
Balanced_Distribution_DataFrame.columns = ['ACTIVITY', 'HEADCOUNTS']

Imbalance_Distribution_Display = Imbalance_Distribution_DataFrame.style.set_table_styles(
    [{'selector': 'th, td', 'props': [('border', '1px solid black')]}]
).format(precision=2).set_caption("IMBALANCE DISTRIBUTION")

Balanced_Distribution_Display = Balanced_Distribution_DataFrame.style.set_table_styles(
    [{'selector': 'th, td', 'props': [('border', '1px solid black')]}]
).format(precision=2).set_caption("BALANCED DISTRIBUTION")
display(Imbalance_Distribution_Display)
display(Balanced_Distribution_Display)

# Plotting the graph to compare balanced and imbalanced data
fig, axs = plt.subplots(1, 2, figsize=(16, 8))
sns.countplot(x=Y_Augmented_Train_series, ax=axs[0])
axs[0].set_title('IMBALANCED DISTRIBUTION', fontsize=16)
axs[0].set_xlabel('ACTIVITIES', fontsize=14)
axs[0].set_ylabel('HEADCOUNTS', fontsize=14)
axs[0].tick_params(axis='x', rotation=45, labelsize=12)

# Plot for Balanced Distribution
sns.countplot(x=y_train_balanced_df['Activity'], ax=axs[1])
axs[1].set_title('BALANCED DISTRIBUTION', fontsize=16)
axs[1].set_xlabel('ACTIVITIES', fontsize=14)
axs[1].set_ylabel('HEADCOUNTS', fontsize=14)
axs[1].tick_params(axis='x', rotation=45, labelsize=12)

plt.tight_layout()
plt.show()

STEP. 05. TRAINING THE MODELS

FOR THIS PROJECT I AM SELECTING FOUR BEST SUITED MODELS FOR DATA AUGMNETATION TECHNIQUES STUDY.

01. RANDOM FOREST
02. SUPPORT VECTOR MACHINE (SVM)
03. CONVOLUTIONAL NEURAL NETWORK (CNN)
04. LONG SHORT-TERM MEMORY (LSTM)

In [15]:
def trainAndtestdatasheet(modelsName, UCI_X_Train, UCI_X_Test):
    if modelsName == 'LSTM':
        UCI_X_Train = np.expand_dims(X_train_balanced, axis=1)
        UCI_X_Test  = np.expand_dims(X_Augmented_test, axis=1)
    else:
        UCI_X_Train = np.expand_dims(X_train_balanced, axis=-1)
        UCI_X_Test  = np.expand_dims(X_Augmented_test, axis=-1)

    return UCI_X_Train, UCI_X_Test


In [16]:
#Consolidating all results in single csv(accuracy,F1 Score)
def getAccuracyAndF1Score(Traditional,ModelName,Accuracy,F1Score):
    if not os.path.exists('./UCI HAR Dataset/results'):
        os.makedirs('./UCI HAR Dataset/results')

    result = {
        "Traditional Class":Traditional,
        "Model": ModelName,
        "Accuracy": Accuracy,
        "F1 Score": F1Score
    }
    file_path = './UCI HAR Dataset/results/results.csv'
    df_result = pd.DataFrame([result])
    if os.path.exists(file_path):
        df_result.to_csv(file_path, mode='a', header=False, index=False)
    else:
        df_result.to_csv(file_path, mode='w', header=True, index=False)
        print(f"File {file_path} created and results logged.")

In [20]:
def TrainingModelAndGeneratingReport(TrainingModelName,ModelsName):
    ModelAccuracy = 0.0
    ModelClassificationDataFrame = pd.DataFrame
    UCI_X_Train, UCI_X_Test = trainAndtestdatasheet(ModelsName,X_train_balanced, X_Augmented_test)
    if (ModelsName =='LSTM' or ModelsName =='CNN'):  
        TrainingModelName.fit(UCI_X_Train, y_train_balanced, epochs=10,batch_size=32,validation_data=(UCI_X_Test, Y_Augmented_test))
        UCI_Evaluation = TrainingModelName.evaluate(UCI_X_Test, Y_Augmented_test)
        ModelPrediction = np.argmax(TrainingModelName.predict(UCI_X_Test), axis=-1)
        ModelClassificationReport = classification_report(Y_Augmented_test, ModelPrediction, output_dict=True)
        ModelClassificationDataFrame = pd.DataFrame(ModelClassificationReport).transpose()
        ModelClassificationDataFrame.loc['accuracy'] = [UCI_Evaluation[1] * 100, '-', '-', '-']
        ModelAccuracy = accuracy_score(Y_Augmented_test, ModelPrediction) * 100
        ModelF1Score = f1_score(Y_Augmented_test, ModelPrediction, average='weighted')
    else:
        TrainingModelName.fit(X_train_balanced, y_train_balanced)
        ModelPrediction = TrainingModelName.predict(X_Augmented_test)
        ModelAccuracy = accuracy_score(Y_Augmented_test, ModelPrediction) * 100
        ModelF1Score = f1_score(Y_Augmented_test, ModelPrediction, average='weighted')
        ModelClassificationReport = classification_report(Y_Augmented_test, ModelPrediction, output_dict=True)
        ModelClassificationDataFrame = pd.DataFrame(ModelClassificationReport).transpose()
        ModelClassificationDataFrame.loc['accuracy'] = [ModelAccuracy, '-', '-', '-']
    getAccuracyAndF1Score('Traditional Class',ModelsName,ModelAccuracy,ModelF1Score)
    ModelClassificationDataFrame = ModelClassificationDataFrame.round(2)
    ModelClassificationDataFrame.columns = [col.upper() for col in ModelClassificationDataFrame.columns]
    ModelClassificationDataFrame.index = [idx.upper() for idx in ModelClassificationDataFrame.index]

    #Displaying report
    Classification_Report_Display = ModelClassificationDataFrame.style.set_table_styles(
    [{'selector': 'th, td', 'props': [('border', '1px solid black')]}]
    ).format(precision=2)
    display(ModelsName+': Classification Report.')
    display(Classification_Report_Display)

    #Putting confusion matrix
    ModelConfusionmatrix= confusion_matrix(Y_Augmented_test, ModelPrediction)
    plt.figure(figsize=(10, 8))
    sns.heatmap(ModelConfusionmatrix, annot=True, fmt='d', cmap='Blues', 
                xticklabels=np.unique(y_train_balanced), yticklabels=np.unique(y_train_balanced))
    plt.title(ModelsName + ' UCI : Confusion Matrix')
    plt.xlabel('PREDICTED LABEL')
    plt.ylabel('TRUE LABEL')
    plt.show()

In [21]:
def callModels(ModelsName):
    print("\033[1;34m.....................................................!\033[0m")
    print('Running for Traditional '+ModelsName+'.....................>>>>>')
    print("\033[1;34m.....................................................!\033[0m")
    UCI_X_Train, UCI_X_Test = trainAndtestdatasheet(ModelsName,X_train_balanced, X_Augmented_test)
    match ModelsName.upper():
        case 'RANDOM FOREST':
            TrainingModelName = RandomForestClassifier(n_estimators=100, random_state=42)
            TrainingModelAndGeneratingReport(TrainingModelName,ModelsName)
            
        case 'SVM':
            TrainingModelName = SVC(kernel='linear', random_state=42)
            TrainingModelAndGeneratingReport(TrainingModelName,ModelsName)
        case 'LSTM':
            TrainingModelName = models.Sequential([
               layers.LSTM(64, input_shape=(UCI_X_Train.shape[1], UCI_X_Train.shape[2])),
               layers.Dense(128, activation='relu'),
               layers.Dense(len(np.unique(y_train_balanced)), activation='softmax')
            ])

            TrainingModelName.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
            TrainingModelAndGeneratingReport(TrainingModelName,ModelsName)

        case 'CNN': 
            TrainingModelName = models.Sequential([
            layers.Conv1D(32, kernel_size=3, activation='relu', input_shape=(UCI_X_Train.shape[1], 1)),
            layers.MaxPooling1D(pool_size=2),
            layers.Conv1D(64, kernel_size=3, activation='relu'),
            layers.MaxPooling1D(pool_size=2),
            layers.Flatten(),
            layers.Dense(128, activation='relu'),
            layers.Dense(len(np.unique(y_train_balanced)), activation='softmax')  # Assuming y is numeric with unique values
            ])
            # Compile with a smaller learning rate and Adam optimizer
            TrainingModelName.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
            TrainingModelAndGeneratingReport(TrainingModelName,ModelsName)
            
    

In [None]:
print("\033[1;32m.........................TRAINING TRADITIONAL............................!\033[0m")
callModels('RANDOM FOREST')
callModels('SVM')
callModels('LSTM')
callModels('CNN')