In [1]:
import torch
from torch import nn
from torch.optim import Adam
import torch.nn.functional as F
from torchvision import transforms, datasets
import time

In [2]:
def squash(inputs, axis=-1):
    """
    The non-linear activation used in Capsule. It drives the length of a large vector to near 1 and small vector to 0
    :param inputs: vectors to be squashed
    :param axis: the axis to squash
    :return: a Tensor with same size as inputs
    """
    norm = torch.norm(inputs, p=2, dim=axis, keepdim=True)
    scale = norm**2 / (1 + norm**2) / (norm + 1e-8)
    return scale * inputs

In [3]:
class PrimaryCapsule(nn.Module):
    """
    Apply Conv2D with `out_channels` and then reshape to get capsules
    :param in_channels: input channels
    :param out_channels: output channels
    :param dim_caps: dimension of capsule
    :param kernel_size: kernel size
    :return: output tensor, size=[batch, num_caps, dim_caps]
    """
    def __init__(self, in_channels, out_channels, dim_caps, kernel_size, stride=1, padding=0):
        super(PrimaryCapsule, self).__init__()
        self.dim_caps = dim_caps
        self.conv2d = nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=padding)

    def forward(self, x):
        outputs = self.conv2d(x)
        outputs = outputs.view(x.size(0), -1, self.dim_caps)
        return squash(outputs)

In [4]:
class DenseCapsule(nn.Module):
    """
    The dense capsule layer. It is similar to Dense (FC) layer. Dense layer has `in_num` inputs, each is a scalar, the
    output of the neuron from the former layer, and it has `out_num` output neurons. DenseCapsule just expands the
    output of the neuron from scalar to vector. So its input size = [None, in_num_caps, in_dim_caps] and output size = \
    [None, out_num_caps, out_dim_caps]. For Dense Layer, in_dim_caps = out_dim_caps = 1.

    :param in_num_caps: number of cpasules inputted to this layer
    :param in_dim_caps: dimension of input capsules
    :param out_num_caps: number of capsules outputted from this layer
    :param out_dim_caps: dimension of output capsules
    :param routings: number of iterations for the routing algorithm
    """
    def __init__(self, in_num_caps, in_dim_caps, out_num_caps, out_dim_caps, routings=3):
        super(DenseCapsule, self).__init__()
        self.in_num_caps = in_num_caps
        self.in_dim_caps = in_dim_caps
        self.out_num_caps = out_num_caps
        self.out_dim_caps = out_dim_caps
        self.routings = routings
        self.weight = nn.Parameter(0.01 * torch.randn(out_num_caps, in_num_caps, out_dim_caps, in_dim_caps))

    def forward(self, x):
        x_hat = torch.squeeze(torch.matmul(self.weight, x[:, None, :, :, None]), dim=-1)
        x_hat_detached = x_hat.detach()

        # The prior for coupling coefficient, initialized as zeros.
        # b.size = [batch, out_num_caps, in_num_caps]
        b = torch.zeros(x.size(0), self.out_num_caps, self.in_num_caps)

        assert self.routings > 0, 'The \'routings\' should be > 0.'
        for i in range(self.routings):
            # c.size = [batch, out_num_caps, in_num_caps]
            c = F.softmax(b, dim=1)

            # At last iteration, use `x_hat` to compute `outputs` in order to backpropagate gradient
            if i == self.routings - 1:
                # c.size expanded to [batch, out_num_caps, in_num_caps, 1           ]
                # x_hat.size     =   [batch, out_num_caps, in_num_caps, out_dim_caps]
                # => outputs.size=   [batch, out_num_caps, 1,           out_dim_caps]
                outputs = squash(torch.sum(c[:, :, :, None] * x_hat, dim=-2, keepdim=True))
                # outputs = squash(torch.matmul(c[:, :, None, :], x_hat))  # alternative way
            else:  # Otherwise, use `x_hat_detached` to update `b`. No gradients flow on this path.
                outputs = squash(torch.sum(c[:, :, :, None] * x_hat_detached, dim=-2, keepdim=True))
                # outputs = squash(torch.matmul(c[:, :, None, :], x_hat_detached))  # alternative way

                # outputs.size       =[batch, out_num_caps, 1,           out_dim_caps]
                # x_hat_detached.size=[batch, out_num_caps, in_num_caps, out_dim_caps]
                # => b.size          =[batch, out_num_caps, in_num_caps]
                b = b + torch.sum(outputs * x_hat_detached, dim=-1)

        return torch.squeeze(outputs, dim=-2)

