In [4]:
import torch
import torch.nn as nn
from PIL import Image
import cv2
import time
import numpy as np
from torchinfo import torchinfo
from thop import profile

  from .autonotebook import tqdm as notebook_tqdm


In [26]:
class MaskClassifier(nn.Module):
    def __init__(self):
        super(MaskClassifier, self).__init__()
        
        # Feature Extraction - 더 얕은 구조로 변경
        self.features = nn.Sequential(
            # First Block
            nn.Conv2d(3, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),
            nn.Dropout2d(0.2),
            
            # Second Block
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),
            nn.Dropout2d(0.2),
            
            # Third Block
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),
            nn.Dropout2d(0.2),
        )
        
        # Classifier - 더 단순한 구조로 변경
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d((1, 1)),
            nn.Flatten(),
            nn.Linear(128, 2)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x

In [5]:
class MaskClassifier30(nn.Module):
    def __init__(self):
        super(MaskClassifier30, self).__init__()
        
        self.features = nn.Sequential(
            # First Block: 3 -> 22
            nn.Conv2d(3, 22, kernel_size=3, padding=1),
            nn.BatchNorm2d(22),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),
            nn.Dropout2d(0.2),
            
            # Second Block: 22 -> 44 (not 45)
            nn.Conv2d(22, 44, kernel_size=3, padding=1),
            nn.BatchNorm2d(44),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),
            nn.Dropout2d(0.2),
            
            # Third Block: 44 -> 89 (not 90)
            nn.Conv2d(44, 89, kernel_size=3, padding=1),
            nn.BatchNorm2d(89),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),
            nn.Dropout2d(0.2),
        )
        
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d((1, 1)),
            nn.Flatten(),
            nn.Linear(89, 2)  # 89 features, not 90
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x

class MaskClassifier50(nn.Module):
    def __init__(self):
        super(MaskClassifier50, self).__init__()
        
        # 50% 프루닝된 구조
        self.features = nn.Sequential(
            # First Block
            nn.Conv2d(3, 16, kernel_size=3, padding=1),  # 32 * 0.5 = 16
            nn.BatchNorm2d(16),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),
            nn.Dropout2d(0.2),
            
            # Second Block
            nn.Conv2d(16, 32, kernel_size=3, padding=1),  # 64 * 0.5 = 32
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),
            nn.Dropout2d(0.2),
            
            # Third Block
            nn.Conv2d(32, 64, kernel_size=3, padding=1),  # 128 * 0.5 = 64
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),
            nn.Dropout2d(0.2),
        )
        
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d((1, 1)),
            nn.Flatten(),
            nn.Linear(64, 2)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x

class MaskClassifier70(nn.Module):
    def __init__(self):
        super(MaskClassifier70, self).__init__()
        
        # 저장된 모델의 정확한 구조와 일치하도록 수정
        self.features = nn.Sequential(
            # First Block: 3 -> 9
            nn.Conv2d(3, 9, kernel_size=3, padding=1),
            nn.BatchNorm2d(9),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),
            nn.Dropout2d(0.2),
            
            # Second Block: 9 -> 19
            nn.Conv2d(9, 19, kernel_size=3, padding=1),
            nn.BatchNorm2d(19),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),
            nn.Dropout2d(0.2),
            
            # Third Block: 19 -> 38
            nn.Conv2d(19, 38, kernel_size=3, padding=1),
            nn.BatchNorm2d(38),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),
            nn.Dropout2d(0.2),
        )
        
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d((1, 1)),
            nn.Flatten(),
            nn.Linear(38, 2)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x

In [6]:
# 이미지 전처리 함수
def preprocess_image(image):
    # 이미지 크기 조정
    image = cv2.resize(image, (128, 128))
    
    # BGR to RGB
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    
    # 정규화 (ImageNet stats)
    image = image.astype(np.float32) / 255.0
    image = (image - np.array([0.485, 0.456, 0.406])) / np.array([0.229, 0.224, 0.225])
    
    # (H, W, C) -> (C, H, W)
    image = image.transpose(2, 0, 1)
    
    # numpy -> tensor
    image = torch.FloatTensor(image).unsqueeze(0)
    return image

