In [1]:
from collections import defaultdict
import torch
import torch.nn as nn
import torch.optim as optim
from typing import List, Tuple, TypeVar

def process(trainloader, testloader, model, epochs: int, lr: float, lr_scheduling=None, log_savepath=None):

    log_dict = defaultdict(list)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)
    if lr_scheduling is not None:
        scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_scheduling)

    def train(trainloader) -> Tuple[float, float]:
        sum_loss, sum_correct, sum_dataN = 0.0, 0, 0
        for (inputs, labels) in trainloader:
            optimizer.zero_grad()
            outputs, _ = model(inputs)
            loss = criterion(outputs, labels)
            sum_loss += loss.item()
            _, predicted = outputs.max(1)
            sum_dataN += labels.size(0)
            sum_correct += (predicted == labels).sum().item()
            loss.backward()
            optimizer.step()
        train_loss = sum_loss*trainloader.batch_size/len(trainloader.dataset)
        train_acc = float(sum_correct/sum_dataN)
        return train_loss, train_acc

    def test(testloader) -> Tuple[float, float]:
        sum_loss, sum_correct, sum_dataN = 0.0, 0, 0
        for (inputs, labels) in testloader:
            outputs, _ = model(inputs)
            loss = criterion(outputs, labels)
            sum_loss += loss.item()
            _, predicted = outputs.max(1)
            sum_dataN += labels.size(0)
            sum_correct += (predicted == labels).sum().item()
        test_loss = sum_loss*testloader.batch_size/len(testloader.dataset)
        test_acc = float(sum_correct/sum_dataN)
        return test_loss, test_acc

    print("\n{0:<13}{1:<13}{2:<13}{3:<13}{4:<13}{5:<6}".format("epoch","train/loss","train/acc","test/loss","test/acc","lr"))
    for epoch in range(1, epochs + 1):
        train_loss, train_acc = train(trainloader)
        test_loss, test_acc = test(testloader)
        lr = optimizer.param_groups[-1]["lr"]
        print("{0:<13}{1:<13.5f}{2:<13.5f}{3:<13.5f}{4:<13.5f}{5:<6.6f}".format(epoch, train_loss, train_acc, test_loss, test_acc, lr))
        if lr_scheduling is not None: scheduler.step()

    return model

In [2]:
import torch.nn as nn
import torch.nn.functional as F

class LeNet(nn.Module):
    def __init__(self, out=3):
        super(LeNet, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, 3, 1, padding=1) # (1) 32*32*3 -> 32*32*16
        self.conv2 = nn.Conv2d(16, 32, 3, 1, padding=1) # (3) 16*16*16 -> 16*16*32
        self.conv3 = nn.Conv2d(32, 64, 3, 1, padding=1) # (5) 8*8*32 -> 8*8*64
        self.fc1 = nn.Linear(4*4*64, 500)
        self.dropout1 = nn.Dropout(0.5)
        self.fc2 = nn.Linear(500, out)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2, 2) # (2) 32*32*16 -> 16*16*16
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2, 2) # (4) 16*16*32 -> 8*8*32
        x = F.relu(self.conv3(x))
        x = F.max_pool2d(x, 2, 2) # (6) 8*8*64 -> 4*4*64
        x = x.view(-1, 4*4*64)
        x = F.relu(self.fc1(x))
        feature = x
        x = self.dropout1(x)
        x = self.fc2(x)
        return x, feature

In [8]:
import torchvision
import torchvision.transforms as transforms

def download_cifar10(savepath: str):
    transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
    train = torchvision.datasets.CIFAR10(root=savepath, train=True, download=True, transform=transform)
    test = torchvision.datasets.CIFAR10(root=savepath, train=False, download=True, transform=transform)
    return train, test

