In [None]:
from utils.parser import load_config, PromptManager, get_all_sentences

# 1. 설정 파일 로드
config = load_config('assets/PIA_FSV/prompts/topk.json')
manager = PromptManager(config)
# 2. 단순히 모든 sentence만 필요할 때
sentences = get_all_sentences(config)
print("All sentences:", sentences)
# 모든 sentence 얻기
all_sentences = manager.get_all_sentences()
print("\nAll sentences from manager:", all_sentences)

# 특정 sentence의 상세 정보 얻기
sentence = "typical"
details_list = manager.get_details_by_sentence(sentence)
for details in details_list:
    print(f"\nDetails for '{sentence}':")
    print(f"Event: {details['event']}")
    print(f"Status: {details['status']}")
    print(f"Indices: ({details['event_idx']}, {details['prompt_idx']})")

In [16]:
import numpy as np
print(np.load('assets/PIA_FSV/results/F/cat copy 2.npy').shape)

(20, 1, 512)


In [17]:
from devmacs_core.devmacs_core import DevMACSCore
from devmacs_core.utils.common.cal import scale_sim, loose_similarity
from utils.parser import load_config, PromptManager, get_all_sentences
# 1. 설정 파일 로드
config = load_config('assets/PIA_FSV/prompts/topk.json')
sentences = get_all_sentences(config)
macs = DevMACSCore()
t_v = macs.get_text_vector(sentences)
t_v.shape
# sim_score = loose_similarity(
# 	sequence_output=t_v,
# 	visual_output=video_vector,
# )


  state_dict = torch.load(self.config.model_path, map_location=self.config.device)


torch.Size([17, 512])

In [12]:
import os
import numpy as np
import torch
from typing import Dict, List, Tuple
from devmacs_core.devmacs_core import DevMACSCore
from devmacs_core.utils.common.cal import scale_sim, loose_similarity
from utils.parser import load_config, PromptManager, get_all_sentences

class EventDetector:
    def __init__(self, config: Dict, macs):
        self.config = config
        self.macs = macs
        self.prompt_manager = PromptManager(config)
        # 모든 문장을 가져와서 텍스트 벡터 미리 계산
        self.sentences = get_all_sentences(config)
        self.text_vectors = macs.get_text_vector(self.sentences)
    
    def process_video_vectors(self, base_dir: str) -> Dict:
        results = {}
        
        for category in os.listdir(base_dir):
            category_path = os.path.join(base_dir, category)
            if not os.path.isdir(category_path):
                continue
                
            results[category] = {}
            for file in os.listdir(category_path):
                if file.endswith('.npy'):
                    video_name = os.path.splitext(file)[0]
                    file_path = os.path.join(category_path, file)
                    results[category][video_name] = self._process_single_vector(file_path)
                    
        return results
    
    def _process_single_vector(self, vector_path: str) -> Dict:
        video_vector = np.load(vector_path)
        processed_vectors = []
        
        for vector in video_vector:
            v = vector.squeeze(0)  # (1, 512) -> (512,)
            v = torch.from_numpy(v).unsqueeze(0).cuda()  # (512,) -> (1, 512) # GPU로 이동
            processed_vectors.append(v)
            
        frame_results = {}
        for frame_idx, v in enumerate(processed_vectors):
            sim_scores = loose_similarity(
                sequence_output=self.text_vectors.cuda(),  # text vectors도 GPU로
                visual_output=v.unsqueeze(1)  # (1, 512) -> (1, 1, 512)
            )
            
            frame_results[frame_idx] = self._calculate_alarms(sim_scores)
            
        return frame_results

    
    def _calculate_alarms(self, sim_scores: torch.Tensor) -> Dict:
        """
        유사도 점수를 기반으로 각 이벤트의 알람 상태 계산
        Returns:
            Dict: {
                '이벤트명': {
                    'alarm': 0 또는 1,  # 0: Normal, 1: Alarm
                    'scores': [스코어들],
                    'top_k_types': [타입들]
                }
            }
        """
        event_alarms = {}
        
        for event_config in self.config['PROMPT_CFG']:
            event = event_config['event']
            top_k = event_config['top_candidates']
            threshold = event_config['alert_threshold']
            
            event_prompts = self._get_event_prompts(event)
            event_scores = sim_scores[event_prompts['indices']]
            
            top_k_values, top_k_indices = torch.topk(event_scores, min(top_k, len(event_scores)))
            
            abnormal_count = sum(1 for idx in top_k_indices 
                            if event_prompts['types'][idx.item()] == 'abnormal')
            
            event_alarms[event] = {
                'alarm': 1 if abnormal_count >= threshold else 0,  # 0: Normal, 1: Alarm
                'scores': top_k_values.tolist(),
                'top_k_types': [event_prompts['types'][idx.item()] for idx in top_k_indices]
            }
            
        return event_alarms
    
    def _get_event_prompts(self, event: str) -> Dict:
        """
        특정 이벤트의 모든 프롬프트 정보 반환
        """
        indices = []
        types = []
        
        for event_idx, event_config in enumerate(self.config['PROMPT_CFG']):
            if event_config['event'] == event:
                for status in ['normal', 'abnormal']:
                    for prompt_idx in range(len(event_config['prompts'][status])):
                        indices.append(len(indices))  # 실제 인덱스로 변환 필요
                        types.append(status)
                        
        return {'indices': indices, 'types': types}

