In [18]:
import os
import pandas as pd

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.optim.lr_scheduler import StepLR

import numpy as np
import random

In [19]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True
    
seed_everything(12)

In [20]:
class skeleton_LSTM(nn.Module):
    def __init__(self, feature_dim, output_dim):
        super(skeleton_LSTM, self).__init__()
        
        self.feature_dim = feature_dim
        self.output_dim = output_dim
        self.lstm1 = nn.LSTM(input_size=self.feature_dim, hidden_size=128, num_layers=1, batch_first=True)
        self.lstm2 = nn.LSTM(input_size=128, hidden_size=256, num_layers=1, batch_first=True)
        self.lstm3 = nn.LSTM(input_size=256, hidden_size=512, num_layers=1, batch_first=True)
        self.fc1 = nn.Linear(512,256)
        self.fc2 = nn.Linear(256,output_dim)
        
    def forward(self, x):
        x, _ = self.lstm1(x)
        x, _ = self.lstm2(x)
        x, (hn, cn) = self.lstm3(x)
        x = F.relu(self.fc1(x[:,-1,:]))
        embedding = self.fc2(x)
        
        return embedding


In [21]:
import os
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset

angle = [['left_biceps', 'left_forearm'],
         ['right_biceps', 'right_forearm'],
         ['between_shoulders', 'left_body'],
         ['between_shoulders', 'right_body'],
         ['between_shoulders', 'rigth_neck'],
         ['between_shoulders', 'left_neck'],
         ['between_pelvis','left_thigh'],
         ['between_pelvis','right_thigh'],
         ['right_thigh','right_calf'],
         ['left_thigh','left_calf'],
         ['right_body','right_thigh'],
         ['left_body','left_thigh']
        ]
         

body_parts = {'left_biceps': [11, 13],
              'left_forearm': [13, 15],
              'right_biceps': [12, 14],
              'right_forearm': [14, 16],
              'between_shoulders': [11, 12],
              'left_body': [11, 23],
              'right_body': [12, 24],
              'between_pelvis': [23, 24],
              'left_thigh': [23, 25],
              'left_calf': [25, 27],
              'right_thigh': [24, 26],
              'right_calf': [26, 28],
              'left_neck': [9, 11],
              'rigth_neck': [10, 12]}


def calculate_angles(matrix1, matrix2):
    dot_product = np.einsum('ij,ij->i', matrix1, matrix2)
    norm1 = np.linalg.norm(matrix1, axis=1)
    norm2 = np.linalg.norm(matrix2, axis=1)
    cos_theta = dot_product / (norm1 * norm2)
    angles = np.arccos(np.clip(cos_theta, -1.0, 1.0))
    return angles


def make_df_angle(path):
    df = pd.read_csv(path)
    df_angle = pd.DataFrame()

    for body_parts1, body_parts2 in angle:
        body_parts1_vec = body_parts[body_parts1]
        body_parts2_vec = body_parts[body_parts2]

        # 벡터 계산
        vec_mat1 = df.iloc[:, body_parts1_vec[0]*3+1:body_parts1_vec[0]*3+4].values - df.iloc[:, body_parts1_vec[1]*3+1:body_parts1_vec[1]*3+4].values
        vec_mat2 = df.iloc[:, body_parts2_vec[0]*3+1:body_parts2_vec[0]*3+4].values - df.iloc[:, body_parts2_vec[1]*3+1:body_parts2_vec[1]*3+4].values

        angles = calculate_angles(vec_mat1, vec_mat2)
        df_angle[f'{body_parts1}_{body_parts2}'] = angles
        
    df_angle = df_angle.replace([np.inf, -np.inf], 0.0)
    df_angle = df_angle.fillna(0.0)


    return df_angle