In [10]:
class DatasetPreprocessor:

    def __init__(self, train: List[Tuple], test: List[Tuple]):
        self.train = train
        self.test = test

    def out_datasets(self, dataframe=False) -> Tuple[List[Tuple], List[Tuple]] or Tuple[Dataframe, Dataframe]:
        if dataframe == False:
            return self.train, self.test
        elif dataframe == True:
            train_df = self.__convert_dataframe(self.train)
            test_df = self.__convert_dataframe(self.test)
            return train_df, test_df

    def show_length(self):
        print("\n------------------------------------------------------------")
        print("Number of train data:{0:>9}".format(len(self.train)))
        print("Number of test data:{0:>9}".format(len(self.test)))

    def show_labels(self):
        texts = ["Number of train data", "Number of test data"]
        for i, dataset in enumerate([self.train, self.test]):
            label_count = defaultdict(int)
            for data in dataset:
                label_count[data[1]] += 1
            print("\n{0}  ----------------------------------------------".format(texts[i]))
            label_count = sorted(label_count.items())
            sum = 0
            for label, count in label_count:
                print("label:  {0}    count:  {1}".format(label, count))
                sum += count
            print("total:  {0}".format(sum))

    # labelsに含まれるラベルのデータを選択
    def select_by_label(self, labels: List[int]):
        self.train = [data for data in self.train if data[1] in labels]
        self.test = [data for data in self.test if data[1] in labels]

    # 正解ラベルを0からの連番に更新
    def update_labels(self):
        updated = [[], []]
        label_mapping = defaultdict(lambda: -1)
        for i, dataset in enumerate([self.train, self.test]):
            dataset = sorted(dataset, key=lambda x:x[1])
            new_label = 0
            for data in dataset:
                if label_mapping[data[1]] == -1:
                    label_mapping[data[1]] = new_label
                    new_label += 1
                updated[i].append((data[0], label_mapping[data[1]]))
        self.train, self.test = updated
        print("\nChanged the label.  ----------------------------------------------")
        for old, new in label_mapping.items():
            print("label:  {0} -> {1}".format(old, new))

    # dataset(train, test) をDataFrame形式に変換
    def __convert_dataframe(self, dataset: List[Tuple]) -> Dataframe:
        dic = defaultdict(list)
        for image, label in dataset:
            dic["image"].append(json.dumps(image.cpu().numpy().tolist()))
            dic["label"].append(int(label))
        df = pd.DataFrame(dic)
        return df

In [33]:
from collections import defaultdict
import random

def select(dataset, dataN, labels=[0,1,2]):
    random.shuffle(dataset)
    selected = []
    dic = defaultdict(list)
    for label in labels:
        for data in dataset:
            if len(dic[label]) >= dataN:
                break
            if data[1] == label:
                dic[label].append(data)
        selected += dic[label]
        
    return selected

In [28]:
savepath = "../../../prototype/proposal/data/"
train, test = download_cifar10(savepath=savepath)

Files already downloaded and verified
Files already downloaded and verified


In [30]:
TRAIN_CLASSES = [1,2,8]
preprocessor = DatasetPreprocessor(train, test)
preprocessor.select_by_label(TRAIN_CLASSES)
preprocessor.update_labels()
preprocessor.show_labels()
train, test = preprocessor.out_datasets(dataframe=False)


Changed the label.  ----------------------------------------------
label:  1 -> 0
label:  2 -> 1
label:  8 -> 2

Number of train data  ----------------------------------------------
label:  0    count:  5000
label:  1    count:  5000
label:  2    count:  5000
total:  15000

Number of test data  ----------------------------------------------
label:  0    count:  1000
label:  1    count:  1000
label:  2    count:  1000
total:  3000


### 1回目の学習

In [37]:
selected_train = select(dataset=train, dataN=100)
selected_test = select(dataset=test, dataN=10)
print("train num: {0}".format(len(selected_train)))
print("test num: {0}".format(len(selected_test)))

train num: 300
test num: 30


In [39]:
import torch
batch_size = 128
trainloader = torch.utils.data.DataLoader(selected_train, batch_size=batch_size, shuffle=True, num_workers=2)
testloader = torch.utils.data.DataLoader(selected_test, batch_size=batch_size, shuffle=False, num_workers=2)

