In [1]:
#저장:현재 작업 디렉토리
import os
SAVE_DIR = '.'
current_directory = os.getcwd()
print(current_directory)

/content


## 1. 모델 정의 (VGG16_BN)

In [2]:
# 필요한 라이브러리
import torch
import torch.nn as nn
import math
from collections import OrderedDict

In [3]:

#------------------------------신경망 구조 설정값(vgg_16_bn)모델 전용-------------------------------------
defaultcfg = [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'M', 512, 512, 512, 'M', 512, 512, 512, 512]
relucfg = [2, 6, 9, 13, 16, 19, 23, 26, 29, 33, 36, 39]
convcfg = [0, 3, 7, 10, 14, 17, 20, 24, 27, 30, 34, 37]
#---------------------------------------------------------------------------------------------------------


class VGG(nn.Module):
    def __init__(self, num_classes=100, init_weights=True, cfg=None, compress_rate=None):
        super(VGG, self).__init__()
        self.features = nn.Sequential()

        if cfg is None:
            cfg = defaultcfg

        #(추가) compress_rate가 None이면 기본값 설정. 모델 훈련시에는 None입력
        if compress_rate is None:
          num_conv = len([x for x in cfg[:-1] if x != 'M'])
          compress_rate = [0.0] * num_conv

        self.relucfg = relucfg
        self.covcfg = convcfg
        self.compress_rate = compress_rate
        self.features = self.make_layers(cfg[:-1], True, compress_rate)
        #(추가) CIFAR100데이터셋에서 Regularization을 위해 기존 classifier구조에
        # Dropout layer를 추가함.
        self.classifier = nn.Sequential(OrderedDict([
            ('linear1', nn.Linear(cfg[-2], cfg[-1])),
            ('norm1', nn.BatchNorm1d(cfg[-1])),
            ('relu1', nn.ReLU(inplace=True)),
            ('dropout1', nn.Dropout(0.5)), #(추가)
            ('linear2', nn.Linear(cfg[-1], num_classes)),
        ]))

        if init_weights:
            self._initialize_weights()

    def make_layers(self, cfg, batch_norm=True, compress_rate=None):
        layers = nn.Sequential()
        in_channels = 3
        cnt = 0
        for i, v in enumerate(cfg):
            if v == 'M':
                layers.add_module('pool%d' % i, nn.MaxPool2d(kernel_size=2, stride=2))
            else:
                conv2d = nn.Conv2d(in_channels, v, kernel_size=3, padding=1)
                conv2d.cp_rate = compress_rate[cnt]
                cnt += 1

                layers.add_module('conv%d' % i, conv2d)
                layers.add_module('norm%d' % i, nn.BatchNorm2d(v))
                layers.add_module('relu%d' % i, nn.ReLU(inplace=True))
                in_channels = v

        return layers

    def forward(self,x):
      x = self.features(x)

      x = nn.AvgPool2d(2)(x)
      x = x.view(x.size(0),-1)
      x = self.classifier(x)
      return x

    def _initialize_weights(self):
      for m in self.modules():
        if isinstance(m,nn.Conv2d):
          n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels #(n = H x W x out_channels).He초기화에 사용
          m.weight.data.normal_(0,math.sqrt(2. /n)) ## He 초기화, (0,0sqrt(2/n))정규분포에서 샘플링
          if m.bias is not None:
            m.bias.data.zero_()
        elif isinstance(m,nn.BatchNorm2d):
          m.weight.data.fill_(0.5)
          m.bias.data.zero_()
        elif isinstance(m,nn.Linear):
          m.weight.data.normal_(0,0.01)
          m.bias.data.zero_()

def vgg_16_bn(compress_rate = None):
  return VGG(compress_rate = compress_rate)


##2. 모델 훈련

In [4]:
# 필요한 라이브러리
import os
import torchvision
from torchvision import datasets
from torchvision import transforms
from torchvision.transforms import ToTensor
from torch.utils.data import DataLoader
import torch.optim as optim
from torch.optim.lr_scheduler import MultiStepLR

In [5]:
# 하이퍼파라미터
LEARNING_RATE = 0.01
BATCH_SIZE = 128
WEIGHT_DECAY = 0.0005
MOMENTUM = 0.9
NUM_EPOCHS = 250

In [6]:
# 모델 훈련 실행 스크립트
#-----------------------------------------------------------------------
print("=============== 1. 설정 시작 ===============")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

#-----------------------------------------------------------------------

print("=============== 2. 데이터 로딩 ===============")
transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.1, hue=0.1),
    transforms.RandomRotation(15),
    transforms.ToTensor(),
    transforms.Normalize((0.5071, 0.4867, 0.4408), (0.2675, 0.2565, 0.2761)),
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5071, 0.4867, 0.4408), (0.2675, 0.2565, 0.2761)),
])

trainset = torchvision.datasets.CIFAR100(root='./data', train=True, download=True, transform=transform_train)
train_loader = torch.utils.data.DataLoader(trainset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)

testset = torchvision.datasets.CIFAR100(root='./data', train=False, download=True, transform=transform_test)
test_loader = torch.utils.data.DataLoader(testset, batch_size=100, shuffle=False, num_workers=2)
print("데이터 준비 완료!")

#-----------------------------------------------------------------------

print("=============== 3. 모델,loss,옵티마이저,스케쥴러 정의 ===============")
model = VGG(num_classes=100, init_weights=True, compress_rate=None).to(device)

criterion = nn.CrossEntropyLoss(label_smoothing=0.1)
optimizer = torch.optim.SGD(model.parameters(), lr=LEARNING_RATE, momentum=MOMENTUM,
                          weight_decay=WEIGHT_DECAY, nesterov=True)
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer,
                                               milestones=[150, 225],
                                               gamma=0.1)
print("모델 정의 완료!")

#-----------------------------------------------------------------------

print("=============== 4. 훈련 시작 ===============")

# Early stopping
best_accuracy = 0.0
patience = 20
patience_counter = 0

for epoch in range(NUM_EPOCHS):
    #훈련
    model.train()
    running_loss = 0.0
    for i, (inputs, labels) in enumerate(train_loader):
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    scheduler.step()

    #평가
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total

    # Early stopping 체크
    if accuracy > best_accuracy:
        best_accuracy = accuracy
        patience_counter = 0
        best_model_path = os.path.join(SAVE_DIR, 'best_vgg16_bn_cifar100.pt')
        torch.save(model.state_dict(), best_model_path)
        print(f"새로운 최고 성능 모델 저장: {best_model_path}")
    else:
        patience_counter += 1

    # 로그 출력
    print(f'Epoch [{epoch+1}/{NUM_EPOCHS}] | '
          f'Loss: {running_loss / len(train_loader):.4f} | '
          f'Test Accuracy: {accuracy:.2f} % | '
          f'Best Accuracy: {best_accuracy:.2f} % | '
          f'Patience: {patience_counter}/{patience} | '
          f'Current LR: {optimizer.param_groups[0]["lr"]}')

    # Early stopping 조건
    if patience_counter >= patience:
        print(f'\n=============== Early Stopping ===============')
        print(f'훈련이 {epoch+1} 에폭에서 조기 중단되었습니다.')
        print(f'최고 정확도: {best_accuracy:.2f}%')
        break
print('=============== 훈련 종료 ===============')

Using device: cuda


100%|██████████| 169M/169M [00:12<00:00, 13.2MB/s]