# 사용 예시:
macs = DevMACSCore()

detector = EventDetector('assets/PIA_FSV/prompts/topk.json', macs)
results = detector.process_video_vectors('assets/PIA_FSV/result')

# 결과 출력
for category, videos in results.items():
    print(f"\nCategory: {category}")
    for video_name, frames in videos.items():
        print(f"\nVideo: {video_name}")
        for frame_idx, alarms in frames.items():
            print(f"\nFrame {frame_idx}:")
            for event, status in alarms.items():
                print(f"{event}: {status['alarm']}")

  state_dict = torch.load(self.config.model_path, map_location=self.config.device)



Category: F

Video: cat copy 2

Frame 0:
D: 0
S: 0
V: 0
F: 0

Frame 1:
D: 0
S: 0
V: 0
F: 0

Frame 2:
D: 0
S: 0
V: 0
F: 0

Frame 3:
D: 0
S: 0
V: 0
F: 0

Frame 4:
D: 0
S: 0
V: 0
F: 0

Frame 5:
D: 0
S: 0
V: 0
F: 0

Frame 6:
D: 0
S: 0
V: 0
F: 0

Frame 7:
D: 0
S: 0
V: 0
F: 0

Frame 8:
D: 0
S: 0
V: 0
F: 0

Frame 9:
D: 0
S: 0
V: 0
F: 0

Frame 10:
D: 0
S: 0
V: 0
F: 0

Frame 11:
D: 0
S: 0
V: 0
F: 0

Frame 12:
D: 0
S: 0
V: 0
F: 0

Frame 13:
D: 0
S: 0
V: 0
F: 0

Frame 14:
D: 0
S: 0
V: 0
F: 0

Frame 15:
D: 0
S: 0
V: 0
F: 0

Frame 16:
D: 0
S: 0
V: 0
F: 0

Frame 17:
D: 0
S: 0
V: 0
F: 0

Frame 18:
D: 0
S: 0
V: 0
F: 0

Frame 19:
D: 0
S: 0
V: 0
F: 0

Video: cat

Frame 0:
D: 0
S: 0
V: 0
F: 0

Frame 1:
D: 0
S: 0
V: 0
F: 0

Frame 2:
D: 0
S: 0
V: 0
F: 0

Frame 3:
D: 0
S: 0
V: 0
F: 0

Frame 4:
D: 0
S: 0
V: 0
F: 0

Frame 5:
D: 0
S: 0
V: 0
F: 0

Frame 6:
D: 0
S: 0
V: 0
F: 0

Frame 7:
D: 0
S: 0
V: 0
F: 0

Frame 8:
D: 0
S: 0
V: 0
F: 0

Frame 9:
D: 0
S: 0
V: 0
F: 0

Frame 10:
D: 0
S: 0
V: 0
F: 0

Frame 11:
D: 0

In [14]:
import json
import os
import numpy as np
from sklearn.metrics import f1_score, accuracy_score, precision_score, recall_score, confusion_matrix
def load_label_json(json_path):
    """라벨링 json 파일 로드"""
    with open(json_path, 'r', encoding='utf-8') as f:
        return json.load(f)
