In [None]:
import os
import pandas as pd
import numpy as np
%matplotlib inline
import matplotlib.pyplot as plt
import random
import copy

# Preprocess

In [None]:
directory = './csvFiles'

In [None]:
csv_files = [file for file in os.listdir(directory) if file.endswith('.csv')]

In [None]:
# 파일 목록을 랜덤하게 섞습니다.
random.seed(42)  # 재현 가능한 결과를 위해 시드 설정
random.shuffle(csv_files)

In [None]:
# 첫 번째 파일에서 컬럼 이름을 가져옵니다.
first_file_path = os.path.join(directory, csv_files[0])
first_df = pd.read_csv(first_file_path)

In [None]:
separator_token = 999
separator = pd.DataFrame({col: separator_token for col in first_df.columns}, index=[0])

In [None]:
df_list = []
for file in csv_files:
    file_path = os.path.join(directory, file)
    df = pd.read_csv(file_path)
    df = df.iloc[300:-100]
    df = df.apply(lambda x: float(f"{x:.2f}") if isinstance(x, (int, float)) else x)
    df_list.append(df)
    df_list.append(separator) 

In [None]:
combined_df = pd.concat(df_list, ignore_index=True)

In [None]:
combined_df

In [None]:
len(combined_df)

In [None]:
rotation_columns = [col for col in combined_df.columns if 'Rot' in col]
rotation_df = combined_df[rotation_columns]

# -180~180 사이로 정규화
normalize_angle = lambda x: x if x == 999 else (x - 360) if x > 180 else (x + 360) if x < -180 else x
rotation_df = rotation_df.apply(lambda col: col.apply(normalize_angle))

In [None]:
# -180 ~ 180 범위를 벗어나는 값이 있는지 확인
num_values_out_of_range = rotation_df.apply(lambda col: col.apply(lambda x: x != 999 and (x > 180 or x < -180))).sum().sum()

# 결과 확인
if num_values_out_of_range > 0:
    print(f"범위를 벗어나는 값의 수: {num_values_out_of_range}")
else:
    print("범위를 벗어나는 값이 없습니다.")

In [None]:
rotation_df.to_csv('./rotation_df.csv', index=False)

# Transformer Learning

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchsummary import summary
from sklearn.model_selection import train_test_split
import math
from tqdm import tqdm

In [None]:
# 변환된 데이터를 DataFrame으로 변환
rotation_df = pd.DataFrame(rotation_df, columns=rotation_df.columns)

In [None]:
rotation_df

In [None]:
train = rotation_df.iloc[:897757]
test = rotation_df.iloc[897757:]

In [None]:
X_train, X_val = train_test_split(train, test_size=0.2, random_state=42, shuffle=False)

In [None]:
batch_size = 128
n_input = 30  # Sequence length
n_features = 63  # Number of features
output_units = (21 * 3)  # Output shape
head_size = 256  # Size of attention head
num_heads = 7  # Number of attention heads
ff_dim = 512  # Hidden layer size in feed forward network inside transformer
num_blocks = 4  # Number of transformer blocks
mlp_units = [512, 256, 128]  # Size of the dense layers of the final classifier
dropout_rate = 0.3 


In [None]:
class TimeseriesDataset(Dataset):
    def __init__(self, data, sequence_length):
        self.data = data
        self.sequence_length = sequence_length

    def __len__(self):
        return len(self.data) - self.sequence_length + 1

    def __getitem__(self, index):
        x = self.data[index:index+self.sequence_length] # : 입력 시퀀스 (30개의 데이터)
        y = self.data[index+self.sequence_length-1] # : 예측 시퀀스 (1개의 데이터)
        return torch.from_numpy(x).float(), torch.from_numpy(y).float()

# 데이터셋과 데이터 로더를 생성
train_dataset = TimeseriesDataset(X_train.values, n_input)
val_dataset = TimeseriesDataset(X_val.values, n_input)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