데이터 준비 완료!
모델 정의 완료!
새로운 최고 성능 모델 저장: ./best_vgg16_bn_cifar100.pt
Epoch [1/250] | Loss: 4.0471 | Test Accuracy: 16.37 % | Best Accuracy: 16.37 % | Patience: 0/20 | Current LR: 0.01
새로운 최고 성능 모델 저장: ./best_vgg16_bn_cifar100.pt
Epoch [2/250] | Loss: 3.6403 | Test Accuracy: 24.12 % | Best Accuracy: 24.12 % | Patience: 0/20 | Current LR: 0.01
새로운 최고 성능 모델 저장: ./best_vgg16_bn_cifar100.pt
Epoch [3/250] | Loss: 3.3793 | Test Accuracy: 30.89 % | Best Accuracy: 30.89 % | Patience: 0/20 | Current LR: 0.01
새로운 최고 성능 모델 저장: ./best_vgg16_bn_cifar100.pt
Epoch [4/250] | Loss: 3.1710 | Test Accuracy: 35.36 % | Best Accuracy: 35.36 % | Patience: 0/20 | Current LR: 0.01
새로운 최고 성능 모델 저장: ./best_vgg16_bn_cifar100.pt
Epoch [5/250] | Loss: 2.9915 | Test Accuracy: 39.13 % | Best Accuracy: 39.13 % | Patience: 0/20 | Current LR: 0.01
Epoch [6/250] | Loss: 2.8516 | Test Accuracy: 36.97 % | Best Accuracy: 39.13 % | Patience: 1/20 | Current LR: 0.01
새로운 최고 성능 모델 저장: ./best_vgg16_bn_cifar100.pt
Epoch [7/250] | Los

## 3.랭크 계산 함수

In [None]:
# 필요한 라이브러리
from collections import defaultdict
import torchvision.transforms as transforms
import numpy as np
import matplotlib.pyplot as plt

In [None]:
class HRankFeatureMapCalculator:
    def __init__(self, model, device):
        self.model = model
        self.device = device
        self.feature_result = torch.tensor(0.)#각 레이어의 채널별 랭크 합계 누적할 텐서
        self.total = torch.tensor(0.)#랭크 계산에 사용된 총 데이터 샘플 수를 누적할 텐서
        self.current_ranks = {} # 현재 계산된 랭크를 저장하기 위한 딕셔너리

    def get_feature_hook(self, layer_name):
        def hook_fn(module, input, output):
            #output 텐서의 모양은 [batch,ch,H,W]
            a = output.shape[0]  # batch_size
            b = output.shape[1]  # num_channels
            # 각 샘플, 각 채널에 대해 matrix rank 계산
            c = torch.tensor([
                torch.linalg.matrix_rank(output[i, j, :, :]).item()
                for i in range(a)
                for j in range(b)
            ])

            # reshape하여 [batch_size, num_channels] 형태로 만들고 합계 계산
            c = c.view(a, -1).float()
            c = c.sum(0)  # 배치 차원에서 합계 (각 채널별 총 rank)

            # 누적 평균 계산
            self.feature_result = self.feature_result * self.total + c
            self.total = self.total + a
            self.feature_result = self.feature_result / self.total

        return hook_fn

    #all_ranks 딕셔너리를 만들기 위한 함수.
    #키값은 'conv%d'이고 value는  array([rank_ch0, rank_ch1, rank_ch2, ..., rank_ch63])로 된 딕셔너리를 반환함.
    def calculate_ranks_hrank_style(self, dataloader, limit=5):
        self.model.eval()
        all_ranks = {}

        # 각 ReLU 레이어에 대해 순차적으로 랭크 계산
        for layer_idx, relu_idx in enumerate(relucfg):
            layer_name = f"conv{layer_idx + 1}"
            relu_module = self.model.features[relu_idx]  # features의 relu_idx 위치

            print(f"\n{layer_name} 랭크 계산 중...")

            # 초기화
            self.feature_result = torch.tensor(0.)
            self.total = torch.tensor(0.)

            # Hook 등록
            hook = relu_module.register_forward_hook(self.get_feature_hook(layer_name))

            try:
                with torch.no_grad():
                    for batch_idx, (inputs, targets) in enumerate(dataloader):
                        if batch_idx >= limit:
                            break

                        inputs = inputs.to(self.device)
                        _ = self.model(inputs)

                # 결과 저장
                all_ranks[layer_name] = self.feature_result.clone().numpy()
                print(f"  완료 - 채널 수: {len(all_ranks[layer_name])}")
                print(f"  평균 랭크: {all_ranks[layer_name].mean():.2f}, 범위: {all_ranks[layer_name].min():.0f}~{all_ranks[layer_name].max():.0f}")

            finally:
                hook.remove()

        return all_ranks

    def get_layer_info(self, dataloader):
        """각 레이어의 정보 수집 (채널 수, feature map 크기 등)"""
        layer_info = {}
        hooks = []
        layer_outputs = {}

        def make_info_hook(layer_name):
            def hook_fn(module, input, output):
                layer_outputs[layer_name] = {
                    'shape': output.shape,
                    'num_channels': output.shape[1],
                    'height': output.shape[2],
                    'width': output.shape[3],
                    'max_possible_rank': min(output.shape[2], output.shape[3])
                }
            return hook_fn

        # 모든 ReLU 레이어에 hook 등록
        for layer_idx, relu_idx in enumerate(relucfg):
            layer_name = f"conv{layer_idx + 1}"
            relu_module = self.model.features[relu_idx]  # features의 relu_idx 위치
            hook = relu_module.register_forward_hook(make_info_hook(layer_name))
            hooks.append(hook)

        # 한 번 forward pass
        self.model.eval()
        with torch.no_grad():
            inputs, _ = next(iter(dataloader))
            inputs = inputs.to(self.device)
            _ = self.model(inputs)

        # Hook 제거
        for hook in hooks:
            hook.remove()

        return layer_outputs

  #---------------------여기부터는 보조 함수------------------------------------
    def visualize_hrank_results(self, all_ranks, layer_info, save_path=None):
        """HRank 결과 시각화"""
        num_layers = len(all_ranks)

        # 서브플롯 배치 계산
        if num_layers <= 4:
            rows, cols = 2, 2
        elif num_layers <= 6:
            rows, cols = 2, 3
        elif num_layers <= 9:
            rows, cols = 3, 3
        else:
            rows, cols = 4, (num_layers + 3) // 4

        fig, axes = plt.subplots(rows, cols, figsize=(15, 10))
        axes = axes.flatten() if num_layers > 1 else [axes]

        for idx, (layer_name, ranks) in enumerate(all_ranks.items()):
            if idx >= len(axes):
                break

            ax = axes[idx]

            # 히스토그램
            ax.hist(ranks, bins=min(20, len(np.unique(ranks))), alpha=0.7,
                   edgecolor='black', color='skyblue')

            # 제목과 레이블
            info = layer_info.get(layer_name, {})
            num_channels = info.get('num_channels', len(ranks))
            fm_size = f"{info.get('height', '?')}x{info.get('width', '?')}"

            ax.set_title(f'{layer_name}\n({num_channels} channels, {fm_size})')
            ax.set_xlabel('Rank')
            ax.set_ylabel('Number of Channels')
            ax.grid(True, alpha=0.3)

            # 통계 정보
            ax.axvline(ranks.mean(), color='red', linestyle='--',
                      label=f'Mean: {ranks.mean():.1f}')
            ax.legend()

            # 텍스트 박스에 상세 정보
            textstr = f'Min: {ranks.min():.0f}\nMax: {ranks.max():.0f}\nStd: {ranks.std():.1f}'
            props = dict(boxstyle='round', facecolor='wheat', alpha=0.5)
            ax.text(0.95, 0.95, textstr, transform=ax.transAxes, fontsize=8,
                   verticalalignment='top', bbox=props, ha='right')

        # 빈 subplot 숨기기
        for idx in range(len(all_ranks), len(axes)):
            axes[idx].set_visible(False)

        plt.tight_layout()

        plt.show()

    def print_hrank_summary(self, all_ranks, layer_info):
        """HRank 결과 요약 출력"""
        print("\n" + "="*70)
        print("HRank Feature Map 랭크 계산 결과 요약")
        print("="*70)

        total_channels = 0
        total_low_rank = 0

        for layer_name, ranks in all_ranks.items():
            info = layer_info.get(layer_name, {})

            print(f"\n{layer_name}:")
            print(f"  채널 수: {len(ranks)}")
            if 'height' in info and 'width' in info:
                print(f"  Feature Map 크기: {info['height']}x{info['width']}")
                print(f"  최대 가능 랭크: {info['max_possible_rank']}")

            print(f"  평균 랭크: {ranks.mean():.2f}")
            print(f"  랭크 범위: {ranks.min():.0f} ~ {ranks.max():.0f}")
            print(f"  랭크 표준편차: {ranks.std():.2f}")

            # 낮은 랭크 비율 계산
            if 'max_possible_rank' in info:
                low_rank_threshold = info['max_possible_rank'] * 0.5
                low_rank_count = (ranks < low_rank_threshold).sum()
                low_rank_ratio = (low_rank_count / len(ranks)) * 100
                print(f"  낮은 랭크 채널 (< {low_rank_threshold:.1f}): {low_rank_count}/{len(ranks)} ({low_rank_ratio:.1f}%)")

                total_channels += len(ranks)
                total_low_rank += low_rank_count

        if total_channels > 0:
            overall_low_rank_ratio = (total_low_rank / total_channels) * 100
            print(f"\n전체 요약:")
            print(f"  총 채널 수: {total_channels}")
            print(f"  낮은 랭크 채널: {total_low_rank} ({overall_low_rank_ratio:.1f}%)")

