In [1]:
# sgan 모델에 대한 궤적 시각화

In [1]:
import argparse
import os
import sys
import torch
import matplotlib.pyplot as plt
import numpy as np


In [None]:
_path = os.getcwd()
print(f"현재 디렉토리: {_path}")

_path = _path.split("/")[:-2]
_path = "/".join(_path)
print(f"MMT4 디렉토리(_path):{_path}")

현재 디렉토리: /home/ngnadmin/dev/ngn_2024/MMT4/MMT/vis/sgan
MMT3 디렉토리(_path):/home/ngnadmin/dev/ngn_2024/MMT4/MMT


In [3]:
sys.path.append(_path) # MMT3 디렉토리를 path 추가

In [4]:
plt.style.use("seaborn-dark")
###### loader check ######
from mmt.data.loader import data_loader
###### model check #######
from mmt.models_eth.mmt_noscene import TrajectoryGenerator
from mmt.utils import (
    int_tuple,
    relative_to_abs,
    get_dset_path,
)
from mmt.losses import(
    displacement_error,
    final_displacement_error,
    l2_loss
)

In [5]:
from typing import Tuple
class CreateArg():
    def __init__(self):
        
        self.num_samples = 20 # type=int

        # Dataset options
        self.dataset_name = 'sdd_dC'
        self.delim = '\t'
        self.loader_num_workers = 0 # 4 -> 1
        self.obs_len = 8
        self.pred_len = 8                   ############################### pred_len check !!!
        self.skip = 1
        # Optimization
        self.batch_size = 8                 ################################  batch_size check !!! (scene encoder 있는 경우 1)
        self.num_iterations = 1000 # 
        self.num_epochs = 50 #                      
        # Model Options
        self.embedding_dim = 64
        self.num_layers = 1
        self.dropout = 0.0
        self.batch_norm = 0 
        self.mlp_dim = 1024
        self.state_type = 2
                                    
        # Generator Options
        self.encoder_h_dim_g = 64
        self.decoder_h_dim_g = 128
        self.noise_dim : Tuple[int] = (0, 0) # default=None # type=int_tuple
        self.noise_type = 'gaussian'
        self.noise_mix_type = 'ped'
        self.clipping_threshold_g = 0 # type=float
        self.g_learning_rate = 5e-4 # type=float 
        self.g_steps = 1

        # Pooling Options
        self.pooling_type = 'pool_net' 
        self.pool_every_timestep = 1 # type=bool_flag

        # Pool Net Option
        self.bottleneck_dim = 1024 # type=int

        # Discriminator Options
        self.d_type = 'local' # type=str
        self.encoder_h_dim_d = 64 # type=int
        self.d_learning_rate = 5e-4 # type=float
        self.d_steps = 2 # type=int        
        self.clipping_threshold_d = 0 # type=float  

        # Loss Options
        self.l2_loss_weight = 0 # type=float 
        self.best_k = 1 # type=int 

        # Output
        # 궤적 이미지 출력 폴더
        self.output_dir = os.getcwd()+ '/fig/obs8/1'
        self.print_every = 5 # type=int
        self.checkpoint_every = 100 # type=int
        self.checkpoint_start_from = None
        self.restore_from_checkpoint = 1 # type=int
        self.num_samples_check = 5000 # type=int        

        # Misc
        self.use_gpu = 1 # type=int
        self.timing = 0 # type=int
        self.gpu_num = "0" # type=str   

        # 시각화 할 모델 경로 (체크포인트)
        self.restore_path = "/home/ngnadmin/dev/ngn_2024/MMT4/MMT/scripts_sdd/output/noscene/state0/obs8/1/lstm_0_with_model.pt"


args = CreateArg() 

In [6]:
# 주요 변수 확인
print(f"restore_model_path: {args.restore_path}")
print(f"img output_dir: {args.output_dir}")
print(f"batch_size: {args.batch_size}")

restore_model_path: /home/ngnadmin/dev/ngn_2024/MMT4/MMT/scripts_sdd/output/noscene/state0/obs8/1/lstm_0_with_model.pt
img output_dir: /home/ngnadmin/dev/ngn_2024/MMT4/MMT/vis/sgan/fig/obs8/1
batch_size: 8


In [7]:
# 오류를 기반으로 모델 여러 출력 중 가장 좋은 경로를 선택
def evaluate_helper(error, seq_start_end, model_output_traj, model_output_traj_best):
    error = torch.stack(error, dim=1) # 모델 출력 에러를 나타내는 텐서들 리스트 스택
    for (start, end) in seq_start_end:
        start = start.item()
        end = end.item()
        _error = error[start:end] # error 텐서에서 해당 시퀀스의 시퀀스 에러 부분을 추출
        _error = torch.sum(_error, dim=0) # 각 경로에 대한 총 에러 계산산
        min_index = _error.min(0)[1].item() # 총 에러가 가장 작은 경로 인덱스 구하기기
        model_output_traj_best[:, start:end, :] = model_output_traj[min_index][ # 가장 작은 에러를 가진 경로를 해당 시퀀스에 복사
            :, start:end, :
        ]
    return model_output_traj_best 