In [22]:
class LandmarkDataset(Dataset):
    def __init__(self, segment_length):
        self.root_dir = '/media/baebro/NIPA_data/Train/landmarks'
        self.data = []
        self.labels = []
        self.label_to_indices = {}
        self.segment_length = segment_length

        # 디렉토리 탐색
        for dance_name in os.listdir(self.root_dir):
            dance_path = os.path.join(self.root_dir, dance_name)
            if os.path.isdir(dance_path):
                for csv_file in os.listdir(dance_path):
                    if csv_file.endswith(".csv") and not csv_file.endswith("_F.csv"):
                        file_path = os.path.join(dance_path, csv_file)
                        df_angle = make_df_angle(file_path)
                        
                        # 시퀀스를 segment_length 단위로 나누어 저장
                        for i in range(0, len(df_angle) - self.segment_length + 1, self.segment_length):
                            segment = df_angle.iloc[i:i + self.segment_length].values
                            self.data.append(segment)
                            self.labels.append(dance_name)

                            # 인덱스를 라벨에 매핑
                            if dance_name not in self.label_to_indices:
                                self.label_to_indices[dance_name] = []
                            self.label_to_indices[dance_name].append(len(self.data) - 1)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        label = self.labels[idx]
        anchor = torch.tensor(self.data[idx], dtype=torch.float32)

        # Positive 쌍 생성
        positive_idx = np.random.choice(self.label_to_indices[label])
        while positive_idx == idx:
            positive_idx = np.random.choice(self.label_to_indices[label])
        pos = torch.tensor(self.data[positive_idx], dtype=torch.float32)

        # Negative 쌍 생성
        neg_label = np.random.choice([l for l in self.label_to_indices if l != label])
        negative_idx = np.random.choice(self.label_to_indices[neg_label])
        neg = torch.tensor(self.data[negative_idx], dtype=torch.float32)

        return anchor, pos, neg


In [23]:
class TripletContrastiveLoss(nn.Module):
    def __init__(self, temperature=0.1):
        super(TripletContrastiveLoss, self).__init__()
        self.temperature = temperature

    def forward(self, anchor, positive, negative):
        # Normalize features
        anchor, positive, negative = F.normalize(anchor, dim=1), F.normalize(positive, dim=1), F.normalize(negative, dim=1)
        
        # Calculate similarities
        pos_sim = torch.exp(torch.sum(anchor * positive, dim=1) / self.temperature)  # Anchor-Positive similarity
        neg_sim = torch.exp(torch.sum(anchor * negative, dim=1) / self.temperature)  # Anchor-Negative similarity

        # Loss calculation: maximize anchor-positive similarity, minimize anchor-negative similarity
        loss = -torch.log(pos_sim / (pos_sim + neg_sim)).mean()
        return loss


In [24]:
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, Subset

# 데이터셋과 데이터로더
dataset = LandmarkDataset()
train_indices, val_indices = train_test_split(range(len(dataset)), test_size=0.2, random_state=42)


# Subset을 사용하여 학습 및 검증 데이터셋 생성
train_dataset = Subset(dataset, train_indices)
val_dataset = Subset(dataset, val_indices)

# DataLoader 생성
train_dataloader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=8, shuffle=False)

In [25]:
import torch
from torch.optim.lr_scheduler import StepLR
import numpy as np

# 하이퍼파라미터 설정
feature_dim = len(angle)
output_dim = 64
num_epochs = 100  # Early stopping 적용 시 더 큰 값을 설정해도 됩니다
learning_rate = 0.001
temperature = 0.1
patience = 10  # Early stopping patience 설정
min_delta = 0.001  # Validation loss가 감소하는 최소 값
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

# 모델, 손실 함수, 옵티마이저, 스케줄러 초기화
model = skeleton_LSTM(feature_dim, output_dim).to(device)
criterion = TripletContrastiveLoss(temperature=temperature)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
scheduler = StepLR(optimizer, step_size=10, gamma=0.1)

# Early stopping 변수 초기화
best_val_loss = np.inf
epochs_no_improve = 0
early_stop = False

