In [1]:
import torch
import torch.nn as nn
import torch.utils.data as data
import torchvision
import torchvision.transforms as transforms
from torch.nn import functional as F

import numpy as np
import os
import csv

In [2]:
BATCH_SIZE = 16
NUM_EPOCHS = 2

path = 'sjtu-m3dv-medical-3d-voxel-classification'
train_path = path+'/train_val'
test_path = path+'/test'

In [3]:
# 图像采样压缩
def compress(org):
    res = np.zeros((32,32,32))
    for i in range(32):
        for j in range(32):
            for k in range(32):
                res[i,j,k] = (org[3*i:3*i+2, 3*j:3*j+2, 3*k:3*k+2].sum()) / 27
    return res

In [4]:
# 打开训练集结果文件
train_res = []
with open(path+'/train_val.csv') as f:
    next(f)
    f_csv = csv.reader(f)
    for row in f_csv:
        train_res.append(float(row[1]))
f.close()
# 打开训练集数据文件
train_npz = os.listdir(train_path)
train_num = len(train_npz)
train_voxel = []
train_seg = []
for i in range(train_num):
    file = np.load(train_path+'/'+train_npz[i])
    voxel = file['voxel'].astype(np.float)
    voxel = compress(voxel[2:98,2:98,2:98])
    seg = file['seg'].astype(np.float)*270
    seg = compress(seg[2:98,2:98,2:98])
    train_voxel.append((voxel,train_res[i]))
    train_seg.append((seg,train_res[i]))
# 加载训练集
train_voxel_loader = data.DataLoader(train_voxel,
                                     batch_size=BATCH_SIZE,
                                     shuffle=True,
                                     drop_last=True) 
train_seg_loader = data.DataLoader(train_seg,
                                   batch_size=BATCH_SIZE,
                                   shuffle=True,
                                   drop_last=True) 

In [6]:
class DenseLayer(nn.Sequential):
    def __init__(self,in_channel,growth_rate,bn_size,drop_rate): 
        super(DenseLayer,self).__init__()
        self.add_module('norm1',nn.BatchNorm3d(in_channel)),
        self.add_module('relu1',nn.ReLU(inplace=True)),
        self.add_module('conv1',nn.Conv3d(in_channel,bn_size*growth_rate,
                                          kernel_size=1,stride=1,bias=False)),
        self.add_module('norm2',nn.BatchNorm3d(bn_size*growth_rate)),
        self.add_module('relu2',nn.ReLU(inplace=True))
        self.add_module('conv2',nn.Conv3d(bn_size*growth_rate,growth_rate,
                                          kernel_size=3,stride=1,padding=1,bias=False))
        self.drop_rate = drop_rate
    def forward(self,x):
        new_feature = super(DenseLayer,self).forward(x)
        if(self.drop_rate>0):
            new_feature = F.dropout3d(new_feature,p=self.drop_rate,training=self.training)
        return torch.cat([x,new_feature],1) 

class DenseBlock(nn.Sequential):
    def __init__(self,layer_num,in_channel,bn_size,growth_rate,drop_rate):
        super(DenseBlock,self).__init__()
        for i in range(layer_num):
            layer = DenseLayer(in_channel+i*growth_rate,growth_rate,bn_size,drop_rate)
            self.add_module('denselayer%d'%(i+1),layer)

class Transition(nn.Sequential):
    def __init__(self,in_channel,zip_ratio=0.5):
        super(Transition,self).__init__()
        self.add_module('norm',nn.BatchNorm3d(in_channel))
        self.add_module('relu',nn.ReLU(inplace=True))
        self.add_module('conv',nn.Conv3d(in_channel,int(in_channel*zip_ratio),
                                         kernel_size=1,stride=1,bias=False))
        self.add_module('pool',nn.AvgPool3d(kernel_size=2,stride=2))

