In [1]:
import cv2
import torch
import numpy as np
import time
import json
import os
import matplotlib.pyplot as plt
from collections import deque
from agemodel import AgePredictor

In [None]:

def calculate_model_metrics(model):
    """모델의 실제 파라미터 수와 FLOPs 계산"""
    # 0이 아닌 파라미터만 카운트
    total_params = sum(torch.count_nonzero(p).item() for p in model.parameters())
   
    def count_conv2d_flops(layer, input_shape):
        """Convolution layer의 실제 FLOPs 계산"""
        batch_size, in_c, h, w = input_shape
        out_c, in_c, k_h, k_w = layer.weight.shape
        out_h = h // layer.stride[0]
        out_w = w // layer.stride[1]
       
        # 0이 아닌 weight의 수만 계산
        non_zero_weights = torch.count_nonzero(layer.weight).item()
        kernel_size = k_h * k_w
        weights_per_kernel = in_c * kernel_size
        effective_kernels = non_zero_weights / weights_per_kernel
       
        # 각 출력 픽셀당 MAC(multiply-accumulate) 연산 수 계산
        return batch_size * out_h * out_w * non_zero_weights * 2 

    def count_linear_flops(layer):
        """Linear layer의 실제 FLOPs 계산"""
        # 0이 아닌 weight의 수만 계산
        non_zero_weights = torch.count_nonzero(layer.weight).item()
        return non_zero_weights * 2  

    total_flops = 0
    input_shape = (1, 3, 64, 64)
   
    x_shape = input_shape
    for layer in model.features:
        if isinstance(layer, torch.nn.Conv2d):
            total_flops += count_conv2d_flops(layer, x_shape)
            x_shape = (x_shape[0], layer.out_channels,
                      x_shape[2]//layer.stride[0],
                      x_shape[3]//layer.stride[1])
        elif isinstance(layer, torch.nn.MaxPool2d):
            x_shape = (x_shape[0], x_shape[1],
                      x_shape[2]//2, x_shape[3]//2)

   
    for layer in model.classifier:
        if isinstance(layer, torch.nn.Linear):
            total_flops += count_linear_flops(layer)

    return {
        'params': total_params,
        'flops': total_flops
    }

def gstreamer_pipeline(
    sensor_id=0,
    capture_width=640,
    capture_height=480,
    display_width=640,
    display_height=480,
    framerate=15,
    flip_method=0,
):
    return (
        f"nvarguscamerasrc sensor-id={sensor_id} ! "
        f"video/x-raw(memory:NVMM), "
        f"width=(int){capture_width}, height=(int){capture_height}, "
        f"format=(string)NV12, framerate=(fraction){framerate}/1 ! "
        f"nvvidconv flip-method={flip_method} ! "
        f"video/x-raw, width=(int){display_width}, height=(int){display_height}, "
        f"format=(string)BGRx ! videoconvert ! video/x-raw, format=(string)BGR ! appsink"
    )


def get_age_group(age):
    """연령층 변환 함수"""
    age = int(age)
    if age <= 70:
        return "young group"
    else:
        return "old group"

def benchmark_and_demo(model_path, duration=30):
    """실시간 데모와 벤치마크를 동시에 수행"""
    print(f"\nRunning demo and benchmark for {os.path.basename(model_path)}...")
    
    # 모델 로드
    checkpoint = torch.load(model_path, map_location='cpu')
    is_student = 'student' in model_path.lower()
    model = AgePredictor(is_student=is_student)
    
    if isinstance(checkpoint, dict):
        if 'model_state_dict' in checkpoint:
            model.load_state_dict(checkpoint['model_state_dict'])
            scale_params = checkpoint.get('scale_params', {'min_age': 0, 'max_age': 100})
        else:
            model.load_state_dict(checkpoint)
            scale_params = {'min_age': 0, 'max_age': 100}
    else:
        model.load_state_dict(checkpoint)
        scale_params = {'min_age': 0, 'max_age': 100}

    model.eval()
    
    # 모델 메트릭 계산
    metrics = calculate_model_metrics(model)
    
    # 카메라 및 face detector 설정
    face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_default.xml')
    cap = cv2.VideoCapture(gstreamer_pipeline(), cv2.CAP_GSTREAMER)
    
    # 측정을 위한 변수들
    processing_times = []
    inference_times = []
    start_time = time.time()
    
    try:
        print("\nRunning real-time demo...")
        print(f"Collecting data for {duration} seconds...")
        print("Press 'q' to quit early")
        
        while True:
            if time.time() - start_time >= duration:
                break
                
            ret, frame = cap.read()
            if not ret:
                break
            
            frame_start_time = time.time()
            
            # 얼굴 검출
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            faces = face_cascade.detectMultiScale(gray, scaleFactor=1.3, minNeighbors=5)
            
            # 실시간 정보 표시
            elapsed_time = time.time() - start_time
            cv2.putText(frame, f"Time: {elapsed_time:.1f}/{duration}s", (10, 30),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
            
            for (x, y, w, h) in faces:
                face_roi = frame[y:y+h, x:x+w]
                face_roi = cv2.resize(face_roi, (64, 64))
                face_rgb = cv2.cvtColor(face_roi, cv2.COLOR_BGR2RGB)
                face_array = face_rgb.astype(np.float32) / 255.0
                mean = np.array([0.485, 0.456, 0.406])
                std = np.array([0.229, 0.224, 0.225])
                face_normalized = (face_array - mean) / std
                face_tensor = torch.from_numpy(face_normalized.transpose(2, 0, 1)).float().unsqueeze(0)
                
                # 나이 예측
                inference_start = time.time()
                with torch.no_grad():
                    prediction = model(face_tensor)
                inference_time = time.time() - inference_start
                inference_times.append(inference_time)
                
                predicted_age = prediction.item() * (scale_params['max_age'] - 
                                                   scale_params['min_age']) + scale_params['min_age']
                age_group = get_age_group(predicted_age)
                
                # 결과 표시
                cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 2)
                cv2.putText(frame, f"Group: {age_group}", (x, y-30),
                           cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
            
            process_time = time.time() - frame_start_time
            processing_times.append(process_time)
            
            cv2.imshow('Age Prediction', frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
    
    finally:
        cap.release()
        cv2.destroyAllWindows()
    
    if processing_times and inference_times:
        results = {
            'model_size': {
                'parameters': metrics['params'],
                'size_mb': metrics['params'] * 4 / (1024 * 1024)
            },
            'flops': metrics['flops'],
            'speed': {
                'average_process_time_ms': np.mean(processing_times) * 1000,
                'average_inference_time_ms': np.mean(inference_times) * 1000
            }
        }
        
        print(f"\n{os.path.basename(model_path)} Results:")
        print(f"Parameters: {results['model_size']['parameters']:,}")
        print(f"Model Size: {results['model_size']['size_mb']:.2f} MB")
        print(f"FLOPs: {results['flops']:,}")
        print(f"Average Process Time: {results['speed']['average_process_time_ms']:.2f} ms")
        print(f"Average Inference Time: {results['speed']['average_inference_time_ms']:.2f} ms")
        
        return results
    return None

def plot_comparison(results):
    """벤치마크 결과 시각화"""
    plt.figure(figsize=(15, 5))
    
    # 1. 모델 크기
    plt.subplot(131)
    params = [results[m]['model_size']['parameters'] for m in results]
    plt.bar(results.keys(), params)
    plt.title('Model Size (Parameters)')
    plt.ylabel('Number of Parameters')
    plt.xticks(rotation=45)
    plt.grid(True)
    
    # 2. FLOPs
    plt.subplot(132)
    flops = [results[m]['flops'] for m in results]
    plt.bar(results.keys(), flops)
    plt.title('Computational Cost (FLOPs)')
    plt.ylabel('FLOPs')
    plt.xticks(rotation=45)
    plt.grid(True)
    
    # 3. 실행 속도
    plt.subplot(133)
    process_times = [results[m]['speed']['average_process_time_ms'] for m in results]
    inference_times = [results[m]['speed']['average_inference_time_ms'] for m in results]
    
    x = np.arange(len(results))
    width = 0.35
    
    plt.bar(x - width/2, process_times, width, label='Process Time')
    plt.bar(x + width/2, inference_times, width, label='Inference Time')
    plt.title('Processing Speed')
    plt.ylabel('Time (ms)')
    plt.xticks(x, results.keys(), rotation=45)
    plt.legend()
    plt.grid(True)
    
    plt.tight_layout()
    plt.savefig('model_comparison_ts.png')
    plt.close()

def run_benchmarks():
    """전체 벤치마크 실행"""
    model_paths = {
        'Teacher': 'teacher_model_best (1).pth',
        'Student': 'student_model_best (1).pth',
        'Pruned (0.3)': 'age_model_pruned_30.pth',
        'Pruned (0.5)': 'age_model_pruned_50.pth',
        'Pruned (0.7)': 'age_model_pruned_70.pth'
        
    }

    results = {}
    print("\nStarting benchmark for each model...")
    
    for name, path in model_paths.items():
        print(f"\nTesting {name} model...")
        results[name] = benchmark_and_demo(path)
        
        if results[name]:
            print(f"\n{name} Model Results:")
            print(f"Parameters: {results[name]['model_size']['parameters']:,}")
            print(f"Model Size: {results[name]['model_size']['size_mb']:.2f} MB")
            print(f"FLOPs: {results[name]['flops']:,}")
            print(f"Average Process Time: {results[name]['speed']['average_process_time_ms']:.2f} ms")
            print(f"Average Inference Time: {results[name]['speed']['average_inference_time_ms']:.2f} ms")
            
            # 결과 저장
            with open(f'results_{name.lower()}.json', 'w') as f:
                json.dump(results[name], f, indent=4)
    
    if results:
        # 전체 결과 저장
        with open('benchmark_results.json', 'w') as f:
            json.dump(results, f, indent=4)
    
    return results

In [3]:
if __name__ == "__main__":
    print("Starting real-time demo and benchmark comparison...")
    results = run_benchmarks()
    print("\nBenchmark completed! Results saved to files.")

Starting real-time demo and benchmark comparison...

Testing Teacher model...

Running demo and benchmark for teacher_model_best (1).pth...

Running real-time demo...
Press SPACE to capture benchmark sample
Press 'q' to quit

Benchmark sample 1/1:
  Process Time: 330.2ms
  Inference Time: 47.6ms
  Age Group: old group

Benchmark samples collected!

Teacher Model Results:
Parameters: 2,190,913
Model Size: 8.36 MB
FLOPs: 86,770,176
Average Process Time: 330.18 ms
Average Inference Time: 47.59 ms

Testing Student model...

Running demo and benchmark for student_model_best (1).pth...

Running real-time demo...
Press SPACE to capture benchmark sample
Press 'q' to quit

Benchmark sample 1/1:
  Process Time: 287.1ms
  Inference Time: 29.1ms
  Age Group: old group

Benchmark samples collected!

Student Model Results:
Parameters: 1,072,673
Model Size: 4.09 MB
FLOPs: 24,510,976
Average Process Time: 287.07 ms
Average Inference Time: 29.12 ms

Final Comparison Summary
Model      Size(MB)   Parame

In [None]:
feat: 연속 프레임 분석을 통한 안정성 평가 개선

- 단일 프레임 벤치마크를 30초 연속 평가 방식으로 변경
- 새로운 안정성 지표 추가 (탐지율, 예측 안정성)
- 포괄적인 성능 그래프로 시각화 기능 강화
- 실시간 FPS 및 경과 시간 모니터링 추가
- 에러 처리 및 리소스 정리 개선

이 변경으로 단일 캡처가 아닌 연속된 프레임을 분석함으로써 젯슨 나노 배포를 위한 더 신뢰할 수 있는 성능 지표를 제공합니다.