In [27]:
# !pip install tsgm

In [28]:
import tensorflow as tf
import os
import glob
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers, models
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.layers import Input, Conv1D, MaxPooling1D, LSTM, LayerNormalization, Dense, Attention, MultiHeadAttention, Lambda
from tensorflow.keras.models import Model, Sequential
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from tensorflow.keras import backend as K
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
from sklearn.metrics import ConfusionMatrixDisplay
import matplotlib.pyplot as plt
import seaborn as sns
# import tsgm

In [29]:
# !gdown --fuzzy https://drive.google.com/file/d/1F9uinZY-eG4x9dNsUOOtAAZMYq6p945U/view?usp=drive_link
# !unzip -qq "ASL-Sensor-Dataglove-Dataset.zip" -d glove_data
# !echo "Unzip successfully"

In [30]:
class TimeSeriesDataset:
    def __init__(self, root_dir, feature_names=[]):
        self.data = self.load_data(root_dir, feature_names)
        self.merge_flex_sensors()

    def load_data(self, root_dir, feature_names):
        data = []

        for individual_dir in sorted(os.listdir(root_dir)):
            individual_path = os.path.join(root_dir, individual_dir)
            for class_dir in sorted(os.listdir(individual_path)):
                class_path = os.path.join(individual_path, class_dir)
                if os.path.isdir(class_path):
                    for file in glob.glob(os.path.join(class_path, "*.csv")):
                        df = pd.read_csv(file, usecols=feature_names)
                        class_name = os.path.splitext(os.path.basename(file))[0]
                        df["class"] = class_name
                        data.append(df)


        # Concatenate all data frames into a single data frame
        data = pd.concat(data, ignore_index=True)
        return data
    
    def merge_flex_sensors(self):
        if 'flex_4' in self.data.columns and 'flex_5' in self.data.columns:
            # Define a small epsilon value to avoid division by zero
            epsilon = 1e-10

            # Convert flex sensor values to conductance (1/R), handling zero values
            conductance_4 = 1 / (self.data['flex_4'] + epsilon)
            conductance_5 = 1 / (self.data['flex_5'] + epsilon)
            
            # Sum the conductances
            total_conductance = conductance_4 + conductance_5
            
            # Convert back to resistance, handling very large values
            self.data['flex_4'] = np.where(
                total_conductance > epsilon,
                1 / total_conductance,
                np.finfo(float).max  # Use maximum float value for near-zero conductance
            )
            
            # Drop original columns
            self.data = self.data.drop(columns=['flex_5'])

In [31]:
root_dir = "glove_data/"
feature_names = [
    "flex_1", "flex_2", "flex_3", "flex_4",
    "GYRx", "GYRy", "GYRz"
]

dataset = TimeSeriesDataset(root_dir, feature_names).data
# dataset = dataset.sort_values(by=["class"])

# filter_classes = ["deaf", "fine", "good", "goodbye", "hello"]
# dataset = dataset[dataset["class"].isin(filter_classes)]

x_data, y_data = dataset.iloc[:, :-1].values, dataset.iloc[:, -1].values

scaler = StandardScaler()
x_data = scaler.fit_transform(x_data)

label_encoder = LabelEncoder()
y_data = label_encoder.fit_transform(y_data)

timesteps = 150
n_features = 7
num_classes = len(np.unique(y_data))

num_samples = len(y_data) // timesteps

x_data = x_data[:num_samples * timesteps].reshape((num_samples, timesteps, n_features))
y_data = y_data[:num_samples * timesteps:timesteps]


In [32]:
# aug_model = tsgm.models.augmentations.GaussianNoise()
# x_data_aug = aug_model.generate(x_data, n_samples=x_data.shape[0], variance=0.2)

# x_data = np.concatenate((x_data, x_data_aug), axis=0)
# y_data = np.concatenate((y_data, y_data), axis=0)


In [33]:
print(x_data.shape)
print(y_data.shape)

(10000, 150, 7)
(10000,)


