# Data readin and pre-processing

In [None]:
import pandas as pd
import os
import numpy as np
from scipy import signal

def process_csv(file_path, freq1=20, freq2=450, fs=1000, process_method='bp'):
    """
    read the csv file and process the data
    :param file_path: the path of the csv file
    :param freq1: the lower bound of the bandpass filter
    :param freq2: the upper bound of the bandpass filter
    :param fs: the sampling rate
    :return: the processed data
    """

    df = pd.read_csv(file_path)
    
    data1 = df.iloc[:, 1]
    data2 = df.iloc[:, 2]
    if process_method == 'raw_data':
        processed_df = pd.DataFrame({
            'EMG1': data1,
            'EMG2': data2,
            'class': df.iloc[:, 3]
        })
    elif process_method == 'bp':
        # filter the data with a bandpass filter
        b, a = signal.butter(4, [freq1, freq2], 'bandpass', fs=fs)
        data1 = signal.filtfilt(b, a, data1)
        data2 = signal.filtfilt(b, a, data2)
        processed_df = pd.DataFrame({
        'EMG1': data1,
        'EMG2': data2,
        'class': df.iloc[:, 3]
    })
    elif process_method == 'sma':
        # SMA
        SMA1 = np.zeros(len(data1) - 50)
        SMA2 = np.zeros(len(data2) - 50)
        for i in range(len(data1) - 50):
            SMA1[i] = np.mean(data1[i:i + 51])
            SMA2[i] = np.mean(data2[i:i + 51])

        processed_df = pd.DataFrame({
            'EMG1': data1[50:],
            'EMG2': data2[50:],
            'SMA1': SMA1,
            'SMA2': SMA2,
            'class': df.iloc[50:, 3]
        })
    elif process_method == 'bp+rms':
        # filter the data with a bandpass filter
        b, a = signal.butter(4, [freq1, freq2], 'bandpass', fs=fs)
        data1_bp = signal.filtfilt(b, a, data1)
        data2_bp = signal.filtfilt(b, a, data2)

        # calculate the RMS
        RMS1 = np.zeros(len(data1_bp)-50)
        RMS2 = np.zeros(len(data2_bp)-50)
        
        for i in range(len(data1_bp)-50):
            RMS1[i] = np.sqrt(np.mean(data1_bp[i:i+51]**2))
            RMS2[i] = np.sqrt(np.mean(data2_bp[i:i+51]**2))
        
        processed_df = pd.DataFrame({
            'EMG1': RMS1,
            'EMG2': RMS2,
            'class': df.iloc[50:, 3]
        })
    elif process_method == 'bp+sma':
        # filter the data with a bandpass filter
        b, a = signal.butter(4, [freq1, freq2], 'bandpass', fs=fs)
        data1_bp = signal.filtfilt(b, a, data1)
        data2_bp = signal.filtfilt(b, a, data2)
        
        # SMA
        SMA1 = np.zeros(len(data1_bp) - 50)
        SMA2 = np.zeros(len(data2_bp) - 50)
        for i in range(len(data1_bp) - 50):
            SMA1[i] = np.mean(data1_bp[i:i + 51])
            SMA2[i] = np.mean(data2_bp[i:i + 51])
        processed_df = pd.DataFrame({
            'SMA1': SMA1,
            'SMA2': SMA2,
            'class': df.iloc[50:, 3]
        })
    elif process_method == 'bp+rms+sma':
        # filter the data with a bandpass filter
        b, a = signal.butter(4, [freq1, freq2], 'bandpass', fs=fs)
        data1_bp = signal.filtfilt(b, a, data1)
        data2_bp = signal.filtfilt(b, a, data2)

        # calculate the RMS
        RMS1 = np.zeros(len(data1_bp) - 50)
        RMS2 = np.zeros(len(data2_bp) - 50)

        for i in range(len(data1_bp) - 50):
            RMS1[i] = np.sqrt(np.mean(data1_bp[i:i + 51] ** 2))
            RMS2[i] = np.sqrt(np.mean(data2_bp[i:i + 51] ** 2))

        # SMA
        SMA1 = np.zeros(len(RMS1) - 50)
        SMA2 = np.zeros(len(RMS2) - 50)
        for i in range(len(RMS1) - 50):
            SMA1[i] = np.mean(RMS1[i:i + 51])
            SMA2[i] = np.mean(RMS2[i:i + 51])

        processed_df = pd.DataFrame({
            'SMA1': SMA1,
            'SMA2': SMA2,
            'class': df.iloc[100:, 3]
        })
    
    return processed_df