In [5]:
class CapsuleNet(nn.Module):
    """
    A Capsule Network on MNIST.
    :param input_size: data size = [channels, width, height]
    :param classes: number of classes
    :param routings: number of routing iterations
    Shape:
        - Input: (batch, channels, width, height), optional (batch, classes) .
        - Output:((batch, classes), (batch, channels, width, height))
    """

    def __init__(self, input_size, classes, routings):
        super(CapsuleNet, self).__init__()
        self.input_size = input_size
        self.classes = classes
        self.routings = routings

        # Layer 1: Just a conventional Conv2D layer
        self.conv1 = nn.Conv2d(
            input_size[0], 256, kernel_size=9, stride=1, padding=0)

        # Layer 2: Conv2D layer with `squash` activation, then reshape to [None, num_caps, dim_caps]
        self.primarycaps = PrimaryCapsule(
            256, 256, 8, kernel_size=9, stride=2, padding=0)

        # Layer 3: Capsule layer. Routing algorithm works here.
        #self.digitcaps = DenseCapsule(in_num_caps=32*6*6, in_dim_caps=8,
        #                              out_num_caps=classes, out_dim_caps=16, routings=routings)
        self.digitcaps = DenseCapsule(in_num_caps=1486848, in_dim_caps=8,
                                      out_num_caps=classes, out_dim_caps=25, routings=routings)

        self.relu = nn.ReLU()
        self.fc1 = nn.Linear(50,2)
        #self.fc2 = nn.Linear(10,2)
        self.softmax = nn.Softmax(dim = 1)

    def forward(self, x, y=None):
        x = self.relu(self.conv1(x))
        x = self.primarycaps(x)
        x = self.digitcaps(x)
        #print(x.shape)
        x = torch.flatten(x, start_dim=1, end_dim=2)
        #print(x.shape)
        x = self.relu(self.fc1(x))
        #x = self.relu(self.fc2(x))
        x = self.softmax(x)
        #print(x.shape)
        #print(x)
        #length = x.norm(dim=-1)
        #print(length)
        return x

In [6]:
def evaluate_accuracy(data_iter, net):
    acc_sum, n = 0.0, 0
    with torch.no_grad():
        for X, y in data_iter:
            if isinstance(net, torch.nn.Module):
                net.eval() # 评估模式, 这会关闭dropout
                acc_sum += (net(X).argmax(dim=1) == y).float().sum().item()
                net.train() # 改回训练模式
            else: 
                if('is_training' in net.__code__.co_varnames): # 如果有is_training这个参数
                    # 将is_training设置成False
                    acc_sum += (net(X, is_training=False).argmax(dim=1) == y).float().sum().item() 
                else:
                    acc_sum += (net(X).argmax(dim=1) == y).float().sum().item() 
            n += y.shape[0]
    return acc_sum / n

In [7]:
def train(train_iter, test_iter, net, loss, optimizer, num_epochs):
    batch_count = 0
    for epoch in range(num_epochs):
        train_l_sum, train_acc_sum, n, start = 0.0, 0.0, 0, time.time()
        for X, y in train_iter:
            y_hat = net(X)
            #print(y_hat)
            #print(y)
            y = y.long()
            l = loss(y_hat,y)
            optimizer.zero_grad()
            l.backward()
            optimizer.step()
            train_l_sum += l.item()
            train_acc_sum += (y_hat.argmax(dim=1) == y).sum().item()
            n += y.shape[0]
            batch_count += 1
        test_acc = evaluate_accuracy(test_iter, net)
        print(
            'epoch %d, loss %.4f, train acc %.3f, test acc %.3f, time %.1f sec'
            % (epoch + 1, train_l_sum / batch_count, train_acc_sum / n,
               test_acc, time.time() - start))

In [8]:
import pandas as pd
import numpy as np
import cv2
import pandas as pd
data = pd.read_csv("数据标注.csv",sep=",")