In [18]:
def get_generator(checkpoint):
    # n_units = (
    #     [args.traj_lstm_hidden_size]
    #     + [int(x) for x in args.hidden_units.strip().split(",")]
    #     + [args.graph_lstm_hidden_size]
    # )
    # n_heads = [int(x) for x in args.heads.strip().split(",")]
    generator = TrajectoryGenerator(
        obs_len=args.obs_len,
        pred_len=args.pred_len, 
        state_type=args.state_type,
        embedding_dim=args.embedding_dim,
        encoder_h_dim=args.encoder_h_dim_g,
        decoder_h_dim=args.decoder_h_dim_g,
        mlp_dim=args.mlp_dim,
        num_layers=args.num_layers,
        noise_dim=args.noise_dim,
        noise_type=args.noise_type,
        noise_mix_type=args.noise_mix_type,
        pooling_type=args.pooling_type,
        pool_every_timestep=args.pool_every_timestep,
        dropout=args.dropout,
        bottleneck_dim=args.bottleneck_dim,
        # neighborhood_size=args.neighborhood_size,
        # grid_size=args.grid_size,
        batch_norm=args.batch_norm
    )
    generator.load_state_dict(checkpoint["g_state"])
    generator.cuda()
    generator.eval()
    return generator

In [9]:
def cal_ade_fde(pred_traj_gt, pred_traj_fake):
    ade = displacement_error(pred_traj_fake, pred_traj_gt, mode="raw")
    fde = final_displacement_error(pred_traj_fake[-1], pred_traj_gt[-1], mode="raw")
    return ade, fde