In [None]:
# 데이터 로더에서 첫 번째 배치를 가져와서 형태를 확인
first_batch = next(iter(train_loader))
X, y = first_batch

print("X batch shape:", X.shape)
print("y batch shape:", y.shape)

### Transformer Model

In [None]:
# Positional Encoding 정의
class PositionalEncoding(nn.Module):
    def __init__(self, n_features, n_input):
        super(PositionalEncoding, self).__init__() # 상속받은 nn.Module 클래스의 __init__() 메서드 호출
        pe = torch.zeros(n_input, n_features) # 가장 큰 시퀀스 길이인 max_len을 기준으로 모두 0으로 채워진 크기가 (max_len, n_features)인 텐서 생성
        position = torch.arange(0, n_input, dtype=torch.float).unsqueeze(1) # position 텐서 생성

        # div_term을 계산하는 방식 수정
        div_term = torch.exp(torch.arange(0, n_features, 2).float() * (-math.log(10000.0) / n_features)) # div_term 계산
        
        # div_term의 길이를 n_features의 절반으로 조정
        div_term = div_term.repeat_interleave(2)[:n_features] # div_term 텐서 생성

        pe[:, 0::2] = torch.sin(position * div_term[0::2]) # 짝수 인덱스에는 sin 함수 적용
        pe[:, 1::2] = torch.cos(position * div_term[1::2]) # 홀수 인덱스에는 cos 함수 적용
        pe = pe.unsqueeze(0) # pe = [bs, seq_len, n_feautres]
        self.register_buffer('pe', pe) # pe 텐서를 모델의 버퍼로 등록

    def forward(self, x):
        # print(self.pe.shape)
        x = x + self.pe[:, :x.size(1), :] # 입력에 위치 인코딩을 더함
        return x

# Transformer Block 정의
class TransformerBlock(nn.Module):
    def __init__(self, n_features, num_heads, ff_dim, dropout):
        super(TransformerBlock, self).__init__()
        self.attention = nn.MultiheadAttention(n_features, num_heads, dropout=dropout)
        self.ffn = nn.Sequential(
            nn.Linear(n_features, ff_dim),
            nn.ReLU(),
            nn.Linear(ff_dim, n_features)
        )
        self.norm1 = nn.LayerNorm(n_features)
        self.norm2 = nn.LayerNorm(n_features)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        attention_output, _ = self.attention(x, x, x)
        x = self.norm1(x + self.dropout(attention_output))
        ffn_output = self.ffn(x)
        x = self.norm2(x + self.dropout(ffn_output))
        return x
# 모델 정의
class TransformerModel(nn.Module):
    def __init__(self, n_features, num_heads, ff_dim, num_blocks, mlp_units, dropout, n_input):
        super(TransformerModel, self).__init__()
        self.pos_encoder = PositionalEncoding(n_features, n_input)
        self.transformer_blocks = nn.ModuleList([TransformerBlock(n_features, num_heads, ff_dim, dropout) for _ in range(num_blocks)])
        
        self.layers = nn.Sequential()
        # 첫 번째 nn.Linear 층의 입력 차원을 n_features로 설정
        self.layers.add_module("dense_0", nn.Linear(n_features, mlp_units[0]))
        self.layers.add_module("relu_0", nn.ReLU())
        self.layers.add_module("dropout_0", nn.Dropout(dropout))
        self.layers.add_module("norm_0", nn.LayerNorm(mlp_units[0]))

        # 이후 층들에 대한 설정
        for i in range(1, len(mlp_units)):
            self.layers.add_module(f"dense_{i}", nn.Linear(mlp_units[i-1], mlp_units[i]))
            self.layers.add_module(f"relu_{i}", nn.ReLU())
            self.layers.add_module(f"dropout_{i}", nn.Dropout(dropout))
            self.layers.add_module(f"norm_{i}", nn.LayerNorm(mlp_units[i]))

        # 최종 출력 층
        self.out = nn.Linear(mlp_units[-1], n_features)

    def forward(self, x):
        x = self.pos_encoder(x)
        for block in self.transformer_blocks:
            x = block(x)
        x = torch.mean(x, dim=1)
        x = self.layers(x)
        return self.out(x)

