In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F

In [None]:
# Define the convolutional part of the architecture
class ConvNet(nn.Module):
    def __init__(self):
        super(ConvNet, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=6, out_channels=32, kernel_size=5, padding=2)
        self.pool1 = nn.MaxPool1d(kernel_size=2)
        self.conv2 = nn.Conv1d(
            in_channels=32, out_channels=64, kernel_size=3, padding=1
        )
        self.pool2 = nn.MaxPool1d(kernel_size=2)
        self.conv3 = nn.Conv1d(
            in_channels=64, out_channels=128, kernel_size=3, padding=1
        )

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.pool1(x)
        x = F.relu(self.conv2(x))
        x = self.pool2(x)
        x = F.relu(self.conv3(x))
        return x


# Define the recurrent part of the architecture
class RecurrentNet(nn.Module):
    def __init__(self):
        super(RecurrentNet, self).__init__()
        self.lstm = nn.LSTM(input_size=128, hidden_size=128, batch_first=True)
        self.fc = nn.Linear(128, 1)

    def forward(self, x):
        self.lstm.flatten_parameters()
        x, _ = self.lstm(x)
        x = torch.sigmoid(self.fc(x))
        return x


# Combine convolutional and recurrent parts into one model
class EndToEndModel(nn.Module):
    def __init__(self):
        super(EndToEndModel, self).__init__()
        self.conv_net = ConvNet()
        self.recurrent_net = RecurrentNet()

    def forward(self, x):
        x = self.conv_net(x)
        x = x.permute(
            0, 2, 1
        )  # Prepare for LSTM: (batch_size, sequence_length, features)
        x = self.recurrent_net(x)
        return x

In [34]:
# Generate a random input tensor with the same shape as the expected input
random_input = torch.randn(1, 6, 4).to(device)

# Set the model to evaluation mode
model.eval()

# Perform a forward pass with the random input
with torch.no_grad():
    output = model(random_input)

# Print the output and its shape
print("Output:", output)
print("Output shape:", output.shape)

Output: tensor([[[0.4866]]], device='cuda:0')
Output shape: torch.Size([1, 1, 1])


In [3]:
import pickle as pkl
import numpy as np
from torch.utils.data import Dataset, DataLoader

# Load the dataset
with open("dataset/FIC/FIC.pkl", "rb") as fh:
    dataset = pkl.load(fh)

# Access the processed signals and corresponding labels
signals_proc = dataset["signals_proc"]
bite_gt = dataset["bite_gt"]

In [4]:
class FICDataset(Dataset):
    def __init__(self, signals_proc, bite_gt, window_length, window_step, epsilon):
        self.data = []
        self.labels = []
        self.window_length = window_length
        self.window_step = window_step
        self.epsilon = epsilon

        for signal, bites in zip(signals_proc, bite_gt):
            num_samples = signal.shape[0]
            for start in range(0, num_samples - window_length, window_step):
                end = start + window_length
                # Extract window
                window = signal[start:end, 1:]  # Skip timestamps
                self.data.append(window)

                # Label based on proximity to bite events
                label = 0  # Default to negative
                for bite_start, bite_end in bites:
                    if abs(bite_end - signal[end - 1, 0]) <= epsilon:
                        label = 1
                        break
                self.labels.append(label)

        self.data = np.array(self.data)
        self.data = self.data.reshape(-1, 6, self.window_length)
        self.labels = np.array(self.labels)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return torch.tensor(self.data[idx], dtype=torch.float32), torch.tensor(
            self.labels[idx], dtype=torch.float32
        )

In [5]:
# Pre-processing
# 1. Already aligned: processed measurements (in the proc list) have a constant sampling rate of 100 Hz and the accelerometer/gyroscope measurements are aligned with each other.
# 2. Already mirrored: all sensor streams are transformed in such a way that reflects all participants wearing the smartwatch at the same hand with the same orientation, thusly achieving data uniformity.
# 3. Remove the acceleration component due to the Earth's gravitational field.

In [6]:
# Parameters
window_length = 500
window_step = 5
epsilon = 1e-1  # Labeling threshold in seconds
batch_size = 32

# Create dataset
dataset = FICDataset(signals_proc, bite_gt, window_length, window_step, epsilon)

