In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class AdaRoundQuantizer(nn.Module):
    def __init__(self, weight, scale, delta=1.1, gamma=-0.1):
        super().__init__()
        # delta, gamma : 시그모이드 극값 조절
        # 극값을 늘려서 미분이 0이 되지 않게 계속 학습 되도록
        self.delta = delta
        self.gamma = gamma
        self.scale = scale

        # V 초기화 (학습 파라미터)
        # FP32 가중치의 소수점 값으로 초기화
        # h(v) = rest의 역함수를 사용
        w_floor = torch.floor(weight / scale)
        rest = (weight / scale) - w_floor

        rest = torch.clamp(rest, 0.01, 0.99)

        sig_inv = (rest - gamma) / (delta - gamma)
        sig_inv = torch.clamp(sig_inv, 0.01, 0.99)

        init_v = torch.log(sig_inv / (1 - sig_inv))

        self.V = nn.Parameter(init_v)

    def rectified_sigmoid(self):
        # Equation 23
        x = torch.clamp(torch.sigmoid(self.V) * (self.delta - self.gamma) + self.gamma, 0, 1)

        return x

    def forward(self, weight, scale, zero_points=0):

        w_floor = torch.floor(weight / scale + zero_points)

        # 반올림 결정 부분
        w_soft = w_floor + self.rectified_sigmoid()
        # 역양자화
        w_dequant = scale * (w_soft - zero_points)

        return w_dequant

In [None]:
class AdaRoundLoss(nn.Module):
    def __init__(self, weight_decay=1e-4):
        super().__init__()
        self.weight_decay = weight_decay # lambda

    def compute_reg_loss(self, h_V, beta):
        # 2 * h_V - 1 : 0.5 보다 커지면 1에 가까워진다.
        # beta가 크면 1보다 작은 경우 0에 확 가까워짐
        reg_loss = torch.sum(1 - torch.pow(torch.abs(2 * h_V - 1), beta))
        return reg_loss

    def forward(self, current_out, orig_out, h_V, beta):

        mse_loss = F.mse_loss(current_out, orig_out)

        reg_loss = self.compute_reg_loss(h_V, beta)

        total_loss = mse_loss + self.weight_decay * reg_loss
        # mse니깐 L2 정규화를 생각해보면 보통 람다를 0.9를 주는데 어떤 비율로 조정해야할까?
        # 논문에서는 frobenius norm을 사용한다. 이거는 제곱의 합이므로 값이 크다. 따라서 조정이 필요한데
        # mse에 reduction = 'sum'을 해주던가 정규화 쪽을 파라미터 개수로 나누던가 람다를 매우 작게 잡는다
        # 초반에는 mse가 중요하니깐 재구성 손실과 비슷하게 맞춘다 보통 1e-4 ~ 1e-2

        return total_loss

In [None]:
def get_quantization_params(weight, bits=4):

    # 4-bit의 경우 범위: -8 ~ 7
    q_max = 2**(bits - 1) - 1 # 7
    q_min = -2**(bits - 1)    # -8

    max_val = torch.max(torch.abs(weight))
    scale = max_val / q_max

    # zero-point: symmetric
    zero_point = 0

    return scale, zero_point

In [None]:
from torch.optim import Adam
import numpy as np

def train_adaround(model_layer, input_data, orig_output, quantizer, n_iter=2000):

    # Scale과 ZeroPoint 미리 계산
    scale, zero_point = get_quantization_params(model_layer.weight)
    scale = scale.to(model_layer.weight.device)

    # Optimizer 설정 (Parameter V만 학습)
    optimizer = Adam([quantizer.V], lr=1e-3)

    loss_func = AdaRoundLoss(weight_decay=1e-4)

    # Beta Scheduling (20 -> 2 로 감소)
    beta_scheduler = np.linspace(20, 2, n_iter)

    print("Start AdaRound Training...")

    for i in range(n_iter):
        beta = beta_scheduler[i]

        optimizer.zero_grad()

        # Soft Quantized Weight 생성
        # 여기서 V가 학습되면서 반올림 여부가 결정됨
        w_soft_quant = quantizer(model_layer.weight, scale, zero_point)

        # Forward Pass
        # 원래 weight를 백업하고, quantized weight를 덮어씌워서 연산
        saved_weight = model_layer.weight.data.clone()
        model_layer.weight.data = w_soft_quant

        # 입력 데이터를 넣어 출력 계산
        current_out = model_layer(input_data)

        # weight 복구
        model_layer.weight.data = saved_weight

        # Loss 계산
        h_V = quantizer.rectified_sigmoid()
        loss = loss_func(current_out, orig_output, h_V, beta)

        loss.backward()
        optimizer.step()

        if i % 500 == 0:
            print(f"Iter {i}: Loss {loss.item():.4f}, Beta {beta:.2f}")

    # Final Hard Rounding
    with torch.no_grad():
        # h(V)가 0.5 이상이면 1(올림), 아니면 0(내림)이 되도록 처리 필요하지만,
        # AdaRound 논문에서는 학습된 V를 이용해 soft quantization 식을 그대로 쓰되
        # h(V)가 거의 0 또는 1로 수렴했으므로 결과적으로 Hard Rounding이 됨.
        # 명확하게 하기 위해 마지막엔 round()를 한 번 더 씌우기도 함.
        final_w_quant = quantizer(model_layer.weight, scale, zero_point)

    return final_w_quant

In [None]:
class DataSaverHook:
    def __init__(self, store_input=False, store_output=False):
        self.store_input = store_input
        self.store_output = store_output
        self.inputs = []
        self.outputs = []

    def hook_fn(self, module, input_t, output_t):
        if self.store_input:
            # detach()를 해서 그래디언트 연결을 끊고, cpu()로 옮겨 메모리를 아끼는 것이 좋다고함.
            self.inputs.append(input_t[0].detach().cpu())

        if self.store_output:
            self.outputs.append(output_t.detach().cpu())