def test_hrank_implementation(model, test_loader, device, limit=5):
    """HRank 구현 테스트"""
    print("="*60)
    print(" 랭크 계산 테스트")
    print("="*60)

    # 계산기 초기화
    calculator = HRankFeatureMapCalculator(model, device)

    # 레이어 정보 수집
    print("\n1. 레이어 정보 수집 중...")
    layer_info = calculator.get_layer_info(test_loader)

    # 랭크 계산
    print("\n2. 랭크 계산 중...")
    all_ranks = calculator.calculate_ranks_hrank_style(test_loader, limit=limit)

    # 결과 요약
    calculator.print_hrank_summary(all_ranks, layer_info)

    # 시각화
    print("\n3. 결과 시각화...")
    calculator.visualize_hrank_results(all_ranks, layer_info, save_path='hrank_results.png')

    return all_ranks, layer_info


### 실행

In [None]:
# 랭크 계산 실행 스크립트

# 디바이스 설정
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"사용 디바이스: {device}")

# 모델 로드
model = VGG(num_classes=100, init_weights=True, compress_rate=None).to(device)

# 체크포인트 로드
checkpoint_path = './best_vgg16_bn_cifar100.pt'
model.load_state_dict(torch.load(checkpoint_path, map_location=device))
print("기존 모델 가중치 로드 완료")

# CIFAR-100 데이터로더
transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5071, 0.4867, 0.4408), (0.2675, 0.2565, 0.2761))])

testset = torchvision.datasets.CIFAR100(
    root='./data', train=False, download=True, transform=transform_test)

test_loader = torch.utils.data.DataLoader(
    testset, batch_size=100, shuffle=False, num_workers=2)

print("CIFAR-100 테스트 데이터 로드 완료")

# 모델이 제대로 로드되었는지 간단한 테스트
model.eval()
with torch.no_grad():
    sample_batch = next(iter(test_loader))
    sample_output = model(sample_batch[0][:1].to(device))
    print(f"모델 출력 shape: {sample_output.shape}")
    print(f"예측 클래스: {torch.argmax(sample_output, dim=1).item()}")

# HRank 테스트 실행
all_ranks, layer_info = test_hrank_implementation(model, test_loader, device, limit=5)

##4. 필터선택, 프루닝 계획 함수

In [None]:
# 필요한 라이브러리
from typing import Dict, List, Tuple

