In [1]:
from dataset import DataManager, make_dataset
import numpy as np
from torch.utils.data import Subset
data_manger = DataManager(train_dataset='mnist',
                              test_dataset='usps',
                              src_prior=0.5,
                              tgt_prior=0.5)
    

pos_dataset, unl_dataset, val_dataset, test_dataset = data_manger.get_data(merge=False)
pd = make_dataset(pos_dataset, 'mnist', batch_size=128, role='weak_train')



In [2]:
pd.dataset.data.shape

(6000, 28, 28)

In [None]:
def binarize_dataset(
    features: np.ndarray,
    targets: np.ndarray,
    pos_class: List,
    neg_class: List = None,
    num_labeled: int = 0,
    num_unlabeled: int = None,
    prior: float = None
):
    # Positive and Negative data indices
    p_data_idx = np.where(np.isin(targets, pos_class))[0]
    n_data_idx = np.where(np.isin(targets, neg_class) if neg_class else
                          np.isin(targets, pos_class, invert=True))[0]

    # Obtain labeled positive data 
    num_pos_labeled_per_cls = int(num_labeled / len(pos_class))
    p_ix = []
    for cls in pos_class:
        p_cls = np.where(np.isin(targets, cls))[0]
        p_ix.extend(np.random.choice(a=p_cls, size=num_pos_labeled_per_cls, replace=False))
    p_data = features[p_ix]

    # y_true: pos -> 1, neg -> -1
    p_labels_true = np.ones(len(p_data), dtype=int)   # labeled pos ground-truth = 1
    p_labels = np.ones(len(p_data), dtype=int)        # observed label = 1

    # Obtain unlabeled data
    if num_unlabeled and prior:
        n_pu = int(prior * num_unlabeled)
        n_nu = num_unlabeled - n_pu
        pu_ix = np.random.choice(a=p_data_idx, size=n_pu,
                                 replace=False if n_pu <= len(p_data_idx) else True)
        nu_ix = np.random.choice(a=n_data_idx, size=n_nu,
                                 replace=False if n_nu <= len(n_data_idx) else True)
        u_ix = np.concatenate((pu_ix, nu_ix), axis=0)
        u_data = features[u_ix]

        # ground-truth
        y_true = np.concatenate((np.ones(len(pu_ix), dtype=int), -np.ones(len(nu_ix), dtype=int)))
        # observed unlabeled
        y = np.zeros(len(u_data), dtype=int)

    else:
        remaining_p_ix = np.setdiff1d(p_data_idx, p_ix)
        u_ix = np.concatenate((remaining_p_ix, n_data_idx), axis=0)
        u_data = features[u_ix]

        # ground-truth
        y_true = np.concatenate((np.ones(len(remaining_p_ix), dtype=int), -np.ones(len(n_data_idx), dtype=int)))
        # observed unlabeled
        y = np.zeros(len(u_data), dtype=int)

    # Create final PU dataset
    features = np.concatenate((p_data, u_data), axis=0)
    y_true = np.concatenate((p_labels_true, y_true), axis=0)  # 1 for pos, -1 for neg
    y = np.concatenate((p_labels, y), axis=0)                 # 1 for labeled pos, 0 for unl

    # Shuffle and return
    features, y_true, y = shuffle(features, y_true, y, random_state=0)
    return features, y_true, y


In [None]:
def binarize_dataset(
    features: np.ndarray,
    targets: np.ndarray,
    pos_class: List,
    neg_class: List=None,
    num_labeled: int=0,
    num_unlabeled: int=None,
    prior: float=None
):
    # Positive and Negative data indices
    p_data_idx = np.where(np.isin(targets, pos_class))[0]
    n_data_idx = np.where(np.isin(targets, neg_class) if neg_class else
                          np.isin(targets, pos_class, invert=True))[0]

    # Obtain labeled positive data 
    num_pos_labeled_per_cls = int(num_labeled / len(pos_class))
    p_ix = []
    for cls in pos_class:
        p_cls = np.where(np.isin(targets, cls))[0]
        p_ix.extend(np.random.choice(a=p_cls, size=num_pos_labeled_per_cls, replace=False))
    p_data = features[p_ix]
    p_labels = np.ones(len(p_data), dtype=targets.dtype)

    # Obtain unlabeled data
    if num_unlabeled and prior:
        n_pu = int(prior * num_unlabeled)
        n_nu = num_unlabeled - n_pu
        pu_ix = np.random.choice(a=p_data_idx, size=n_pu,
                                 replace=False if n_pu <= len(p_data_idx) else True)
        nu_ix = np.random.choice(a=n_data_idx, size=n_nu,
                                 replace=False if n_nu <= len(n_data_idx) else True)
        u_ix = np.concatenate((pu_ix, nu_ix), axis=0)
        u_data = features[u_ix]

        y_true = np.concatenate((np.ones(len(pu_ix), dtype=int), -np.ones(len(nu_ix), dtype=int)))
        y = np.zeros(len(u_data), dtype=int)
    else:
        remaining_p_ix = np.setdiff1d(p_data_idx, p_ix)
        u_ix = np.concatenate((remaining_p_ix, n_data_idx), axis=0)
        u_data = features[u_ix]

        y_true = np.concatenate((np.ones(len(remaining_p_ix), dtype=int), -np.ones(len(n_data_idx), dtype=int)))
        y = np.zeros(len(u_data), dtype=int)



    # Create final PU dataset
    features = np.concatenate((p_data, u_data), axis=0)
    y_true = np.concatenate((np.ones(len(p_data), dtype=int), y_true), axis=0)
    y = np.concatenate((p_labels, y), axis=0)

    # Shuffle and return
    features, y_true, y = shuffle(features, y_true, y, random_state=0)
    return features, y_true, y

