<a href="https://colab.research.google.com/github/jetsonai/Working-R-Ssaem/blob/main/LSTM/%5B3%5D_Trajectory_Prediction_PyTorch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **PyTorch를 활용한 Vehicle Trajectory Prediction**

## **데이터 처리를 위한 라이브러리 불러오기**

In [None]:
import random

In [None]:
from io import open
from os import path
import pickle

In [None]:
import numpy as np
import scipy
import pandas as pd
import matplotlib.pyplot as plt

In [None]:
from tqdm import tqdm

## **PyTorch 라이브러리 불러오기**

In [None]:
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader

## **데이터 읽어오기**

In [None]:
import gdown

file_id = "12b_g_IYdHm8JGIKrQ7WU59MHlkuzdjWG"
output_file = "trajectory-prediction.zip"  # Replace "data_file.ext" with the desired output filename and extension

gdown.download(f"https://drive.google.com/uc?id={file_id}", output_file)

In [None]:
!unzip "/content/trajectory-prediction.zip"

## **데이터셋 살펴보기**

In [None]:
df = pd.read_csv("/content/WholeVdata2.csv")

In [None]:
df.info()

In [None]:
df.describe()

In [None]:
df.head(10)

## **PyTorch DataLoader 클래스 정의**

In [None]:
class TrajectoryDataset(Dataset) :
  def __init__(self, csv_path="/content/WholeVdata2.csv") :
    # Inheritance
    super(TrajectoryDataset, self).__init__()

    # Initialize Variable
    self.csv_path = csv_path

    # store X as a list, each element is a 100*42(len*# attributes) np array [vel_x; vel_y; x; y; acc; angle]*7
    # store Y as a list, each element is a 100*4(len*# attributes) np array[vel_x; vel_y; x; y]
    self.frames_x, self.frames_y = [], []

    # Function-Calling
    self.load_data()
    self.norm_data()

  def __len__(self) :
    return len(self.frames_x)

  def __getitem__(self, index) :
    single_data = self.frames_x[index]
    single_label = self.frames_y[index]

    return (single_data, single_label)

  def load_data(self) :
    data_raw = pd.read_csv(self.csv_path)
    max_veh_num = np.max(data_raw.Vehicle_ID.unique())
    for vid in data_raw.Vehicle_ID.unique() :
      print(f"{vid} and {max_veh_num}")
      frame_ori = data_raw[data_raw.Vehicle_ID == vid]
      frame = frame_ori[["Local_X", "Local_Y", "v_Acc", "Angle",
                         "L_rX", "L_rY", "L_rAcc", "L_angle",
                         "F_rX", "F_rY", "F_rAcc", "F_angle",
                         "LL_rX", "LL_rY", "LL_rAcc", "LL_angle",
                         "LF_rX", "LF_rY", "LF_rAcc", "LF_angle",
                         "RL_rX", "RL_rY", "RL_rAcc", "RL_angle",
                         "RF_rX", "RF_rY", "RF_rAcc", "RF_angle"]]
      frame = np.asarray(frame)
      frame[np.where(frame > 4000)] = 0 # assign all 5000 to 0

      # remove anomalies, which has a discontinuious local x or local y
      dis = frame[1:,:2] - frame[:-1,:2]
      dis = np.sqrt(np.power(dis[:,0],2)+np.power(dis[:,1],2))

      index = np.where(dis > 10)
      if not (index[0].all) :
          continue

      # smooth the data column wise
      # window size = 5, polynomial order = 3
      frame =  scipy.signal.savgol_filter(frame, window_length=5, polyorder=3, axis=0)

      # calculate vel_x and vel_y according to localX and localY for all vehicles
      all_veh = []

      for i in range(7) :
        vel_x = (frame[1:,0+i*4]-frame[:-1, 0+i*4])/0.1
        vel_avg_x = (vel_x[1:]+vel_x[:-1])/2.0
        vel_x1 = [2.0*vel_x[0]- vel_avg_x[0]]
        vel_end_x = [2.0*vel_x[-1]- vel_avg_x[-1]];
        vel_x = np.array(vel_x1 + vel_avg_x.tolist() + vel_end_x)

        vel_y = (frame[1:,1+i*4]-frame[:-1, 1+i*4])/0.1
        vel_avg_y = (vel_y[1:]+vel_y[:-1])/2.0
        vel_y1 = [2.0*vel_y[0]- vel_avg_y[0]]
        vel_end_y = [2.0*vel_y[-1]-vel_avg_y[-1]]
        vel_y = np.array(vel_y1 + vel_avg_y.tolist() + vel_end_y)

        if isinstance(all_veh,(list)) :
            all_veh = np.vstack((vel_x, vel_y))
        else:
            all_veh = np.vstack((all_veh, vel_x.reshape(1,-1)))
            all_veh = np.vstack((all_veh, vel_y.reshape(1,-1)))

      all_veh = np.transpose(all_veh)
      total_frame_data = np.concatenate((all_veh[:,:2], frame), axis=1)

      # split into several frames each frame have a total length of 100, drop sequence smaller than 130
      if total_frame_data.shape[0] < 130 :
        continue

      X = total_frame_data[:-29,:]
      Y = total_frame_data[29:,:4]

      count = 0
      for i in range(X.shape[0]-100) :
        if random.random() > 0.2 :
            continue

        if count>20:
            break

        self.frames_x = self.frames_x + [X[i:i+100,:]]
        self.frames_y = self.frames_y + [Y[i:i+100,:]]

        count += 1

  def norm_data(self) :
    A = [list(x) for x in zip(*(self.frames_x))]
    A = torch.tensor(A, dtype=torch.float32)
    A = A.view(-1, A.shape[2])
    print("A:", A.shape)

    self.mn = torch.mean(A, dim=0)
    self.range = (torch.max(A, dim=0).values - torch.min(A, dim=0).values)/2.0
    self.range = torch.ones(self.range.shape, dtype=torch.float32)
    self.std = torch.std(A,dim=0)
    self.frames_x = [(torch.tensor(item, dtype=torch.float32)-self.mn)/(self.std*self.range) for item in self.frames_x]
    self.frames_y = [(torch.tensor(item, dtype=torch.float32)-self.mn[:4])/(self.std[:4]*self.range[:4]) for item in self.frames_y]