In [None]:
from torch.utils.data import DataLoader

def get_layer_inputs_and_outputs(model, target_layer, calib_dataloader, device='cuda'):
    # Hook 등록
    data_saver = DataSaverHook(store_input=True, store_output=True)
    handle = target_layer.register_forward_hook(data_saver.hook_fn)

    model.eval()
    model.to(device)

    print(f"Collecting calibration data for layer: {target_layer}...")

    # 모델 전체 Forward Pass
    with torch.no_grad():
        for inputs, _ in calib_dataloader:
            inputs = inputs.to(device)
            model(inputs) #데이터를 수집함

    # Hook 제거
    handle.remove()

    # 수집된 데이터를 하나의 텐서로 합치기
    all_inputs = torch.cat(data_saver.inputs, dim=0).to(device)
    all_outputs = torch.cat(data_saver.outputs, dim=0).to(device)

    print(f"Data Collected! Input shape: {all_inputs.shape}, Output shape: {all_outputs.shape}")

    return all_inputs, all_outputs

In [None]:
import torch
import torch.nn as nn

def fold_conv_bn(conv, bn):
    """
    Conv2d와 BatchNorm2d를 입력받아, BN 파라미터를 Conv에 흡수(Folding)시킵니다.
    """
    # BN 파라미터 가져오기
    # running_mean, running_var, weight(gamma), bias(beta)
    mu = bn.running_mean
    var = bn.running_var
    gamma = bn.weight
    beta = bn.bias
    eps = bn.eps

    # Conv 파라미터 가져오기
    W = conv.weight
    b = conv.bias if conv.bias is not None else torch.zeros_like(mu)

    # 분모 계산: sigma = sqrt(var + eps)
    denom = torch.sqrt(var + eps)

    # Scale Factor 계산: gamma / sigma
    scale_factor = gamma / denom


    # 새로운 Weight 계산: W * scale
    # view(-1, 1, 1, 1)은 (Channel, 1, 1, 1)로 만들어 Broadcasting을 가능하게 함
    W_new = W * scale_factor.view(-1, 1, 1, 1)

    # 새로운 Bias 계산: beta + (b - mu) * scale
    b_new = beta + (b - mu) * scale_factor

    # Conv 레이어 업데이트
    conv.weight.data.copy_(W_new)


    # Conv에 바이어스가 없었다면 새로 만들어줘야 함
    if conv.bias is None:
        conv.bias = nn.Parameter(b_new)
    else:
        conv.bias.data.copy_(b_new)


    # 5. BN 레이어는 이제 필요 없으므로 Identity로 대체 (아무 일도 안 하는 레이어)
    return nn.Identity()

In [None]:
def fuse_resnet_model(model):

    model.eval() #eval 모드여야 running_mean/var가 고정됨

    # 첫 번째 레이어 (Conv1 + BN1)
    if hasattr(model, 'bn1') and not isinstance(model.bn1, nn.Identity):
        print("Fusing model.conv1 and model.bn1...")
        model.bn1 = fold_conv_bn(model.conv1, model.bn1)


    # ResNet Layer 1~4 내부의 BasicBlock 순회
    # layer -> block -> (conv1+bn1), (conv2+bn2)
    for layer_name in ['layer1', 'layer2', 'layer3', 'layer4']:
        layer = getattr(model, layer_name)
        for i, block in enumerate(layer):
            print(f"Fusing {layer_name} Block {i}...")

            # Block 내 첫 번째 쌍
            block.bn1 = fold_conv_bn(block.conv1, block.bn1)

            # Block 내 두 번째 쌍
            block.bn2 = fold_conv_bn(block.conv2, block.bn2)

            # Downsample 레이어가 있는 경우 (conv+bn)
            if block.downsample is not None:
                # downsample[0]은 Conv, downsample[1]은 BN
                block.downsample[1] = fold_conv_bn(block.downsample[0], block.downsample[1])


    print("BN Folding Complete!")
    return model

In [None]:
import torchvision.models as models

model = models.resnet18(pretrained=True)
model.eval()

q_model = copy.deepcopy(model)
q_model.eval()
q_model.to(deivce)
q_model = fuse_resnet_model(q_model)

# 더미 데이터
dummy_data = torch.randn(1024, 3, 224, 224)
dataset = torch.utils.data.TensorDataset(dummy_data, torch.zeros(1024))
calib_loader = DataLoader(dataset, batch_size=32, shuffle=False)

device = 'cuda' if torch.cuda.is_available() else 'cpu'

for name, module in q_model.named_modules():
    if isinstance(module, (nn.Conv2d, nn.Linear)):
        print(f"Quantizer Layer : {name}")

        input_data, orig_output = get_layer_inputs_and_outputs(model, module, calib_loader, device)

        scale, zero_point = get_quantization_params(module.weight)
        quantizer = AdaRoundQuantizer(module.weight, scale)
        quantizer.to(device)

        final_quantized_weight = train_adaround(
            model_layer=module,
            input_data=input_data,
            orig_output=orig_output,
            quantizer=quantizer,
            n_iter=2000
        )

        module.weight.data = final_quantized_weight
        print("Quantization Complete for this layer!")

        break

Quantizer Layer : conv1
Collecting calibration data for layer: Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)...
Data Collected! Input shape: torch.Size([1024, 3, 224, 224]), Output shape: torch.Size([1024, 64, 112, 112])
Start AdaRound Training...
Iter 0: Loss 0.8143, Beta 20.00
Iter 500: Loss 0.4458, Beta 15.50


KeyboardInterrupt: 