### Task

In this competition, you are given a dataset containing the handwritten trajectories of numerals, each with eight $x-y$ coordinates. You must design a recurrent network to recognize the trajectories into their corresponding numerals.


### Descriptions:
* $A$: an RNN cell (or LSTM or GRU cell) which contains $N_h$ hidden neurons inside;
* $O$: the output layer containing 10 neurons to output the predicted likelihood of the 10 numerals. Find the one with the largest likelihood as the recognition numeral; In this problem, we only need to output the recognized numeral at the time step that all of the eight coordinates are received, that is the $8^{th}$ time step. Therefore, it is unnecessary to output the intermediate results during the first seven time steps;
* $W_y$: the hidden-to-output connection weight matrix;
* Loss function: you can use the cross-entropy to train the network.

In [1]:
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import numpy as np
import pandas as pd

In [2]:
# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [71]:
# Hyperparameters
num_classes = 10
num_epochs = 10
batch_size = 64
learning_rate = 0.001

input_size = 2  # Adjusted to match the number of coordinates (x, y)
sequence_length = 8  # Adjusted to match the number of coordinates in each trajectory
hidden_size = 512  # Adjusted to increase the number of hidden neurons
num_layers = 3  # Adjusted to increase the number of layers
dropout_prob = 0.5 # Adjusted to randomly drop out 50% of the input units during training

In [4]:
train_in = "train_in.csv"
train_out = "train_out.csv"
test_in = "test_in.csv"

In [6]:
class CustomDataset(Dataset):

    def __init__(self, x, y=None):
        # read with numpy
        x = np.loadtxt(x, delimiter=',', dtype=np.float32, skiprows=1)  # Change dtype to float32
        if y is not None:
            y = np.loadtxt(y, delimiter=',', dtype=np.float32, skiprows=1)  # Change dtype to float32
        self.n_samples = x.shape[0]

        # here the first column is the serial no, the rest are the feature coordinates
        self.x_data = torch.from_numpy(x[:, 1:])  # Convert to torch.float32
        if y is not None:
            self.y_data = torch.from_numpy(y[:, [1]])  # Convert to torch.float32
        else:
            self.y_data = None

    # support indexing such that dataset[i] can be used to get i-th sample
    def __getitem__(self, index):
        if self.y_data is not None and len(self.y_data) > 0:
            return self.x_data[index], self.y_data[index]
        else:
            return self.x_data[index]

    # we can call len(dataset) to return the size
    def __len__(self):
        return self.n_samples


In [70]:
# create Dataset
train_dataset = CustomDataset(train_in, train_out)
test_dataset = CustomDataset(test_in)

# get first sample and unpack

first_data = train_dataset[0]
features, labels = first_data
print(features, labels)

tensor([ 47., 100.,  27.,  81.,  57.,  37.,  26.,   0.,   0.,  23.,  56.,  53.,
        100.,  90.,  40.,  98.]) tensor([8.])


In [8]:
train_dataset, val_dataset = train_test_split(train_dataset, test_size=0.2, random_state=42)

In [9]:
# Data Loader
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(dataset=val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)


In [10]:
import math
total_samples = len(train_loader)
n_iterations = math.ceil(total_samples/4)
print(total_samples, n_iterations)

94 24


In [None]:
len(train_dataset)/64

In [64]:
# Fully connected neural network with one hidden layer
class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes, dropout_prob):
        super(RNN, self).__init__()
        self.num_layers = num_layers
        self.hidden_size = hidden_size
        # self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True)
        # -> x needs to be: (batch_size, seq, input_size)
        
        # or:
        self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True)
        # self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.dropout = nn.Dropout(dropout_prob)
        self.fc = nn.Linear(hidden_size, num_classes)
        # self.activation = nn.ReLU()  # Add an activation function (e.g., ReLU)
        
    def forward(self, x):
        # Set initial hidden states (and cell states for LSTM)
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device) 
        # c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device) 
        
        # x: (n, 28, 28), h0: (2, n, 128)
        
        # Forward propagate RNN
        out, _ = self.gru(x, h0)  
        # or:
        # out, _ = self.lstm(x, (h0,c0))  
        
        # out: tensor of shape (batch_size, seq_length, hidden_size)
        # out: (n, 28, 128)
        
        # Decode the hidden state of the last time step
        out = out[:, -1, :]
        # out: (n, 128)
        out = self.dropout(out)
        out = self.fc(out)
        # out = self.activation(out)  # Apply the activation function to the output
        # out: (n, 10)
        return out


In [None]:
# Fully connected neural network with one hidden layer
class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(RNN, self).__init__()
        self.num_layers = num_layers
        self.hidden_size = hidden_size
        self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)
        self.activation = nn.ReLU()  # Add an activation function (e.g., ReLU)
        
    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        out, _ = self.rnn(x, h0)
        out = out[:, -1, :]
        out = self.fc(out)
        out = self.activation(out)  # Apply the activation function to the output
        return out

In [65]:
model = RNN(input_size, hidden_size, num_layers, num_classes, dropout_prob).to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)  