In [None]:
def get_dataloader(opt, csv_path="/content/WholeVdata2.csv") :
  """
  return torch.util.data.Dataloader for train, valid and test
  """
  # load Dataloader
  dataset = TrajectoryDataset(csv_path)
  with open("Dataset.pickle", "wb") as output :
      pickle.dump(dataset, output)

  # split Dataloader into train test and valid 7:2:1
  num_train = int(dataset.__len__()*0.7)
  num_test = int(dataset.__len__()*0.9) - num_train
  num_valid = int(dataset.__len__() - num_test - num_train)
  train, valid, test = torch.utils.data.random_split(dataset, [num_train, num_valid, num_test])

  # create dataloader instance
  train_dataloader = DataLoader(train, batch_size=opt["batch_size"], shuffle=True, drop_last=True)
  valid_dataloader = DataLoader(valid, batch_size=opt["batch_size"], shuffle=False, drop_last=False)
  test_dataloader = DataLoader(test, batch_size=opt["batch_size"], shuffle=False, drop_last=False)

  return train_dataloader, valid_dataloader, test_dataloader, dataset

## **Trajectory LSTM Model 클래스 정의**

In [None]:
class TrajectoryLSTM(nn.Module) :
  def __init__(self, input_size, target_size, hidden_size, num_layer, p) :
    # Inheritance
    super(TrajectoryLSTM, self).__init__()

    # Create LSTM Layer Instance
    self.lstm = nn.LSTM(hidden_size, hidden_size, num_layers=num_layer, bidirectional=False, batch_first=True, dropout=p)
    self.bilstm = nn.LSTM(hidden_size, hidden_size//2, num_layers=num_layer, bidirectional=True, batch_first=True, dropout=p)

    # Create FC Layer Instance
    self.input2lstm = nn.Linear(input_size, hidden_size)
    self.input2bilstm = nn.Linear(input_size, hidden_size)
    self.fc0 = nn.Linear(hidden_size, 128)
    self.fc1 = nn.Linear(128, 64)
    self.fc2 = nn.Linear(64, target_size)
    self.input2output = nn.Linear(input_size, 64)

    # Create Activation Layer Instance
    self.act = nn.Tanh()

  def forward(self, input) :
    lstmOutput, _ = self.lstm(self.input2lstm(input))
    bilstmOutput, _ = self.bilstm(self.input2bilstm(input))

    output = self.act(self.fc0(lstmOutput + bilstmOutput))
    output = self.act(self.fc1(output)) + self.input2output(input)
    output = self.fc2(output)

    return output

## **훈련 및 모델 하이퍼파라미터 선정**

In [None]:
opt = {"input_size":30, "target_size":4, "hidden_size":256, "num_layer":5, "p":0.1,
       "batch_size":128, "num_epoch":50, "lr":1e-3, "seed":42}

## **Seed 고정**

In [None]:
import random
import numpy as np

In [None]:
def fix_seed(seed) :
  random.seed(seed)
  np.random.seed(seed)
  torch.manual_seed(seed)
  torch.cuda.manual_seed(seed)
  torch.cuda.manual_seed_all(seed)
  torch.backends.cudnn.deterministic = True
  torch.backends.cudnn.benchmark = False

## **훈련 과정 요약을 위한 Average Meter 인스턴스 생성**

In [None]:
class AverageMeter(object) :
  def __init__(self) :
    self.reset()

  def reset(self) :
    self.val = 0
    self.avg = 0
    self.sum = 0
    self.count = 0

  def update(self, val, n=1) :
    self.val = val
    self.sum += val*n
    self.count += n
    self.avg = self.sum / self.count

## **Trajectory LSTM 모델 훈련**

### **사용 Device 정하기 (GPU 또는 CPU)**

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
print(f"Device Type : {device}")

### **DataLoader 인스턴스 생성**

In [None]:
train_dataloader, valid_dataloader, test_dataloader, dataset = get_dataloader(opt, "/content/WholeVdata2.csv")

### **Trajectory LSTM 모델 인스턴스 생성**

In [None]:
fix_seed(opt["seed"])

In [None]:
model = TrajectoryLSTM(opt["input_size"], opt["target_size"], opt["hidden_size"], opt["num_layer"], opt["p"]).to(device)

### **Trajectory LSTM 모델 파라미터 개수 계산**

In [None]:
num_param = sum(p.numel() for p in model.parameters() if p.requires_grad)

In [None]:
print(f"Number of Trainable Parameters : {num_param:,}")

### **손실 함수 인스턴스 생성**

In [None]:
criterion = nn.MSELoss()

### **Optimizer 인스턴스 생성**

In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=opt["lr"])