In [None]:
class HRankPruningPlanner:
    def __init__(self):
        pass

    def determine_pruning_plan(self, all_ranks: Dict, pruning_ratios: List[float]):
        """
        HRank 기반 프루닝 계획 수립

        Args:
            all_ranks: 각 레이어별 랭크 정보 {'conv1': array, 'conv2': array, ...}
            pruning_ratios: 각 레이어별 프루닝 비율 [0.3, 0.5, 0.2, ...]

        Returns:
            pruning_plan: 각 레이어별 프루닝 계획
            new_channels: 프루닝 후 각 레이어의 채널 수
        """
        pruning_plan = {}
        new_channels = {}

        layer_names = list(all_ranks.keys())

        # 프루닝 비율 검증 및 조정
        if len(pruning_ratios) == 1:
            # 모든 레이어에 동일한 비율 적용
            pruning_ratios = pruning_ratios * len(layer_names)
            print(f"모든 레이어에 {pruning_ratios[0]*100:.1f}% 프루닝 비율 적용")
        elif len(pruning_ratios) != len(layer_names):
            raise ValueError(f"프루닝 비율 개수({len(pruning_ratios)})와 레이어 수({len(layer_names)})가 맞지 않습니다.")

        print("\n" + "="*70)
        print("HRank 기반 프루닝 계획 수립")
        print("="*70)

        total_original_channels = 0
        total_pruned_channels = 0

        for i, (layer_name, ranks) in enumerate(all_ranks.items()):
            ratio = pruning_ratios[i]
            original_channels = len(ranks)
            total_original_channels += original_channels

            if ratio > 0 and ratio < 1.0:
                # 낮은 랭크 순으로 정렬하여 제거할 인덱스 선택
                num_to_prune = int(original_channels * ratio)
                # 최소 1개 채널은 유지
                num_to_prune = min(num_to_prune, original_channels - 1)

                # 랭크 기준으로 정렬 (낮은 랭크부터)
                sorted_indices = np.argsort(ranks)
                prune_indices = sorted_indices[:num_to_prune]
                keep_indices = sorted_indices[num_to_prune:]

                # 프루닝 계획 저장
                pruning_plan[layer_name] = {
                    'prune_indices': prune_indices.tolist(),
                    'keep_indices': keep_indices.tolist(),
                    'original_channels': original_channels,
                    'pruned_channels': len(keep_indices),
                    'pruning_ratio_actual': len(prune_indices) / original_channels,
                    'pruning_ratio_target': ratio,
                    'pruned_ranks': ranks[prune_indices],  # 제거될 채널들의 랭크
                    'kept_ranks': ranks[keep_indices]      # 유지될 채널들의 랭크
                }

                new_channels[layer_name] = len(keep_indices)
                total_pruned_channels += len(keep_indices)

            elif ratio >= 1.0:
                print(f"경고: {layer_name}의 프루닝 비율이 1.0 이상입니다. 프루닝하지 않습니다.")
                keep_indices = list(range(original_channels))
                pruning_plan[layer_name] = {
                    'prune_indices': [],
                    'keep_indices': keep_indices,
                    'original_channels': original_channels,
                    'pruned_channels': original_channels,
                    'pruning_ratio_actual': 0.0,
                    'pruning_ratio_target': ratio,
                    'pruned_ranks': np.array([]),
                    'kept_ranks': ranks
                }
                new_channels[layer_name] = original_channels
                total_pruned_channels += original_channels

            else:
                # 프루닝하지 않음 (ratio == 0)
                keep_indices = list(range(original_channels))
                pruning_plan[layer_name] = {
                    'prune_indices': [],
                    'keep_indices': keep_indices,
                    'original_channels': original_channels,
                    'pruned_channels': original_channels,
                    'pruning_ratio_actual': 0.0,
                    'pruning_ratio_target': ratio,
                    'pruned_ranks': np.array([]),
                    'kept_ranks': ranks
                }
                new_channels[layer_name] = original_channels
                total_pruned_channels += original_channels

            # 결과 출력
            plan = pruning_plan[layer_name]
            print(f"{layer_name:>6}: {plan['original_channels']:>3} -> {plan['pruned_channels']:>3} 채널 "
                  f"({plan['pruning_ratio_actual']*100:>5.1f}% 프루닝)")

            if len(plan['prune_indices']) > 0:
                print(f"        제거될 채널 랭크 범위: {plan['pruned_ranks'].min():.1f} ~ {plan['pruned_ranks'].max():.1f}")
                print(f"        유지될 채널 랭크 범위: {plan['kept_ranks'].min():.1f} ~ {plan['kept_ranks'].max():.1f}")

        # 전체 요약
        overall_pruning_ratio = 1 - (total_pruned_channels / total_original_channels)
        print(f"\n전체 요약:")
        print(f"  총 원본 채널 수: {total_original_channels}")
        print(f"  총 프루닝 후 채널 수: {total_pruned_channels}")
        print(f"  전체 프루닝 비율: {overall_pruning_ratio*100:.1f}%")

        return pruning_plan, new_channels

    def determine_classifier_pruning(self, classifier_state_dict: Dict, pruning_ratio: float):
        """
        2-layer Classifier 프루닝 계획 수립

        Args:
            classifier_state_dict: classifier의 state_dict (torch.load로 로드한 .pt 파일 또는 model.classifier.state_dict())
            pruning_ratio: 중간 hidden layer 프루닝 비율 (0.0 ~ 1.0)

        Returns:
            classifier_plan: Classifier 프루닝 계획

        사용 예시:
            # 방법 1: .pt 파일에서 직접 로드
            classifier_weights = torch.load('classifier.pt')
            plan = planner.determine_classifier_pruning(classifier_weights, 0.3)
        """
        print("\n" + "="*70)
        print("2-Layer Classifier 프루닝 계획 수립")
        print("="*70)

        # state_dict에서 가중치 추출
        if 'linear1.weight' in classifier_state_dict:
            linear1_weight = classifier_state_dict['linear1.weight'].cpu().numpy()
            linear2_weight = classifier_state_dict['linear2.weight'].cpu().numpy()
        else:
            raise KeyError("classifier_state_dict에 'linear1.weight' 또는 'linear2.weight' 키가 없습니다. "
                          "키 목록: " + str(list(classifier_state_dict.keys())))

        # 차원 정보
        hidden_dim, in_features = linear1_weight.shape
        num_classes, hidden_dim_check = linear2_weight.shape

        if hidden_dim != hidden_dim_check:
            raise ValueError(f"linear1 출력 차원({hidden_dim})과 linear2 입력 차원({hidden_dim_check})이 일치하지 않습니다.")

        print(f"구조: {in_features} -> {hidden_dim} -> {num_classes}")

        if pruning_ratio <= 0 or pruning_ratio >= 1.0:
            print(f"경고: 프루닝 비율이 {pruning_ratio}입니다. 프루닝하지 않습니다.")
            classifier_plan = {
                'prune_indices': [],
                'keep_indices': list(range(hidden_dim)),
                'original_hidden_dim': hidden_dim,
                'pruned_hidden_dim': hidden_dim,
                'pruning_ratio_actual': 0.0,
                'pruning_ratio_target': pruning_ratio
            }
        else:
            # 각 hidden 뉴런의 중요도 계산
            # linear1의 출력 가중치 (각 행의 L2 norm)
            linear1_importance = np.linalg.norm(linear1_weight, axis=1)  # shape: [hidden_dim]

            # linear2의 입력 가중치 (각 열의 L2 norm)
            linear2_importance = np.linalg.norm(linear2_weight, axis=0)  # shape: [hidden_dim]

            # 두 중요도를 결합 (곱셈 또는 합)
            # 곱셈: 양쪽 모두에서 중요한 뉴런 선호
            combined_importance = linear1_importance * linear2_importance

            # 프루닝할 뉴런 수 계산
            num_to_prune = int(hidden_dim * pruning_ratio)
            num_to_prune = min(num_to_prune, hidden_dim - 1)  # 최소 1개는 유지

            # 중요도가 낮은 순으로 정렬
            sorted_indices = np.argsort(combined_importance)
            prune_indices = sorted_indices[:num_to_prune]
            keep_indices = sorted_indices[num_to_prune:]

            classifier_plan = {
                'prune_indices': prune_indices.tolist(),
                'keep_indices': keep_indices.tolist(),
                'original_hidden_dim': hidden_dim,
                'pruned_hidden_dim': len(keep_indices),
                'pruning_ratio_actual': len(prune_indices) / hidden_dim,
                'pruning_ratio_target': pruning_ratio,
                'pruned_importance': combined_importance[prune_indices],
                'kept_importance': combined_importance[keep_indices],
                'linear1_importance': linear1_importance,
                'linear2_importance': linear2_importance
            }

        # 결과 출력
        print(f"Hidden Layer: {classifier_plan['original_hidden_dim']:>5} -> {classifier_plan['pruned_hidden_dim']:>5} 뉴런 "
              f"({classifier_plan['pruning_ratio_actual']*100:>5.1f}% 프루닝)")
        print(f"  최종 구조: {in_features} -> {classifier_plan['pruned_hidden_dim']} -> {num_classes}")

        if len(classifier_plan['prune_indices']) > 0:
            print(f"  제거될 뉴런 중요도 범위: {classifier_plan['pruned_importance'].min():.4f} ~ "
                  f"{classifier_plan['pruned_importance'].max():.4f}")
            print(f"  유지될 뉴런 중요도 범위: {classifier_plan['kept_importance'].min():.4f} ~ "
                  f"{classifier_plan['kept_importance'].max():.4f}")

        return classifier_plan
    #----------------------------보조 함수--------------------------------------------

    def visualize_pruning_plan(self, all_ranks: Dict, pruning_plan: Dict, save_path=None):
        """프루닝 계획 시각화"""
        num_layers = len(all_ranks)

        if num_layers <= 4:
            rows, cols = 2, 2
        elif num_layers <= 6:
            rows, cols = 2, 3
        else:
            rows, cols = 3, (num_layers + 2) // 3

        fig, axes = plt.subplots(rows, cols, figsize=(15, 10))
        if num_layers == 1:
            axes = [axes]
        else:
            axes = axes.flatten()

        for idx, (layer_name, ranks) in enumerate(all_ranks.items()):
            if idx >= len(axes):
                break

            ax = axes[idx]
            plan = pruning_plan[layer_name]

            # 전체 랭크 히스토그램
            ax.hist(ranks, bins=20, alpha=0.3, color='gray', label='All channels', density=True)

            # 제거될 채널들의 랭크
            if len(plan['pruned_ranks']) > 0:
                ax.hist(plan['pruned_ranks'], bins=15, alpha=0.7, color='red',
                       label=f'Pruned ({len(plan["pruned_ranks"])})', density=True)

            # 유지될 채널들의 랭크
            if len(plan['kept_ranks']) > 0:
                ax.hist(plan['kept_ranks'], bins=15, alpha=0.7, color='blue',
                       label=f'Kept ({len(plan["kept_ranks"])})', density=True)

            ax.set_title(f'{layer_name}\n{plan["original_channels"]} -> {plan["pruned_channels"]} '
                        f'({plan["pruning_ratio_actual"]*100:.1f}%)')
            ax.set_xlabel('Rank')
            ax.set_ylabel('Density')
            ax.legend()
            ax.grid(True, alpha=0.3)

        # 빈 subplot 숨기기
        for idx in range(len(all_ranks), len(axes)):
            axes[idx].set_visible(False)

        plt.tight_layout()

        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
            print(f"프루닝 계획 시각화가 {save_path}에 저장되었습니다.")

        plt.show()

    def validate_pruning_plan(self, pruning_plan: Dict):
        """프루닝 계획 검증"""
        print("\n프루닝 계획 검증 중...")

        validation_passed = True

        for layer_name, plan in pruning_plan.items():
            # 인덱스 중복 검사
            prune_set = set(plan['prune_indices'])
            keep_set = set(plan['keep_indices'])

            if prune_set & keep_set:  # 교집합이 있으면
                print(f"오류: {layer_name}에서 제거/유지 인덱스가 중복됩니다.")
                validation_passed = False

            # 전체 인덱스 검사
            all_indices = set(range(plan['original_channels']))
            plan_indices = prune_set | keep_set

            if all_indices != plan_indices:
                print(f"오류: {layer_name}에서 인덱스가 누락되거나 초과됩니다.")
                validation_passed = False

            # 최소 채널 수 검사
            if plan['pruned_channels'] < 1:
                print(f"오류: {layer_name}에서 유지되는 채널이 없습니다.")
                validation_passed = False

        if validation_passed:
            print("프루닝 계획 검증 통과!")
        else:
            print("프루닝 계획에 오류가 있습니다.")

        return validation_passed