In [34]:
def positional_encoding(length, depth):
    depth = int(depth)
    positions = np.arange(length)[:, np.newaxis]     # (seq, 1)
    depths = np.arange(depth)[np.newaxis, :]/depth   # (1, depth)

    angle_rates = 1 / (10000**depths)                # (1, depth)
    angle_rads = positions * angle_rates             # (pos, depth)

    pos_encoding = np.concatenate(
        [np.sin(angle_rads), np.cos(angle_rads)],
        axis=-1)

    return tf.cast(pos_encoding, dtype=tf.float32)

class AddPositionalEncoding(tf.keras.layers.Layer):
    def __init__(self):
        super().__init__()

    def build(self, input_shape):
        _, seq_len, d_model = input_shape
        self.pos_encoding = positional_encoding(seq_len, d_model)

    def call(self, inputs):
        # Ensure positional encoding has the same shape as the input
        return inputs + self.pos_encoding[:tf.shape(inputs)[1], :tf.shape(inputs)[2]]

    def compute_output_shape(self, input_shape):
        return input_shape

    def get_config(self):
        config = super().get_config()
        return config

def create_model(timesteps, n_features, num_classes):
    inputs = Input(shape=(timesteps, n_features))

    x = Conv1D(filters=64, kernel_size=5, activation='relu')(inputs)
    x = MaxPooling1D(pool_size=2)(x)

    x = Conv1D(filters=64, kernel_size=5, activation='relu')(x)
    x = MaxPooling1D(pool_size=2)(x)

    x = Conv1D(filters=64, kernel_size=5, activation='relu')(x)
    x = MaxPooling1D(pool_size=2)(x)

    x = Conv1D(filters=64, kernel_size=5, activation='relu')(x)
    x = MaxPooling1D(pool_size=2)(x)

    x = LSTM(units=128, return_sequences=True)(x)
    x = LSTM(units=128, return_sequences=True)(x)

    x = AddPositionalEncoding()(x)

    # MultiHeadAttention layer
    attn_output = MultiHeadAttention(num_heads=4, key_dim=128)(x, x, x)
    x = LayerNormalization()(attn_output + x)

    x = Dense(units=128, activation='relu')(x)

    # Global Attention layer
    attn = Attention()([x, x])
    x = LayerNormalization()(attn + x)

    # Global average pooling to reduce sequence dimension
    x = tf.keras.layers.GlobalAveragePooling1D()(x)

    outputs = Dense(num_classes, activation='softmax')(x)

    model = Model(inputs=inputs, outputs=outputs)
    return model

In [35]:
model = create_model(timesteps, n_features, num_classes)
model.summary()

# checkpoint = tf.keras.callbacks.ModelCheckpoint('best_model_weights.h5', monitor='val_loss', save_best_only=True, mode='min', verbose=1)
early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3)

model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
model.fit(x_data, y_data, epochs=30, batch_size=64, validation_split = 0.2, callbacks=[early_stopping])

Model: "model_2"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_3 (InputLayer)        [(None, 150, 7)]             0         []                            
                                                                                                  
 conv1d_8 (Conv1D)           (None, 146, 64)              2304      ['input_3[0][0]']             
                                                                                                  
 max_pooling1d_8 (MaxPoolin  (None, 73, 64)               0         ['conv1d_8[0][0]']            
 g1D)                                                                                             
                                                                                                  
 conv1d_9 (Conv1D)           (None, 69, 64)               20544     ['max_pooling1d_8[0][0]'

<keras.src.callbacks.History at 0x1e0a8b80b10>

In [36]:
if not os.path.exists("model"):
    os.makedirs("model")
model.save("models/SADeepConvLSTM_TransferLearning.h5")

  saving_api.save_model(


In [37]:
# y_pred = model.predict(x_test)
# y_pred = np.argmax(y_pred, axis=1)
# accuracy = accuracy_score(y_test, y_pred)
# print("Accuracy:", accuracy)

In [38]:
# cm = confusion_matrix(y_test, y_pred)
# disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=list(label_encoder.classes_))
# disp.plot(cmap=plt.cm.Blues)
# plt.show()

In [39]:
# print(classification_report(y_test, y_pred))