In [40]:
model_savepath = "./assets/test_v1.pth"
model = LeNet(3)
model = process(trainloader, testloader, model, epochs=10, lr=0.01)
torch.save(model.state_dict(), model_savepath)


epoch        train/loss   train/acc    test/loss    test/acc     lr    
1            1.40579      0.33667      4.69788      0.26667      0.010000
2            1.40737      0.29667      4.68450      0.26667      0.010000
3            1.40292      0.38667      4.67562      0.36667      0.010000
4            1.40432      0.35000      4.68020      0.40000      0.010000
5            1.40171      0.40333      4.67242      0.36667      0.010000
6            1.40154      0.39000      4.67798      0.33333      0.010000
7            1.39745      0.36667      4.67250      0.40000      0.010000
8            1.39708      0.34667      4.65202      0.33333      0.010000
9            1.39590      0.35000      4.65504      0.33333      0.010000
10           1.38876      0.35667      4.65117      0.33333      0.010000


In [46]:
model.state_dict()["conv1.weight"][0]

tensor([[[-0.0933, -0.0398,  0.1804],
         [ 0.0397, -0.1548,  0.1030],
         [-0.1602, -0.0680,  0.1743]],

        [[ 0.1842, -0.0073,  0.0016],
         [ 0.1044, -0.0353,  0.1169],
         [-0.0129, -0.0259,  0.1643]],

        [[ 0.0697,  0.1926, -0.0276],
         [-0.0641,  0.0099,  0.0959],
         [ 0.1730,  0.0037,  0.1823]]])

### 2回目の学習

In [47]:
model_path = "./assets/test_v1.pth"
model2 = LeNet(3)
model2.load_state_dict(torch.load(model_path))

<All keys matched successfully>

In [48]:
model2.state_dict()["conv1.weight"][0] # 前回学習したモデルのパラメータと一致

tensor([[[-0.0933, -0.0398,  0.1804],
         [ 0.0397, -0.1548,  0.1030],
         [-0.1602, -0.0680,  0.1743]],

        [[ 0.1842, -0.0073,  0.0016],
         [ 0.1044, -0.0353,  0.1169],
         [-0.0129, -0.0259,  0.1643]],

        [[ 0.0697,  0.1926, -0.0276],
         [-0.0641,  0.0099,  0.0959],
         [ 0.1730,  0.0037,  0.1823]]])

In [49]:
selected_train = select(dataset=train, dataN=100)
selected_test = select(dataset=test, dataN=10)
print("train num: {0}".format(len(selected_train)))
print("test num: {0}".format(len(selected_test)))

train num: 300
test num: 30


In [50]:
import torch
batch_size = 128
trainloader = torch.utils.data.DataLoader(selected_train, batch_size=batch_size, shuffle=True, num_workers=2)
testloader = torch.utils.data.DataLoader(selected_test, batch_size=batch_size, shuffle=False, num_workers=2)

In [51]:
model_savepath = "./assets/test_v2.pth"
model2 = process(trainloader, testloader, model2, epochs=10, lr=0.01)
torch.save(model2.state_dict(), model_savepath)


epoch        train/loss   train/acc    test/loss    test/acc     lr    
1            1.39182      0.34333      4.61419      0.33333      0.010000
2            1.39002      0.35000      4.60380      0.36667      0.010000
3            1.38920      0.35000      4.57252      0.43333      0.010000
4            1.39004      0.39000      4.57040      0.43333      0.010000
5            1.38041      0.37333      4.57810      0.43333      0.010000
6            1.38499      0.38667      4.56418      0.50000      0.010000
7            1.37328      0.43333      4.54290      0.50000      0.010000
8            1.37653      0.41667      4.47347      0.60000      0.010000
9            1.36442      0.48000      4.49608      0.60000      0.010000
10           1.34970      0.54000      4.43874      0.56667      0.010000