# 학습 루프
for epoch in range(num_epochs):
    if early_stop:
        break

    model.train()
    epoch_loss = 0.0
    for batch_idx, (anchor, pos, neg) in enumerate(train_dataloader):
        optimizer.zero_grad()
        
        # 데이터를 장치로 이동
        anchor, pos, neg = anchor.to(device), pos.to(device), neg.to(device)
        
        # 모델에 통과하여 임베딩 생성
        anchor_emb = model(anchor)
        pos_emb = model(pos)
        neg_emb = model(neg)

        # Contrastive Loss 계산
        loss = criterion(anchor_emb, pos_emb, neg_emb)
        epoch_loss += loss.item()

        loss.backward()
        optimizer.step()
    
    scheduler.step()  # 학습률 조정

    # 에폭마다 평균 손실을 기록
    avg_train_loss = epoch_loss / len(train_dataloader)
    print(f'Epoch [{epoch + 1}/{num_epochs}], Training Loss: {avg_train_loss:.4f}')

    # Validation loop
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for batch_idx, (anchor, pos, neg) in enumerate(val_dataloader):
            anchor, pos, neg = anchor.to(device), pos.to(device), neg.to(device)
            
            # 임베딩 생성
            anchor_emb = model(anchor)
            pos_emb = model(pos)
            neg_emb = model(neg)
            
            # Validation loss 계산
            loss = criterion(anchor_emb, pos_emb, neg_emb)
            val_loss += loss.item()
    
    avg_val_loss = val_loss / len(val_dataloader)
    print(f'Epoch [{epoch + 1}/{num_epochs}], Validation Loss: {avg_val_loss:.4f}')

    # Early Stopping 체크
    if avg_val_loss < best_val_loss - min_delta:
        best_val_loss = avg_val_loss
        epochs_no_improve = 0  # Improvement이 있으면 카운트 리셋
    else:
        epochs_no_improve += 1

    if epochs_no_improve >= patience:
        print("Early stopping triggered!")
        early_stop = True

print("Training complete!")


cuda
Epoch [1/100], Training Loss: 0.5617
Epoch [1/100], Validation Loss: 0.6931
Epoch [2/100], Training Loss: 0.6931
Epoch [2/100], Validation Loss: 0.6931
Epoch [3/100], Training Loss: 0.6931
Epoch [3/100], Validation Loss: 0.6931
Epoch [4/100], Training Loss: 0.6903
Epoch [4/100], Validation Loss: 0.6931
Epoch [5/100], Training Loss: 0.6894
Epoch [5/100], Validation Loss: 0.6138
Epoch [6/100], Training Loss: 0.6603
Epoch [6/100], Validation Loss: 0.6931
Epoch [7/100], Training Loss: 0.6931
Epoch [7/100], Validation Loss: 0.6931
Epoch [8/100], Training Loss: 0.6931
Epoch [8/100], Validation Loss: 0.6931
Epoch [9/100], Training Loss: 0.6931
Epoch [9/100], Validation Loss: 0.6931
Epoch [10/100], Training Loss: 0.6931
Epoch [10/100], Validation Loss: 0.6931
Epoch [11/100], Training Loss: 0.6931
Epoch [11/100], Validation Loss: 0.6931
Epoch [12/100], Training Loss: 0.6931
Epoch [12/100], Validation Loss: 0.6931
Epoch [13/100], Training Loss: 0.6931
Epoch [13/100], Validation Loss: 0.6931

In [9]:
# 유클리디언 거리 계산
model.eval()

origin_path = '/kaggle/input/gmb-nipa/landmarks/landmarks/Hurricane Venus (보아)/landmarks_3d_L.csv'
pos_path = '/kaggle/input/gmb-nipa/landmarks/landmarks/Hurricane Venus (보아)/landmarks_3d_P.csv'
neg_path = '/kaggle/input/gmb-nipa/landmarks/landmarks/View (샤이니)/landmarks_3d_P.csv'

origin = make_df_angle(origin_path)
pos = make_df_angle(pos_path)
neg = make_df_angle(neg_path)

origin_input = torch.tensor(origin.iloc[:44,].values).unsqueeze(0).to(torch.float32).to(device)
pos_input = torch.tensor(pos.iloc[:44,].values).unsqueeze(0).to(torch.float32).to(device)
neg_input = torch.tensor(neg.iloc[:44,].values).unsqueeze(0).to(torch.float32).to(device)

origin_emb = model(origin_input)
pos_emb = model(pos_input)
neg_emb = model(neg_input)