# Sliding windows

In [None]:
def sliding_window(data, window_size, stride):
    """
    对数据应用滑动窗口处理。
    
    参数:
    data (pd.DataFrame): 输入的数据。
    window_size (int): 窗口大小。
    stride (int): 滑动步幅。
    
    返回:
    np.ndarray: 滑动窗口处理后的数据。
    np.ndarray: 滑动窗口对应的标签。
    """
    windowed_data = []
    labels = []

    for i in range(0, len(data) - window_size + 1, stride):
        windowed = data.iloc[i:i + window_size, :-1].values  # Exclude label column and convert to NumPy array
        label_window = data.iloc[i:i + window_size, -1]  # Get all labels in the window
        labels.append(label_window.value_counts().idxmax())
        windowed_data.append(windowed)
        # if len(label_window.unique()) == 1:  # Check if all labels in the window are the same
        #     windowed_data.append(windowed)
        #     labels.append(label_window.iloc[0])  # Use the first label as the label for the window

    return np.array(windowed_data), np.array(labels)

In [None]:
# 主程序
folder_path = "./0527data/"
csv_files = [file for file in os.listdir(folder_path) if file.endswith('.csv')]

freq1 = 100
freq2 = 400
window_size = 3000
stride = 200
method = 'raw_data'


# 初始化滑动窗口处理后的数据和标签列表
windowed_data_all = []
labels_all = []

# 逐个文件处理数据并应用滑动窗口
for file in csv_files:
    file_path = os.path.join(folder_path, file)
    processed_df = process_csv(file_path, freq1, freq2, process_method=method)
    # windowed_data, labels = sliding_window(processed_df.drop('time', axis=1), window_size, stride) 
    windowed_data, labels = sliding_window(processed_df, window_size, stride)
    windowed_data_all.append(windowed_data)
    labels_all.append(labels)

# 将所有滑动窗口处理后的数据和标签合并
windowed_data_all = np.concatenate(windowed_data_all, axis=0)
labels_all = np.concatenate(labels_all, axis=0)
labels_all = labels_all - 1

# 示例: 显示滑动窗口处理后的数据和标签的形状
print("滑动窗口处理后的数据形状:", windowed_data_all.shape)
print("滑动窗口处理后的标签形状:", labels_all.shape)
print(windowed_data_all[0:1])
print(labels_all[0:5])

# Model: CNN

In [None]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(windowed_data_all, labels_all
                                                    , test_size=0.20, random_state=42)
print(X_train.shape)
print(X_train[0])

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Input

def CNN_model(input_shape, num_classes):
    model = Sequential()

    model.add(Input(shape=input_shape))

    # Convolutional layers
    model.add(Conv1D(32, kernel_size=3, activation='relu', input_shape=input_shape))
    model.add(MaxPooling1D(pool_size=2))

    model.add(Conv1D(64, kernel_size=3, activation='relu'))
    model.add(MaxPooling1D(pool_size=2))

    # Flattening layers
    model.add(Flatten())

    # Full connected layers
    model.add(Dense(128, activation='relu'))
    model.add(Dense(num_classes, activation='softmax'))

    # compile the model
    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    model.summary()
    return model

In [None]:
# define the CNN model
input_shape = X_train.shape[1:]  # set input size
num_classes = len(np.unique(y_train))   # set class size