#卷积池化层
def FeatureLayer(in_channel,out_channel):
    layer = nn.Sequential(
            nn.Conv3d(in_channel,out_channel,
                      kernel_size=3,stride=1,padding=3,bias=False),
            nn.BatchNorm3d(num_features=out_channel), 
            nn.ReLU(inplace=True),
            nn.AvgPool3d(kernel_size=3,stride=2,padding=1))
    return layer

class DenseNet(nn.Module):
    
    def __init__(self,growth_rate=32,block_config=(4,4,4),init_channel_num=64,
                 bn_size=4,drop_rate=0):
        super(DenseNet,self).__init__()
        self.feature_layer = FeatureLayer(1,init_channel_num)
        channel_num = init_channel_num
        for i,layer_num in enumerate(block_config):
            block = DenseBlock(layer_num=layer_num,in_channel=channel_num,
                               bn_size=bn_size,growth_rate=growth_rate,
                               drop_rate=drop_rate)
            self.feature_layer.add_module('denseblock%d'%(i+1),block)
            channel_num = channel_num + layer_num*growth_rate
            if(i!=len(block_config)-1):
                trans = Transition(channel_num,0.5)
                self.feature_layer.add_module('transition%d'%(i+1),trans)
                channel_num = int(0.5*channel_num)
        self.feature_layer.add_module('norm5',nn.BatchNorm3d(channel_num))
        self.feature_layer.add_module('relu5',nn.ReLU(inplace=True))
        self.feature_layer.add_module('avgpool5',
                                      nn.AvgPool3d(kernel_size=3,stride=2))
        self.classifier = nn.Linear(channel_num,1) 
        self.sigmoid = torch.sigmoid
        
    def forward(self,x):
        features = self.feature_layer(x)
        out = features.view(len(x),-1)
        out = self.classifier(out)
        out = self.sigmoid(out)
        out = out.view(len(out))
        return out

In [7]:
model_voxel = DenseNet(drop_rate=0.3)
model_seg = DenseNet(drop_rate=0.3)

optimizer_voxel = torch.optim.SGD(model_voxel.parameters(),lr=0.01)
criterion_voxel = nn.MultiLabelSoftMarginLoss()
optimizer_seg = torch.optim.SGD(model_voxel.parameters(),lr=0.01)
criterion_seg = nn.MultiLabelSoftMarginLoss()

In [8]:
# 训练
model_voxel.train()
for epoch in range(NUM_EPOCHS):
    optimizer_voxel.zero_grad()
    for images,labels in train_voxel_loader:
        images = images.view(BATCH_SIZE,1,32,32,32)
        outputs = model_voxel(images.float())
        labels = labels.view(BATCH_SIZE,1)
        outputs = outputs.view(BATCH_SIZE,1)
        loss = criterion_voxel(outputs,labels)
        loss.backward()
        optimizer_voxel.step()

        
model_seg.train()
for epoch in range(NUM_EPOCHS):
    optimizer_seg.zero_grad()
    for images,labels in train_seg_loader:
        images = images.view(BATCH_SIZE,1,32,32,32)
        outputs = model_seg(images.float())
        labels = labels.view(BATCH_SIZE,1)
        outputs = outputs.view(BATCH_SIZE,1)
        loss = criterion_seg(outputs,labels)
        loss.backward()
        optimizer_seg.step()
    

In [9]:
#torch.save(model_voxel,'try_2_voxel_model.pkl')
torch.save(model_seg,'try_2_seg_model.pkl')

  "type " + obj.__name__ + ". It won't be checked "
  "type " + obj.__name__ + ". It won't be checked "
  "type " + obj.__name__ + ". It won't be checked "
  "type " + obj.__name__ + ". It won't be checked "
  "type " + obj.__name__ + ". It won't be checked "
  "type " + obj.__name__ + ". It won't be checked "
  "type " + obj.__name__ + ". It won't be checked "
  "type " + obj.__name__ + ". It won't be checked "
  "type " + obj.__name__ + ". It won't be checked "
  "type " + obj.__name__ + ". It won't be checked "
