# **Keras를 활용한 Trajectory Prediction**

## **Google Drive Mount 하기**

In [None]:
from google.colab import drive

drive.mount("/content/gdrive")

## **사용할 라이브러리 불러오기**

In [None]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

## **데이터 읽어오기**

### **Bounding Box에서 얻은 좌표를 기반으로 작성된 데이터셋**

In [None]:
!unzip "/content/gdrive/MyDrive/LSTMTracking/dataset/HCMC-vehicle-dataset.zip"

In [None]:
data = pd.read_csv("/content/HCMC-vehicle-dataset.csv", index_col=0)

## **전처리 진행하기**

In [None]:
data.head(10)

### **One-Hot Encoding 적용**

In [None]:
classNameEncoded = pd.get_dummies(data["Class_name"]).astype("int")

In [None]:
data = data.drop(["Class_name"], axis=1)

In [None]:
data = pd.concat([data, classNameEncoded], axis=1)

In [None]:
data.head(10)

### **데이터 전반적으로 살펴 보기**

In [None]:
data.info()

In [None]:
data.describe()

## **Train / Test Dataset 분리**

In [None]:
targetCol = ["X_max_8", "X_min_8", "Y_max_8", "Y_min_8"]

In [None]:
input = data.drop(targetCol + ["Track_ID", "arrival"], axis=1)
target = data[targetCol]

In [None]:
input.head(10)

In [None]:
target.head(10)

### **Scikit-Learn 라이브러리 사용**

In [None]:
xTrain, xTest, yTrain, yTest = train_test_split(input, target, test_size=0.1, random_state=42)

In [None]:
print(f"Input Train Data Shape : {xTrain.shape}")
print(f"Target Train Data Shape : {yTrain.shape}")

In [None]:
timesteps = xTrain.shape[1]

In [None]:
xTrain = xTrain.values.reshape((xTrain.shape[0], timesteps, 1))
xTest = xTest.values.reshape((xTest.shape[0], timesteps, 1))
yTrain = yTrain.values.reshape((yTrain.shape[0], 4))
yTest = yTest.values.reshape((yTest.shape[0], 4))

In [None]:
print(f"Input Train Data Shape : {xTrain.shape}")
print(f"Target Train Data Shape : {yTrain.shape}")

## **Keras 라이브러릴 활용한 Seqeuntial Data Analysis 진행**

### **하이퍼파라미터 설정**

In [None]:
opt = {"batchSize":128, "optimizer":"adam", "loss":"mean_squared_error", "epoch":50}

### **Tensorflow 및 Keras 라이브러리 불러오기**

In [None]:
from keras.layers import Input, Dense, LSTM, Conv1D, MaxPooling1D, Flatten, GRU
from keras.models import Sequential, Model, load_model

### **LSTM 모델 제작 / 훈련 / 추론 진행**

In [None]:
inputShape = (xTrain.shape[1], xTrain.shape[2])

In [None]:
model = Sequential()
model.add(Input(inputShape))
model.add(LSTM(50))
model.add(Dense(4))

In [None]:
model.compile(loss=opt["loss"], optimizer=opt["optimizer"])

In [None]:
model.summary()

In [None]:
history = model.fit(xTrain, yTrain, epochs=opt["epoch"], batch_size=opt["batchSize"], validation_data=(xTest, yTest))

#### **LSTM 모델 훈련 결과 시각화**

In [None]:
history.history

In [None]:
trainLoss, testLoss = history.history["loss"], history.history["val_loss"]

In [None]:
plt.plot(np.arange(opt["epoch"]), trainLoss, label="Train Loss")
plt.plot(np.arange(opt["epoch"]), testLoss, label="Test Loss")
plt.xlabel("Epoch")
plt.ylabel("MSE Loss")
plt.title("[MSE] Train Loss vs. Test Loss")
plt.legend(loc="best")
plt.show()

#### **LSTM 모델 추론 진행**

In [None]:
model.evaluate(xTest, yTest, batch_size=opt["batchSize"])

In [None]:
model.save("LSTM.keras")

### **GRU 모델 제작 / 훈련 / 추론 진행**

In [None]:
inputShape = (xTrain.shape[1], xTrain.shape[2])

