# 模型预设

In [1]:
import time
import copy
import torch
from torch.nn import functional as F
from torchvision import models, transforms
from sklearn.semi_supervised import LabelSpreading
from sklearn import preprocessing, metrics
from sklearn.model_selection import train_test_split
from PIL import Image
import numpy as np


class Bunch(dict):
    '''
    分支结构
    '''

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.__dict__ = self


def use_gpu(use_gpu=True):
    '''
    指定运算设备
    '''
    device = torch.device('cuda' if use_gpu else 'cpu')
    return device


# 数据增强
data_transforms = {
    'train':
    transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'test':
    transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}


def aug_imgs(X, dataType):
    '''
    对 X 做预处理
    '''
    return np.stack([data_transforms[dataType](Image.fromarray(x)) for x in X])


class Loader(dict):
    """
    方法
    ========
    L 为该类的实例
    len(L)::返回 batch 的批数
    iter(L)::即为数据迭代器

    参数
    =============
    type: 'train', 'test'

    Return
    ========
    可迭代对象（numpy 对象）
    """

    def __init__(self, batch_size, X, Y=None, shuffle=True, *args, **kwargs):
        '''
        X, Y 均为类 numpy, 可以是 HDF5 
        '''
        super().__init__(*args, **kwargs)
        self.__dict__ = self
        self.batch_size = batch_size
        if shuffle:
            self.type = 'train'
        else:
            self.type = 'test'

        self.X = X[:]
        self.nrows = self.X.shape[0]
        if Y is not None:
            self.Y = Y[:]
        else:
            self.Y = None

    def __iter__(self):
        idx = np.arange(self.nrows)
        if self.type == 'train':
            np.random.shuffle(idx)

        for start in range(0, self.nrows, self.batch_size):
            end = min(start + self.batch_size, self.nrows)
            K = idx[start:end].tolist()
            if self.Y is None:
                yield np.take(self.X[:], K, 0)
            else:
                yield np.take(self.X[:], K, 0), np.take(self.Y[:], K, 0)

    def __len__(self):
        return round(len(self.X) / self.batch_size)


def cnn_feature(model, batch_size, Xs):
    '''
    特征提取器
    '''
    model.eval()
    for xs in Loader(batch_size, Xs, shuffle=False):
        imgs = torch.tensor(aug_imgs(xs, 'test')).cuda()
        out = model(imgs).cpu().detach().numpy()
        yield np.squeeze(out)

# CNN 训练
def train_model(model, loader, num_epochs):
    '''
    NN 训练
    '''
    # opt
    criterion = torch.nn.CrossEntropyLoss()
    # Observe that all parameters are being optimized
    optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
    # Decay LR by a factor of 0.1 every 7 epochs
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)
    device = use_gpu(True)
    since = time.time()
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)
        if loader.type == 'train':
            scheduler.step()
            model.train()  # Set model to training mode
        else:
            model.eval()  # Set model to evaluate mode

        running_loss = 0.0
        running_corrects = 0
        m = 0
        # Iterate over data.
        for inputs, labels_ in loader:
            inputs = aug_imgs(inputs, loader.type)
            inputs = torch.from_numpy(inputs)
            inputs = inputs.to(device)
            labels = torch.LongTensor(labels_).to(device)

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward
            # track history if only in train
            with torch.set_grad_enabled(loader.type == 'train'):
                outputs = model(inputs)
                _, preds = torch.max(outputs, 1)
                loss = criterion(outputs, labels)
                
                if loader.type == 'train':
                    # backward + optimize only if in training phase
                    loss.backward()
                    optimizer.step()
            # statistics
            running_loss += loss.item()
            running_corrects += torch.sum(preds == labels.data)
            m += inputs.size(0)

        epoch_loss = running_loss / len(loader)
        epoch_acc = running_corrects.double() / m
        print('{} Loss: {:.4f} Acc: {:.4f}'.format(loader.type, epoch_loss,
                                                   epoch_acc))

        # deep copy the model
        if epoch_acc > best_acc:
            best_acc = epoch_acc
            best_model_wts = copy.deepcopy(model.state_dict())
            print('Best val Acc: {:4f}'.format(best_acc))

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(
        time_elapsed // 60, time_elapsed % 60))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model

# 数据集划分

In [2]:
import tables as tb


# 载入数据集
data_path = 'E:/xdata/X.h5'
dataset = tb.open_file(data_path).root.cifar10
xTr = dataset.trainX[:]
yTr = dataset.trainY[:]

# 数据集划分操作
batch_size = 32
numLabels = 800
test_size = xTr.shape[0] - numLabels

trainX, testX, trainY, testY = train_test_split(
    xTr, yTr, test_size=test_size, random_state=42, shuffle=True)

# 数据集封装
trainset = Loader(batch_size, trainX, trainY, shuffle=True)  # 训练集
valset = Loader(batch_size, dataset.testX, dataset.testY, shuffle=False)  # 验证集
# 测试集，用来做 SSL 的无标签数据集
testset = Loader(10000, testX, testY, shuffle=True)

# 模型训练

In [3]:
# 模型初始化
model = models.resnet50(pretrained=True)
num_inFeats = model.fc.in_features
model.fc = torch.nn.Linear(num_inFeats, 10)
device = use_gpu(True)
model = model.to(device)
epochs = 1000

In [None]:
for epoch in range(epochs):
    features = torch.nn.Sequential(*list(model.children())[:-1])
    xTr_features = np.concatenate(
        [x for x in cnn_feature(features, batch_size, trainX)])
    
    print('Iter: %i'%epoch)
    print('^=^'*20)
    for xTe, yTe in testset:
        xTe_features = np.concatenate(
            [x for x in cnn_feature(features, batch_size, xTe)])
        # SSL, obtain psudo-labels of unlabeled samples
        dataX = np.vstack((xTr_features, xTe_features)).astype('float')
        dataY = np.concatenate((trainY, yTe))
        numSamples = len(dataY)
        ind_unlabeled = np.arange(numLabels, numSamples)
        dataY[ind_unlabeled] = -1
        cls = LabelSpreading(max_iter=150, kernel='rbf', gamma=0.003)
        cls.fit(preprocessing.scale(dataX), dataY)
        predicted_labels = cls.transduction_[ind_unlabeled]
        print()
        print("SSL: accuracy:%f" %metrics.accuracy_score(yTe, predicted_labels))
        
        print('*'*50)
        print('Train:')
        model = train_model(model, trainset, 10)
        print('*'*50)
        print('Validate:')
        model = train_model(model, valset, 1)
        print('*'*50)
        print('伪标签训练: ')
        unlabelset = Loader(batch_size, xTe, predicted_labels, shuffle=True)
        model = train_model(model, unlabelset, 10)

Iter: 0
^=^^=^^=^^=^^=^^=^^=^^=^^=^^=^^=^^=^^=^^=^^=^^=^^=^^=^^=^^=^
