In [1]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import torch
from torch.utils.data import Dataset, DataLoader
from torch import nn
from torch.optim import Adam
import random
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import tensorflow as tf
import math
from torch.utils.data import TensorDataset, DataLoader
import seaborn as sns
import os
import shap

# 시드 고정
def set_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)  # if you are using multi-GPU.
    np.random.seed(seed)
    random.seed(seed)
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True

set_seed(42)

In [2]:
data = pd.read_csv('/home/aibig25/hong_sj/trb/num.csv')
data = data.fillna(0)

unique_ids = data['sequence_ID'].unique()
train_ids, test_ids = train_test_split(unique_ids, test_size=41, random_state=42)
train_data = data[data['sequence_ID'].isin(train_ids)]
test_data = data[data['sequence_ID'].isin(test_ids)]

independent_vars = data.columns.difference(['center_x', 'center_y','center_x_ma','center_y_ma', 'ID', 'frame', "LC"])
dependent_vars = ['center_y_ma']

scaler = MinMaxScaler()

train_data[independent_vars] = scaler.fit_transform(train_data[independent_vars])
test_data[independent_vars] = scaler.transform(test_data[independent_vars])

X_train = train_data[independent_vars]
y_train = train_data[dependent_vars]

X_test = test_data[independent_vars]
y_test = test_data[dependent_vars]


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy


In [3]:
# 입력 및 예측 시퀀스 길이 정의
input_sequence_length = 120
output_sequence_length = 90

def create_sequences(data, input_sequence_length, output_sequence_length):
    X = []
    y = []

    for i in range(len(data) - input_sequence_length - output_sequence_length + 1):
        X.append(data.iloc[i:(i + input_sequence_length)][independent_vars].values)
        y.append(data.iloc[(i + input_sequence_length):(i + input_sequence_length + output_sequence_length)][dependent_vars].values)
    
    return np.array(X), np.array(y)

X_train, y_train = create_sequences(train_data, input_sequence_length, output_sequence_length)
X_test, y_test = create_sequences(test_data, input_sequence_length, output_sequence_length)

In [4]:
# 데이터셋을 텐서로 변환
train_dataset = TensorDataset(torch.tensor(X_train, dtype=torch.float32), torch.tensor(y_train, dtype=torch.float32))
test_dataset = TensorDataset(torch.tensor(X_test, dtype=torch.float32), torch.tensor(y_test, dtype=torch.float32))

# 데이터 로더 생성
train_loader = DataLoader(train_dataset, batch_size=10, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=10, shuffle=False)

In [5]:
class TrajectoryTransformer(nn.Module):
    def __init__(self, input_dim, model_dim, num_heads, num_encoder_layers, num_decoder_layers, output_dim):
        super(TrajectoryTransformer, self).__init__()
        self.model_dim = model_dim
        
        self.encoder = nn.Linear(input_dim, model_dim)
        self.pos_encoder = PositionalEncoding(model_dim)
        self.tgt_linear = nn.Linear(1, model_dim)
        
        self.transformer = nn.Transformer(
            d_model=model_dim,
            nhead=num_heads,
            num_encoder_layers=num_encoder_layers,
            num_decoder_layers=num_decoder_layers,
            dim_feedforward=model_dim * 4,
            dropout=0.1
        )
        
        self.decoder = nn.Linear(model_dim, output_dim)

    def forward(self, src, tgt):
        src = self.encoder(src)
        src = src * math.sqrt(self.model_dim)
        src = self.pos_encoder(src.permute(1, 0, 2))

        tgt = tgt.squeeze(-1)
        original_shape = tgt.shape
        tgt = tgt.reshape(-1, 1)
        tgt = self.tgt_linear(tgt)
        tgt = tgt.view(original_shape[0], original_shape[1], -1)
        tgt = tgt * math.sqrt(self.model_dim)
        tgt = self.pos_encoder(tgt.permute(1, 0, 2))

        output = self.transformer(src, tgt)
        output = self.decoder(output.permute(1, 0, 2))

        return output

class PositionalEncoding(nn.Module):
    def __init__(self, model_dim, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.encoding = torch.zeros(max_len, model_dim)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, model_dim, 2).float() * (-math.log(10000.0) / model_dim))
        self.encoding[:, 0::2] = torch.sin(position * div_term)
        self.encoding[:, 1::2] = torch.cos(position * div_term)
        self.encoding = self.encoding.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', self.encoding)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return x

In [6]:
# 모델 초기화
input_dim = len(independent_vars)
output_dim = len(dependent_vars)
model_dim = 512
num_heads = 4
num_encoder_layers = 3
num_decoder_layers = 3

model = TrajectoryTransformer(input_dim, model_dim, num_heads, num_encoder_layers, num_decoder_layers, output_dim)

device = torch.device("cuda")
model.to(device)

# 옵티마이저와 손실 함수
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()

In [7]:
# 예시 입력 데이터 (배치 크기: 1, 시간 단계: 120, 변수: 24)
example_src = torch.tensor(X_train[:10], dtype=torch.float32)
example_tgt = torch.tensor(y_train[:10], dtype=torch.float32)

In [17]:
import shap

# SHAP 분석
def model_predict(input_data):
    model.eval()
    # 전체 시퀀스 길이에 맞게 데이터 크기 조정
    num_samples = input_data.shape[0] // len(independent_vars)
    if num_samples == 0:
        return np.zeros((0, output_sequence_length))  # 입력 데이터가 너무 작을 경우 예측값을 0으로 반환
    input_tensor = torch.tensor(input_data.reshape(num_samples, input_sequence_length, len(independent_vars)), dtype=torch.float32).to(device)
    with torch.no_grad():
        dummy_tgt = torch.zeros((input_tensor.size(0), output_sequence_length, 1)).to(device)
        predictions = model(input_tensor, dummy_tgt)
    # 예측 결과를 2차원으로 변환하여 반환
    return predictions.cpu().numpy().reshape(num_samples * output_sequence_length, -1)

# SHAP Explainer 준비
# 2차원으로 입력 데이터를 변환하여 준비
background_data = X_train[:1200].reshape(-1, len(independent_vars))  # 배경 데이터로 사용할 데이터셋
background_data_summary = shap.kmeans(background_data, 50)  # K-means를 사용하여 50개의 샘플로 요약

explainer = shap.KernelExplainer(model_predict, background_data_summary.data)

# SHAP 값 계산
X_test_sample = X_test[:input_sequence_length*10].reshape(-1, len(independent_vars))  # 10개의 시퀀스를 선택
shap_values = explainer.shap_values(X_test_sample, nsamples=100)

# SHAP 값 시각화 (예: 첫 번째 샘플의 60번째 시점)
shap.initjs()
shap.force_plot(explainer.expected_value, shap_values[59], X_test_sample[59])

# 전체 시퀀스에 대한 SHAP 값 요약 플롯
shap.summary_plot(shap_values, X_test_sample)


Provided model function fails when applied to the provided data set.


ValueError: cannot reshape array of size 1250 into shape (2,120,25)