In [1]:
import torch
import torch.nn as nn
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence, pad_sequence
from tqdm import tqdm
import pandas as pd
import ast
import os

In [2]:
class LSTMModel(nn.Module):
    def __init__(self, input_size=2, hidden_size=64, output_size=2):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x, lengths):
        packed_input = pack_padded_sequence(x, lengths, batch_first=True, enforce_sorted=False)
        packed_output, (hn, cn) = self.lstm(packed_input)
        unpacked_output, _ = pad_packed_sequence(packed_output, batch_first=True)
        out = self.fc(hn[-1])
        return out

In [4]:
cityA = pd.read_csv("../../Datasets/Task 2/frequent_sequences_A.csv")
cityA = list(cityA['Pattern'])
cityA_sequence = []

for str_seq in cityA:
    tuple_seq = ast.literal_eval(str_seq)
    cityA_sequence.append(tuple_seq)

cityA_sequence
train_sequences, test_sequences = train_test_split(cityA_sequence, test_size=0.2, random_state=42)




In [5]:
def get_inputs_labels(dataset):
    inputs = []
    labels = []
    for seq in dataset:
        inputs.append(torch.tensor(seq[:-1], dtype=torch.float32))
        labels.append(torch.tensor(seq[-1], dtype=torch.float32))

    return inputs, labels

In [None]:
X_train, y_train = get_inputs_labels(train_sequences)

model = LSTMModel()
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

num_epochs = 100

for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0

    for seq, target in tqdm(zip(X_train, y_train), total=len(X_train), desc=f"Epoch {epoch + 1}/{num_epochs}"):       
        seq_length = torch.tensor([len(seq)])  
        padded_seq = pad_sequence([seq], batch_first=True, padding_value=0).to(torch.float32)

        target = target.unsqueeze(0)  
        optimizer.zero_grad()

        output = model(padded_seq, seq_length)

        loss = criterion(output, target)
        epoch_loss += loss.item()

        loss.backward()
        optimizer.step()

    if (epoch + 1) % 10 == 0:
        print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {epoch_loss / len(X_train):.4f}")
        
print("Training complete.")



In [None]:
torch.save(model.state_dict(), "lstm_model.pth")
print('saved to lstm_model.pth')

In [7]:
X_test, y_test = get_inputs_labels(test_sequences)

model = LSTMModel()
weights_path = os.path.join("weights", "lstm_model.pth")

model.load_state_dict(torch.load(weights_path))
model.eval()
predictions = []

with torch.no_grad():
    for seq in X_test:
        # Prepare sequence
        seq_length = torch.tensor([len(seq)])  # Sequence length
        padded_seq = pad_sequence([seq], batch_first=True).float()  # Convert to float if needed

        # Forward pass to get the prediction
        pred = model(padded_seq, seq_length)
        predictions.append(pred.squeeze().cpu())  # Append the prediction, removing extra dimensions

# Display predictions and compare to true targets
for i, (pred, true_target) in enumerate(zip(predictions, y_test)):
    print(f"Test Sequence {i+1}:", y_test[i])
    print(f"  Predicted next coordinate: {pred.numpy()}")
    print(f"  True next coordinate: {true_target.numpy()}")

Test Sequence 1: tensor([176.,  48.])
  Predicted next coordinate: [176.1075  45.214 ]
  True next coordinate: [176.  48.]
Test Sequence 2: tensor([ 56., 168.])
  Predicted next coordinate: [ 54.77323 168.20311]
  True next coordinate: [ 56. 168.]
Test Sequence 3: tensor([ 24., 111.])
  Predicted next coordinate: [ 18.27745 109.11273]
  True next coordinate: [ 24. 111.]
Test Sequence 4: tensor([ 97., 111.])
  Predicted next coordinate: [ 96.6445 110.6371]
  True next coordinate: [ 97. 111.]
Test Sequence 5: tensor([ 1., 33.])
  Predicted next coordinate: [ 1.5959778 34.46978  ]
  True next coordinate: [ 1. 33.]
Test Sequence 6: tensor([117., 101.])
  Predicted next coordinate: [117.64739  96.80162]
  True next coordinate: [117. 101.]
Test Sequence 7: tensor([183., 104.])
  Predicted next coordinate: [182.91556 103.50127]
  True next coordinate: [183. 104.]
Test Sequence 8: tensor([142.,  90.])
  Predicted next coordinate: [143.89758   88.020996]
  True next coordinate: [142.  90.]
Test

In [None]:
def predict_next_location(model, weights_path, input_sequence):
    # Convert the input sequence to a tensor and add the batch dimension
    input_tensor = torch.tensor(input_sequence).unsqueeze(0).float()  # Shape: [1, seq_length, input_size]
    seq_length = torch.tensor([len(input_sequence)])  # Sequence length

    # Load the model weights
    model.load_state_dict(torch.load(weights_path))
    model.eval()

    # Make a prediction
    with torch.no_grad():
        predicted_next = model(input_tensor, seq_length)
        predicted_next = predicted_next.squeeze().cpu().numpy()  # Remove batch dimension and convert to numpy

    return predicted_next.tolist()


In [28]:
sequence = [(176, 48)]


predict_next_location(model, weights_path, sequence)

FileNotFoundError: [Errno 2] No such file or directory: 'weights\\lstm_model.pth'