In [1]:
from tqdm import tqdm
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split

# device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device = 'cpu'

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# Calculate accuracy (a classification metric)
def accuracy_fn(y_true, y_pred):
    correct = torch.eq(y_true, y_pred).sum().item() # torch.eq() calculates where two tensors are equal
    acc = (correct / len(y_pred)) * 100 
    return acc

In [3]:
training_data = [pd.read_csv(f).drop(columns=['is_burst']).to_numpy() for f in ('./cycles/suj11_day1_F3.csv',
                                                                                './cycles/suj11_day1_F4.csv',
                                                                                './cycles/suj11_day1_C3.csv')]
df = pd.read_csv('./cycles/suj11_day1_C4.csv').drop(columns=['is_burst'])
columns = [
    'period',
    'time_peak', 'time_trough', 'volt_peak', 'volt_trough',
    'time_decay', 'time_rise', 'volt_decay', 'volt_rise',
    'volt_amp', 'time_rdsym', 'time_ptsym', 'band_amp', 
    'sample_last_zerox_decay',
    'spindle']
test_data = df[columns]

In [4]:
window_size    = 20
offset         = 5
sample_shape   = (20, 15)
new_samp_count = int(test_data.shape[0] / offset)
new_data       = np.zeros(shape=(new_samp_count, sample_shape[0], sample_shape[1]))
for i in range(new_samp_count):
    start = i*offset
    stop  = i*offset + window_size
    if test_data.iloc[start:stop].shape != sample_shape:
        continue
    new_data[i] = test_data.iloc[start:stop]

In [18]:
X = new_data
has_spindle, _ = np.where(pd.DataFrame(X[:,:,-1].sum(axis=1)) == 1)
y = np.zeros(shape=(new_samp_count))
y[has_spindle] = 1

X_tensor = torch.from_numpy(X).type(torch.double)
y_tensor = torch.from_numpy(y).type(torch.double)

In [68]:
X_train, X_test, y_train, y_test = train_test_split(X_tensor,
                                                    y_tensor,
                                                    test_size=0.3,
                                                    random_state=42)

train_data = list(zip(X_train, y_train))
test_data   = list(zip(X_test, y_test))

In [69]:
class SigClassifier(nn.Module):
    def __init__(self, input_shape, hidden_units):
        super().__init__()
        self.conv_block_1 = nn.Sequential(
            nn.Conv1d(in_channels=input_shape,
                      out_channels=hidden_units,
                      kernel_size=3,
                      stride=1,
                      padding=1),
            nn.BatchNorm1d(hidden_units),
            nn.Conv1d(in_channels=hidden_units,
                      out_channels=hidden_units,
                      kernel_size=3,
                      stride=1,
                      padding=1),
            nn.BatchNorm1d(hidden_units),
            nn.ELU(),
            nn.AdaptiveAvgPool1d(output_size=16),
            nn.Dropout()
        )
        self.conv_block_2 = nn.Sequential(
            nn.Conv1d(in_channels=hidden_units,
                      out_channels=hidden_units,
                      kernel_size=3,
                      padding=1),
            nn.BatchNorm1d(hidden_units),
            nn.Conv1d(in_channels=hidden_units,
                      out_channels=hidden_units,
                      kernel_size=3,
                      padding=1),
            nn.BatchNorm1d(hidden_units),
            nn.ELU(),
            nn.AdaptiveAvgPool1d(output_size=16),
            nn.Dropout()
        )
        self.classifier = nn.Sequential(
            nn.Flatten(), 
            nn.Linear(in_features=hidden_units*16,
                      out_features=2),
            nn.Softmax()
        )


    def forward(self, x):
        x = self.conv_block_1(x)
        print(x.shape)
        x = self.conv_block_2(x)
        print(x.shape)
        x = self.classifier(x)
        return x

In [70]:
model = SigClassifier(input_shape=20, hidden_units=20).type(torch.LongTensor).to(device)

In [71]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(params=model.parameters(),
                            lr=0.1)

In [72]:
BATCH_SIZE = 32
# Turn datasets into iterables (batches)
train_dataloader = DataLoader(train_data, # dataset to turn into iterable
    batch_size=BATCH_SIZE, # how many samples per batch? 
    shuffle=True # shuffle data every epoch?
)

test_dataloader = DataLoader(test_data,
    batch_size=BATCH_SIZE,
    shuffle=False # don't necessarily have to shuffle the testing data
)

In [73]:
def train_step(model: torch.nn.Module,
               data_loader: torch.utils.data.DataLoader,
               loss_fn: torch.nn.Module,
               optimizer: torch.optim.Optimizer,
               accuracy_fn,
               device: torch.device = device):
    train_loss, train_acc = 0, 0
    for batch, (X, y) in enumerate(data_loader):
        # Send data to GPU
        X, y = X.type(torch.LongTensor), y.type(torch.LongTensor)
        X, y = X.to(device), y.to(device)

        # 1. Forward pass
        y_pred = model(X)

        # 2. Calculate loss
        loss = loss_fn(y_pred, y)
        train_loss += loss
        train_acc += accuracy_fn(y_true=y,
                                 y_pred=y_pred.argmax(dim=1)) # Go from logits -> pred labels

        # 3. Optimizer zero grad
        optimizer.zero_grad()

        # 4. Loss backward
        loss.backward()

        # 5. Optimizer step
        optimizer.step()

    # Calculate loss and accuracy per epoch and print out what's happening
    train_loss /= len(data_loader)
    train_acc /= len(data_loader)
    print(f"Train loss: {train_loss:.5f} | Train accuracy: {train_acc:.2f}%")

def test_step(data_loader: torch.utils.data.DataLoader,
              model: torch.nn.Module,
              loss_fn: torch.nn.Module,
              accuracy_fn,
              device: torch.device = device):
    test_loss, test_acc = 0, 0
    model.eval() # put model in eval mode
    # Turn on inference context manager
    with torch.inference_mode(): 
        for X, y in data_loader:
            # Send data to GPU
            X, y = X.type(torch.LongTensor), y.type(torch.LongTensor)
            X, y = X.to(device), y.to(device)
            
            # 1. Forward pass
            test_pred = model(X)
            
            # 2. Calculate loss and accuracy
            test_loss += loss_fn(test_pred, y)
            test_acc += accuracy_fn(y_true=y,
                y_pred=test_pred.argmax(dim=1) # Go from logits -> pred labels
            )
        
        # Adjust metrics and print out
        test_loss /= len(data_loader)
        test_acc /= len(data_loader)
        print(f"Test loss: {test_loss:.5f} | Test accuracy: {test_acc:.2f}%\n")

In [74]:
epochs = 3
for epoch in tqdm(range(epochs)):
    print(f"Epoch: {epoch}\n--------")
    train_step(data_loader=train_dataloader, 
        model=model, 
        loss_fn=loss_fn,
        optimizer=optimizer,
        accuracy_fn=accuracy_fn
    )
    test_step(data_loader=test_dataloader,
        model=model,
        loss_fn=loss_fn,
        accuracy_fn=accuracy_fn
    )

  0%|          | 0/3 [00:00<?, ?it/s]

Epoch: 0
--------





RuntimeError: isDifferentiableType(variable.scalar_type()) INTERNAL ASSERT FAILED at "..\\torch/csrc/autograd/functions/utils.h":65, please report a bug to PyTorch. 