def convert_timestamp_to_frames(timestamp, fps=25, total_frames=None):
    """timestamp를 실제 프레임 인덱스로 변환"""
    start_frame = min(int(timestamp[0] * fps), total_frames-1)
    end_frame = min(int(timestamp[1] * fps), total_frames-1)
    return start_frame, end_frame

def create_frame_level_ground_truth(label_data, category):
    total_frames = label_data['video_info']['total_frame']
    fps = label_data['video_info'].get('fps', 25)
    ground_truth = np.zeros(total_frames)
    
    for clip in label_data['clips']:
        clip_data = list(clip.values())[0]
        if clip_data['category'] == category:
            start_frame, end_frame = convert_timestamp_to_frames(
                clip_data['timestamp'], 
                fps=fps,
                total_frames=total_frames
            )
            ground_truth[start_frame:end_frame+1] = 1
    
    return ground_truth

def expand_predictions_to_frames(predictions, total_frames, time_sampling=15):
    """
    인퍼런스 결과(0,1,2,3...)를 실제 비디오 프레임 인덱스(0,15,30,45...)로 매핑
    """
    frame_predictions = np.zeros(total_frames)
    
    for idx, pred in enumerate(predictions):
        real_frame = idx * time_sampling  # 실제 프레임 인덱스로 변환
        next_real_frame = (idx + 1) * time_sampling
        
        # 마지막 프레임은 total_frames를 넘지 않도록
        if next_real_frame > total_frames:
            next_real_frame = total_frames
            
        # 해당 구간의 모든 프레임에 예측값 할당
        frame_predictions[real_frame:next_real_frame] = pred
        
    return frame_predictions
   
def calculate_f1_for_video(prediction_results, label_path, category):
    """단일 비디오에 대한 F1 score 계산"""
    label_data = load_label_json(label_path)
    total_frames = label_data['video_info']['total_frame']
    
    # Ground truth 생성
    ground_truth = create_frame_level_ground_truth(label_data, category)
    
    # 예측값 추출 및 실제 프레임으로 확장
    predictions = np.array([frame_result[category]['alarm'] 
                          for frame_idx, frame_result in prediction_results.items()])
    frame_predictions = expand_predictions_to_frames(predictions, total_frames)
    
    print(f"\nDebug for {os.path.basename(label_path)}:")
    print(f"Prediction indices: {list(prediction_results.keys())}")  # 원본 인덱스
    print(f"Mapped to frames: {[i*15 for i in range(len(predictions))]}")  # 매핑된 프레임
    

    print(f"Raw predictions: {predictions}")  # 예측값 출력
    print(f"Ground truth non-zero frames: {np.where(ground_truth == 1)[0]}")
    print(f"Prediction non-zero frames: {np.where(frame_predictions == 1)[0]}")
    return f1_score(ground_truth, frame_predictions, zero_division=0)

def calculate_metrics_for_video(prediction_results, label_path, category):
    """단일 비디오에 대한 모든 메트릭 계산"""
    label_data = load_label_json(label_path)
    total_frames = label_data['video_info']['total_frame']
    
    ground_truth = create_frame_level_ground_truth(label_data, category)
    predictions = np.array([frame_result[category]['alarm'] 
                          for frame_idx, frame_result in prediction_results.items()])
    frame_predictions = expand_predictions_to_frames(predictions, total_frames)
    
    # 디버그 출력
    print(f"\nDebug for {os.path.basename(label_path)}:")
    print(f"Prediction indices: {list(prediction_results.keys())}")
    print(f"Mapped to frames: {[i*15 for i in range(len(predictions))]}")
    print(f"Raw predictions: {predictions}")
    print(f"Ground truth non-zero frames: {np.where(ground_truth == 1)[0]}")
    print(f"Prediction non-zero frames: {np.where(frame_predictions == 1)[0]}")
    
    # 프레임 카운트 정보 추가
    print(f"\nFrame counts:")
    print(f"Total frames: {total_frames}")
    print(f"Actual event frames: {len(np.where(ground_truth == 1)[0])}")
    print(f"Predicted event frames: {len(np.where(frame_predictions == 1)[0])}")
    
    # 혼동 행렬 계산 - 단일 클래스 처리 추가
    cm = confusion_matrix(ground_truth, frame_predictions, labels=[0, 1])
    tn, fp, fn, tp = cm.ravel() if cm.size == 4 else (0, 0, 0, 0)
    
    # 각종 메트릭 계산 - 예외 처리 추가
    metrics = {
        'f1': f1_score(ground_truth, frame_predictions, zero_division=0),
        'accuracy': accuracy_score(ground_truth, frame_predictions),
        'precision': precision_score(ground_truth, frame_predictions, zero_division=0),
        'recall': recall_score(ground_truth, frame_predictions, zero_division=0),
        'specificity': tn / (tn + fp) if (tn + fp) > 0 else 0
    }
    
    # 메트릭 결과 출력
    print("\nMetrics:")
    print(f"True Negatives: {tn}")
    print(f"False Positives: {fp}")
    print(f"False Negatives: {fn}")
    print(f"True Positives: {tp}")
    
    return metrics

