In [None]:
import pickle as pkl
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, LSTM, Dense
from sklearn.model_selection import LeaveOneGroupOut

In [None]:
with open('../data/FIC.pkl', 'rb') as f:
    dataset = pkl.load(f)

In [None]:
subject_ids = dataset['subject_id']
signals_proc = dataset['signals_proc']
bite_gt = dataset['bite_gt']

Model creation

In [None]:
def create_model(input_shape):
    model = Sequential()
    model.add(Conv1D(filters=32, kernel_size=5, activation='relu', input_shape=input_shape))
    model.add(MaxPooling1D(pool_size=2))
    model.add(Conv1D(filters=64, kernel_size=3, activation='relu'))
    model.add(MaxPooling1D(pool_size=2))
    model.add(Conv1D(filters=128, kernel_size=3, activation='relu'))
    # model.add(MaxPooling1D(pool_size=2))
    model.add(LSTM(128, activation='hard_sigmoid'))
    model.add(Dense(1, activation='sigmoid'))
    return model

In [None]:
model = create_model((None, 6))

In [None]:
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])

In [None]:
model.summary()

Sliding window and Labeling

In [None]:
import numpy as np

def create_sliding_windows(signals, bite_gt, window_length, step, epsilon):
    X, y = [], []
    for signal, bites in zip(signals, bite_gt):
        for i in range(0, len(signal) - window_length, step):
            window = signal[i:i+window_length, :]
            window_end_time = window[-1, 0]  # Assuming first column is timestamp

            # Check if window end is within epsilon of any bite end time
            label = 0  # Negative case
            for bite in bites:
                if abs(window_end_time - bite[1]) <= epsilon:  # bite[1] is end time of bite
                    label = 1  # Positive case
                    break

            X.append(window[:, 1:])  # Excluding timestamp
            y.append(label)

    return np.array(X), np.array(y)


Data augmentation

In [None]:
def rotate_signal(signal, theta_x_hat, theta_z_hat):
    Q_x = np.array([[1, 0, 0],
                    [0, np.cos(theta_x_hat), -np.sin(theta_x_hat)],
                    [0, np.sin(theta_x_hat), np.cos(theta_x_hat)]])

    Q_z = np.array([[np.cos(theta_z_hat), -np.sin(theta_z_hat), 0],
                    [np.sin(theta_z_hat), np.cos(theta_z_hat), 0],
                    [0, 0, 1]])

    # Randomly select one of the four transformations
    transformations = [Q_x, Q_z, np.dot(Q_x, Q_z), np.dot(Q_z, Q_x)]
    Q = transformations[np.random.randint(0, 4)]

    # Apply transformation
    transformed_signal = np.dot(signal, Q.T)
    return transformed_signal

def augment_data(X_batch):
    augmented_batch = []
    for X in X_batch:
        if np.random.rand() < 0.5:
            theta_x_hat = np.random.normal(0, 10, 1)
            theta_z_hat = np.random.normal(0, 10, 1)
            augmented_signal = rotate_signal(X[:, :3], theta_x_hat, theta_z_hat)  # Apply to accelerometer data
            augmented_signal = np.concatenate((augmented_signal, X[:, 3:]), axis=1)  # Re-attach gyroscope data
        else:
            augmented_signal = X
        augmented_batch.append(augmented_signal)
    return np.array(augmented_batch)


In [None]:
tf.config.list_physical_devices('GPU')

In [None]:
signals_proc[1]

Training setup

In [None]:
from tensorflow.keras.optimizers import RMSprop

# Assuming create_model function is already defined
model = create_model((None, 6))  # Replace 'None' with the actual timestep size if fixed
model.compile(loss='binary_crossentropy', optimizer=RMSprop(learning_rate=1e-3), metrics=['accuracy'])

# Training parameters
epochs = 5
batch_size = 128
window_length = 128  # Define based on your data
step = 64  # Define based on your data
epsilon = 0.5  # Define based on your data

# Prepare your data
signals_proc = np.array(signals_proc)  # Convert list to numpy array
X, y = create_sliding_windows(signals_proc, bite_gt, window_length, step, epsilon)

# Training loop
for epoch in range(epochs):
    # Shuffle and batch data
    indices = np.arange(len(X))
    np.random.shuffle(indices)
    X_shuffled = X[indices]
    y_shuffled = y[indices]

    for i in range(0, len(X_shuffled), batch_size):
        X_batch = X_shuffled[i:i+batch_size]
        y_batch = y_shuffled[i:i+batch_size]

        # Augment data
        X_batch_augmented = augment_data(X_batch)

        # Dropout is applied internally in the model
        loss, accuracy = model.train_on_batch(X_batch_augmented, y_batch)
        print(f"Epoch {epoch+1}, Batch {i//batch_size+1}, Loss: {loss}, Accuracy: {accuracy}")


In [None]:
logo = LeaveOneGroupOut()
results = []

In [None]:
for train_index, test_index in logo.split(signals_proc, bite_gt, subject_ids):
    X_train, X_test = signals_proc[train_index], signals_proc[test_index]
    y_train, y_test = bite_gt[train_index], bite_gt[test_index]

    # Define and compile the model with RMSProp optimizer
    model = create_model(input_shape=X_train.shape[1:])
    model.compile(optimizer=RMSprop(learning_rate=1e-3), loss='binary_crossentropy', metrics=['accuracy'])

    # Train the model with a batch size of 128 and 5 epochs
    model.fit(X_train, y_train, batch_size=128, epochs=5)

    # Evaluate the model
    scores = model.evaluate(X_test, y_test)
    results.append(scores)