model = TransformerModel(n_features, num_heads, ff_dim, num_blocks, mlp_units, dropout_rate, n_input)


In [None]:
pos_encoder = PositionalEncoding(n_features, n_input)
x = torch.rand(128, 30, 63)
y = pos_encoder(x)

In [None]:
column_order = [
    'm_avg_PelvisRotX', 'm_avg_PelvisRotY', 'm_avg_PelvisRotZ',
    'm_avg_L_HipRotX', 'm_avg_L_HipRotY', 'm_avg_L_HipRotZ',
    'm_avg_L_KneeRotX', 'm_avg_L_KneeRotY', 'm_avg_L_KneeRotZ',
    'm_avg_L_AnkleRotX', 'm_avg_L_AnkleRotY', 'm_avg_L_AnkleRotZ',
    'm_avg_L_FootRotX', 'm_avg_L_FootRotY', 'm_avg_L_FootRotZ',
    'm_avg_R_HipRotX', 'm_avg_R_HipRotY', 'm_avg_R_HipRotZ',
    'm_avg_R_KneeRotX', 'm_avg_R_KneeRotY', 'm_avg_R_KneeRotZ',
    'm_avg_R_AnkleRotX', 'm_avg_R_AnkleRotY', 'm_avg_R_AnkleRotZ',
    'm_avg_R_FootRotX', 'm_avg_R_FootRotY', 'm_avg_R_FootRotZ',
    'm_avg_Spine1RotX', 'm_avg_Spine1RotY', 'm_avg_Spine1RotZ',
    'm_avg_Spine2RotX', 'm_avg_Spine2RotY', 'm_avg_Spine2RotZ',
    'm_avg_L_CollarRotX', 'm_avg_L_CollarRotY', 'm_avg_L_CollarRotZ',
    'm_avg_L_ShoulderRotX', 'm_avg_L_ShoulderRotY', 'm_avg_L_ShoulderRotZ',
    'm_avg_L_ElbowRotX', 'm_avg_L_ElbowRotY', 'm_avg_L_ElbowRotZ',
    'm_avg_L_WristRotX', 'm_avg_L_WristRotY', 'm_avg_L_WristRotZ',
    'm_avg_NeckRotX', 'm_avg_NeckRotY', 'm_avg_NeckRotZ',
    'm_avg_HeadRotX', 'm_avg_HeadRotY', 'm_avg_HeadRotZ',
    'm_avg_R_CollarRotX', 'm_avg_R_CollarRotY', 'm_avg_R_CollarRotZ',
    'm_avg_R_ShoulderRotX', 'm_avg_R_ShoulderRotY', 'm_avg_R_ShoulderRotZ',
    'm_avg_R_ElbowRotX', 'm_avg_R_ElbowRotY', 'm_avg_R_ElbowRotZ',
    'm_avg_R_WristRotX', 'm_avg_R_WristRotY', 'm_avg_R_WristRotZ'
]

In [None]:
column_names = [
    'm_avg_L_WristRotX', 'm_avg_L_WristRotY', 'm_avg_L_WristRotZ',
    'm_avg_HeadRotX', 'm_avg_HeadRotY', 'm_avg_HeadRotZ',
    'm_avg_R_WristRotX', 'm_avg_R_WristRotY', 'm_avg_R_WristRotZ'
]

weighted_columns_indices = [column_order.index(name) for name in column_names]
print(weighted_columns_indices)