In [None]:
model = Sequential()
model.add(Input(inputShape))
model.add(GRU(50))
model.add(Dense(4))

In [None]:
model.compile(loss=opt["loss"], optimizer=opt["optimizer"])

In [None]:
model.summary()

In [None]:
history = model.fit(xTrain, yTrain, epochs=opt["epoch"], batch_size=opt["batchSize"], validation_data=(xTest, yTest))

#### **LSTM 모델 훈련 결과 시각화**

In [None]:
history.history

In [None]:
trainLoss, testLoss = history.history["loss"], history.history["val_loss"]

In [None]:
plt.plot(np.arange(opt["epoch"]), trainLoss, label="Train Loss")
plt.plot(np.arange(opt["epoch"]), testLoss, label="Test Loss")
plt.xlabel("Epoch")
plt.ylabel("MSE Loss")
plt.title("[MSE] Train Loss vs. Test Loss")
plt.legend(loc="best")
plt.show()

#### **GRU 모델 추론 진행**

In [None]:
model.evaluate(xTest, yTest, batch_size=opt["batchSize"])

In [None]:
model.save("GRU.keras")

### **1-D Convolution를 조합한 LSTM 모델 제작 / 훈련 / 추론 진행**

In [None]:
inputShape = (xTrain.shape[1], 1)

In [None]:
inputLayer = Input(inputShape)
cnnLayer = Conv1D(32, kernel_size=3, activation="relu")(inputLayer)
cnnLayer = MaxPooling1D(pool_size=2)(cnnLayer)
lstmLayer = LSTM(50)(cnnLayer)
outputLayer = Dense(4)(lstmLayer)

In [None]:
modelLSTM = Model(inputs=inputLayer, outputs=outputLayer)

In [None]:
modelLSTM.compile(loss=opt["loss"], optimizer=opt["optimizer"])

In [None]:
modelLSTM.summary()

In [None]:
history = modelLSTM.fit(xTrain, yTrain, epochs=opt["epoch"], batch_size=opt["batchSize"], validation_data=(xTest, yTest))

#### **1-D Convolution을 조합한 LSTM 모델 훈련 결과 시각화**

In [None]:
history.history

In [None]:
trainLoss, testLoss = history.history["loss"], history.history["val_loss"]

In [None]:
plt.plot(np.arange(opt["epoch"]), trainLoss, label="Train Loss")
plt.plot(np.arange(opt["epoch"]), testLoss, label="Test Loss")
plt.xlabel("Epoch")
plt.ylabel("MSE Loss")
plt.title("[MSE] Train Loss vs. Test Loss")
plt.legend(loc="best")
plt.show()

#### **1-D Convolution을 조합한 LSTM 모델 모델 추론 진행**

In [None]:
modelLSTM.evaluate(xTest, yTest, batch_size=opt["batchSize"])

In [None]:
modelLSTM.save("Conv-LSTM.keras")

### **1-D Convolution를 조합한 GRU 모델 제작 / 훈련 / 추론 진행**

In [None]:
inputShape = (xTrain.shape[1], 1)

In [None]:
inputLayer = Input(inputShape)
cnnLayer = Conv1D(32, kernel_size=3, activation="relu")(inputLayer)
cnnLayer = MaxPooling1D(pool_size=2)(cnnLayer)
gruLayer = GRU(50)(cnnLayer)
outputLayer = Dense(4)(gruLayer)

In [None]:
modelGRU = Model(inputs=inputLayer, outputs=outputLayer)

In [None]:
modelGRU.compile(loss=opt["loss"], optimizer=opt["optimizer"])

In [None]:
modelGRU.summary()

In [None]:
history = modelGRU.fit(xTrain, yTrain, epochs=opt["epoch"], batch_size=opt["batchSize"], validation_data=(xTest, yTest))

#### **1-D Convolution을 조합한 GRU 모델 훈련 결과 시각화**

In [None]:
history.history

In [None]:
trainLoss, testLoss = history.history["loss"], history.history["val_loss"]

