In [1]:
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader

from sklearn.preprocessing import MinMaxScaler
from scipy.signal import find_peaks
from tqdm import tqdm

import torch.functional as F
import torch.optim as optim
import torch.nn as nn
import numpy as np
import torch

import matplotlib.pyplot as plt
import pickle
import gzip
import os

PEAK_THRESHOLD = 390
DATA_PATH = "./data/exported/1min/"

# Data Load

## Sitting

In [None]:
# Sitting data load
with gzip.open(DATA_PATH + "sitting_ecg.pkl", "rb") as f:
    sitting_ecg = pickle.load(f)

with gzip.open(DATA_PATH + "sitting_acc.pkl", "rb") as f:
    sitting_acc = pickle.load(f)
sitting_acc = sitting_acc - np.mean(sitting_acc)

In [None]:
fig, ax = plt.subplots(len(sitting_acc), 1, figsize=(20, 10))
for idx in range(len(sitting_acc)):
    peaks, _ = find_peaks(sitting_acc[idx], height=PEAK_THRESHOLD)
    np.diff(peaks)

    ax[idx].plot(sitting_acc[idx])
    ax[idx].plot(peaks, sitting_acc[idx][peaks], "x")
plt.show()

In [None]:
peak_list = list()
for idx in range(len(sitting_acc)):
    peaks, _ = find_peaks(sitting_acc[idx], height=PEAK_THRESHOLD)
    if peaks == []:
        peak_list.append(np.array([0]))
    peak_list.append(len(peaks))

X_sitting = sitting_ecg
y_sitting = peak_list

## Walking

In [None]:
# Walking data load
with gzip.open(DATA_PATH + "walking_ecg.pkl", "rb") as f:
    walking_ecg = pickle.load(f)

with gzip.open(DATA_PATH + "walking_acc.pkl", "rb") as f:
    walking_acc = pickle.load(f)
walking_acc = walking_acc - np.mean(walking_acc)

In [None]:
fig, ax = plt.subplots(len(walking_acc), 1, figsize=(20, 20))
for idx in range(len(walking_acc)):
    peaks, _ = find_peaks(walking_acc[idx], height=PEAK_THRESHOLD)
    np.diff(peaks)

    ax[idx].plot(walking_acc[idx])
    ax[idx].plot(peaks, walking_acc[idx][peaks], "x")
plt.show()

In [None]:
peak_list = list()
for idx in range(len(walking_acc)):
    peaks, _ = find_peaks(walking_acc[idx], height=PEAK_THRESHOLD)
    if peaks == []:
        peak_list.append(np.array([0]))
    peak_list.append(len(peaks))

X_walking = walking_ecg
y_walking = peak_list

In [None]:
print(y_walking)

## Running

In [None]:
# Running data load
with gzip.open(DATA_PATH + "running_ecg.pkl", "rb") as f:
    running_ecg = pickle.load(f)

with gzip.open(DATA_PATH + "running_acc.pkl", "rb") as f:
    running_acc = pickle.load(f)
running_acc = running_acc - np.mean(running_acc)

In [None]:
fig, ax = plt.subplots(len(running_acc), 1, figsize=(20, 10))
for idx in range(len(running_acc)):
    peaks, _ = find_peaks(running_acc[idx], height=PEAK_THRESHOLD)
    np.diff(peaks)

    ax[idx].plot(running_acc[idx])
    ax[idx].plot(peaks, running_acc[idx][peaks], "x")
plt.show()

In [None]:
peak_list = list()
for idx in range(len(running_acc)):
    peaks, _ = find_peaks(running_acc[idx], height=PEAK_THRESHOLD)
    if peaks == []:
        peak_list.append(np.array([0]))
    peak_list.append(len(peaks))

X_running = running_ecg
y_running = peak_list

# X, y data Split

In [None]:
BATCH_SIZE = 8

In [None]:
X = np.concatenate((X_walking, X_running, X_sitting))
y = np.concatenate((y_walking, y_running, y_sitting))

scaler = MinMaxScaler()
X = scaler.fit_transform(X) ** 2

print(f"""X shape: {X.shape}
y shape: {y.shape}""")

In [None]:
# Plotting the data
fig, ax = plt.subplots(3, 1, figsize=(20, 10))

ax[0].title.set_text(y[0])
ax[0].plot(X[0])

ax[1].title.set_text(y[1])
ax[1].plot(X[1])

ax[2].title.set_text(y[2])
ax[2].plot(X[2])
plt.show()

In [None]:
with gzip.open('./data//exported/X.pkl', 'wb') as f:
    pickle.dump(X, f)

with gzip.open('./data//exported/y.pkl', 'wb') as f:
    pickle.dump(y, f)

In [None]:
with gzip.open('./data//exported/X.pkl', 'rb') as f:
    X = pickle.load(f)

with gzip.open('./data//exported/y.pkl', 'rb') as f:
    y = pickle.load(f)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

print(f"""X_train shape: {X_train.shape}
y_train shape: {y_train.shape}
X_test shape: {X_test.shape}
y_test shape: {y_test.shape}""")