array([-1, -1, -1, ..., -1, -1, -1])

In [4]:
((1914/60) * 27)/60

14.354999999999999

In [None]:
class LeNet(nn.Module):
    def __init__(
        self,
        in_channels: int = 1,      # 3으로 바꾸면 RGB
        normalize: bool = False,   # ℓ2-정규화 여부
        output_dim: int = 0,       # projection head 출력 차원(0이면 사용 안 함)
        hidden_mlp: int = 0,       # 0이면 1-layer, >0이면 2-layer MLP
        nmb_prototypes=0,          # int or list for multi-head
        eval_mode: bool = False,   # backbone feature map까지만 리턴
    ):
        super().__init__()
        self.eval_mode = eval_mode
        self.l2norm = normalize

        # ────── Backbone ──────
        self.conv1 = nn.Conv2d(in_channels, 6, kernel_size=5, stride=1, padding=2, bias=False)
        self.bn1   = nn.BatchNorm2d(6)
        self.pool1 = nn.MaxPool2d(2)          # ½ 크기
        self.conv2 = nn.Conv2d(6, 16, 5, bias=False)
        self.bn2   = nn.BatchNorm2d(16)
        self.pool2 = nn.MaxPool2d(2)

        # Global average pooling으로 입력 크기 자유화
        self.gap   = nn.AdaptiveAvgPool2d((1, 1))  # (B,16,1,1) → (B,16)
        feat_dim   = 16

        # ────── Projection head ──────
        if output_dim == 0:
            self.projection_head = None
        elif hidden_mlp == 0:
            self.projection_head = nn.Linear(feat_dim, output_dim)
        else:
            self.projection_head = nn.Sequential(
                nn.Linear(feat_dim, hidden_mlp),
                nn.BatchNorm1d(hidden_mlp),
                nn.ReLU(inplace=True),
                nn.Linear(hidden_mlp, output_dim),
            )

        # ────── Prototype layer ──────
        self.prototypes = None
        if isinstance(nmb_prototypes, list):
            self.prototypes = MultiPrototypes(output_dim, nmb_prototypes)
        elif isinstance(nmb_prototypes, int) and nmb_prototypes > 0:
            self.prototypes = nn.Linear(output_dim, nmb_prototypes, bias=False)

        # Kaiming 초기화
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    # ───────────────────────────
    # ―― forward 단계 분리 ―――――――
    # ───────────────────────────
    def forward_backbone(self, x: torch.Tensor) -> torch.Tensor:
        x = self.pool1(F.relu(self.bn1(self.conv1(x))))
        x = self.pool2(F.relu(self.bn2(self.conv2(x))))
        if self.eval_mode:
            return x                          # feature map 그대로
        x = self.gap(x).flatten(1)           # (B,16)
        return x

    def forward_head(self, x: torch.Tensor):
        if self.projection_head is not None:
            x = self.projection_head(x)
        if self.l2norm:
            x = F.normalize(x, dim=1, p=2)
        if self.prototypes is not None:
            return x, self.prototypes(x)
        return x

    # ───────────────────────────
    # ―― 멀티-크롭 입력 처리 ―――
    # ───────────────────────────
    def forward(self, inputs):
        # SwAV 구현처럼 inputs가 [crop₁, crop₂, …] 리스트일 때 처리
        if not isinstance(inputs, list):
            inputs = [inputs]

        # 동일 공간해상도끼리 묶어서 한 번에 backbone 처리
        sizes = torch.tensor([inp.shape[-1] for inp in inputs])
        idx_crops = torch.cumsum(torch.unique_consecutive(sizes, return_counts=True)[1], 0)

        start = 0
        for end in idx_crops:
            _out = self.forward_backbone(torch.cat(inputs[start:end]).cuda(non_blocking=True))
            output = _out if start == 0 else torch.cat((output, _out))
            start = end

        return self.forward_head(output)