<a href="https://colab.research.google.com/github/mijanr/TimeSeries/blob/master/Handwriting.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [180]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

from xgboost import XGBClassifier
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import LabelEncoder

from sktime.datasets import load_UCR_UEA_dataset
from tslearn.datasets import UCR_UEA_datasets

#1d interpolation
from scipy.interpolate import interp1d

#tensorboard
from torch.utils.tensorboard import SummaryWriter
writer = SummaryWriter('runs/ChlorineConcentration')

In [181]:
dataset_name = 'Handwriting'

In [182]:
#load dataset using tslearn
data_loader = UCR_UEA_datasets()
X_train, y_train, X_test, y_test  = data_loader.load_dataset(dataset_name)

In [183]:
X_train.shape, y_train.shape, X_test.shape, y_test.shape

((150, 152, 3), (150,), (850, 152, 3), (850,))

In [184]:
#encode labels
le = LabelEncoder()
y_train = le.fit_transform(y_train)
y_test = le.transform(y_test)

In [185]:
#interpolate data
length = 64
def interpolate(X, length):
    X_interpolated = []
    for i in range(X.shape[0]):
        x = np.linspace(0, 1, X.shape[1])
        f = interp1d(x, X[i], axis=0)
        xnew = np.linspace(0, 1, length)
        ynew = f(xnew)
        X_interpolated.append(ynew)
    return np.array(X_interpolated)

In [186]:
X_interpolated_train = interpolate(X_train, length)
X_interpolated_test = interpolate(X_test, length)

In [187]:
#torch tensor
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.long)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.long)

In [188]:
#normalize data
X_train = (X_train - X_train.min()) / (X_train.max() - X_train.min())
X_test = (X_test - X_test.min()) / (X_test.max() - X_test.min())

In [189]:
#datasets
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)
#dataloaders
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

In [190]:
X_train.shape, y_train.shape, X_test.shape, y_test.shape

(torch.Size([150, 152, 3]),
 torch.Size([150]),
 torch.Size([850, 152, 3]),
 torch.Size([850]))

In [191]:
#CNN model
class CNN(nn.Module):
    def __init__(self, input_size, num_classes):
        super(CNN, self).__init__()
        self.sequential_1 = nn.Sequential(
            nn.Conv1d(in_channels=input_size, out_channels=16, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm1d(16),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=2),
            nn.Conv1d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm1d(32),
            nn.ReLU(),
            #global average pooling
            nn.AdaptiveAvgPool1d(1)
        )
        self.sequential_2 = nn.Sequential(
            nn.LazyLinear(out_features=128),
            nn.ReLU(),
            #nn.Dropout(0.2),
            nn.Linear(in_features=128, out_features=num_classes)
        )
    def forward(self, x):
        #swap axes
        x = x.permute(0, 2, 1)
        x = self.sequential_1(x)
        x = x.reshape(x.shape[0], -1)
        x = self.sequential_2(x)
        return x
#LSM model
class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(LSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    def forward(self, x):
        #set initial states
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        #forward propagate LSTM
        out, _ = self.lstm(x, (h0, c0))
        #decode the hidden state of the last time step
        out = self.fc(out[:, -1, :])
        return out

In [192]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cuda


In [193]:
#cnn model
input_size = X_train.shape[-1]
num_classes = len(np.unique(y_train))
model = CNN(input_size, num_classes).to(device)
# lstm model
# input_size = X_train.shape[-1]
# hidden_size = 128
# num_layers = 3
# num_classes = len(np.unique(y_train))
# model = LSTM(input_size, hidden_size, num_layers, num_classes).to(device)

In [194]:
#feed a sample through the model
for data, target in train_loader:
    data = data.to(device)
    target = target.to(device)
    output = model(data)
    print(output.shape)
    break

torch.Size([64, 26])


In [195]:
#optimizer and loss
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

In [196]:
#training
n_epochs = 1000
for epoch in range(n_epochs):
    for data, target in train_loader:
        data = data.to(device)
        target = target.to(device)
        #forward
        output = model(data)
        loss = criterion(output, target)
        #backward
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    if epoch%100==0:
        print(f'Epoch: {epoch+1}, Loss: {loss.item():.4f}')

Epoch: 1, Loss: 3.2658
Epoch: 101, Loss: 1.9968
Epoch: 201, Loss: 1.2423
Epoch: 301, Loss: 0.5987
Epoch: 401, Loss: 0.2918
Epoch: 501, Loss: 0.1751
Epoch: 601, Loss: 0.4864
Epoch: 701, Loss: 0.0995
Epoch: 801, Loss: 0.2643
Epoch: 901, Loss: 0.1742


In [197]:
#testing
with torch.no_grad():
    n_correct = 0
    n_samples = 0
    for data, target in test_loader:
        data = data.to(device)
        target = target.to(device)
        output = model(data)
        #value, index
        _, predictions = torch.max(output, 1)
        n_samples += target.shape[0]
        n_correct += (predictions == target).sum().item()
    acc = 100.0 * n_correct / n_samples
    print(f'Accuracy: {acc:.2f}')

Accuracy: 37.29