def test_pruning_planner(all_ranks, pruning_ratios, classifier_weights=None, classifier_ratio=0.0):
    """
    프루닝 계획 수립 테스트 (Conv + Classifier)

    Args:
        all_ranks: Conv 레이어별 랭크 정보
        pruning_ratios: Conv 레이어별 프루닝 비율
        classifier_weights: Classifier state_dict (optional, .pt 파일 또는 model.classifier.state_dict())
        classifier_ratio: Classifier 프루닝 비율 (optional, 0.0이면 프루닝 안함)

    Returns:
        pruning_plan: Conv 레이어 프루닝 계획
        new_channels: 프루닝 후 채널 수
        classifier_plan: Classifier 프루닝 계획 (없으면 None)
        is_valid: 계획 검증 결과
    """
    print("="*60)
    print("HRank 프루닝 계획 수립 테스트")
    print("="*60)

    planner = HRankPruningPlanner()

    # 1. Conv 레이어 프루닝 계획 수립
    pruning_plan, new_channels = planner.determine_pruning_plan(all_ranks, pruning_ratios)

    # 2. Classifier 프루닝 계획 수립 (옵션)
    classifier_plan = None
    if classifier_weights is not None and classifier_ratio > 0:
        classifier_plan = planner.determine_classifier_pruning(classifier_weights, classifier_ratio)

    # 3. 계획 검증
    is_valid = planner.validate_pruning_plan(pruning_plan)

    # 4. 시각화
    if is_valid:
        planner.visualize_pruning_plan(all_ranks, pruning_plan, save_path='pruning_plan.png')

    return pruning_plan, new_channels, classifier_plan, is_valid


In [None]:
#실행 예시

pruning_ratios = [0.3]

classifier_weights = torch.load('classifier.pt')  # 또는 model.classifier.state_dict()

pruning_plan, new_channels, classifier_plan, is_valid = test_pruning_planner(
    all_ranks=all_ranks,
    pruning_ratios=pruning_ratios,
    classifier_weights=classifier_weights,
    classifier_ratio=0.3  # 30% 프루닝
)


##5. 프루닝 블록

In [None]:
#필요한 라이브러리
import torch.optim as optim
from torch.optim.lr_scheduler import MultiStepLR

In [None]:
# 파인튜닝 용도 데이터셋
train_data = torchvision.datasets.CIFAR100(root='./data', train=True, download=True, transform=torchvision.transforms.ToTensor())
train_loader = torch.utils.data.DataLoader(train_data, batch_size=128, shuffle=True, num_workers=2)

test_data = torchvision.datasets.CIFAR100(root='./data', train=False, download=True, transform=torchvision.transforms.ToTensor())
test_loader = torch.utils.data.DataLoader(test_data, batch_size=128, shuffle=False, num_workers=2)
print("데이터 준비 완료!")

### 프루닝,파인튜닝 함수

