In [1]:
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import torch
from torch.utils.data import Dataset
import numpy as np
import torch.nn as nn
import random
from torch.utils.data import DataLoader

In [None]:
if torch.cuda.is_available():
    device = torch.device("cuda")
    print("Using GPU:", torch.cuda.get_device_name(0))
else:
    device = torch.device("cpu")
    print("GPU not available, using CPU instead.")

In [3]:
## Path of Dataset
data_path = './train_valid_set/'

In [4]:
## definition of dataset
class MyDataset(Dataset):
    def __init__(self, data,labels):
        self.data = data
        self.labels = labels

        self.data = np.array(self.data).astype(np.float32)

        # Convert the data to a PyTorch tensor
        self.data = torch.tensor(self.data)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, index):
        x = self.data[index]
        y = self.labels[index]
        return x, y

In [None]:
## definition of network. Refer to implement of TCN(https://github.com/LOCUSLAB/tcn)
import torch
import torch.nn as nn
from torch.nn.utils.parametrizations import weight_norm
import torch.nn.functional as F

class Chomp1d(nn.Module):
    def __init__(self, chomp_size):
        super(Chomp1d, self).__init__()
        self.chomp_size = chomp_size

    def forward(self, x):
        return x[:, :, :-self.chomp_size].contiguous()


class TemporalBlock(nn.Module):
    def __init__(self, n_inputs, n_outputs, kernel_size, stride, dilation, padding, dropout=0.2):
        super(TemporalBlock, self).__init__()
        self.conv1 = weight_norm(nn.Conv1d(n_inputs, n_outputs, kernel_size,
                                           stride=stride, padding=padding, dilation=dilation))
        self.chomp1 = Chomp1d(padding)
        self.relu1 = nn.ReLU()
        self.dropout1 = nn.Dropout(dropout)

        self.conv2 = weight_norm(nn.Conv1d(n_outputs, n_outputs, kernel_size,
                                           stride=stride, padding=padding, dilation=dilation))
        self.chomp2 = Chomp1d(padding)
        self.relu2 = nn.ReLU()
        self.dropout2 = nn.Dropout(dropout)

        self.net = nn.Sequential(self.conv1, self.chomp1, self.relu1, self.dropout1,
                                 self.conv2, self.chomp2, self.relu2, self.dropout2)
        self.downsample = nn.Conv1d(n_inputs, n_outputs, 1) if n_inputs != n_outputs else None
        self.relu = nn.ReLU()
        self.init_weights()

    def init_weights(self):
        self.conv1.weight.data.normal_(0, 0.01)
        self.conv2.weight.data.normal_(0, 0.01)
        if self.downsample is not None:
            self.downsample.weight.data.normal_(0, 0.01)

    def forward(self, x):
        out = self.net(x)
        res = x if self.downsample is None else self.downsample(x)
        return self.relu(out + res)


class TemporalConvNet(nn.Module):
    def __init__(self, num_inputs, num_channels, kernel_size=2, dropout=0.2):
        super(TemporalConvNet, self).__init__()
        layers = []
        num_levels = len(num_channels)
        for i in range(num_levels):
            dilation_size = 2 ** i
            in_channels = num_inputs if i == 0 else num_channels[i-1]
            out_channels = num_channels[i]
            layers += [TemporalBlock(in_channels, out_channels, kernel_size, stride=1, dilation=dilation_size,
                                     padding=(kernel_size-1) * dilation_size, dropout=dropout)]

        self.network = nn.Sequential(*layers)

    def forward(self, x):
        return self.network(x)
    
class TCN(nn.Module):
    def __init__(self, input_size, output_size, num_channels, kernel_size, dropout):
        super(TCN, self).__init__()
        self.tcn = TemporalConvNet(input_size, num_channels, kernel_size=kernel_size, dropout=dropout)
        self.linear = nn.Linear(num_channels[-1], output_size)

    def forward(self, inputs):
        """Inputs have to have dimension (N, C_in, L_in)"""
        y1 = self.tcn(inputs.transpose(1, 2))  # input should have dimension (N, C, L)
        o = self.linear(y1[:, :, -1])
        return F.log_softmax(o, dim=1)

In [None]:
## Load data and create a model
def load_data(file_path):
    data = []
    with open(file_path, 'r') as f:
        for line in f:
            line = line.strip().split(' ')
            line = [float(x) for x in line]
            data.append(line)
    return np.array(data)

def pad_sample(sample, min_len):
  padded_sample = sample[0:min_len]
  return padded_sample

def normalize_sample(sample):
    for dim in range(3):
        channel = sample[:,dim]
        min_val = np.min(channel)
        max_val = np.max(channel)
        normalized_channel = (channel - min_val) / (max_val - min_val)
        sample[:, dim] = normalized_channel
    return sample

data = []
labels = []
with open(data_path+'index.txt', 'r') as f:
    for line in f:
        line = line.strip().split(' ')
        file_path = data_path+line[0]
        label = int(line[1])
        sample_data = load_data(file_path)
        data.append(sample_data)
        labels.append(label)