In [None]:
plt.plot(np.arange(opt["epoch"]), trainLoss, label="Train Loss")
plt.plot(np.arange(opt["epoch"]), testLoss, label="Test Loss")
plt.xlabel("Epoch")
plt.ylabel("MSE Loss")
plt.title("[MSE] Train Loss vs. Test Loss")
plt.legend(loc="best")
plt.show()

#### **1-D Convolution을 조합한 GRU 모델 모델 추론 진행**

In [None]:
modelGRU.evaluate(xTest, yTest, batch_size=opt["batchSize"])

In [None]:
modelGRU.save("Conv-GRU.keras")

### **LSTM과 GRU 추론 결과 비교**

In [None]:
predLSTM = modelLSTM.predict(xTest)

In [None]:
predGRU = modelGRU.predict(xTest)

In [None]:
pd.DataFrame({"Pred X Min (LSTM)":predLSTM[:,0], "Pred X Min (GRU)":predGRU[:,0], "Target X Min":yTest[:,0]}).head(10)

## **Scikit-Learn을 활용한 모델 성능 비교**

In [None]:
from sklearn.metrics import mean_absolute_percentage_error, mean_absolute_error
from sklearn.metrics import mean_squared_error, r2_score

### **Symmetric Mean Absolute Percentage Error (SMAPE)**
*   https://computer-nerd-coding.tistory.com/31




In [None]:
def computeSMAPE(actual, predicted) :
    if not all([isinstance(actual, np.ndarray), isinstance(predicted, np.ndarray)]) :
        actual, predicted = np.array(actual),
        np.array(predicted)
    return round(np.mean(np.abs(predicted-actual) / ((np.abs(predicted)+np.abs(actual))/2))*100, 2)

### **저장된 모델 불러오기**

In [None]:
modelLSTM = load_model("/content/Conv-LSTM.keras")

In [None]:
modelGRU = load_model("/content/Conv-GRU.keras")

In [None]:
predLSTM, predGRU = modelLSTM.predict(xTest), modelGRU.predict(xTest)

In [None]:
print(f"LSTM Prediciton Shape : {predLSTM.shape}")
print(f"GRU Prediciton Shape : {predGRU.shape}")

In [None]:
predLSTM = predLSTM.reshape(-1)
predGRU = predGRU.reshape(-1)

In [None]:
print(f"LSTM Prediciton Shape : {predLSTM.shape}")
print(f"GRU Prediciton Shape : {predGRU.shape}")

In [None]:
yTestFlatten = yTest.reshape(-1)

In [None]:
print(f"Test Dataset Shape : {yTestFlatten.shape}")

### **Metric 계산하기**

In [None]:
print("[Metric] 1-D Conv LSTM")
print(f"SMAPE : {computeSMAPE(yTestFlatten, predLSTM)}")
print(f"MSE : {mean_squared_error(yTestFlatten, predLSTM)}")
print(f"MAE : {mean_absolute_error(yTestFlatten, predLSTM)}")
print(f"R2 Score : {r2_score(yTestFlatten, predLSTM)}")

In [None]:
print("[Metric] 1-D Conv GRU")
print(f"SMAPE : {computeSMAPE(yTestFlatten, predGRU)}")
print(f"MSE : {mean_squared_error(yTestFlatten, predGRU)}")
print(f"MAE : {mean_absolute_error(yTestFlatten, predGRU)}")
print(f"R2 Score : {r2_score(yTestFlatten, predGRU)}")

In [None]:
resultLSTM = {"SMAPE":computeSMAPE(yTestFlatten, predLSTM), "MSE":mean_squared_error(yTestFlatten, predLSTM), "MAE":mean_absolute_error(yTestFlatten, predLSTM), "R2 Score":r2_score(yTestFlatten, predLSTM)}
resultGRU = {"SMAPE":computeSMAPE(yTestFlatten, predGRU), "MSE":mean_squared_error(yTestFlatten, predGRU), "MAE":mean_absolute_error(yTestFlatten, predGRU), "R2 Score":r2_score(yTestFlatten, predGRU)}

In [None]:
pd.DataFrame({"LSTM":resultLSTM, "GRU":resultGRU})

## **Trajectory 시각화**

In [None]:
predLSTM, predGRU = modelLSTM.predict(xTest), modelGRU.predict(xTest)