In [66]:
# Training loop
total_step = len(train_loader)
for epoch in range(num_epochs):
    for i, (features, labels) in enumerate(train_loader):
        # Reshape input data
        features = features.view(-1, sequence_length, input_size)
        
        # Convert labels to indices and apply one-hot encoding
        labels_indices = torch.squeeze(labels).long()
        labels_one_hot = torch.eye(num_classes)[labels_indices]

        # Forward pass, backward pass, and optimization
        outputs = model(features)
        loss = criterion(outputs, labels_one_hot)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Print training progress
        if (i+1) % 64 == 0:
            print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'
                  .format(epoch+1, num_epochs, i+1, total_step, loss.item()))

    # Print epoch loss
    print('Epoch [{}/{}], Loss: {:.4f}'.format(epoch+1, num_epochs, loss.item()))
        
    
    # Validation
    model.eval()
    with torch.no_grad():
        total_val_loss = 0
        correct = 0
        total = 0
        for features, labels in val_loader:
            # Reshape input data
            features = features.view(-1, sequence_length, input_size)

            # Convert labels to indices and apply one-hot encoding
            labels_indices = torch.squeeze(labels).long()
            labels_one_hot = torch.eye(num_classes)[labels_indices]

            outputs = model(features)
            val_loss = criterion(outputs, labels_one_hot)
            total_val_loss += val_loss.item()
            
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels_indices).sum().item()
        
        accuracy = 100 * correct / total
        avg_val_loss = total_val_loss / len(val_loader)
        print('Epoch [{}/{}], Validation Loss: {:.4f}, Validation Accuracy: {:.2f}%'
              .format(epoch+1, num_epochs, avg_val_loss, accuracy))

Epoch [1/10], Step [64/94], Loss: 0.1398
Epoch [1/10], Loss: 0.1640
Epoch [1/10], Validation Loss: 0.1492, Validation Accuracy: 94.86%
Epoch [2/10], Step [64/94], Loss: 0.0628
Epoch [2/10], Loss: 0.0549
Epoch [2/10], Validation Loss: 0.0455, Validation Accuracy: 98.47%
Epoch [3/10], Step [64/94], Loss: 0.0128
Epoch [3/10], Loss: 0.0097
Epoch [3/10], Validation Loss: 0.0436, Validation Accuracy: 98.53%
Epoch [4/10], Step [64/94], Loss: 0.0119
Epoch [4/10], Loss: 0.0117
Epoch [4/10], Validation Loss: 0.0817, Validation Accuracy: 97.20%
Epoch [5/10], Step [64/94], Loss: 0.0587
Epoch [5/10], Loss: 0.0194
Epoch [5/10], Validation Loss: 0.0838, Validation Accuracy: 98.07%
Epoch [6/10], Step [64/94], Loss: 0.0348
Epoch [6/10], Loss: 0.0021
Epoch [6/10], Validation Loss: 0.0297, Validation Accuracy: 99.27%
Epoch [7/10], Step [64/94], Loss: 0.1861
Epoch [7/10], Loss: 0.0048
Epoch [7/10], Validation Loss: 0.0555, Validation Accuracy: 98.27%
Epoch [8/10], Step [64/94], Loss: 0.0099
Epoch [8/10], 

In [67]:
loss.item()

0.0017893353942781687

In [68]:
# Make predictions on the test dataset
predictions = []
serial_numbers = []
serial_number = 1  # Initialize serial number

with torch.no_grad():
    for features in test_loader:
        # Reshape input data
        features = features.view(-1, sequence_length, input_size)
        features = features.to(device)
        outputs = model(features)
        _, predicted = torch.max(outputs.data, 1)
        predictions.extend(predicted.tolist())
        
        # Add serial numbers
        num_predictions = len(predicted)
        serial_numbers.extend(list(range(serial_number, serial_number + num_predictions)))
        
        # Update serial number for the next set of predictions
        serial_number += num_predictions

# Create a DataFrame with 'Serial No.' and 'Label' columns
output_df = pd.DataFrame({'Serial No.': serial_numbers, 'Label': predictions})

# Write the predictions to the output CSV file
output_df.to_csv('output6.csv', index=False)



In [69]:
# Save the model
# Model Architecture
# Hyperparameters
# num_classes = 10
# num_epochs = 10
# batch_size = 4
# learning_rate = 0.001

# input_size = 2  # Adjusted to match the number of coordinates (x, y)
# sequence_length = 8  # Adjusted to match the number of coordinates in each trajectory
# hidden_size = 512  # Adjusted to increase the number of hidden neurons
# num_layers = 5  # Adjusted to increase the number of layers
torch.save(model.state_dict(), 'model1.pth')

In [None]:
# Hyperparameters
# Model Architecture
# num_classes = 10
# num_epochs = 10
# batch_size = 64
# learning_rate = 0.001

# input_size = 2  # Adjusted to match the number of coordinates (x, y)
# sequence_length = 8  # Adjusted to match the number of coordinates in each trajectory
# hidden_size = 512  # Adjusted to increase the number of hidden neurons
# num_layers = 3  # Adjusted to increase the number of layers
# dropout_prob = 0.5 # Adjusted to randomly drop out 50% of the input units during training
torch.save(model.state_dict(), 'model2.pth')