# operation of train set
min_len = min([len(sample) for sample in data])
data = [(pad_sample(sample, min_len)) for sample in data]
min_val = np.min(labels)
max_val = np.max(labels)
labels =labels -min_val

X_train, X_valid, y_train, y_valid = train_test_split(data, labels, test_size = 0.2 , random_state = 0)

## count the num of diff labels in dataset
label_counts = {}
for lable in labels:
    if lable in label_counts:
        label_counts[lable] += 1
    else:
        label_counts[lable] = 1
print(label_counts)

print(len(X_train))
print(len(X_valid))

### create dataset
train_dataset = MyDataset(X_train,y_train)
valid_dataset = MyDataset(X_valid,y_valid)

from torch.utils.data import DataLoader
from torch.optim import Adam, AdamW
import torch.nn.functional as F
from sklearn.metrics import f1_score
from PiH_utils import checkpoint

np.random.seed(4)
torch.manual_seed(6)
random.seed(5)

# Parameters
learning_rate = 5e-5
num_epochs = 800
num_classes = 7
batch_size = 8

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False)

# Model instantiation
model = TCN(3, num_classes, [16, 32, 64], kernel_size=4, dropout=0.01).to(device)
loss_function = nn.CrossEntropyLoss()  # Assuming a classification problem
optimizer = AdamW(model.parameters(), lr=learning_rate, weight_decay=0)


In [None]:
## Train the model
ckp_epoch =  0  
best_f1 = 0  

# Training loop
for epoch in range(num_epochs):
    model.train()  
    total_loss = 0

    for sequences, labels in train_loader:
        
        labels = labels.type(torch.LongTensor)

        sequences, labels = sequences.to(device), labels.to(device)
        sequences = sequences.float()

        optimizer.zero_grad()

        outputs = model(sequences)

        loss = loss_function(outputs, labels)
        total_loss += loss.item()

        loss.backward()
        optimizer.step()


    print(f'Epoch [{epoch+ckp_epoch+1}/{num_epochs}], Loss: {total_loss/len(train_loader):.4f}')

    # Validation phase
    model.eval()
    all_predictions = []
    all_labels = []
    with torch.no_grad():
        for sequences, labels in val_loader:
            sequences, labels = sequences.to(device), labels.to(device)
            sequences = sequences.float()

            outputs = model(sequences)

            _, predicted = torch.max(outputs.data, 1)
            all_predictions.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    f1 = f1_score(all_labels, all_predictions, average='weighted')
    if f1 >= best_f1:
      best_f1 = f1
      print(f"Best F1: {f1:.4f}")
      checkpoint(best_f1, epoch+ckp_epoch,model,"./model/")
      best_model = model
    print(f'Epoch [{epoch+ckp_epoch+1}/{num_epochs}], Test F1: {f1:.4f}')

In [None]:
#### Testing
real_path = './testing_set/'
real_data = []
real_labels = []
with open(real_path+'index.txt', 'r') as f:
    for line in f:
        line = line.strip().split(' ')
        file_path = real_path+line[0]
        # print(line[1])
        label = int(line[1])
        sample_data = load_data(file_path)
        real_data.append(sample_data)
        real_labels.append(label)

min_len = min([len(sample) for sample in real_data])
real_data = [(pad_sample(sample, min_len)) for sample in real_data]
real_labels =real_labels -min_val

real_dataset = MyDataset(real_data,real_labels)
real_loader = DataLoader(real_dataset, batch_size=1, shuffle=False)
print(len(real_labels))

model = TCN(3, num_classes, [16, 32, 64], kernel_size=4, dropout=0.01).to(device)
ckpt = torch.load('./model/model1230.pth.tar')
model.load_state_dict(ckpt['model'])
# model = best_model
model.eval()  

all_predictions = []
all_labels = []

with torch.no_grad():  # Turn off gradients
    for sequences, labels in real_loader:
        sequences, labels = sequences.to(device).float(), labels.to(device).long()

        outputs = model(sequences)

        _, predictions = torch.max(outputs, 1)

        all_predictions.extend(predictions.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

for i in range(len(all_predictions)):
    if all_predictions[i] == 5 and all_labels[i] == 4:
        print(f"index = {i}")
        print(f"prediction = {all_predictions[i]}; label = {all_labels[i]}")

# Calculate metrics
from sklearn.metrics import confusion_matrix

cf = confusion_matrix(all_labels, all_predictions)
print(cf)
row_sum = np.sum(cf, axis=1, keepdims=True)
# print(row_sum)
cf = np.divide(cf, row_sum)

import seaborn as sns
xtick=['L3','L2','L1','M','R1','R2','R3']
ytick=['L3','L2','L1','M','R1','R2','R3']
labels_heat = ["{0:.2%}".format(value) for value in cf.flatten()]
labels_heat = np.asarray(labels_heat).reshape(7,7)
sns.heatmap(cf,fmt='', annot=labels_heat, cmap='Blues',xticklabels=xtick, yticklabels=ytick)

from sklearn.metrics import accuracy_score, classification_report
accuracy = accuracy_score(all_labels, all_predictions)
print(f'Accuracy: {accuracy:.4f}')
print(classification_report(all_labels, all_predictions))