In [None]:
print(f"LSTM Prediciton Shape : {predLSTM.shape}")
print(f"GRU Prediciton Shape : {predGRU.shape}")

In [None]:
print(f"Input Test Dataset Shape : {xTest.shape}")

In [None]:
targetCoordX, targetCoordY = [], []
lstmCoordX, lstmCoordY = [], []
gruCoordX, gruCoordY = [], []

rowIndex = 2
for i in range(8) :
  xMin, xMax, yMin, yMax = xTest[rowIndex, 4*i, 0], xTest[rowIndex, 4*i+1, 0], xTest[rowIndex, 4*i+2, 0], xTest[rowIndex, 4*i+3, 0]

  targetCoordX.append((xMin+xMax)/2), targetCoordY.append((yMin+yMax)/2)
  lstmCoordX.append((xMin+xMax)/2), lstmCoordY.append((yMin+yMax)/2)
  gruCoordX.append((xMin+xMax)/2), gruCoordY.append((yMin+yMax)/2)

In [None]:
print(targetCoordX)
print(targetCoordY)

In [None]:
yTest.shape

In [None]:
targetCoordX.append(np.mean(yTest[rowIndex,:2])), targetCoordY.append(np.mean(yTest[rowIndex,2:]))

In [None]:
print(targetCoordX)
print(targetCoordY)

In [None]:
lstmCoordX.append(np.mean(predLSTM[rowIndex,:2])), lstmCoordY.append(np.mean(predLSTM[rowIndex,2:]))

In [None]:
print(lstmCoordX)
print(lstmCoordY)

In [None]:
gruCoordX.append(np.mean(predGRU[rowIndex,:2])), gruCoordY.append(np.mean(predGRU[rowIndex,2:]))

In [None]:
print(gruCoordX)
print(gruCoordY)

In [None]:
fig = plt.figure(figsize=(10, 5))
plt.plot(targetCoordX, targetCoordY, c="b", label="Target Trajectory")
plt.plot(lstmCoordX, lstmCoordY, c="g", label="LSTM Pred Trajectory")
plt.plot(gruCoordX, gruCoordY, c="r", label="GRU Pred Trajectory")
plt.xlabel("X-Coordinate")
plt.ylabel("Y-Coordinate")
plt.title("Trajectory Prediction")
plt.legend(loc="best")
plt.show()

# **Keras를 활용한 개별 실습**

## **Scikit-Learn의 preprocessing에서부터 MinMaxScaler 불러온 후 데이터 전처리를 진행하세요**
*   [Exercise 03번 참고](https://github.com/jetsonai/LSTMTracking/blob/main/03_Sequential_Data_Analysis/Exercise_03_Sequential_Data_Analysis_with_LSTM.ipynb)



In [None]:
# 코드를 입력해주세요.

## **1-D Convolution 기반 Multi-Layer LSTM 및 GRU 모델을 만드세요**
*   LSTM 및 GRU 인자 → return_sequences=True
*   Dropout을 추가하여 Overfitting을 방지해보세요



In [None]:
# LSTM 코드를 입력해주세요.

In [None]:
# GRU 코드를 입력해주세요.

### **1-D Convolution 기반 LSTM 모델 훈련 / 추론 진행**


In [None]:
# 모델 컴파일

In [None]:
# 모델 훈련

In [None]:
# 훈련 과정 시각화

In [None]:
# 모델 저장

### **1-D Convolution 기반 GRU 모델 훈련 / 추론 진행**


In [None]:
# 모델 컴파일

In [None]:
# 모델 훈련

In [None]:
# 훈련 과정 시각화

In [None]:
# 모델 저장

### **저장된 모델 불러오기**


In [None]:
# LSTM 모델 불러오기

In [None]:
# GRU 모델 불러오기

### **모델 추론 진행**


In [None]:
# LSTM 모델 추론

In [None]:
# GRU 모델 추론

### **Metric 계산**


In [None]:
# LSTM 모델 Metric 계산

In [None]:
# GRU 모델 Metric 계산

### **Trajectory 시각화**


#### **Inverse Transformation을 진행하고 시각화를 진행하세요**

In [None]:
# 모든 방법의 Trajectory 시각화