In [10]:
def plot_trajectory(args, loader, generator):
    ground_truth_input = [] # 관찰된 궤적
    all_model_output_traj = [] # 모델의 모든 예측 궤적
    ground_truth_output = [] # 실제 궤적을 저장하기 위한 리스트
    pic_cnt = 0
    with torch.no_grad():
        for batch in loader:
            batch = [tensor.cuda() for tensor in batch]
            (obs_traj, _, _,     
            pred_traj_gt, _, _,
            obs_traj_rel, pred_traj_gt_rel, 
            _, loss_mask, seq_start_end,  _
            ) = batch
            ade = []
            ground_truth_input.append(obs_traj)
            ground_truth_output.append(pred_traj_gt)
            model_output_traj = []
            model_output_traj_best = torch.ones_like(pred_traj_gt).cuda() # 동일한 shpae의 tensor를 1로 채워줌

            for _ in range(args.num_samples):

                pred_traj_fake_rel = generator(
                                    obs_traj, obs_traj_rel, seq_start_end,
                                  )

                pred_traj_fake_rel = pred_traj_fake_rel[-args.pred_len :] # 시퀀스 마지막 pred_len 길이 만큼 (예측한 부분)

                pred_traj_fake = relative_to_abs(pred_traj_fake_rel, obs_traj[-1]) #

                model_output_traj.append(pred_traj_fake) # 

                ade_, fde_ = cal_ade_fde(pred_traj_gt, pred_traj_fake)
                ade.append(ade_)
            model_output_traj_best = evaluate_helper(
                ade, seq_start_end, model_output_traj, model_output_traj_best   # 가장 작은 ADE 가진 예측 궤적을 선택하고 최적 궤적 텐서에 저장함
            )
            all_model_output_traj.append(model_output_traj_best)

            for (start, end) in seq_start_end:
                plt.figure(figsize=(20,15), dpi=100)
                ground_truth_input_x_piccoor = (    # 관찰된 궤적 x
                    obs_traj[:, start:end, :].cpu().numpy()[:, :, 0].T
                )
                ground_truth_input_y_piccoor = (  # 관찰된 궤적 y
                    obs_traj[:, start:end, :].cpu().numpy()[:, :, 1].T
                )
                ground_truth_output_x_piccoor = (   # 실제 궤적 x
                    pred_traj_gt[:, start:end, :].cpu().numpy()[:, :, 0].T
                )
                ground_truth_output_y_piccoor = (   # 실제 궤적 y
                    pred_traj_gt[:, start:end, :].cpu().numpy()[:, :, 1].T
                )
                model_output_x_piccoor = (          # 모델이 예측한 궤적 x
                    model_output_traj_best[:, start:end, :].cpu().numpy()[:, :, 0].T
                )
                model_output_y_piccoor = (           # 모델이 예측한 궤적 y
                    model_output_traj_best[:, start:end, :].cpu().numpy()[:, :, 1].T
                )
                # print("model_output_traj_best shape:", model_output_traj_best.shape) # (8, 71, 2)
                # print("model_output_traj_best values:", model_output_traj_best)

                # print("model_output_traj_best shape:", model_output_traj_best.shape) # (8, 71, 2)
                # print("model_output_traj_best values:", model_output_traj_best)

                # print("model_output_traj_best shape:", model_output_traj_best.shape) # (8, 71, 2)
                # print("model_output_traj_best values:", model_output_traj_best)
                
                # sys.exit()

                for i in range(ground_truth_output_x_piccoor.shape[0]): # 샘플 수

                    observed_line = plt.plot(                   # plot에 선을 그리는 함수: 관찰된 궤적을 플롯으로 그리기
                        ground_truth_input_x_piccoor[i, :], # i번째 샘플의 모든 관찰된 x 좌표
                        ground_truth_input_y_piccoor[i, :],
                        color="#FF5733", # 다홍색 // # "r-",   # 빨간 선
                        linewidth=4,
                        label="Observed Trajectory", # 범례에 이 선을 라벨링
                        alpha=0.7,
                    )[0]                                # plot 함수는 리스트를 반환, 첫 요소가 실제 그린 선을 나타내는 Line2D 객체
                    observed_line.axes.annotate(    # observed_line 객체의 축(axis)에 주석을 추가하는 함수, annotate 함수는 텍스트 주석과 함께 화살표 추가
                        "", # 주석으로 표시할 텍스트, 텍스트는 표시 않고, 화살표만 그리기
                        xytext=( # 화살표 시작점
                            ground_truth_input_x_piccoor[i, -2], # i번째 샘플의 두 번째 마지막 x 좌표
                            ground_truth_input_y_piccoor[i, -2],
                        ),
                        xy=( # 화살표 끝점점
                            ground_truth_input_x_piccoor[i, -1], # i번째 샘플의 마지막 x 좌표
                            ground_truth_input_y_piccoor[i, -1],
                        ),
                        arrowprops=dict(
                            arrowstyle="->", color=observed_line.get_color(), lw=1 # 화살표 스타일 지정: 머리가 점을 향하도록, color: 화살표 색상을 observed_line과 동일하게, 화살표 두께=1
                        ),
                        size=20, # 주석 텍스트 크기
                    )

                    ground_line = plt.plot(
                        np.append(
                            ground_truth_input_x_piccoor[i, -1], # np.append: 관찰된 궤적의 마지막 x 좌표와 실제 궤적의 모든 x 좌표를 하나의 배열로 결합
                            ground_truth_output_x_piccoor[i, :],
                        ),
                        np.append(
                            ground_truth_input_y_piccoor[i, -1],
                            ground_truth_output_y_piccoor[i, :],
                        ),
                        color="#00FFFF", # 형광 하늘색 // # "b-",
                        linewidth=4,
                        label="Ground Truth",
                        alpha=0.7,
                    )[0]

                    predict_line = plt.plot(
                        np.append( # 관찰된 궤적의 마지막 x 좌표와 모델이 예측한 궤적의 모든 x 좌표를 하나의 배열로 결합
                            ground_truth_input_x_piccoor[i, -1], # i번째 샘플의 관찰된 궤적의 마지막 x 좌표
                            model_output_x_piccoor[i, :], # i번째 샘플의 모델이 예측한 궤적의 모든 x 좌표
                        ),
                        np.append( # 예측된 궤적이 관찰된 궤적의 끝에서 시작하는 연속적인 궤적으로 표시
                            ground_truth_input_y_piccoor[i, -1],
                            model_output_y_piccoor[i, :],
                        ),
                        color="#ADFF2F", # 연두             // #"#ffff00", # 노랑
                        ls="--", # 선의 스타일을 점선으로 설정
                        linewidth=4,
                        label="Predicted Trajectory",
                        alpha=0.7,
                    )[0]

                plt.axis("off")
                plt.savefig(
                    args.output_dir+"/pic_{}.png".format(pic_cnt),
                    transparent=True # 투명으로 저장
                )
                plt.close()
                pic_cnt += 1

In [15]:
def main(args):
    checkpoint = torch.load(args.restore_path) # .pt 파일을 불러올 경로
    generator = get_generator(checkpoint)
    # path = get_dset_path(args.dataset_name, args.dset_type)
    dst_path = args.output_dir
    if not os.path.exists(dst_path):
        os.makedirs(dst_path)
        print(f"Directory {dst_path} created.")
    else:
        print(f"Directory {dst_path} already exists.")
    path = '/home/ngnadmin/dev/ngn_2024/MMT4/MMT/mmt/datasets/sdd_dC/test2'
    _, loader = data_loader(args, path)
    plot_trajectory(args, loader, generator)

In [16]:
# 주요 변수 확인
print(f"불러올 모델 체크포인트 파일 경로: {args.restore_path}")
print(f"이미지가 저장될 위치: {args.output_dir}")
print(f"축제거")
print(f"투명배경")
# print(f"모델 궤적만 가시화")

불러올 모델 체크포인트 파일 경로: /home/ngnadmin/dev/ngn_2024/MMT4/MMT/scripts_sdd/output/noscene/state0/obs8/1/lstm_0_with_model.pt
이미지가 저장될 위치: /home/ngnadmin/dev/ngn_2024/MMT4/MMT/vis/sgan/fig/obs8/1
축제거
투명배경


In [17]:
main(args)

KeyboardInterrupt: 