In [None]:
class ECGDataset(torch.utils.data.Dataset):
    def __init__(self, X, y):
        self.X = np.array(X).astype(np.float32)
        self.y = np.array(y).astype(np.float32)

    def __getitem__(self, idx):
        X = self.X[idx]
        y = self.y[idx]

        return X, y

    def __len__(self):
        return len(self.X)
    
train_dataset = ECGDataset(X_train, y_train)
test_dataset = ECGDataset(X_test, y_test)

train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=True)

# Model

In [None]:
device = torch.device("mps")
EPOCH = 100

In [None]:
class Model(nn.Module):
    def __init__(self, *args, **kwargs) -> None:
        super().__init__()
        self.nn_layers = nn.ModuleList()
        self.nn_layers.append(self.input_block(1, 32, 3, 1, 1)).to(device)
        self.nn_layers.append(self.lfem_block(32, 32, 3, 1, 1)).to(device)
        self.nn_layers.append(self.lfem_block(32, 64, 3, 1, 1)).to(device)
        self.nn_layers.append(self.lfem_block(64, 128, 3, 1, 1)).to(device)
        self.nn_layers.append(self.lfem_block(128, 256, 3, 1, 1)).to(device)
        self.nn_layers.append(self.lfem_block(256, 256, 3, 1, 1)).to(device)

        self.gru = nn.GRU(256, 256, 6, batch_first=True).to(device)
        self.fc = nn.Linear(256, 1).to(device)

    def input_block(self, in_channels, out_channels, kernel_size, stride, padding):
        x = nn.Sequential(
            nn.Conv1d(in_channels, out_channels, kernel_size, stride, padding),
            nn.BatchNorm1d(out_channels),
            nn.ReLU(),
        ).to(device)

        return x

    def lfem_block(self, in_channels, out_channels, kernel_size, stride, padding):
        x = nn.Sequential(
            nn.Conv1d(in_channels, out_channels, kernel_size, stride, padding),
            nn.Conv1d(out_channels, out_channels, kernel_size, stride, padding),
            nn.BatchNorm1d(out_channels),
            nn.ReLU(),

            nn.Conv1d(out_channels, out_channels, 1, stride, padding),
            nn.BatchNorm1d(out_channels),
            nn.ReLU(),
        ).to(device)

        return x
 
    def forward(self, x):
        x = self.input_block(1, 32, 3, 1, 1)(x)

        x = self.nn_layers[1](x)
        x = self.nn_layers[2](x)
        x = self.nn_layers[3](x)
        x = self.nn_layers[4](x)
        x = self.nn_layers[5](x)

        x = x.permute(0, 2, 1).to(device)
        x, _ = self.gru(x)
        x = x[:, -1, :].to(device)

        x = self.fc(x).to(device)
        x = nn.ReLU()(x).to(device)

        return x

In [None]:
model = Model().to(device)
criterion = nn.MSELoss()
optimizer = optim.Adagrad(model.parameters(), lr=0.04)

In [None]:
print(model)

# Model Train

In [None]:
model.train()

train_loss, val_loss = [], []
for epoch in range(EPOCH):
    running_loss = 0.0
    for i, data in enumerate(train_dataloader):
        inputs, labels = data
        inputs = inputs.reshape(-1, 1, 15000).to(device)
        labels = labels.to(device)

        optimizer.zero_grad()

        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    train_loss.append(running_loss / len(train_dataloader))

    model.eval()
    with torch.no_grad():
        running_loss = 0.0
        for i, data in enumerate(test_dataloader):
            inputs, labels = data
            inputs = inputs.unsqueeze(1).to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)

            running_loss += loss.item()

        val_loss.append(running_loss / len(test_dataloader))
    
    model.train()
    print(f"Epoch: {epoch + 1} / {EPOCH} | Train loss: {train_loss[-1]:.5f} | Val loss: {val_loss[-1]:.5f}")

In [None]:
plt.figure(figsize=(10, 5))
plt.plot(train_loss, label="Train loss")
plt.plot(val_loss, label="Val loss")
plt.legend()
plt.show()

In [None]:
model.eval()
with torch.no_grad():
    for i, data in enumerate(test_dataloader):
        inputs, labels = data
        inputs = inputs.unsqueeze(1).to(device)
        labels = labels.to(device)

        outputs = model(inputs)
        loss = criterion(outputs, labels)

        print(f"Loss: {loss.item():.5f}")

        output_x_range = np.linspace(0, len(outputs.cpu().numpy()), len(outputs.cpu().numpy()))
        labels_x_range = np.linspace(0, len(labels.cpu().numpy()), len(labels.cpu().numpy()))

        plt.figure(figsize=(10, 5))
        plt.plot(output_x_range, outputs.cpu().numpy() * 2, label="Predicted", color="red", marker="o")
        plt.plot(labels_x_range, labels.cpu().numpy(), label="True", color="blue", marker="*")
        plt.legend()
        plt.show()
        break