In [9]:
def findname(name):
    for i in range(0,len(name)):
        #print(name[i][0])
        if(name[i][0] == '2' and name[i][1] == '-'):
            file_dir1 = file_dir+"/"+name[i]
            break
        else:
            if(name[i][0] == '5'and name[i][1] == '-'):
                file_dir1 = file_dir+"/"+name[i]
                #print(file_dir1)
                break
            else:
                if(name[i][0] == '6'and name[i][1] == '-'):
                    file_dir1 = file_dir+"/"+name[i]
                    break
                else:
                    if(name[i][0] == '3'and name[i][1] == '-'):
                        file_dir1 = file_dir+"/"+name[i]
                        break
                    else:
                        if(name[i][0] == '9'and name[i][1] == '-'):
                            file_dir1 = file_dir+"/"+name[i]
                            break
    return file_dir1

In [10]:
size_n = 100
def readimage(file_dir,name,size_n):
    character_address = file_dir+"/"+name
    #print(character_address)
    d = cv2.imread(character_address,0)
    d = cv2.resize(d, (size_n, size_n))
    return d
#d = readimage(file_dir1,name_1[0],size_n)

In [18]:
import os
X_data = []
path_label=[]
for h in range(0,len(data["ID"])):
    file_dir='./'+data["ID"][h]
    print(file_dir)
    # ⽬录下⾯的所有⽂件名
    name = os.listdir(file_dir)
    file_dir2 = findname(name)
    name_1 = os.listdir(file_dir2)
    #print(h)
    #data.append
    #print(name_1)
    #print(file_dir2)
    for i in range(0,20):
        if i == 0:
            #print(file_dir2)
            d = readimage(file_dir2,name_1[i],size_n)
        else:
            c = readimage(file_dir2,name_1[i],size_n)
            d = np.hstack((d,c))
    #print(d.shape)
    d = np.reshape(d, (400, 500))
    X_data.append(d)
    path_label.append(data["Lable"][h])

./MR201802210163-Wu HuaHao
./MR201706120215-LiangYuQing
./MR201602290243-Liang QiPan
./MR201803090487-HuangTianDi
./MR201710230147-HuangJiangQuan
./MR2017062901390-LinGen You
./MR201707060266-ChenGuiYing
./MR201806250211-HuangHuiZhong
./MR201806040279-Li JinRong
./MR202103160080-Chen LianDing
./MR201807080094-He Zhong
./MR201708080222-Li GuiFeng
./MR202109160378-Liang YouDi
./MR201802200016-ZhaoDongSheng
./MR201803140073-Fang LiChang
./MR201711160167-Huang ZhiRong
./MR201709170104-LiangSuZhen
./MR202110280279-Yang XiTian
./MR201803170300-Huang GenFa
./MR201805060354-XieShengYu
./MR201812090066-Lin Nong
./MR201810030079-Huang QiDing
./MR201806070184-ZhouLiBo
./MR201811040118-Pan YiYou
./MR202204270343-Huang BingQiang
./MR201807130152-Zhang SheXie
./MR201901140071-Xiao BaoXi
./MR202007040281-Yin CaiDi
./MR201907170393-Ou FuTai
./MR201808280243-Lin ShiMei
./MR201808150286-Huang ZhengLian
./MR201902140144-Zhou RuiQiu
./MR201905270177-Lu SongMao
./MR201908300389-Huang YanChun
./MR2019092402

In [19]:
X_data_1 = np.array(X_data)
X_data_1.shape

(192, 400, 500)

In [20]:
X_data_1 = X_data_1.reshape(X_data_1.shape[0],1 ,X_data_1.shape[1], X_data_1.shape[2])

In [21]:
X_data_1.shape

(192, 1, 400, 500)

In [22]:
import numpy as np
from sklearn.model_selection import KFold
from sklearn.model_selection import train_test_split
from keras.utils.np_utils import to_categorical
#categorical_labels = to_categorical(y_test, num_classes=2)
#print(categorical_labels)
X_train, X_test, y_train, y_test = train_test_split(X_data_1, path_label, test_size=0.2)
Xtrain = np.array(X_train)
Xtext = np.array(X_test)
y_train = np.array(y_train)
y_test = np.array(y_test)
#y_train = to_categorical(y_train, num_classes=2)
#y_test = to_categorical(y_test, num_classes=2)
print(Xtrain.shape)
print(Xtext.shape)