In [None]:
def prune_and_finetune_vgg16(orig_pt: str,
                            pruning_plan: dict,
                            train_dataloader,
                            test_dataloader,
                            num_classes=100,
                            fine_tuning_epochs=30,
                            initial_learning_rate=0.01,
                            weight_decay=0.0005,
                            milestones=[5, 10],
                            gamma=0.1,
                            device=None,
                            classifier_plan=None):  # 새로 추가된 파라미터
    """
    VGG16_BN 모델을 프루닝하고 fine-tune하는 함수

    Args:
        orig_pt: 원본 모델 체크포인트 경로
        pruning_plan: 각 conv 레이어별 프루닝 정보 딕셔너리
        train_dataloader: 훈련 데이터로더
        test_dataloader: 테스트 데이터로더
        num_classes: 클래스 수
        fine_tuning_epochs: fine-tuning 에포크 수
        initial_learning_rate: 초기 학습률
        weight_decay: 가중치 감소
        milestones: 학습률 스케줄링 마일스톤
        gamma: 학습률 감소 비율
        device: 디바이스
        classifier_plan: classifier 프루닝 계획 (optional)
    """
    device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # VGG16_BN_PruneModel 인스턴스 생성
    model = VGG16_BN_PruneModel(num_classes=num_classes).to(device)

    # 체크포인트 로드 (features만)
    ckpt = torch.load(orig_pt, map_location=device)
    state_dict = ckpt['model_state_dict'] if 'model_state_dict' in ckpt else ckpt

    # features 부분만 로드 (classifier는 nn.Identity()이므로 제외)
    new_state_dict = {}
    for k, v in state_dict.items():
        if k.startswith('features.'):
            new_state_dict[k] = v

    model.load_state_dict(new_state_dict, strict=False)
    print("원본 모델의 features 부분 로드 완료")

    # VGG16의 conv 레이어들 매핑 (MaxPool 제외하고 Conv2d만)
    conv_indices = []
    bn_indices = []

    # features에서 Conv2d와 BatchNorm2d 레이어 인덱스 찾기
    for i, layer in enumerate(model.features):
        if isinstance(layer, nn.Conv2d):
            conv_indices.append(i)
            bn_indices.append(i + 1)  # Conv 다음이 BatchNorm

    print(f"Conv 레이어 인덱스: {conv_indices}")
    print(f"BatchNorm 레이어 인덱스: {bn_indices}")

    print("프루닝 시작...")

    # 각 conv 레이어별로 프루닝 수행
    for i, conv_name in enumerate([f'conv{j+1}' for j in range(len(conv_indices))]):
        if conv_name not in pruning_plan:
            print(f"{conv_name}: 프루닝하지 않음")
            continue

        conv_idx = conv_indices[i]
        bn_idx = bn_indices[i]

        # 현재 conv 레이어와 bn 레이어 가져오기
        conv_layer = model.features[conv_idx]
        bn_layer = model.features[bn_idx]

        # pruning_plan에서 유지할 채널 인덱스 가져오기
        keep_indices = pruning_plan[conv_name]['keep_indices']

        print(f"{conv_name}: {conv_layer.out_channels} -> {len(keep_indices)} 채널")

        # 1. 출력 채널 프루닝
        old_conv = conv_layer
        old_bn = bn_layer

        # 새로운 conv 레이어 생성 (출력 채널 수 조정)
        new_out_channels = len(keep_indices)
        new_conv = nn.Conv2d(
            old_conv.in_channels,
            new_out_channels,
            kernel_size=3,
            padding=1,
            bias=(old_conv.bias is not None)
        )

        # 가중치 복사 (유지할 채널들만)
        new_conv.weight.data = old_conv.weight.data[keep_indices].clone()
        if old_conv.bias is not None:
            new_conv.bias.data = old_conv.bias.data[keep_indices].clone()

        # 새로운 BatchNorm 레이어 생성
        new_bn = nn.BatchNorm2d(new_out_channels)
        new_bn.weight.data = old_bn.weight.data[keep_indices].clone()
        new_bn.bias.data = old_bn.bias.data[keep_indices].clone()
        new_bn.running_mean.data = old_bn.running_mean[keep_indices].clone()
        new_bn.running_var.data = old_bn.running_var[keep_indices].clone()

        # 레이어 교체
        model.features[conv_idx] = new_conv.to(device)
        model.features[bn_idx] = new_bn.to(device)

        # 2. 다음 레이어의 입력 채널 조정 (마지막 레이어가 아닌 경우)
        if i < len(conv_indices) - 1:
            next_conv_idx = conv_indices[i + 1]
            next_conv = model.features[next_conv_idx]

            # 다음 conv 레이어의 입력 채널 조정
            new_next_conv = nn.Conv2d(
                new_out_channels,  # 현재 레이어의 출력이 다음 레이어의 입력
                next_conv.out_channels,
                kernel_size=3,
                padding=1,
                bias=(next_conv.bias is not None)
            )

            # 가중치 복사 (입력 채널 차원에서 유지할 채널들만)
            new_next_conv.weight.data = next_conv.weight.data[:, keep_indices].clone()
            if next_conv.bias is not None:
                new_next_conv.bias.data = next_conv.bias.data.clone()

            # 다음 레이어 교체는 다음 반복에서 처리됨 (출력 채널 프루닝 시)
            # 여기서는 임시로 가중치만 업데이트
            model.features[next_conv_idx] = new_next_conv.to(device)

    print("프루닝 완료!")

    # 3. Classifier 재정의 (features의 출력 크기가 변했으므로)
    # 더미 입력으로 features 출력 크기 계산
    dummy_input = torch.randn(1, 3, 32, 32, device=device)  # CIFAR-10 크기로 가정
    with torch.no_grad():
        features_output = model.features(dummy_input)
        flat_dim = features_output.view(1, -1).size(1)

    # Classifier 생성 (프루닝 여부에 따라 다르게)
    if classifier_plan is not None and len(classifier_plan.get('keep_indices', [])) > 0:
        # Classifier 프루닝이 있는 경우
        print("\n" + "="*60)
        print("Classifier 프루닝 적용")
        print("="*60)

        model.classifier = apply_classifier_pruning(
            flat_dim=flat_dim,
            hidden_dim=classifier_plan['original_hidden_dim'],
            pruned_hidden_dim=classifier_plan['pruned_hidden_dim'],
            keep_indices=classifier_plan['keep_indices'],
            num_classes=num_classes,
            orig_pt=orig_pt,
            device=device
        )
    else:
        # 기존 방식: 프루닝 없이 새로운 classifier 생성
        model.classifier = nn.Sequential(
            nn.Linear(flat_dim, 512),
            nn.ReLU(True),
            nn.Dropout(),
            nn.Linear(512, 128),
            nn.ReLU(True),
            nn.Dropout(),
            nn.Linear(128, num_classes)
        ).to(device)

    print(f"새로운 classifier 입력 차원: {flat_dim}")

    # 4. Validation set 생성 (train 데이터의 10% 사용)
    # sklearn 대신 torch의 random_split 사용

    # train_dataloader의 데이터셋에서 인덱스 분할
    dataset = train_dataloader.dataset
    train_size = int(0.9 * len(dataset))
    val_size = len(dataset) - train_size

    train_subset, val_subset = torch.utils.data.random_split(
        dataset, [train_size, val_size],
        generator=torch.Generator().manual_seed(42)
    )

    train_loader_new = torch.utils.data.DataLoader(
        train_subset, batch_size=train_dataloader.batch_size,
        shuffle=True, num_workers=2
    )
    val_loader = torch.utils.data.DataLoader(
        val_subset, batch_size=train_dataloader.batch_size,
        shuffle=False, num_workers=2
    )

    print(f"Train size: {len(train_subset)}, Validation size: {len(val_subset)}")

    # 4. Fine-tuning 설정
    print("Fine-tuning 시작...")

    # SGD 옵티마이저 설정
    optimizer = optim.SGD(
        model.parameters(),
        lr=initial_learning_rate,
        weight_decay=weight_decay,
        momentum=0.9  # SGD에 모멘텀 추가
    )

    # 학습률 스케줄러 설정
    scheduler = MultiStepLR(optimizer, milestones=milestones, gamma=gamma)

    criterion = nn.CrossEntropyLoss()

    # 기록용 리스트
    train_losses, val_losses = [], []
    train_accs, val_accs = [], []


    # Fine-tuning 수행
    for epoch in range(fine_tuning_epochs):
        model.train()
        train_loss = 0.0
        train_correct = 0
        train_total = 0

        for batch_idx, (data, target) in enumerate(train_loader_new):
            data, target = data.to(device), target.to(device)

            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            _, predicted = torch.max(output.data, 1)
            train_total += target.size(0)
            train_correct += (predicted == target).sum().item()

            if batch_idx % 100 == 0:
                print(f'Epoch {epoch+1}/{fine_tuning_epochs}, '
                      f'Batch {batch_idx}, '
                      f'Loss: {loss.item():.6f}, '
                      f'LR: {scheduler.get_last_lr()[0]:.6f}')

        # Validation phase
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0

        with torch.no_grad():
            for data, target in val_loader:
                data, target = data.to(device), target.to(device)
                output = model(data)
                loss = criterion(output, target)

                val_loss += loss.item()
                _, predicted = torch.max(output.data, 1)
                val_total += target.size(0)
                val_correct += (predicted == target).sum().item()

        # 에포크별 결과 계산 및 저장
        train_loss_avg = train_loss / len(train_loader_new)
        val_loss_avg = val_loss / len(val_loader)
        train_acc = 100 * train_correct / train_total
        val_acc = 100 * val_correct / val_total

        train_losses.append(train_loss_avg)
        val_losses.append(val_loss_avg)
        train_accs.append(train_acc)
        val_accs.append(val_acc)

        print(f'Epoch {epoch+1}: Train Loss: {train_loss_avg:.4f}, Train Acc: {train_acc:.2f}%')
        print(f'           Val Loss: {val_loss_avg:.4f}, Val Acc: {val_acc:.2f}%')
        print(f'           Gap: {train_acc - val_acc:.2f}%')

        # 학습률 스케줄링
        scheduler.step()

    # 5. 평가
    print("최종 평가 중...")
    model.eval()
    test_loss = 0.0
    test_correct = 0
    test_total = 0

    with torch.no_grad():
        for data, target in test_dataloader:
            data, target = data.to(device), target.to(device)
            outputs = model(data)
            # --- 추가 시작 ---
            loss = criterion(outputs, target)
            test_loss += loss.item()
            # --- 추가 끝 ---
            _, predicted = torch.max(outputs.data, 1)
            test_total += target.size(0)
            test_correct += (predicted == target).sum().item()

    test_loss_avg = test_loss / len(test_dataloader)
    test_acc = 100 * test_correct / test_total
    print(f'최종 테스트 결과: Loss: {test_loss_avg:.4f}, Accuracy: {test_acc:.2f}%')

    # 6. 학습 곡선 플롯
    import matplotlib.pyplot as plt

    plt.figure(figsize=(12, 5))

    # Loss plot
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='Train Loss', color='blue')
    plt.plot(val_losses, label='Validation Loss', color='orange')
    plt.title('Loss Trend')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True)

    # Accuracy plot
    plt.subplot(1, 2, 2)
    plt.plot(train_accs, label='Train Accuracy', color='blue')
    plt.plot(val_accs, label='Validation Accuracy', color='orange')
    plt.title('Accuracy Trend')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy (%)')
    plt.legend()
    plt.grid(True)

    plt.tight_layout()
    plt.show()

    # Overfitting 분석
    final_gap = train_accs[-1] - val_accs[-1]
    if final_gap > 5:
        print(f"⚠️  Overfitting 감지: Train-Val gap = {final_gap:.2f}%")
    else:
        print(f"✅ 정상 학습: Train-Val gap = {final_gap:.2f}%")

    accuracy = test_acc


    # 6. 모델 저장
    torch.save({
        'model_state_dict': model.state_dict(),
        'pruning_plan': pruning_plan,
        'accuracy': accuracy,
        'training_config': {
            'fine_tuning_epochs': fine_tuning_epochs,
            'initial_learning_rate': initial_learning_rate,
            'weight_decay': weight_decay,
            'milestones': milestones,
            'gamma': gamma
        }
    }, 'vgg16_pruned_finetuned.pth')

    print("모델 저장 완료: vgg16_pruned_finetuned.pth")

    return model