# Split into train and test sets (e.g., 80-20 split)
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(
    dataset, [train_size, test_size]
)

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [7]:
# import matplotlib.pyplot as plt
# import matplotlib.patches as patches
# import torch


# # Visualize data from DataLoader
# def visualize_dataloader(data_loader, window_length):
#     for batch_idx, (data, labels) in enumerate(data_loader):
#         # Data is shaped as (batch_size, channels, window_length)
#         # Labels are shaped as (batch_size,)
#         for sample_idx in range(data.shape[0]):
#             window = data[sample_idx].numpy()  # Shape: (channels, window_length)
#             label = labels[sample_idx].item()

#             # Extract accelerometer and gyroscope signals
#             acc = window[:3, :]  # First 3 channels
#             gyr = window[3:, :]  # Next 3 channels
#             t = range(window_length)  # Simulated time axis for window

#             # Plot accelerometer and gyroscope signals
#             max_acc = max(abs(acc.min()), abs(acc.max()))
#             max_gyr = max(abs(gyr.min()), abs(gyr.max()))

#             fig, (ax1, ax2) = plt.subplots(2, 1, sharex=True)
#             fig.suptitle(
#                 f"Sample {batch_idx}-{sample_idx}, Label: {label}", fontsize=16
#             )

#             ax1.plot(t, acc[0, :], label="Acc X")
#             ax1.plot(t, acc[1, :], label="Acc Y")
#             ax1.plot(t, acc[2, :], label="Acc Z")
#             ax1.set_ylabel("Accelerometer (g)")
#             ax1.set_ylim(-max_acc, max_acc)
#             ax1.legend()
#             ax1.grid()

#             ax2.plot(t, gyr[0, :], label="Gyr X")
#             ax2.plot(t, gyr[1, :], label="Gyr Y")
#             ax2.plot(t, gyr[2, :], label="Gyr Z")
#             ax2.set_ylabel("Gyroscope (°/s)")
#             ax2.set_xlabel("Time (samples)")
#             ax2.set_ylim(-max_gyr, max_gyr)
#             ax2.legend()
#             ax2.grid()

#             # Mark the presence of a "bite" event
#             if label == 1:
#                 ax1.add_patch(
#                     patches.Rectangle(
#                         (0, -max_acc),
#                         window_length,
#                         2 * max_acc,
#                         edgecolor="red",
#                         facecolor="pink",
#                         alpha=0.3,
#                         label="Bite event",
#                     )
#                 )
#                 ax2.add_patch(
#                     patches.Rectangle(
#                         (0, -max_gyr),
#                         window_length,
#                         2 * max_gyr,
#                         edgecolor="red",
#                         facecolor="pink",
#                         alpha=0.3,
#                     )
#                 )

#             plt.legend()
#             plt.show()


# # Visualize a few samples from the train DataLoader
# visualize_dataloader(train_loader, window_length)

In [10]:
import torch.nn as nn
import torch.optim as optim

# Initialize model, loss, and optimizer
model = EndToEndModel()
criterion = nn.BCELoss()  # Binary Cross-Entropy Loss
optimizer = optim.RMSprop(model.parameters(), lr=1e-3)

# Training loop
num_epochs = 20
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

best_f1 = 0.0
best_model_path = "model/best_model.pth"

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)
        outputs = outputs.squeeze()  # Remove extra dimension

        # Compute loss
        loss = criterion(outputs, labels)
        loss.backward()

        # Update weights
        optimizer.step()
        running_loss += loss.item()

    print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {running_loss / len(train_loader)}")

    # Evaluate on the test set with F1 score
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            preds = (outputs.squeeze() > 0.5).float()  # Threshold at 0.5
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    all_preds = np.array(all_preds)
    all_labels = np.array(all_labels)
    tp = np.sum(all_preds * all_labels)
    fp = np.sum(all_preds * (1 - all_labels))
    fn = np.sum((1 - all_preds) * all_labels)
    precision = tp / (tp + fp + 1e-6)
    recall = tp / (tp + fn + 1e-6)
    f1 = 2 * precision * recall / (precision + recall + 1e-6)
    print(f"F1 Score: {f1}")

    # Save the best model
    if f1 > best_f1:
        best_f1 = f1
        torch.save(model.state_dict(), best_model_path)

ValueError: Using a target size (torch.Size([32])) that is different to the input size (torch.Size([32, 125])) is deprecated. Please ensure they have the same size.