In [1]:
import pandas as pd
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader

import pickle
from torch.utils.data import random_split

In [2]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
device

device(type='cpu')

In [3]:
with open('p4/train.pkl', 'rb') as train_file:
    dataset = pickle.load(train_file)

with open('p4/test_no_target.pkl', 'rb') as test_file:
    testset = pickle.load(test_file)
    
# print(dataset)
# print(testset)

In [4]:
print(type(dataset))
for i in range(3):
    print(len(dataset[i][0]))


<class 'list'>
4756
5322
592


In [5]:
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence

pad = 0

def pad_collate(batch, pad_value=pad):
    xx, yy = zip(*batch)
    x_lens = [len(x) for x in xx]
    y_lens = [1 for y in yy]

    # xx_pad = pad_sequence(xx, batch_first=True, padding_value=pad_value)
    # yy_pad = pad_sequence(yy, batch_first=True, padding_value=pad_value)

    xx_pad = pad_sequence([torch.tensor(x) for x in xx], batch_first=True, padding_value=pad_value)
    yy_pad = pad_sequence([torch.tensor([y]) for y in yy], batch_first=True, padding_value=pad_value)

    return xx_pad, yy_pad, x_lens, y_lens

In [6]:
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
trainset, valset = random_split(dataset, [train_size, val_size])

train_loader = DataLoader(trainset, batch_size=5, shuffle=True, collate_fn=pad_collate)
val_loader = DataLoader(valset, batch_size=5, shuffle=True, collate_fn=pad_collate)

test_loader = DataLoader(testset, batch_size=5, shuffle=False, drop_last=False, collate_fn=pad_collate)

In [7]:
print(next(iter(train_loader)))

(tensor([[-1., -1., -1.,  ..., 67., 67., -1.],
        [88., 12., 80.,  ...,  0.,  0.,  0.],
        [34., 50., 90.,  ...,  0.,  0.,  0.],
        [ 0., 12., 92.,  ...,  0.,  0.,  0.],
        [-1., -1., -1.,  ...,  0.,  0.,  0.]], dtype=torch.float64), tensor([[1],
        [1],
        [0],
        [0],
        [3]]), [340, 240, 200, 84, 332], [1, 1, 1, 1, 1])


In [8]:
class LSTMClassifier(nn.Module):

    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super().__init__()
        self.num_layers = num_layers
        self.hidden_size = hidden_size
        self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers)
        self.linear = nn.Linear(hidden_size, hidden_size)
        self.fc = nn.Linear(hidden_size, num_classes)
        
    def init_hidden(self, batch_size):
        hidden = torch.zeros(self.num_layers, batch_size, self.hidden_size)
        state = torch.zeros(self.num_layers, batch_size, self.hidden_size)
        return hidden, state
    
    def forward(self, x, hidden):
        x = torch.transpose(x, 0, 1)
        all_outputs, hidden = self.lstm(x, hidden)
        out = all_outputs[-1] # We are interested only in the last output
        x = self.fc(out)
        return x, hidden
    
model = LSTMClassifier(1, 20, 2, 5).to(device)
model

LSTMClassifier(
  (lstm): LSTM(1, 20, num_layers=2)
  (linear): Linear(in_features=20, out_features=20, bias=True)
  (fc): Linear(in_features=20, out_features=5, bias=True)
)

In [9]:
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
loss_fun = nn.CrossEntropyLoss()

hidden_sizes = [50, 100, 500, 1000]
num_layers = [2, 3, 4, 5, 8]

for h_s in hidden_sizes:
    for n_l in num_layers:
        model = LSTMClassifier(1, h_s, n_l, 5).to(device)
        for epoch in range(101):
            for x, labels, _, _ in train_loader:
                x = x.to(device).unsqueeze(2).float()
                labels = labels.to(device).squeeze(1)
                hidden, state = model.init_hidden(x.size(0))
                hidden, state = hidden.to(device), state.to(device) 
                preds, last_hidden = model(x, (hidden,state))
                preds = preds.squeeze(1)
                optimizer.zero_grad() 
                loss = loss_fun(preds, labels)
                loss.backward()
                optimizer.step()
        
            if epoch % 20 == 0:
                print(f"Epoch: {epoch}, loss: {loss.item():.3}")

        file_path = f'model_hs_{h_s}_nl_{n_l}.pt'
        torch.save(model.state_dict(), file_path)

Epoch: 0, loss: 1.72
Epoch: 20, loss: 1.57
Epoch: 40, loss: 1.6
Epoch: 60, loss: 1.57
Epoch: 80, loss: 1.52
Epoch: 100, loss: 1.57
Epoch: 0, loss: 1.72


In [None]:
# loading model
# model.load_state_dict(torch.load(file_path))

<All keys matched successfully>

In [None]:
model.eval()

total_correct = 0
total_samples = 0

with torch.no_grad():
    for x, labels, _, _ in val_loader:
        x = x.to(device).unsqueeze(2).float()
        labels = labels.to(device).squeeze(1)
        hidden, state = model.init_hidden(x.size(0))
        hidden, state = hidden.to(device), state.to(device) 
        preds, last_hidden = model(x, (hidden,state))
        preds = preds.squeeze(1)

        predicted_labels = torch.argmax(preds)
        correct = (predicted_labels == labels).sum().item()
        total_correct += correct
        total_samples += labels.size(0)

accuracy = total_correct / total_samples * 100
print(f"Accuracy on test set: {accuracy:.2f}%")

Accuracy on test set: 32.14%


In [None]:
import csv

with open('poniedzialek_grunwald_rozkosz.csv', 'w', newline='\n') as file:
    csv_writer = csv.writer(file)

    for x, labels, _, _ in test_loader:
        x = x.to(device).unsqueeze(2).float()
        labels = labels.to(device).squeeze(1)
        hidden, state = model.init_hidden(x.size(0))
        hidden, state = hidden.to(device), state.to(device) 
        preds, last_hidden = model(x, (hidden,state))
        preds = preds.squeeze(1)

        for pred in preds:
            csv_writer.writerow([pred])

with open('poniedzialek_grunwald_rozkosz.csv', 'r') as file:
    lines = file.readlines()

with open('poniedzialek_grunwald_rozkosz.csv', 'w') as file:
    file.writelines(lines[:-1])