def apply_classifier_pruning(flat_dim: int,
                            hidden_dim: int,
                            pruned_hidden_dim: int,
                            keep_indices: list,
                            num_classes: int,
                            orig_pt: str,
                            device):
    """
    Classifier 프루닝을 적용하여 새로운 classifier 생성

    Args:
        flat_dim: features 출력을 flatten한 차원
        hidden_dim: 원본 hidden layer 차원
        pruned_hidden_dim: 프루닝 후 hidden layer 차원
        keep_indices: 유지할 뉴런 인덱스
        num_classes: 클래스 수
        orig_pt: 원본 모델 체크포인트 경로
        device: 디바이스

    Returns:
        pruned_classifier: 프루닝된 classifier
    """
    # 원본 classifier 가중치 로드
    ckpt = torch.load(orig_pt, map_location=device)
    state_dict = ckpt['model_state_dict'] if 'model_state_dict' in ckpt else ckpt

    # classifier의 state_dict 추출
    classifier_state = {}
    for k, v in state_dict.items():
        if k.startswith('classifier.'):
            # 'classifier.' 접두사 제거
            new_key = k.replace('classifier.', '')
            classifier_state[new_key] = v

    # 원본 가중치 가져오기
    linear1_weight = classifier_state['linear1.weight']  # [hidden_dim, in_features]
    linear1_bias = classifier_state['linear1.bias']      # [hidden_dim]
    bn1_weight = classifier_state['norm1.weight']        # [hidden_dim]
    bn1_bias = classifier_state['norm1.bias']            # [hidden_dim]
    bn1_mean = classifier_state['norm1.running_mean']    # [hidden_dim]
    bn1_var = classifier_state['norm1.running_var']      # [hidden_dim]
    linear2_weight = classifier_state['linear2.weight']  # [num_classes, hidden_dim]
    linear2_bias = classifier_state['linear2.bias']      # [num_classes]

    keep_indices_tensor = torch.tensor(keep_indices, device=device)

    # 프루닝된 classifier 생성
    pruned_classifier = nn.Sequential(OrderedDict([
        ('linear1', nn.Linear(flat_dim, pruned_hidden_dim)),
        ('norm1', nn.BatchNorm1d(pruned_hidden_dim)),
        ('relu1', nn.ReLU(inplace=True)),
        ('dropout1', nn.Dropout(0.5)),
        ('linear2', nn.Linear(pruned_hidden_dim, num_classes)),
    ])).to(device)

    # linear1 가중치 적용 (출력 뉴런만 프루닝)
    # 입력 차원이 변했으므로 (features 프루닝 때문에) 초기화 후 부분 복사
    with torch.no_grad():
        # 원본 입력 차원과 새 입력 차원 비교
        orig_in_features = linear1_weight.size(1)

        if flat_dim == orig_in_features:
            # 입력 차원이 같으면 그대로 복사
            pruned_classifier.linear1.weight.data = linear1_weight[keep_indices_tensor].clone()
        else:
            # 입력 차원이 다르면 Xavier 초기화 (features가 프루닝되었을 경우)
            print(f"  경고: linear1 입력 차원 불일치 ({orig_in_features} -> {flat_dim}), 새로 초기화")
            nn.init.xavier_uniform_(pruned_classifier.linear1.weight)

        pruned_classifier.linear1.bias.data = linear1_bias[keep_indices_tensor].clone()

        # BatchNorm1d 가중치 적용
        pruned_classifier.norm1.weight.data = bn1_weight[keep_indices_tensor].clone()
        pruned_classifier.norm1.bias.data = bn1_bias[keep_indices_tensor].clone()
        pruned_classifier.norm1.running_mean.data = bn1_mean[keep_indices_tensor].clone()
        pruned_classifier.norm1.running_var.data = bn1_var[keep_indices_tensor].clone()

        # linear2 가중치 적용 (입력 뉴런만 프루닝)
        pruned_classifier.linear2.weight.data = linear2_weight[:, keep_indices_tensor].clone()
        pruned_classifier.linear2.bias.data = linear2_bias.clone()

    print(f"  Classifier 프루닝 완료: {hidden_dim} -> {pruned_hidden_dim} hidden neurons")
    print(f"  유지된 뉴런 인덱스 수: {len(keep_indices)}")

    return pruned_classifier

### 실행