In [None]:
class CustomLoss(nn.Module):
    def __init__(self, weighted_columns_indices, weight_for_weighted_columns):
        super(CustomLoss, self).__init__()
        self.weighted_columns_indices = torch.tensor(weighted_columns_indices)
        self.weight_for_weighted_columns = weight_for_weighted_columns

    def forward(self, y_true, y_pred):
        # y_pred의 마지막 feature를 rot_diff_category로 분리
        y_pred_values = y_pred[:, :]

        # MSE 계산
        mse = F.mse_loss(y_true[:, :], y_pred_values, reduction='none')
        mse = mse.mean(axis=-1)

        # 특정 joint rotation에 대한 가중치 적용
        weighted_mse = y_pred_values[:, self.weighted_columns_indices]
        weighted_mse = (weighted_mse ** 2) * self.weight_for_weighted_columns
        mse += weighted_mse.mean(axis=-1)

        return mse.mean()  # 전체 배치에 대한 평균 손실 반환

# 가중치를 적용할 열 인덱스와 가중치 값
weighted_columns_indices = [42, 43, 44, 48, 49, 50, 60, 61, 62] 
weight_for_weighted_columns = 2.0

# CustomLoss 인스턴스 생성
custom_loss_instance = CustomLoss(
    weighted_columns_indices=weighted_columns_indices,
    weight_for_weighted_columns=weight_for_weighted_columns,
)

In [None]:
# 최적화기와 손실 함수
optimizer = optim.Adam(model.parameters(), lr=0.0001)
criterion = custom_loss_instance

In [None]:
# 설정
epochs = 50
patience = 7  # Early Stopping patience
best_loss = np.inf
early_stopping_counter = 0

In [None]:
# Learning Rate Scheduler 설정
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', factor=0.2, patience=7, min_lr=0.00001)

In [None]:
# 사용 가능한 GPU 목록을 출력
available_gpus = torch.cuda.device_count()
print("Available GPUs:", available_gpus)

# 현재 장치를 출력 (GPU 사용 가능시 CUDA 장치, 그렇지 않으면 CPU)
current_device = torch.cuda.current_device() if torch.cuda.is_available() else 'CPU'
print("Current device:", torch.cuda.get_device_name(current_device) if torch.cuda.is_available() else current_device)

In [None]:
model = model.to(current_device)

In [None]:
summary(model, input_size=(n_input, n_features))

In [None]:
# 훈련 루프
train_losses = []
val_losses = []

