In [1]:
import os
import json
import numpy as np
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from scipy.interpolate import interp1d
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import f1_score, confusion_matrix
from scipy.spatial import distance
from tqdm import tqdm

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [None]:
MODEL_INPUT = {
    'bbox_xyxy': False,
    'bbox_ratio': True,
    'bbox_class': True,
    'head_torso_speed': True,
    'aug': False
}
DR = 'D' # 프로젝트 폴더가 있는 드라이브
GRU_PT_PATH = DR + r':\project\CVProject\results\my_model_pt\best_fall_detection_gru_001_full.pt'
input_size = 22 + MODEL_INPUT['bbox_xyxy']*4 + MODEL_INPUT['bbox_ratio']*1 + MODEL_INPUT['bbox_class']*1 + MODEL_INPUT['head_torso_speed']*1

In [4]:
class FallDetectionGRU(nn.Module):
    def __init__(self, input_size, hidden_size=64, num_layers=2, output_size=3, dropout = 0.5):
        super(FallDetectionGRU, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True, dropout = dropout)
        self.fc = nn.Linear(hidden_size, output_size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.gru(x, h0)
        out = self.dropout(out[ : , -1, :])
        out = self.fc(out)
        return out

In [None]:
model = FallDetectionGRU(input_size=input_size)
model.load_state_dict(torch.load(GRU_PT_PATH))

In [None]:
LANDMARKS = [0, 11, 12, 15, 16, 23, 24, 25, 26, 27, 28]

def augment_sequence(sequence, factor=0.2):
    time_warped = []
    for landmark in sequence:
        x = np.arange(len(landmark))
        f = interp1d(x, landmark, kind='linear', axis=0)
        x_new = np.linspace(0, len(landmark) - 1, num=int(len(landmark) * (1 + factor)))
        time_warped.append(f(x_new))
    return np.array(time_warped)

class FallSequenceDataset(Dataset):
    def __init__(self, json_files, sequence_length=3):
        self.sequence_length = sequence_length
        self.sequences = []
        self.labels = []
        #self.scaler = StandardScaler()
        print(f"LANDMARKS length: {len(LANDMARKS)}")
    
        all_landmarks = []
        
        for json_file in json_files:
            print(f'Processing file: {json_file}')
            with open(json_file, 'r') as f:
                data = json.load(f)
            
            frames = list(data['pose_data'].values())
            
            for i in range(0, len(frames) - self.sequence_length + 1):
                sequence = frames[i:i+self.sequence_length]
                landmarks = []
                fall_frames = 0
                
                for frame in sequence:
                    frame_landmarks = []
                    for landmark in LANDMARKS:
                        frame_landmarks.extend([
                            frame[f'landmark_{landmark}']['x'],
                            frame[f'landmark_{landmark}']['y']
                        ])
                    landmarks.append(frame_landmarks)
                    if frame['class'] == 'Fall':
                        fall_frames += 1
                # 데이터 증강 적용
                if MODEL_INPUT['aug'] == True:
                    augmented_sequence = augment_sequence(landmarks)            
                    all_landmarks.extend(augmented_sequence)
                
                # 레이블 재정의
                if fall_frames == 0:
                    label = 0  # 비낙상
                elif fall_frames == self.sequence_length:
                    label = 2  # 완전 낙상
                else:
                    label = 1  # 낙상 위험
                
                self.sequences.append(augmented_sequence)
                self.labels.append(label)
        
        if self.sequences:
            print(f"Sample sequence shape: {self.sequences[0].shape}")
        else:
            print("No sequences were created.")

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        if idx >= len(self.sequences):
            raise IndexError(f"Index {idx} out of range. Dataset length: {len(self.sequences)}")
        sequence = self.sequences[idx]
        return torch.FloatTensor(sequence), torch.LongTensor([self.labels[idx]]).squeeze()

def calculate_metrics(model, data_loader):
    model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for sequences, labels in data_loader:
            sequences, labels = sequences.to(device), labels.to(device)
            outputs = model(sequences)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    f1 = f1_score(all_labels, all_preds, average='weighted')
    cm = confusion_matrix(all_labels, all_preds)
    return f1, cm

In [None]:
# 데이터 로드 및 전처리
data_root = DR + r':\addition_yolobbox_json_6'
train_json_folder = f'{data_root}\\train'
val_json_folder = f'{data_root}\\val'  # 검증 데이터셋 폴더 경로
json_files = [os.path.join(json_folder, f) for f in os.listdir(json_folder) if f.endswith('.json')]
dataset = FallSequenceDataset(json_files)

# 데이터셋을 학습 및 검증 세트로 분할
train_indices, val_indices = train_test_split(range(len(dataset)), test_size=0.2)
train_dataset = torch.utils.data.Subset(dataset, train_indices)
val_dataset = torch.utils.data.Subset(dataset, val_indices)

# 데이터 로더 생성 전에 클래스 가중치 계산
class_weights = compute_class_weight('balanced', classes=np.unique(dataset.labels), y=dataset.labels)
class_weights = torch.FloatTensor(class_weights).to(device)

# 손실 함수에 가중치 적용
criterion = nn.CrossEntropyLoss(weight=class_weights)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32)

if len(dataset) > 0:
    sample_sequence, sample_label = dataset[0]
    input_size = sample_sequence.shape[1]
    print(f'input_size :{input_size}')
    model = FallDetectionGRU(input_size).to(device)
else:
    print("데이터 없음")
    exit()

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001,  weight_decay=1e-5)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=5, verbose=True)
num_epochs = 500
best_loss = float('inf')
patience = 20 # 15 이후 다시 실행을 위해 50으로 변경
no_improve = 0

val_f1, val_cm = calculate_metrics(model, val_loader)
print(f'Val F1: {val_f1:.4f}')
print(f'Val CM:\n{val_cm}')