model = CNN_model(input_shape, num_classes)

In [None]:
history = model.fit(X_train, y_train, epochs=10, validation_data=(X_test, y_test))

In [None]:
from sklearn.metrics import accuracy_score, f1_score

# make predictions on training data
y_pred_train = model.predict(X_train)
y_pred_train = np.argmax(y_pred_train, axis=1)  # Convert from one-hot encoding to tags

# make predictions on test data
y_pred_test = model.predict(X_test)
y_pred_test = np.argmax(y_pred_test, axis=1)  # Convert from one-hot encoding to tags

# calculate accuracy
accuracy_training = accuracy_score(y_train, y_pred_train)
accuracy_test = accuracy_score(y_test, y_pred_test)

# calculate F1 score
f1_training = f1_score(y_train, y_pred_train, average='weighted')
f1_test = f1_score(y_test, y_pred_test, average='weighted')

print("Training Accuracy:", accuracy_training)
print("Test Accuracy:", accuracy_test)
print("Training F1 Score:", f1_training)
print("Test F1 Score:", f1_test)

#write in an csv to save the result with frequency1, frequency2, window_size, stride, processing method
#dont overwrite the file
#we need to write in a new row

import csv

with open('./models/result.csv', mode='a') as file:
    writer = csv.writer(file)
    writer.writerow([freq1, freq2, window_size, stride, method, accuracy_training, accuracy_test, f1_training, f1_test])

In [None]:
import matplotlib.pyplot as plt
# training and testing loss graph
plt.plot(history.history['loss'], label='Training loss')
plt.plot(history.history['val_loss'], label='Test Loss')
plt.legend()
plt.show()

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns

cm = confusion_matrix(y_test, y_pred_test)
plt.figure(figsize=(6, 6))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=np.unique(y_test), yticklabels=np.unique(y_test))
plt.title('Confusion Matrix SW')
plt.xlabel('Predicted Labels SW')
plt.ylabel('Real Labels SW')
plt.show()

In [None]:
from sklearn.metrics import classification_report

# accuracy and F1 score of classes
rapor = classification_report(y_test, y_pred_test, target_names=[str(i) for i in np.unique(y_test)])
print(rapor)

# Save models

In [None]:
# 輸出 SavedModel 的格式來輸出模型
model_name = 'EMG_classification_newdata_'+ str(method) + '_' +str(freq1)+'_'+str(freq2)+'_'+str(window_size)+'.keras'
model.save('./models/' + model_name)

# Predict hidden

In [None]:
import tensorflow as tf
new_model = tf.keras.models.load_model('./models/' + model_name)

# Show the model architecture
new_model.summary()

In [None]:
folder_path = "./0520hidden/"
csv_files = [file for file in os.listdir(folder_path) if file.endswith('.csv')]


# 初始化滑动窗口处理后的数据和标签列表
data_hidden = []
labels_hidden = []

# 逐个文件处理数据并应用滑动窗口
for file in csv_files:
    file_path = os.path.join(folder_path, file)
    processed_df = process_csv(file_path, freq1, freq2, process_method=method)
    windowed_data, labels = sliding_window(processed_df, window_size, stride)
    data_hidden.append(windowed_data)
    labels_hidden.append(labels)

# 将所有滑动窗口处理后的数据和标签合并
data_hidden = np.concatenate(data_hidden, axis=0)
labels_hidden = np.concatenate(labels_hidden, axis=0)

print(data_hidden.shape)

In [None]:
label_pred = new_model.predict(data_hidden)
label_pred = np.argmax(label_pred, axis=1)  # Convert from one-hot encoding to tags


# calculate accuracy
hidden_accuracy = accuracy_score(labels_hidden, label_pred)

# calculate F1 score
hidden_1 = f1_score(labels_hidden, label_pred, average='weighted')
print("Training Accuracy:", hidden_accuracy)
print("Training F1 Score:", hidden_1)