### **훈련 결과 저장을 위한 AverageMeter 인스턴스 생성**

In [None]:
train_loss, valid_loss = AverageMeter(), AverageMeter()

### **훈련 결과 저장을 위한 Python List 인스턴스 생성**

In [None]:
train_loss_list, valid_loss_list = [], []

In [None]:
best_valid_loss = torch.inf

### **훈련 진행**

In [None]:
for epoch in range(1, opt["num_epoch"]+1) :
  ########################################################################################################################################
  train_bar = tqdm(train_dataloader) # Create TQDM Instance
  train_loss.reset() # Reset AverageMeter Instance

  model.train() # Train Mode

  for data in train_bar :
    input, target = data # Unpack Tuple Elements
    input, target = input.to(device), target.to(device) # Assign Device
    optimizer.zero_grad() # Set Gradient to 0
    pred = model(input) # Get Prediction
    loss = criterion(pred[:,-30:,2:4], target[:,-30:,2:4]) # Compute Loss
    loss.backward() # Back-Propagation
    optimizer.step() # Update Weight

    train_loss.update(loss.detach().cpu().item(), opt["batch_size"]) # Compute Averaged Loss
    train_bar.set_description(desc=f"[{epoch}/{opt['num_epoch']}] [Train] < Loss:{train_loss.avg:.4f} >")

  train_loss_list.append(train_loss.avg)

  ########################################################################################################################################

  valid_bar = tqdm(valid_dataloader) # Create TQDM Instance
  valid_loss.reset() # Reset AverageMeter Instance

  model.eval() # Evaulation Mode

  for data in valid_bar :
    input, target = data # Unpack Tuple Elements
    input, target = input.to(device), target.to(device) # Assign Device

    with torch.no_grad() :
      pred = model(input) # Get Prediction
      loss = criterion(pred[:,-30:,2:4], target[:,-30:,2:4]) # Compute Loss

      valid_loss.update(loss.detach().cpu().item(), opt["batch_size"]) # Compute Averaged Loss
      valid_bar.set_description(desc=f"[{epoch}/{opt['num_epoch']}] [Valid] < Loss:{valid_loss.avg:.4f} >")

  valid_loss_list.append(valid_loss.avg)

  if valid_loss.avg < best_valid_loss :
    best_valid_loss = valid_loss.avg
    torch.save(model.state_dict(), "Best-LSTM.pth")

  torch.save(model.state_dict(), "Latest-LSTM.pth")