In [None]:
#Conv + Classifier 함께 프루닝
pruned_model = prune_and_finetune_vgg16(
    orig_pt='model.pt',
    pruning_plan=pruning_plan,
    train_dataloader=train_loader,
    test_dataloader=test_loader,
    fine_tuning_epochs=30,
    initial_learning_rate=0.007,
    weight_decay=0.001,
    milestones=[5, 10],
    gamma=0.1,
    classifier_plan=classifier_plan  # 추가!
)

### 모델정보(프루닝 비율, 파라미터) 함수

In [None]:
## 파라미터 계산, 모델정보 출력함수

def get_model_info(model, show_classifier=True):
    """
    프루닝된 모델의 정보를 출력하는 헬퍼 함수

    Args:
        model: PyTorch 모델
        show_classifier: Classifier 정보 표시 여부
    """
    total_params = sum(p.numel() for p in model.parameters())
    trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

    print("="*70)
    print("모델 정보")
    print("="*70)
    print(f"총 파라미터 수: {total_params:,}")
    print(f"학습 가능한 파라미터 수: {trainable_params:,}")

    # Features 부분 파라미터 계산
    features_params = sum(p.numel() for p in model.features.parameters())

    # Classifier 부분 파라미터 계산
    if hasattr(model, 'classifier') and not isinstance(model.classifier, nn.Identity):
        classifier_params = sum(p.numel() for p in model.classifier.parameters())
    else:
        classifier_params = 0

    print(f"\n파라미터 분포:")
    print(f"  Features (Conv layers): {features_params:,} ({features_params/total_params*100:.1f}%)")
    print(f"  Classifier: {classifier_params:,} ({classifier_params/total_params*100:.1f}%)")

    # 각 conv 레이어의 채널 수 출력
    conv_count = 0
    print("\n각 Conv 레이어별 채널 수:")
    for i, layer in enumerate(model.features):
        if isinstance(layer, nn.Conv2d):
            conv_count += 1
            params = layer.weight.numel() + (layer.bias.numel() if layer.bias is not None else 0)
            print(f"  Conv{conv_count}: {layer.in_channels:>4} -> {layer.out_channels:>4} "
                  f"(파라미터: {params:,})")

    # Classifier 정보 출력
    if show_classifier and hasattr(model, 'classifier') and not isinstance(model.classifier, nn.Identity):
        print("\nClassifier 구조:")
        for name, layer in model.classifier.named_children():
            if isinstance(layer, nn.Linear):
                params = layer.weight.numel() + (layer.bias.numel() if layer.bias is not None else 0)
                print(f"  {name}: {layer.in_features:>5} -> {layer.out_features:>5} "
                      f"(파라미터: {params:,})")
            elif isinstance(layer, nn.BatchNorm1d):
                params = layer.weight.numel() + layer.bias.numel()
                print(f"  {name}: {layer.num_features} features (파라미터: {params:,})")


def calculate_pruning_ratios(pruning_plan, classifier_plan=None, original_model=None, pruned_model=None):
    """
    프루닝 비율 계산 및 출력

    Args:
        pruning_plan: Conv 레이어 프루닝 계획
        classifier_plan: Classifier 프루닝 계획 (optional)
        original_model: 원본 모델 (optional, 파라미터 감소량 계산용)
        pruned_model: 프루닝된 모델 (optional, 파라미터 감소량 계산용)
    """
    print("\n" + "="*70)
    print("프루닝 비율 정보")
    print("="*70)

    # Conv 레이어 프루닝 정보
    print("\n[Conv 레이어 프루닝]")
    total_original_channels = 0
    total_pruned_channels = 0

    for conv_name, info in pruning_plan.items():
        original_channels = info['original_channels']
        pruned_channels = info['pruned_channels']
        ratio = info.get('pruning_ratio_actual', (original_channels - pruned_channels) / original_channels)

        total_original_channels += original_channels
        total_pruned_channels += pruned_channels

        print(f"  {conv_name}: {original_channels:>3} -> {pruned_channels:>3} "
              f"(프루닝 비율: {ratio:>5.1%})")

    overall_conv_ratio = (total_original_channels - total_pruned_channels) / total_original_channels
    print(f"\n  전체 Conv 채널 프루닝: {total_original_channels} -> {total_pruned_channels} "
          f"({overall_conv_ratio:.1%})")

    # Classifier 프루닝 정보
    if classifier_plan is not None and len(classifier_plan.get('keep_indices', [])) > 0:
        print("\n[Classifier 프루닝]")
        original_hidden = classifier_plan['original_hidden_dim']
        pruned_hidden = classifier_plan['pruned_hidden_dim']
        ratio = classifier_plan['pruning_ratio_actual']

        print(f"  Hidden Layer: {original_hidden:>5} -> {pruned_hidden:>5} neurons "
              f"(프루닝 비율: {ratio:>5.1%})")

    # 전체 파라미터 감소량 계산
    if original_model is not None and pruned_model is not None:
        print("\n" + "-"*70)
        print("[전체 파라미터 감소량]")

        # Features 파라미터
        orig_features_params = sum(p.numel() for p in original_model.features.parameters())
        pruned_features_params = sum(p.numel() for p in pruned_model.features.parameters())
        features_reduction = (orig_features_params - pruned_features_params) / orig_features_params

        print(f"  Features: {orig_features_params:,} -> {pruned_features_params:,} "
              f"({features_reduction:.1%} 감소)")

        # Classifier 파라미터
        if hasattr(original_model, 'classifier') and hasattr(pruned_model, 'classifier'):
            if not isinstance(original_model.classifier, nn.Identity):
                orig_classifier_params = sum(p.numel() for p in original_model.classifier.parameters())
                pruned_classifier_params = sum(p.numel() for p in pruned_model.classifier.parameters())
                classifier_reduction = (orig_classifier_params - pruned_classifier_params) / orig_classifier_params

                print(f"  Classifier: {orig_classifier_params:,} -> {pruned_classifier_params:,} "
                      f"({classifier_reduction:.1%} 감소)")

        # 전체
        orig_total_params = sum(p.numel() for p in original_model.parameters())
        pruned_total_params = sum(p.numel() for p in pruned_model.parameters())
        total_reduction = (orig_total_params - pruned_total_params) / orig_total_params

        print(f"\n  전체: {orig_total_params:,} -> {pruned_total_params:,} "
              f"({total_reduction:.1%} 감소)")
        print(f"  압축률: {pruned_total_params/orig_total_params:.2%} (원본의 {pruned_total_params/orig_total_params:.1%})")


# 사용 예시
"""
# 1. 프루닝 전 모델 정보
print("원본 모델:")
get_model_info(original_model)

# 2. 프루닝 후 모델 정보
print("\n프루닝된 모델:")
get_model_info(pruned_model)

# 3. 프루닝 비율 상세 정보 (Conv만)
calculate_pruning_ratios(pruning_plan)

# 4. 프루닝 비율 상세 정보 (Conv + Classifier)
calculate_pruning_ratios(
    pruning_plan=pruning_plan,
    classifier_plan=classifier_plan,
    original_model=original_model,
    pruned_model=pruned_model
)
"""

In [None]:

# 1. 프루닝 전 모델 정보
print("원본 모델:")
get_model_info(original_model)

# 2. 프루닝 후 모델 정보
print("\n프루닝된 모델:")
get_model_info(pruned_model)

# 4. 프루닝 비율 상세 정보 (Conv + Classifier)
calculate_pruning_ratios(
    pruning_plan=pruning_plan,
    classifier_plan=classifier_plan,
    original_model=original_model,
    pruned_model=pruned_model
)


##6. 시각자료 모음