for epoch in range(epochs):
    model.train()
    train_loss = 0.0
    # 훈련 데이터 로더에 대한 프로그레스 바 추가
    train_progress_bar = tqdm(train_loader, desc=f'Epoch {epoch+1}/{epochs}', unit='batch')

    for data, target in train_progress_bar:
        data, target = data.to(current_device), target.to(current_device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * data.size(0)
        # 프로그레스 바 업데이트
        train_progress_bar.set_postfix({'train_loss': loss.item()})

    train_losses.append(train_loss / len(train_loader.dataset))

    model.eval()
    val_loss = 0.0
    # 검증 데이터 로더에 대한 프로그레스 바 추가
    val_progress_bar = tqdm(val_loader, desc=f'Validation Epoch {epoch+1}/{epochs}', unit='batch')

    with torch.no_grad():
        for data, target in val_progress_bar:
            data, target = data.to(current_device), target.to(current_device)
            output = model(data)
            loss = criterion(output, target)
            val_loss += loss.item() * data.size(0)
            # 프로그레스 바 업데이트
            val_progress_bar.set_postfix({'val_loss': loss.item()})

    val_losses.append(val_loss / len(val_loader.dataset))
    print(f'Epoch {epoch+1} \t Training Loss: {train_loss:.6f} \t Validation Loss: {val_loss:.6f}')
    
    # Learning Rate Scheduler 업데이트
    scheduler.step(val_loss)

    # Early Stopping
    if val_loss < best_loss:
        best_loss = val_loss
        best_model_wts = copy.deepcopy(model.state_dict())
        torch.save(model.state_dict(), 'best_model.pth')
        early_stopping_counter = 0
    else:
        early_stopping_counter += 1
        if early_stopping_counter >= patience:
            print("Early stopping")
            break

# 가장 좋은 모델 가중치 로드
model.load_state_dict(best_model_wts)

In [None]:
torch.save(model.state_dict(), 'final_model_Transformer.pth')

In [None]:
plt.plot(train_losses, label='Training Loss', color='blue')
plt.plot(val_losses, label='Validation Loss', color='orange')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

# Test with Real Data

In [None]:
test_df = pd.read_csv('./B01_TransformData_FinalAvatar_20230922_171230.csv').iloc[300:-100]

In [None]:
test_df.head()

In [None]:
test_rotation_columns = [col for col in test_df.columns if 'Rot' in col]
test_rotation_df = test_df[test_rotation_columns]

In [None]:
test_rotation_df

In [None]:
#-180~180 사이로 정규화
def normalize_angle(x):
    x = np.where(x > 180, x - 360, x)
    x = np.where(x < -180, x + 360, x)
    return x
test_df[test_rotation_columns] = test_df[test_rotation_columns].apply(normalize_angle)

In [None]:
test_df = test_df[test_rotation_columns].map(lambda x: float(f"{x:.2f}") if isinstance(x, (int, float)) else x)

In [None]:
test_df

In [None]:
# -180 ~ 180 범위를 벗어나는 값이 있는지 확인
num_values_out_of_range = (test_df > 180).sum().sum() + (test_df < -180).sum().sum()

# 결과 확인
if num_values_out_of_range > 0:
    print(f"범위를 벗어나는 값의 수: {num_values_out_of_range}")
else:
    print("범위를 벗어나는 값이 없습니다.")

In [None]:
test_df.shape

In [None]:
# 결과를 새로운 CSV 파일로 저장합니다.
test_df.to_csv('./test_df.csv', index=False)

In [None]:
# 모델을 평가 모드로 설정
model.eval()

# 예측 값을 넣을 빈 리스트
test_predictions = []

# 훈련 데이터셋에서 마지막 입력 개수의 값을 가져온 후
current_batch = torch.from_numpy(test_df[-n_input:].values.astype(np.float32)).reshape((1, n_input, n_features))

# 모델이 사용하는 디바이스를 확인하고 데이터를 해당 디바이스로 옮깁니다.
current_device = 'cuda' if torch.cuda.is_available() else 'cpu'
current_batch = current_batch.to(current_device)

# 예측 과정 반복
with torch.no_grad():  # 그래디언트 계산을 비활성화
    for i in range(1):
        # 현재 배치에서 다음 포인트를 예측
        current_pred = model(current_batch).cpu().numpy()[0]  # 마지막 시퀀스 포인트 예측
        current_pred = np.array([normalize_angle(y) for y in current_pred])  # 예측값 정규화

        # 예측된 마지막 프레임을 리스트에 추가
        test_predictions.append(current_pred)

        # 새로운 배치 생성: 마지막 시퀀스 제외하고 예측값 추가
        current_batch = np.roll(current_batch.cpu().numpy(), -1, axis=1)
        current_batch[:, -1, :] = current_pred
        current_batch = torch.from_numpy(current_batch).to(current_device)

In [None]:
test_predictions

In [None]:
test_predictions_array = np.array(test_predictions)

In [None]:
# 변환된 리스트를 데이터프레임으로 변환합니다.
test_predictions = pd.DataFrame(test_predictions_array)

# test_predictions 데이터프레임을 CSV 파일로 저장합니다.
test_predictions.to_csv('./test_predictions.csv', index=True)

In [None]:
test_predictions

In [None]:
# input 데이터(test_df)의 마지막 30 프레임과 
last_inputs_df = test_df.iloc[-30:][column_order].reset_index(drop=True)
test_predictions_df = pd.DataFrame(test_predictions_array, columns=column_order)

test_combined_df = pd.concat([last_inputs_df, test_predictions_df], ignore_index=True)

In [None]:
test_combined_df

In [None]:
test_combined_df.to_csv('./test_combined_df.csv', index=True)