In [None]:
print(f"Best Valid Loss : {best_valid_loss:.4f}")

## **Trajectory LSTM 모델 훈련 과정 시각화**

In [None]:
plt.plot(np.arange(opt["num_epoch"]), train_loss_list, label="Train Loss")
plt.plot(np.arange(opt["num_epoch"]), valid_loss_list, label="Valid Loss")
plt.legend(loc="best")
plt.xlabel("Epoch")
plt.ylabel("MSE Loss")
plt.title("[Trajectory] Train Loss vs. Valid Loss")
plt.show()

## **Trajectory LSTM 모델 추론**

### **Best Model 불러오기**

In [None]:
weights = torch.load("/content/Best-LSTM.pth")
model.load_state_dict(weights, strict=True)

### **전처리에 사용한 통계값 불러오기**

In [None]:
std = dataset.std[:4].to(device)
mn = dataset.mn[:4].to(device)
rg = dataset.range[:4].to(device)

### **Trajectory Model 추론 진행**

In [None]:
pred_list, target_list = [], []

In [None]:
test_bar = tqdm(test_dataloader) # Create TQDM Instance

model.eval() # Evaulation Mode

for data in test_bar :
  input, target = data # Unpack Tuple Elements
  input, target = input.to(device), target.to(device) # Assign Device

  with torch.no_grad() :
    pred = model(input)
    pred = (pred*(rg*std) + mn).detach().cpu().numpy()
    pred = scipy.signal.savgol_filter(pred, window_length=5, polyorder=2,axis=1)

    target = (target*(rg*std)+mn).detach().cpu().numpy()
    pred[:,:-30,:] = target[:,:-30,:]

    pred_list.append(pred)
    target_list.append(target)

### **추론 (예측) 결과 시각화**

In [None]:
plt.figure(figsize=(10, 5))

index = 0
plt.plot(pred_list[0][index,:,2], pred_list[0][index,:,3], "r", label="Prediction")
plt.plot(target_list[0][index,:,2], target_list[0][index,:,3], "g", label="Ground-Truth")
plt.xlabel("Local X Coordinate")
plt.ylabel("Local Y Coordinate")
plt.title("Trajectory Prediction")
plt.legend(loc="best")
plt.show()

## **Trajectory LSTM 모델 구조를 바꾸어 가면서 성능을 올려보세요**

In [None]:
# Option Dictionary 입력

### **Trajectory LSTM Model 클래스 정의**

In [None]:
# 모델 구조 설계

### **Trajectory LSTM 모델 훈련 (MSE)**

#### **LSTM 모델 인스턴스 생성**

In [None]:
# 시드 고정

In [None]:
# 모델 인스턴스 생성

#### **LSTM 모델 파라미터 개수 계산**

In [None]:
# 모델 파라미터 계산

In [None]:
# 모델 파라미터 개수 출력

#### **손실 함수 인스턴스 생성**

In [None]:
# MSE 손실 함수 인스턴스 생성

#### **Optimizer 인스턴스 생성**

In [None]:
# Adam Optimizer 인스턴스 생성

#### **훈련 진행**

In [None]:
# 모델 훈련 코드 작성

### **Trajectory LSTM 모델 훈련 과정 시각화**

In [None]:
# 훈련 과정 시각화 코드 작성

### **모델 성능 평가**

#### **Best Model 불러오기**

In [None]:
# Best Model 불러오기

#### **전처리에 사용한 통계값 불러오기**

In [None]:
# 통계값 계산

#### **Trajectory Model 추론 진행**

In [None]:
# List 인스턴스 생성

In [None]:
# 모델 추론 진행

#### **추론 (예측) 결과 시각화**

In [None]:
# 예측 결과 시각화