Using TensorFlow backend.


(153, 1, 400, 500)
(39, 1, 400, 500)


In [23]:
trainset = torch.utils.data.TensorDataset(torch.FloatTensor(X_train), torch.FloatTensor(y_train))
trainloader = torch.utils.data.DataLoader(trainset, batch_size=5, shuffle=True, num_workers=1)
testset = torch.utils.data.TensorDataset(torch.FloatTensor(X_test), torch.FloatTensor(y_test))
testloader = torch.utils.data.DataLoader(testset, batch_size=5, shuffle=True, num_workers=1)

In [17]:
batch_size, lr, num_epochs = 5, 0.001, 10
# load dat
#train_iter, test_iter = load_mnist('./data', download=False, batch_size=batch_size)
# define model
net = CapsuleNet(input_size=[1, 400, 500], classes=2, routings=2)

optimizer = torch.optim.Adam(net.parameters(), lr=lr)
loss = nn.CrossEntropyLoss()
#loss = nn.BCELoss()
train(trainloader, testloader, net, loss, optimizer, num_epochs)

epoch 1, loss 0.6526, train acc 0.673, test acc 0.641, time 1101.7 sec
epoch 2, loss 0.3129, train acc 0.693, test acc 0.641, time 1098.4 sec
epoch 3, loss 0.1969, train acc 0.739, test acc 1.000, time 1099.3 sec
epoch 4, loss 0.1200, train acc 0.993, test acc 1.000, time 1099.8 sec
epoch 5, loss 0.0867, train acc 1.000, test acc 1.000, time 1108.0 sec
epoch 6, loss 0.0680, train acc 1.000, test acc 1.000, time 1111.1 sec
epoch 7, loss 0.0560, train acc 1.000, test acc 1.000, time 1095.6 sec
epoch 8, loss 0.0474, train acc 1.000, test acc 1.000, time 1072.0 sec
epoch 9, loss 0.0411, train acc 1.000, test acc 1.000, time 1069.7 sec
epoch 10, loss 0.0363, train acc 1.000, test acc 1.000, time 1067.8 sec


In [18]:
print(net.state_dict().keys()) 

odict_keys(['conv1.weight', 'conv1.bias', 'primarycaps.conv2d.weight', 'primarycaps.conv2d.bias', 'digitcaps.weight', 'fc1.weight', 'fc1.bias'])


In [20]:
torch.save(net, './model_image_1/model.pkl')        # 保存整个模型
#new_model = torch.load('./model_image_1/model.pkl')   # 加载模型

In [21]:
def save_model(save_path, iteration, optimizer, model):
    torch.save({'iteration': iteration,
                'optimizer_dict': optimizer.state_dict(),
                'model_dict': model.state_dict()},
                save_path)
    print("model save success")


In [24]:
def load_model(save_name, optimizer, model):
    model_data = torch.load(save_name)
    model.load_state_dict(model_data['model_dict'])
    optimizer.load_state_dict(model_data['optimizer_dict'])
    print("model load success")

In [22]:
path = "./model_image_1/model_1.pkl"
save_model(path, num_epochs, optimizer, net)

#new_model = CapsuleNet(input_size=[1, 400, 500], classes=2, routings=2)
#new_optimizer = torch.optim.Adam(net.parameters(), lr=lr)
#loss = nn.CrossEntropyLoss()
#load_model(path, new_optimizer, new_model)
#print(new_model.state_dict()['linear.weight'])


model save success


In [25]:
path = "./model_image_1/model_1.pkl"
batch_size, lr, num_epochs = 5, 0.001, 10
net = CapsuleNet(input_size=[1, 400, 500], classes=2, routings=2)
optimizer = torch.optim.Adam(net.parameters(), lr=lr)
loss = nn.CrossEntropyLoss()
load_model(path, optimizer, net)

model load success


In [26]:
test_acc = evaluate_accuracy(testloader, net)

In [27]:
test_acc

1.0