In [2]:
import numpy as np
import pandas as pd
import os
from scipy.signal import butter, filtfilt
import pywt
import matplotlib.pyplot as plt
from tensorflow.keras.models import load_model

In [4]:



# Function to apply wavelet denoising
def wavelet_denoising(data, wavelet='db4', level=1):
    denoised_data = pd.DataFrame()
    for column in data.columns:
        coeffs = pywt.wavedec(data[column], wavelet, level=level)
        sigma = np.median(np.abs(coeffs[-level])) / 0.6745
        uthresh = sigma * np.sqrt(2 * np.log(len(data)))
        coeffs[1:] = (pywt.threshold(i, value=uthresh, mode='soft') for i in coeffs[1:])
        denoised_signal = pywt.waverec(coeffs, wavelet)
        denoised_data[column] = denoised_signal[:len(data)]
    return denoised_data

# Function to apply low-pass and high-pass filters
def apply_filters(data, fs=1000):
    nyquist = 0.5 * fs

    # EMG (VM) filter parameters
    emg_low_cutoff = 0.5 / nyquist
    emg_high_cutoff = 50 / nyquist
    b_emg, a_emg = butter(1, [emg_low_cutoff, emg_high_cutoff], btype='band')

    # Angle (FM) filter parameters
    angle_low_cutoff = 0.1 / nyquist
    angle_high_cutoff = 20 / nyquist
    b_angle, a_angle = butter(1, [angle_low_cutoff, angle_high_cutoff], btype='band')

    data['VM'] = filtfilt(b_emg, a_emg, data['VM'])
    data['FM'] = filtfilt(b_angle, a_angle, data['FM'])
    return data


# Custom F1 Score Metric
class F1Score(Metric):
    def __init__(self, name='f1_score', **kwargs):
        super(F1Score, self).__init__(name=name, **kwargs)
        self.true_positives = self.add_weight(name='tp', initializer='zeros')
        self.false_positives = self.add_weight(name='fp', initializer='zeros')
        self.false_negatives = self.add_weight(name='fn', initializer='zeros')

    def update_state(self, y_true, y_pred, sample_weight=None):
        y_true = tf.cast(y_true, tf.float32)
        y_pred = tf.cast(tf.math.round(y_pred), tf.float32)
        self.true_positives.assign_add(tf.reduce_sum(y_true * y_pred))
        self.false_positives.assign_add(tf.reduce_sum((1 - y_true) * y_pred))
        self.false_negatives.assign_add(tf.reduce_sum(y_true * (1 - y_pred)))

    def result(self):
        precision = self.true_positives / (self.true_positives + self.false_positives + K.epsilon())
        recall = self.true_positives / (self.true_positives + self.false_negatives + K.epsilon())
        f1 = 2 * precision * recall / (precision + recall + K.epsilon())
        return f1

# Squash Activation Function
def squash(x, axis=-1):
    s_squared_norm = K.sum(K.square(x), axis, keepdims=True) + K.epsilon()
    scale = K.sqrt(s_squared_norm) / (0.5 + s_squared_norm)
    return scale * x

# Softmax Function
def softmax(x, axis=-1):
    ex = K.exp(x - K.max(x, axis=axis, keepdims=True))
    return ex / K.sum(ex, axis=axis, keepdims=True)

# Capsule Layer
class Capsule(Layer):
    def __init__(self, num_capsule, dim_capsule, routings=3, share_weights=True, activation='squash', **kwargs):
        super(Capsule, self).__init__(**kwargs)
        self.num_capsule = num_capsule
        self.dim_capsule = dim_capsule
        self.routings = routings
        self.share_weights = share_weights
        if activation == 'squash':
            self.activation = squash
        else:
            self.activation = layers.Activation(activation)

    def build(self, input_shape):
        super(Capsule, self).build(input_shape)
        input_dim_capsule = input_shape[-1]
        if self.share_weights:
            self.W = self.add_weight(name='capsule_kernel',
                                     shape=(1, input_dim_capsule, self.num_capsule * self.dim_capsule),
                                     initializer='glorot_uniform',
                                     trainable=True)
        else:
            input_num_capsule = input_shape[-2]
            self.W = self.add_weight(name='capsule_kernel',
                                     shape=(input_num_capsule, input_dim_capsule, self.num_capsule * self.dim_capsule),
                                     initializer='glorot_uniform',
                                     trainable=True)

    def call(self, u_vecs):
        if self.share_weights:
            u_hat_vecs = K.conv1d(u_vecs, self.W)
        else:
            u_hat_vecs = K.local_conv1d(u_vecs, self.W, [1], [1])

        batch_size = K.shape(u_vecs)[0]
        input_num_capsule = K.shape(u_vecs)[1]
        u_hat_vecs = K.reshape(u_hat_vecs, (batch_size, input_num_capsule, self.num_capsule, self.dim_capsule))
        u_hat_vecs = K.permute_dimensions(u_hat_vecs, (0, 2, 1, 3))
        b = K.zeros_like(u_hat_vecs[:,:,:,0])
        for i in range(self.routings):
            c = softmax(b, 1)
            o = tf.einsum('bin,binj->bij', c, u_hat_vecs)
            if K.backend() == 'theano':
                o = K.sum(o, axis=1)
            if i < self.routings - 1:
                o = K.l2_normalize(o, -1)
                b = tf.einsum('bij,binj->bin', o, u_hat_vecs)
                if K.backend() == 'theano':
                    b = K.sum(b, axis=1)
        return self.activation(o)

    def compute_output_shape(self, input_shape):
        return (None, self.num_capsule, self.dim_capsule)

