In [1]:
import torch
import numpy as np
import onnx
import os
from collections import OrderedDict
from copy import deepcopy

class YOLOWeightOptimizer:
    """가중치 최적화"""
    def __init__(self):
        # APACHE5 NPU 파라미터
        self.TILE_SIZE = 64
        self.CELL_SIZE = 8
        self.WFRAM_SIZE = 64 * 1024  # 64KB
        self.MAX_BANDWIDTH = 64 * (1024**3) / 8  # 64Gbit/s to Byte/s
        self.INT8_MACS = 1024
        self.GMAC_CAPACITY = 819.2

        # 타일링 관련 상수
        self.TILE_SQR = self.TILE_SIZE * self.TILE_SIZE
        self.CELL_SQR = self.CELL_SIZE * self.CELL_SIZE
        self.CELL_TWI = 2 * self.CELL_SIZE
        self.CELL_QUA = 4 * self.CELL_SIZE

        # DFL 및 특수 레이어 이름
        self.DFL_LAYERS = ['model.24.dfl.conv']
        self.CV3_LAYERS = ['model.24.cv3.0.2', 'model.24.cv3.1.2', 'model.24.cv3.2.2']

    def _align_channels(self, channels):
        """채널 수를 CELL_SIZE(8)의 배수로 조정"""
        return ((channels + self.CELL_SIZE - 1) // self.CELL_SIZE) * self.CELL_SIZE

    def _scale_for_bandwidth(self, weight):
        """메모리 대역폭 제약 고려한 스케일링"""
        memory_access = np.prod(weight.shape) * 4  # float32 기준
        
        if memory_access * self.GMAC_CAPACITY > self.MAX_BANDWIDTH:
            scale_factor = self.MAX_BANDWIDTH / (memory_access * self.GMAC_CAPACITY)
            weight = weight * scale_factor
        
        return weight

    def optimize_weights(self, model):
        """가중치 최적화"""
        optimized_state_dict = OrderedDict()
        
        for name, param in model.state_dict().items():
            if 'conv' in name and 'weight' in name:
                print(f"Optimizing conv weights: {name}")
                try:
                    optimized_weight = self._optimize_conv_weight(param, name)
                    optimized_state_dict[name] = optimized_weight
                except Exception as e:
                    print(f"Warning: {name} 레이어 최적화 중 오류 발생: {str(e)}")
                    optimized_state_dict[name] = param
            else:
                optimized_state_dict[name] = param

        return optimized_state_dict

    def _optimize_conv_weight(self, weight, name):
        """Convolution 레이어 가중치 최적화"""
        # DFL 레이어는 건너뛰기
        if any(dfl_name in name for dfl_name in self.DFL_LAYERS):
            return weight
            
        out_channels, in_channels, kernel_h, kernel_w = weight.shape
        
        # 첫 번째 레이어 또는 cv3 레이어 특별 처리
        if in_channels == 3 or any(cv3_name in name for cv3_name in self.CV3_LAYERS):
            return self._optimize_special_layers(weight, name)
            
        # 기본 채널 정렬
        aligned_out = self._align_channels(out_channels)
        aligned_in = self._align_channels(in_channels)
        
        # 가중치 재구성
        new_weight = torch.zeros(aligned_out, aligned_in, kernel_h, kernel_w)
        
        # 실제 복사할 크기 계산
        copy_out = min(out_channels, aligned_out)
        copy_in = min(in_channels, aligned_in)
        
        # 안전하게 가중치 복사
        new_weight[:copy_out, :copy_in, :, :] = weight[:copy_out, :copy_in, :, :]
        
        # 새로운 영역 초기화
        if aligned_out > copy_out or aligned_in > copy_in:
            scaling = np.sqrt(2.0 / (aligned_in * kernel_h * kernel_w))
            mask = torch.zeros_like(new_weight)
            mask[copy_out:, :, :, :] = 1
            mask[:, copy_in:, :, :] = 1
            
            # Xavier/Glorot 초기화 사용
            limit = np.sqrt(6 / ((copy_out + aligned_out) * kernel_h * kernel_w))
            rand_weight = torch.empty_like(new_weight).uniform_(-limit, limit)
            new_weight = new_weight * (1 - mask) + rand_weight * mask
        
        return self._scale_for_bandwidth(new_weight)

    def _optimize_special_layers(self, weight, name):
        """특수 레이어 (첫 번째 레이어, cv3 레이어) 최적화"""
        if any(cv3_name in name for cv3_name in self.CV3_LAYERS):
            return self._optimize_cv3_layer(weight)
        elif weight.shape[1] == 3:  # 첫 번째 레이어
            return self._optimize_first_layer(weight)
        return weight

    def _optimize_first_layer(self, weight):
        """첫 번째 레이어 최적화"""
        out_channels, in_channels, kernel_h, kernel_w = weight.shape
        
        # 출력 채널만 8의 배수로 조정
        aligned_out = self._align_channels(out_channels)
        new_weight = torch.zeros(aligned_out, in_channels, kernel_h, kernel_w)
        
        # 가중치 복사
        copy_out = min(out_channels, aligned_out)
        new_weight[:copy_out, :, :, :] = weight[:copy_out, :, :, :]
        
        # 새로운 채널 초기화
        if aligned_out > copy_out:
            scaling = np.sqrt(2.0 / (in_channels * kernel_h * kernel_w))
            new_weight[copy_out:, :, :, :] = torch.randn_like(new_weight[copy_out:, :, :, :]) * scaling
            
        return new_weight

    def _optimize_cv3_layer(self, weight):
        """cv3 레이어 최적화"""
        out_channels, in_channels, kernel_h, kernel_w = weight.shape
        
        # 입력 채널만 8의 배수로 조정 (출력은 원래 크기 유지)
        aligned_in = self._align_channels(in_channels)
        new_weight = torch.zeros(out_channels, aligned_in, kernel_h, kernel_w)
        
        # 실제 복사할 크기 계산
        copy_in = min(in_channels, aligned_in)
        
        # 안전하게 가중치 복사
        new_weight[:, :copy_in, :, :] = weight[:, :copy_in, :, :]
        
        # 새로운 채널 초기화
        if aligned_in > copy_in:
            scaling = np.sqrt(2.0 / (aligned_in * kernel_h * kernel_w))
            new_weight[:, copy_in:, :, :] = torch.randn_like(new_weight[:, copy_in:, :, :]) * scaling

        return new_weight

class YOLOStructureOptimizer:
    """F-LAM 레이어 구조 최적화"""
    def __init__(self):
        self.TILE_SIZE = 64
        self.WFRAM_SIZE = 64 * 1024
        self.TILE_SQR = self.TILE_SIZE * self.TILE_SIZE

    def optimize_structure(self, model):
        """F-LAM 레이어 구조 최적화"""
        optimized_model = deepcopy(model)
        
        # 원본 forward 메소드 저장
        original_forward = optimized_model.forward
        
        def unified_forward(self, x):
            # 원본 forward 실행
            outputs = original_forward(x)
            # 단일 출력만 반환
            if isinstance(outputs, (list, tuple)):
                return outputs[0]  # 첫 번째 출력만 반환
            return outputs
        
        # 새로운 forward 메소드 설정
        optimized_model.forward = unified_forward.__get__(optimized_model)
        
        # F-LAM 레이어 최적화
        for name, module in optimized_model.named_modules():
            # Concat 최적화
            if isinstance(module, torch.nn.modules.container.Sequential):
                for layer in module:
                    if hasattr(layer, 'f') and isinstance(layer.f, (list, int)):
                        print(f"Optimizing Concat layer: {name}")
                        self._optimize_concat_layer(layer)
            
            # Add 최적화
            if hasattr(module, 'add'):
                print(f"Optimizing Add operation: {name}")
                self._optimize_add_operation(module)
            
            # Upsample 최적화
            if isinstance(module, torch.nn.Upsample):
                print(f"Optimizing Upsample layer: {name}")
                self._optimize_upsample_layer(module)
            
            # MaxPool2d 최적화
            if isinstance(module, torch.nn.MaxPool2d):
                print(f"Optimizing MaxPool2d layer: {name}")
                self._optimize_maxpool_layer(module)

        return optimized_model

    def _optimize_flam_output(self, x):
        """F-LAM 출력 크기 최적화"""
        if x is None:
            return None
            
        total_size = np.prod(x.shape[2:])
        if total_size > self.TILE_SQR:
            scale = np.sqrt(self.TILE_SQR / total_size)
            return torch.nn.functional.interpolate(
                x,
                scale_factor=scale,
                mode='bilinear',
                align_corners=False
            )
        return x

    def _optimize_concat_layer(self, layer):
        original_forward = layer.forward
        optimizer = self
        
        def optimized_forward(self, x):
            if isinstance(x, list):
                x = [optimizer._optimize_flam_output(f) for f in x]
            return original_forward(x)
        
        layer.forward = optimized_forward.__get__(layer)
        
    def _optimize_add_operation(self, module):
        original_forward = module.forward
        optimizer = self
        
        def optimized_forward(self, x):
            if isinstance(x, tuple):
                x = tuple(optimizer._optimize_flam_output(t) for t in x)
            return original_forward(x)
        
        module.forward = optimized_forward.__get__(module)

    def _optimize_upsample_layer(self, module):
        original_forward = module.forward
        optimizer = self
        
        def optimized_forward(self, x):
            x = optimizer._optimize_flam_output(x)
            result = original_forward(x)
            return optimizer._optimize_flam_output(result)
        
        module.forward = optimized_forward.__get__(module)

    def _optimize_maxpool_layer(self, module):
        original_forward = module.forward
        optimizer = self
        
        def optimized_forward(self, x):
            x = optimizer._optimize_flam_output(x)
            result = original_forward(x)
            return optimizer._optimize_flam_output(result)
        
        module.forward = optimized_forward.__get__(module)

def optimize_yolo_model(input_pt_path, output_onnx_path, intermediate_onnx_path=None):
    """YOLO 모델 최적화"""
    try:
        # 현재 작업 디렉토리 확인 및 출력
        current_dir = os.getcwd()
        print(f"현재 작업 디렉토리: {current_dir}")
        
        # 입력 파일 확인
        if not os.path.exists(input_pt_path):
            print(f"입력 파일을 찾을 수 없음: {input_pt_path}")
            raise FileNotFoundError(f"입력 모델을 찾을 수 없습니다: {input_pt_path}")
        else:
            print(f"입력 파일 찾음: {input_pt_path}")
        
        # 출력 디렉토리 생성
        os.makedirs(os.path.dirname(output_onnx_path) or '.', exist_ok=True)
        if intermediate_onnx_path:
            os.makedirs(os.path.dirname(intermediate_onnx_path) or '.', exist_ok=True)

        print("1단계: 가중치 최적화 시작...")
        model = torch.load(input_pt_path, map_location='cpu')['model'].float()
        model.eval()
        
        # 1단계: 가중치 최적화
        weight_optimizer = YOLOWeightOptimizer()
        optimized_weights = weight_optimizer.optimize_weights(model)
        model.load_state_dict(optimized_weights)
        
        # 중간 ONNX 저장 (가중치만 최적화된 상태)
        if intermediate_onnx_path:
            print(f"중간 ONNX 저장 중: {intermediate_onnx_path}")
            dummy_input = torch.randn(1, 3, 384, 640)
            torch.onnx.export(
                model,
                dummy_input,
                intermediate_onnx_path,
                verbose=False,
                opset_version=12,
                input_names=['images'],
                output_names=['output']
            )

        print("2단계: F-LAM 레이어 구조 최적화 시작...")
        # 2단계: 구조 최적화
        structure_optimizer = YOLOStructureOptimizer()
        final_model = structure_optimizer.optimize_structure(model)
        
        # 최종 ONNX 저장
        print(f"최종 ONNX 저장 중: {output_onnx_path}")
        dummy_input = torch.randn(1, 3, 384, 640)
        torch.onnx.export(
            final_model,
            dummy_input,
            output_onnx_path,
            verbose=False,
            opset_version=12,
            input_names=['images'],
            output_names=['output'],
            dynamic_axes=None,  # dynamic axes 비활성화
        )
        
        print("최적화가 성공적으로 완료되었습니다!")
        return True
        
    except Exception as e:
        print(f"최적화 중 오류 발생: {str(e)}")
        import traceback
        traceback.print_exc()
        return False

# 실행 예시
if __name__ == "__main__":
    # 경로 설정
    input_pt_path = "yolo_50_50.pt"
    output_onnx_path = "yolo_50_50_optimized_model_test.onnx"
    intermediate_onnx_path = "yolo_50_50_weight_optimized_test.onnx"
    
    # 현재 경로 출력
    current_dir = os.getcwd()
    print(f"현재 작업 디렉토리: {current_dir}")
    print(f"입력 파일 전체 경로: {os.path.abspath(input_pt_path)}")
    
    # 최적화 실행
    try:
        success = optimize_yolo_model(
            input_pt_path=input_pt_path,
            output_onnx_path=output_onnx_path,
            intermediate_onnx_path=intermediate_onnx_path
        )
        
        if success:
            print("모든 최적화 단계가 완료되었습니다.")
        else:
            print("최적화 중 오류가 발생했습니다.")
            
    except Exception as e:
        print(f"실행 중 오류 발생: {str(e)}")

현재 작업 디렉토리: /home/jovyan/yolov5_quant_sample/npu_optimizer
입력 파일 전체 경로: /home/jovyan/yolov5_quant_sample/npu_optimizer/yolo_50_50.pt
현재 작업 디렉토리: /home/jovyan/yolov5_quant_sample/npu_optimizer
입력 파일 찾음: yolo_50_50.pt
1단계: 가중치 최적화 시작...


  model = torch.load(input_pt_path, map_location='cpu')['model'].float()


Optimizing conv weights: model.0.conv.weight
Optimizing conv weights: model.1.conv.weight
Optimizing conv weights: model.2.cv1.conv.weight
Optimizing conv weights: model.2.cv2.conv.weight
Optimizing conv weights: model.2.cv3.conv.weight
Optimizing conv weights: model.2.m.0.cv1.conv.weight
Optimizing conv weights: model.2.m.0.cv2.conv.weight
Optimizing conv weights: model.2.m.1.cv1.conv.weight
Optimizing conv weights: model.2.m.1.cv2.conv.weight
Optimizing conv weights: model.3.conv.weight
Optimizing conv weights: model.4.cv1.conv.weight
Optimizing conv weights: model.4.cv2.conv.weight
Optimizing conv weights: model.4.cv3.conv.weight
Optimizing conv weights: model.4.m.0.cv1.conv.weight
Optimizing conv weights: model.4.m.0.cv2.conv.weight
Optimizing conv weights: model.4.m.1.cv1.conv.weight
Optimizing conv weights: model.4.m.1.cv2.conv.weight
Optimizing conv weights: model.4.m.2.cv1.conv.weight
Optimizing conv weights: model.4.m.2.cv2.conv.weight
Optimizing conv weights: model.5.conv.wei

  if self.dynamic or self.shape != shape:
  for i, stride in enumerate(strides):


2단계: F-LAM 레이어 구조 최적화 시작...
Optimizing Concat layer: model
Optimizing Concat layer: model
Optimizing Concat layer: model
Optimizing Concat layer: model
Optimizing Concat layer: model
Optimizing Concat layer: model
Optimizing Concat layer: model
Optimizing Concat layer: model
Optimizing Concat layer: model
Optimizing Concat layer: model
Optimizing Concat layer: model
Optimizing Concat layer: model
Optimizing Concat layer: model
Optimizing Concat layer: model
Optimizing Concat layer: model
Optimizing Concat layer: model
Optimizing Concat layer: model
Optimizing Concat layer: model
Optimizing Concat layer: model
Optimizing Concat layer: model
Optimizing Concat layer: model
Optimizing Concat layer: model
Optimizing Concat layer: model
Optimizing Concat layer: model
Optimizing Concat layer: model
Optimizing Add operation: model.2.m.0
Optimizing Add operation: model.2.m.1
Optimizing Add operation: model.4.m.0
Optimizing Add operation: model.4.m.1
Optimizing Add operation: model.4.m.2
Optimiz

  return ufunc.reduce(obj, axis, dtype, out, **passkwargs)


최적화가 성공적으로 완료되었습니다!
모든 최적화 단계가 완료되었습니다.