pos_dist = torch.pow(F.pairwise_distance(origin_emb, pos_emb), 2)
neg_dist = torch.pow(F.pairwise_distance(origin_emb, neg_emb), 2)
print(pos_dist)
print(neg_dist)
neg_dist/pos_dist

tensor([0.0042], device='cuda:0', grad_fn=<PowBackward0>)
tensor([0.0126], device='cuda:0', grad_fn=<PowBackward0>)


tensor([2.9700], device='cuda:0', grad_fn=<DivBackward0>)

In [10]:
# 정규화 후, 유클리디언 거리 계산
model.eval()

origin_path = '/kaggle/input/gmb-nipa/landmarks/landmarks/Hurricane Venus (보아)/landmarks_3d_L.csv'
pos_path = '/kaggle/input/gmb-nipa/landmarks/landmarks/Hurricane Venus (보아)/landmarks_3d_P.csv'
neg_path = '/kaggle/input/gmb-nipa/landmarks/landmarks/ID Peace B (보아)/landmarks_3d_P.csv'

origin = make_df_angle(origin_path)
pos = make_df_angle(pos_path)
neg = make_df_angle(neg_path)

origin_input = torch.tensor(origin.iloc[:44,].values).unsqueeze(0).to(torch.float32).to(device)
pos_input = torch.tensor(pos.iloc[:44,].values).unsqueeze(0).to(torch.float32).to(device)
neg_input = torch.tensor(neg.iloc[:44,].values).unsqueeze(0).to(torch.float32).to(device)

origin_emb = F.normalize(model(origin_input), dim=1)
pos_emb = F.normalize(model(pos_input), dim=1)
neg_emb = F.normalize(model(neg_input), dim=1)

pos_dist = torch.pow(F.pairwise_distance(origin_emb, pos_emb), 2)
neg_dist = torch.pow(F.pairwise_distance(origin_emb, neg_emb), 2)
print(pos_dist)
print(neg_dist)
neg_dist/pos_dist

tensor([0.0683], device='cuda:0', grad_fn=<PowBackward0>)
tensor([0.0971], device='cuda:0', grad_fn=<PowBackward0>)


tensor([1.4211], device='cuda:0', grad_fn=<DivBackward0>)

In [13]:
# 코사인 유사도 계산

model.eval()

origin_path = '/kaggle/input/gmb-nipa/landmarks/landmarks/Hurricane Venus (보아)/landmarks_3d_L.csv'
pos_path = '/kaggle/input/gmb-nipa/landmarks/landmarks/Hurricane Venus (보아)/landmarks_3d_P.csv'
neg_path = '/kaggle/input/gmb-nipa/landmarks/landmarks/ID Peace B (보아)/landmarks_3d_P.csv'

origin = make_df_angle(origin_path)
pos = make_df_angle(pos_path)
neg = make_df_angle(neg_path)

origin_input = torch.tensor(origin.iloc[:44,].values).unsqueeze(0).to(torch.float32).to(device)
pos_input = torch.tensor(pos.iloc[:44,].values).unsqueeze(0).to(torch.float32).to(device)
neg_input = torch.tensor(neg.iloc[:44,].values).unsqueeze(0).to(torch.float32).to(device)

origin_emb = F.normalize(model(origin_input), dim=1)
pos_emb = F.normalize(model(pos_input), dim=1)
neg_emb = F.normalize(model(neg_input), dim=1)

pos_sim = torch.sum(origin_emb * pos_emb, dim=1)
neg_sim = torch.sum(origin_emb * neg_emb, dim=1)

print(pos_sim)
print(neg_sim)


tensor([0.9658], device='cuda:0', grad_fn=<SumBackward1>)
tensor([0.9515], device='cuda:0', grad_fn=<SumBackward1>)


In [11]:
from time import time

model.eval()

start = time()
origin_path = '/kaggle/input/nipa-sample/sample_video/100 (슈퍼엠)/landmarks_3d_L.csv'
origin = make_df_angle(origin_path)
origin_input = torch.tensor(origin.iloc[:44,].values).unsqueeze(0).to(torch.float32).to(device)

origin_emb = F.normalize(model(origin_input),dim=1)
time()-start

0.02616405487060547