def evaluate_category(results, label_dir, target_category):
    """특정 카테고리의 모든 비디오에 대한 메트릭 계산"""
    all_metrics = {}
    
    for video_name, frames in results[target_category].items():
        label_path = os.path.join(label_dir, target_category, f"{video_name}.json")
        
        if os.path.exists(label_path):
            metrics = calculate_metrics_for_video(frames, label_path, target_category)
            all_metrics[video_name] = metrics
    
    # 각 메트릭의 평균 계산
    if all_metrics:
        all_metrics['average'] = {
            metric: np.mean([video_metrics[metric] 
                           for video_metrics in all_metrics.values() 
                           if video_metrics is not None]) 
            for metric in ['f1', 'accuracy', 'precision', 'recall', 'specificity']
        }
    
    return all_metrics
def evaluate_all_categories(results, label_dir, categories=['F', 'S', 'V']):
    """모든 카테고리 평가"""
    all_results = {}
    
    for category in categories:
        print(f"\nEvaluating category: {category}")
        metrics = evaluate_category(results, label_dir, category)
        
        print(f"\nResults for category {category}:")
        for video_name, video_metrics in metrics.items():
            if video_name == 'average':
                print("\nAverage metrics:")
                for metric_name, value in video_metrics.items():
                    print(f"{metric_name}: {value:.3f}")
            else:
                print(f"\n{video_name}:")
                for metric_name, value in video_metrics.items():
                    print(f"{metric_name}: {value:.3f}")
        
        all_results[category] = metrics
    
    return all_results


def get_average_metrics_per_category(evaluation_results):
    """각 카테고리별 평균 메트릭 반환"""
    category_metrics = {}
    
    for category, metrics in evaluation_results.items():
        if 'average' in metrics:
            category_metrics[category] = metrics['average']
    
    return category_metrics

label_dir = 'assets/PIA_FSV/label'  # 라벨 json 파일이 있는 디렉토리

# 평가 실행
categories = ['F', 'S', 'V']
evaluation_results = evaluate_all_categories(results, label_dir, categories)

average_metrics = get_average_metrics_per_category(evaluation_results)

print("\nAverage Metrics per Category:")
for category, metrics in average_metrics.items():
    print(f"\n{category}:")
    for metric_name, value in metrics.items():
        print(f"{metric_name}: {value:.3f}")




Evaluating category: F

Debug for cat copy 2.json:
Prediction indices: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
Mapped to frames: [0, 15, 30, 45, 60, 75, 90, 105, 120, 135, 150, 165, 180, 195, 210, 225, 240, 255, 270, 285]
Raw predictions: [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
Ground truth non-zero frames: [  0   1   2   3   4   5   6   7   8   9  10  11  12  13  14  15  16  17
  18  19  20  21  22  23  24  25  26  27  28  29  30  31  32  33  34  35
  36  37  38  39  40  41  42  43  44  45  46  47  48  49  50  51  52  53
  54  55  56  57  58  59  60  61  62  63  64  65  66  67  68  69  70  71
  72  73  74  75  76  77  78  79  80  81  82  83  84  85  86  87  88  89
  90  91  92  93  94  95  96  97  98  99 100 101 102 103 104 105 106 107
 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