In [7]:
# 모델 로드 함수
def load_pruned_model(pruning_ratio):
    device = torch.device('cpu')
    """프루닝 비율에 따라 적절한 모델을 로드하는 함수"""
    if pruning_ratio == 0.3:
        model = MaskClassifier30().to(device)
        model_path = "pruned_model_30percent.pth"
    elif pruning_ratio == 0.5:
        model = MaskClassifier50().to(device)
        model_path = "pruned_model_50percent.pth"
    elif pruning_ratio == 0.7:
        model = MaskClassifier70().to(device)
        model_path = "pruned_model_70percent.pth"
    else:
        raise ValueError("Unsupported pruning ratio")
    
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()
    return model

In [28]:
def infer_webcam(pruning_ratio):
    device = torch.device('cpu')
    if pruning_ratio == 30:
        model = MaskClassifier30().to(device)
        model_path = "pruned_model_30percent.pth"
    elif pruning_ratio == 50:
        model = MaskClassifier50().to(device)
        model_path = "pruned_model_50percent.pth"
    elif pruning_ratio == 70:
        model = MaskClassifier70().to(device)
        model_path = "pruned_model_70percent.pth"
    else:
        model = MaskClassifier().to(device)
        model_path = 'mask_classifier.pth'
    
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()

    # 얼굴 검출을 위한 cascade classifier 로드
    face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
    # cascade_path = "/usr/share/opencv4/haarcascades/haarcascade_frontalface_default.xml"  # 일반적인 Linux 설치 경로
    # face_cascade = cv2.CascadeClassifier(cascade_path)
    
    cap = cv2.VideoCapture(0)
    dataset_classes = ["With Mask", "Without Mask"]

    print("Press 'q' to quit.")
    
    # 성능 측정을 위한 변수들
    frame_count = 0
    total_time = 0
    total_inference_time = 0
    prev_time = time.time()
    fps = 0
    
    while True:
        frame_start_time = time.time()
        ret, frame = cap.read()
        if not ret:
            break
        
        # 좌우 반전
        frame = cv2.flip(frame, 1)
        
        # 밝기 조정 (화면 어둡게 하기)
        brightness_offset = 0  # 밝기를 낮출 값 (0~255)
        frame = cv2.convertScaleAbs(frame, alpha=1, beta=-brightness_offset)
        
        # 얼굴 검출
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        faces = face_cascade.detectMultiScale(
            gray,
            scaleFactor=1.1,
            minNeighbors=5,
            minSize=(60, 60)
        )
        
        # FPS 계산
        current_time = time.time()
        fps = 1 / (current_time - prev_time)
        prev_time = current_time
        
        # 검출된 얼굴에 대해 마스크 분류 수행
        for (x, y, w, h) in faces:
            # 얼굴 영역 추출
            face_roi = frame[max(0, y-30):min(frame.shape[0], y+h+30), 
                           max(0, x-30):min(frame.shape[1], x+w+30)]
            
            if face_roi.size != 0:
                # 추론 시간 측정 시작
                inference_start = time.time()
                
                # 마스크 분류
                input_tensor = preprocess_image(face_roi).to(device)
                with torch.no_grad():
                    output = model(input_tensor)
                    _, pred = torch.max(output, 1)
                
                # 추론 시간 측정 종료
                inference_time = time.time() - inference_start
                total_inference_time += inference_time
                
                # 결과 표시
                label = dataset_classes[pred.item()]
                color = (0, 255, 0) if "With" in label else (0, 0, 255)
                
                # 얼굴 영역 표시
                cv2.rectangle(frame, (x, y), (x+w, y+h), color, 2)
                
                # 라벨과 추론 시간 표시
                cv2.putText(frame, f"{label} ({inference_time*1000:.1f}ms)", 
                          (x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2)
        
        # 프레임 처리 완료 시간 계산
        frame_time = time.time() - frame_start_time
        total_time += frame_time
        frame_count += 1
        
        # 성능 지표 표시
        cv2.putText(frame, f"FPS: {fps:.1f}", (10, 30), 
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        cv2.putText(frame, f"Frame Time: {frame_time*1000:.1f}ms", (10, 70), 
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        
        # 결과 표시
        cv2.imshow("Mask Detection", frame)
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    # 최종 성능 통계 계산
    avg_fps = frame_count / total_time
    avg_frame_time = total_time / frame_count
    avg_inference_time = total_inference_time / frame_count if frame_count > 0 else 0
    
    print(f"\n=== Performance Statistics-Pruning {pruning_ratio}%===")
    print(f"Total Frames: {frame_count}")
    print(f"Average FPS: {avg_fps:.1f}")
    print(f"Average Frame Time: {avg_frame_time*1000:.1f}ms")
    print(f"Average Inference Time: {avg_inference_time*1000:.1f}ms\n")

    cap.release()
    cv2.destroyAllWindows()

In [None]:

def infer_csi_camera(pruning_ratio):
    """
    Jetson Nano의 CSI 카메라를 활용한 실시간 추론 함수
    """
    # 모델 로드
    device = torch.device('cpu')
    if pruning_ratio == 30:
        model = MaskClassifier30().to(device)
        model_path = "pruned_model_30percent.pth"
    elif pruning_ratio == 50:
        model = MaskClassifier50().to(device)
        model_path = "pruned_model_50percent.pth"
    elif pruning_ratio == 70:
        model = MaskClassifier70().to(device)
        model_path = "pruned_model_70percent.pth"
    else:
        model = MaskClassifier().to(device)
        model_path = 'mask_classifier.pth'
    
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()

    # 얼굴 검출을 위한 cascade classifier 로드
    # face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
    cascade_path = "/usr/share/opencv4/haarcascades/haarcascade_frontalface_default.xml"  # 일반적인 Linux 설치 경로
    face_cascade = cv2.CascadeClassifier(cascade_path)
    
    # GStreamer 파이프라인 정의
    gst_pipeline = (
        "nvarguscamerasrc ! "
        "video/x-raw(memory:NVMM), width=640, height=480, format=(string)NV12, framerate=30/1 ! "
        "nvvidconv flip-method=0 ! "
        "video/x-raw, width=640, height=480, format=(string)BGRx ! "
        "videoconvert ! "
        "video/x-raw, format=(string)BGR ! appsink"
    )

    cap = cv2.VideoCapture(gst_pipeline, cv2.CAP_GSTREAMER)
    if not cap.isOpened():
        print("CSI 카메라를 열 수 없습니다.")
        return

    dataset_classes = ["With Mask", "Without Mask"]
    print("Press 'q' to quit.")
    
    # 성능 측정을 위한 변수들
    frame_count = 0
    total_time = 0
    total_inference_time = 0
    prev_time = time.time()
    fps = 0
    
    while True:
        frame_start_time = time.time()
        ret, frame = cap.read()
        if not ret:
            print("카메라 프레임을 읽을 수 없습니다.")
            break
        
        # 좌우 반전
        frame = cv2.flip(frame, 1)
        
        # 밝기 조정 (화면 어둡게 하기)
        brightness_offset = 50  # 밝기를 낮출 값 (0~255)
        frame = cv2.convertScaleAbs(frame, alpha=1, beta=-brightness_offset)
        
        # 얼굴 검출
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        faces = face_cascade.detectMultiScale(
            gray,
            scaleFactor=1.1,
            minNeighbors=5,
            minSize=(60, 60)
        )
        
        # FPS 계산
        current_time = time.time()
        fps = 1 / (current_time - prev_time)
        prev_time = current_time
        
        # 검출된 얼굴에 대해 마스크 분류 수행
        for (x, y, w, h) in faces:
            # 얼굴 영역 추출
            face_roi = frame[max(0, y-30):min(frame.shape[0], y+h+30), 
                           max(0, x-30):min(frame.shape[1], x+w+30)]
            
            if face_roi.size != 0:
                # 추론 시간 측정 시작
                inference_start = time.time()
                
                # 마스크 분류
                input_tensor = preprocess_image(face_roi).to(device)
                with torch.no_grad():
                    output = model(input_tensor)
                    _, pred = torch.max(output, 1)
                
                # 추론 시간 측정 종료
                inference_time = time.time() - inference_start
                total_inference_time += inference_time
                
                # 결과 표시
                label = dataset_classes[pred.item()]
                color = (0, 255, 0) if "With" in label else (0, 0, 255)
                
                # 얼굴 영역 표시
                cv2.rectangle(frame, (x, y), (x+w, y+h), color, 2)
                
                # 라벨과 추론 시간 표시
                cv2.putText(frame, f"{label} ({inference_time*1000:.1f}ms)", 
                          (x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2)
        
        # 프레임 처리 완료 시간 계산
        frame_time = time.time() - frame_start_time
        total_time += frame_time
        frame_count += 1
        
        # 성능 지표 표시
        cv2.putText(frame, f"FPS: {fps:.1f}", (10, 30), 
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        cv2.putText(frame, f"Frame Time: {frame_time*1000:.1f}ms", (10, 70), 
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        
        # 결과 표시
        cv2.imshow("CSI Camera Mask Detection", frame)
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    # 최종 성능 통계 계산
    avg_fps = frame_count / total_time
    avg_frame_time = total_time / frame_count
    avg_inference_time = total_inference_time / frame_count if frame_count > 0 else 0
    
    print(f"\n=== Performance Statistics-Pruning {pruning_ratio}%===")
    print(f"Total Frames: {frame_count}")
    print(f"Average FPS: {avg_fps:.1f}")
    print(f"Average Frame Time: {avg_frame_time*1000:.1f}ms")
    print(f"Average Inference Time: {avg_inference_time*1000:.1f}ms")

    cap.release()
    cv2.destroyAllWindows()

In [None]:
infer_webcam(0)
infer_webcam(30)
infer_webcam(50)
infer_webcam(70)

In [33]:
infer_csi_camera(0)
infer_csi_camera(30)
infer_csi_camera(50)
infer_csi_camera(70)

CSI 카메라를 열 수 없습니다.
CSI 카메라를 열 수 없습니다.
CSI 카메라를 열 수 없습니다.
CSI 카메라를 열 수 없습니다.


In [12]:
def count_parameters(model):
    """Calculate total, trainable, zero and non-zero parameters"""
    total_params = 0
    trainable_params = 0
    zero_params = 0
    nonzero_params = 0
    
    for p in model.parameters():
        total_params += p.numel()
        if p.requires_grad:
            trainable_params += p.numel()
        
        # Count zero and non-zero parameters
        zero_params += torch.sum(p == 0).item()
        nonzero_params += torch.sum(p != 0).item()
    
    return {
        'total': total_params,
        'trainable': trainable_params,
        'zero': zero_params,
        'nonzero': nonzero_params
    }
def calculate_flops(model):
    """Calculate FLOPs considering zero parameters"""
    input_tensor = torch.randn(1, 3, 128, 128)
    macs, params = profile(model, inputs=(input_tensor,))
    
    flops = 2 * macs  # Convert MACs to FLOPs
    
    # 0이 아닌 파라미터의 비율 계산
    total_params = 0
    nonzero_params = 0
    for name, param in model.named_parameters():
        if 'weight' in name:  # weight 파라미터만 고려
            total_params += param.numel()
            nonzero_params += torch.sum(param != 0).item()
    
    # 실제 수행되는 연산량 추정
    sparsity = 1 - (nonzero_params / total_params)
    actual_flops = flops * (1 - sparsity)
    
    return {
        'raw_flops': flops,
        'actual_flops': actual_flops,
        'sparsity': sparsity * 100
    }

def get_model_size(model):
    """Calculate model size in various units"""
    # state_dict()를 통한 실제 메모리 사용량 계산
    param_size = 0
    buffer_size = 0
    
    # 파라미터 크기 계산
    for param in model.state_dict().values():
        param_size += param.numel() * param.element_size()
    
    # 버퍼 크기 계산 (BN의 running mean/var 등)
    for buffer in model.buffers():
        buffer_size += buffer.numel() * buffer.element_size()
        
    # 총 크기 계산
    total_size = param_size + buffer_size
    
    # 다양한 단위로 변환
    size_bytes = total_size
    size_kb = total_size / 1024
    size_mb = size_kb / 1024
    
    return {
        'bytes': size_bytes,
        'kb': size_kb,
        'mb': size_mb,
        'params': sum(p.numel() for p in model.parameters()),
        'param_size': param_size,
        'buffer_size': buffer_size
    }

In [24]:
def print_model_analysis():
    """여러 프루닝 모델들의 분석 결과를 비교하여 출력"""
    device = torch.device('cpu')
    
    # 모델 불러오기
    models = {
        'Original Model': MaskClassifier().to(device),
        'Pruned 30%': MaskClassifier30().to(device),
        'Pruned 50%': MaskClassifier50().to(device),
        'Pruned 70%': MaskClassifier70().to(device)
    }
    
    # 모델 가중치 로드
    models['Original Model'].load_state_dict(torch.load("mask_classifier.pth", map_location=device))
    models['Pruned 30%'].load_state_dict(torch.load("pruned_model_30percent.pth", map_location=device))
    models['Pruned 50%'].load_state_dict(torch.load("pruned_model_50percent.pth", map_location=device))
    models['Pruned 70%'].load_state_dict(torch.load("pruned_model_70percent.pth", map_location=device))
    
    print("\n=== Model Analysis ===")
    
    # 파라미터 분석
    print("\nParameter Analysis:")
    print("-" * 100)
    print(f"{'Model':15} {'Total Params':>15} {'Non-zero':>15} {'Zero':>15} {'Sparsity %':>15}")
    print("-" * 100)
    
    for name, model in models.items():
        params = count_parameters(model)
        sparsity = params['zero'] / params['total'] * 100
        print(f"{name:15} {params['total']:15,d} {params['nonzero']:15,d} "
              f"{params['zero']:15,d} {sparsity:15.1f}")
    
    # 메모리 크기 분석
    print("\nMemory Analysis:")
    print("-" * 100)
    print(f"{'Model':15} {'Total Size(MB)':>15} {'Param Size(MB)':>20} {'Buffer Size(MB)':>20}")
    print("-" * 100)
    
    for name, model in models.items():
        size_info = get_model_size(model)
        print(f"{name:15} {size_info['mb']:15.2f} "
              f"{size_info['param_size']/1024/1024:20.2f} "
              f"{size_info['buffer_size']/1024/1024:20.2f}")
    
    # FLOPs 분석
    print("\nComputational Cost Analysis:")
    print("-" * 100)
    print(f"{'Model':15} {'FLOPs':>20}")
    print("-" * 100)
    
    for name, model in models.items():
        flops_info = calculate_flops(model)
        print(f"{name:15} {flops_info['raw_flops']:20,.0f}")
    
    # 실행 시간 분석 (100회 추론 평균)
    print("\nInference Time Analysis (100 runs):")
    print("-" * 60)
    print(f"{'Model':15} {'Average Time (ms)':>20}")
    print("-" * 60)
    
    input_tensor = torch.randn(1, 3, 128, 128).to(device)
    
    for name, model in models.items():
        model.eval()
        times = []
        with torch.no_grad():
            for _ in range(100):
                start_time = time.time()
                _ = model(input_tensor)
                times.append((time.time() - start_time) * 1000)  # Convert to ms
        
        avg_time = np.mean(times)
        print(f"{name:15} {avg_time:20.2f}")

In [27]:
print_model_analysis()


=== Model Analysis ===

Parameter Analysis:
----------------------------------------------------------------------------------------------------
Model              Total Params        Non-zero            Zero      Sparsity %
----------------------------------------------------------------------------------------------------
Original Model           93,954          93,954               0             0.0
Pruned 30%               45,195          45,195               0             0.0
Pruned 50%               23,938          23,938               0             0.0
Pruned 70%                8,556           8,556               0             0.0

Memory Analysis:
----------------------------------------------------------------------------------------------------
Model            Total Size(MB)       Param Size(MB)      Buffer Size(MB)
----------------------------------------------------------------------------------------------------
Original Model             0.36                 0.36       