# Attention Mechanism
def attention_block(inputs, filters):
    attention = layers.Conv2D(filters, (1, 1), activation='relu', padding='same')(inputs)
    attention = layers.Conv2D(filters, (1, 1), activation='sigmoid', padding='same')(attention)
    return layers.multiply([inputs, attention])

# Residual Block
def residual_block(inputs, filters):
    x = layers.Conv2D(filters, (3, 3), padding='same')(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    x = layers.Conv2D(filters, (3, 3), padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.add([x, inputs])
    x = layers.ReLU()(x)
    return x

def preprocess_data(data):
    # Apply wavelet denoising
    data = wavelet_denoising(data)
    
    # Apply filters
    data = apply_filters(data)
    
    return data

def convert_to_image(data):
    # Plot VM and FM signals
    plt.figure(figsize=(6, 4))

    # Plot VM signal
    plt.subplot(2, 1, 1)
    plt.plot(data['VM'], color='blue')
    plt.grid(True)
    plt.ylim(-0.02, 0.02)  # Adjust these limits based on the range of your EMG data

    # Plot FM signal
    plt.subplot(2, 1, 2)
    plt.plot(data['FM'], color='red')
    plt.grid(True)
    plt.ylim(-70, 70)  # Adjust these limits based on the range of your angle data

    # Adjust layout to prevent overlap
    plt.tight_layout()

    # Resize the figure to 200x200 pixels
    plt.gcf().set_size_inches(2, 2)  # 2 inches by 2 inches for 200x200 pixels at 100 DPI

    # Save the plot as an image
    image_path = 'temp_image.png'
    plt.savefig(image_path, dpi=100)  # DPI of 100 for 200x200 pixels
    plt.close()  # Close the figure to free up memory
    
    return image_path

def predict_class(data):
    # Preprocess the data
    preprocessed_data = preprocess_data(data)
    
    # Convert the preprocessed data into an image
    image_path = convert_to_image(preprocessed_data)
    
    # Load the trained model
    model = load_model('model.h5', custom_objects={'F1Score': F1Score, 'Capsule': Capsule})

    # Load and preprocess the image for prediction
    img = tf.keras.preprocessing.image.load_img(image_path, target_size=(200, 200))
    img_array = tf.keras.preprocessing.image.img_to_array(img)
    img_array = np.expand_dims(img_array, axis=0)
    
    # Predict the class
    predictions = model.predict(img_array)
    class_index = np.argmax(predictions)
    classes = {0: 'sitting', 1: 'standing', 2: 'gait'}
    predicted_class = classes[class_index]
    
    return predicted_class


In [26]:
import pandas as pd

# Read the CSV file with correct header names
df = pd.read_csv(r"D:\jhansi_emg\internship_2024_may\sitting_test.txt", header=None, names=['VM', 'FM'],sep='  ')

# Display the first few rows of the DataFrame
print(df.shape)
df.head()


(5849, 2)


  after removing the cwd from sys.path.


Unnamed: 0,VM,FM
0,-0.0863,42.4
1,-0.0968,42.2
2,-0.093,41.8
3,-0.0728,41.6
4,-0.0248,41.3


In [29]:
df[:800].shape

(800, 2)

In [None]:
# Call the prediction function
predicted_class = predict_class(df[:800].shape)

print("Predicted class:", predicted_class)

## temp

In [None]:
from prediction_utils import predict_class

# Example input data
input_data = pd.DataFrame({
    'VM': [0.1, 0.2, 0.3, ..., 0.9, 1.0],
    'FM': [-10, -8, -6, ..., 6, 8]  # Example angle values
})

# Call the prediction function
predicted_class = predict_class(input_data)

print("Predicted class:", predicted_class)
