In [4]:
from tqdm import tqdm

In [2]:
import torch
import torchvision as tv
from torch.utils.data import DataLoader
import torch.nn as nn

In [6]:
# The config class defines the hyperparameter,
class Config(object):
    data_path = "D:/Project/demo1/NN-Torch/aml-data"
    virs = "result"
    num_workers = 0  # multithreading, An error occured on windows operating system
    img_size = 96  # cut the pixel size of the image
    batch_size = 256  # number of batches
    max_epoch = 400   # the maximum epoch
    lr1 = 2e-4  # Generator learning rate
    lr2 = 2e-4  # Discriminator learning rate
    beta1 = 0.5  # Regularization coefficient, Adam optimizer parameter
    gpu = True  # Using GPU computing
    nz = 100  # Noise dimension
    ngf = 64  # Number of convolution kernels for generator
    ndf = 64  # The number of convolution kernels of the discriminator

    # 1.Model saving path
    save_path = 'D:/Project/demo1/NN-Torch/imgs2/'  # opt.netg path Save path of the generated image
    # The update frequency of the discriminative model is higher than that of the generative model
    d_every = 1  # Each batch trains the discriminator once
    g_every = 5  # The generation model is trained every batch
    save_every = 5  # each save_every save the model
    netd_path = None
    netg_path = None

    # testing data
    gen_img = "result.png"
    # choose the save pic
    # Save 64 images at a time
    gen_num = 64
    gen_search_num = 512
    gen_mean = 0    # Generate the noise mean of the model
    gen_std = 1     # Noise variance

# Instantiate the Config class, set the hyperparameter, and set it as a global parameter
opt = Config()

In [7]:
opt = Config()

In [8]:
# Define Generation model to generate images by input noise vector
class NetG(nn.Module):
    # Build the initialization function and pass in the opt class
    def __init__(self, opt):
        super(NetG, self).__init__()
        # Number of feature maps of self.ngf generator
        self.ngf = opt.ngf
        self.Gene = nn.Sequential(
            # Suppose the input is opt.nz*1*1 dimension data, opt.nz dimension vector
            # output = (input - 1)*stride + output_padding - 2*padding + kernel_size
            # What does it mean to expand the convolution of a pixel and let the machine learn to understand each element of the n
            nn.ConvTranspose2d(in_channels=opt.nz, out_channels=self.ngf * 8, kernel_size=4, stride=1, padding=0, bias =False),
            nn.BatchNorm2d(self.ngf * 8),
            nn.ReLU(inplace=True),

            # input 4*4*ngf*8
            nn.ConvTranspose2d(in_channels=self.ngf * 8, out_channels=self.ngf * 4, kernel_size=4, stride=2, padding=1, bias =False),
            nn.BatchNorm2d(self.ngf * 4),
            nn.ReLU(inplace=True),

            # input 8*8*ngf*4
            nn.ConvTranspose2d(in_channels=self.ngf * 4, out_channels=self.ngf * 2, kernel_size=4, stride=2, padding=1,bias=False),
            nn.BatchNorm2d(self.ngf * 2),
            nn.ReLU(inplace=True),

            # input 16*16*ngf*2
            nn.ConvTranspose2d(in_channels=self.ngf * 2, out_channels=self.ngf, kernel_size=4, stride=2, padding=1, bias =False),
            nn.BatchNorm2d(self.ngf),
            nn.ReLU(inplace=True),

            # input 32*32*ngf
            nn.ConvTranspose2d(in_channels=self.ngf, out_channels=3, kernel_size=5, stride=3, padding=1, bias =False),

            # The convergence speed of Tanh is faster than sigmoid and much slower than relu. The output range is [-1,1], and the output mean is 0
            nn.Tanh(),

        )# 输出一张96*96*3

    def forward(self, x):
        return self.Gene(x)

In [9]:
# Construct a Discriminator
class NetD(nn.Module):
    def __init__(self, opt):
        super(NetD, self).__init__()

        self.ndf = opt.ndf
        # Discriminator defined by DCGAN, generator has no pooling layer
        self.Discrim = nn.Sequential(
            # Convolution layer
            # The number of input channels in_channels, the number of output channels (that is, the number of channels of the convolution kernel)out_channels. Here, 
            # 64 filer filters are set, and the output channel is 64 naturally.
            # Because the image is gray processed, the number of channels here is 1,
            # input:(bitch_size, 3, 96, 96),bitch_size = Sample size for a single training session
            # output:(bitch_size, ndf, 32, 32), (96 - 5 +2 *1)/3 + 1 =32
            # LeakyReLu= x if x>0 else nx (n is the parameter of the first function)，
            # Enabling inplace (overwrite) saves memory and eliminates the process of repeatedly requesting memory
            # LeakyReLu cancels the negative hard saturation problem of Relu, and whether it is effective for model optimization remains to be verified
            nn.Conv2d(in_channels=3, out_channels= self.ndf, kernel_size= 5, stride= 3, padding= 1, bias=False),
            nn.LeakyReLU(negative_slope=0.2, inplace= True),

            # input:(ndf, 32, 32)
            nn.Conv2d(in_channels= self.ndf, out_channels= self.ndf * 2, kernel_size= 4, stride= 2, padding= 1, bias=False),
            nn.BatchNorm2d(self.ndf * 2),
            nn.LeakyReLU(0.2, True),

            # input:(ndf *2, 16, 16)
            nn.Conv2d(in_channels= self.ndf * 2, out_channels= self.ndf *4, kernel_size= 4, stride= 2, padding= 1,bias=False),
            nn.BatchNorm2d(self.ndf * 4),
            nn.LeakyReLU(0.2, True),

            # input:(ndf *4, 8, 8)
            nn.Conv2d(in_channels= self.ndf *4, out_channels= self.ndf *8, kernel_size= 4, stride= 2, padding= 1, bias=False),
            nn.BatchNorm2d(self.ndf *8),
            nn.LeakyReLU(0.2, True),

            # input:(ndf *8, 4, 4)
            # output:(1, 1, 1)
            nn.Conv2d(in_channels= self.ndf *8, out_channels= 1, kernel_size= 4, stride= 1, padding= 0, bias=True),

            # Call the sigmoid function to solve the classification problem
            # Because the discriminant model needs to do dichotomous classification, it is sufficient to use sigmoid, 
            # because the return value interval of sigmoid is [0,1],
            # It can be used as a scoring standard for the discriminative model
            nn.Sigmoid()
        )

    def forward(self, x):
        # Flatten and return
        return self.Discrim(x).view(-1)

In [3]:
if torch.cuda.is_available():
    print("j")

j


In [8]:
def train(**kwargs):

    # Configure properties
    # Use the default hyperargument set in opt if the function has no dictionary input
    for k_, v_ in kwargs.items():
        setattr(opt, k_, v_)

    # Use gpu or cpy
    if opt.gpu:
        device = torch.device("cuda")
    else:
        device = torch.device('cpu')

    # Data preprocessing
    # transforms Module provides general image conversion operation class functions, and finally converted to floatTensor
    # tv.transforms.Compose used for tv.transforms operation,Once defined, 
    # the transforms combination is passed directly into the image for processing
    # tv.transforms.Resize，resize the PIL Image object and save the value in float64
    # tv.transforms.CenterCrop, Center clipping
    # tv.transforms.ToTensor，torch image type (channel, pixel, pixel) and change the pixel range to [0,1]
    # tv.transforms.Normalize,Perform image = (image-mean)/std data normalization operation, 
    # with mean as one parameter and std as two parameters
    # Because it is a three-channel, mean = (0.5, 0.5, 0.5), which is converted into the range of [-1, 1]
    transforms = tv.transforms.Compose([
        # 3*96*96
        tv.transforms.Resize(opt.img_size),   # scale it to img_size* img_size
        # The center is cropped into a 96 by 96 image. Because the data of this experiment has met the 96*96 size, it can be omitted
        tv.transforms.CenterCrop(opt.img_size),

        # ToTensor and Normalize collocation
        tv.transforms.ToTensor(),
        tv.transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])

    # Load the data and preprocess the image with defined transforms, which was defined directly
    dataset = tv.datasets.ImageFolder(root=opt.data_path, transform=transforms)

    dataloader = DataLoader(
        dataset,      
        batch_size=opt.batch_size,    
        shuffle=True,     
        #num_workers=opt.num_workers,    
        drop_last=True          
    )

    
    netg, netd = NetG(opt), NetD(opt)
    # Determine whether the network has a weight value
    # storage
    map_location = lambda storage, loc: storage



    if opt.netg_path:
        netg.load_state_dict(torch.load(f=opt.netg_path, map_location=map_location))
    if opt.netd_path:
        netd.load_state_dict(torch.load(f=opt.netd_path, map_location=map_location))

    netd.to(device)
    netg.to(device)

    optimize_g = torch.optim.Adam(netg.parameters(), lr=opt.lr1, betas=(opt.beta1, 0.999))
    optimize_d = torch.optim.Adam(netd.parameters(), lr=opt.lr2, betas=(opt.beta1, 0.999))


    criterions = nn.BCELoss().to(device)


    true_labels = torch.ones(opt.batch_size).to(device)
    fake_labels = torch.zeros(opt.batch_size).to(device)


    noises = torch.randn(opt.batch_size, opt.nz, 1, 1).to(device)


    fix_noises = torch.randn(opt.batch_size, opt.nz, 1, 1).to(device)


    for epoch in range(opt.max_epoch):
        for ii_, (img, _) in tqdm((enumerate(dataloader))):
            real_img = img.to(device)


            if ii_ % opt.d_every == 0:
                optimize_d.zero_grad()
                output = netd(real_img)
                error_d_real = criterions(output, true_labels)
                error_d_real.backward()
                noises = noises.detach()
                fake_image = netg(noises).detach()
                output = netd(fake_image)
                error_d_fake = criterions(output, fake_labels)
                error_d_fake.backward()
                optimize_d.step()

            if ii_ % opt.g_every == 0:
                optimize_g.zero_grad()
                noises.data.copy_(torch.randn(opt.batch_size, opt.nz, 1, 1))
                fake_image = netg(noises)
                output = netd(fake_image)
                error_g = criterions(output, true_labels)
                error_g.backward()
                optimize_g.step()

        if (epoch + 1) % opt.save_every == 0:
            fix_fake_image = netg(fix_noises)
            tv.utils.save_image(fix_fake_image.data[:64], "%s/%s.png" % (opt.save_path, epoch), normalize=True)

            torch.save(netd.state_dict(),  'D:/Project/demo1/NN-Torch/imgs2/' + 'netd_{0}.pth'.format(epoch))
            torch.save(netg.state_dict(),  'D:/Project/demo1/NN-Torch/imgs2/' + 'netg_{0}.pth'.format(epoch))

In [9]:
@torch.no_grad()
def generate(**kwargs):
    for k_, v_ in kwargs.items():
        setattr(opt, k_, v_)

    device = torch.device("cuda") if opt.gpu else torch.device("cpu")
    netg, netd = NetG(opt).eval(), NetD(opt).eval()
    map_location = lambda storage, loc: storage
    netd.load_state_dict(torch.load('D:/Project/demo1/NN-Torch/imgs2/netd_399.pth', map_location=map_location), False)
    netg.load_state_dict(torch.load('D:/Project/demo1/NN-Torch/imgs2/netg_399.pth', map_location=map_location), False)
    netd.to(device)
    netg.to(device)
    noise = torch.randn(opt.gen_search_num, opt.nz, 1, 1).normal_(opt.gen_mean, opt.gen_std).to(device)

    fake_image = netg(noise)
    score = netd(fake_image).detach()
    indexs = score.topk(opt.gen_num)[1]

    result = []

    for ii in indexs:
        result.append(fake_image.data[ii])
    tv.utils.save_image(torch.stack(result), opt.gen_img, normalize=True, value_range=(-1, 1))

In [10]:
def main():
    train()
    generate()

In [11]:
if __name__ == '__main__':
    main()

64it [03:26,  3.22s/it]
64it [00:22,  2.83it/s]
64it [00:22,  2.80it/s]
64it [00:22,  2.80it/s]
64it [00:23,  2.77it/s]
64it [00:23,  2.77it/s]
64it [00:24,  2.66it/s]
64it [00:23,  2.68it/s]
64it [00:24,  2.63it/s]
64it [00:24,  2.59it/s]
64it [00:23,  2.67it/s]
64it [00:24,  2.66it/s]
64it [00:24,  2.65it/s]
64it [00:24,  2.65it/s]
64it [00:23,  2.68it/s]
64it [00:23,  2.69it/s]
64it [00:23,  2.69it/s]
64it [00:23,  2.68it/s]
64it [00:23,  2.69it/s]
64it [00:23,  2.69it/s]
64it [00:23,  2.70it/s]
64it [00:23,  2.68it/s]
64it [00:23,  2.69it/s]
64it [00:23,  2.67it/s]
64it [00:23,  2.68it/s]
64it [00:23,  2.71it/s]
64it [00:23,  2.68it/s]
64it [00:23,  2.68it/s]
64it [00:23,  2.68it/s]
64it [00:23,  2.68it/s]
64it [00:23,  2.70it/s]
64it [00:23,  2.67it/s]
64it [00:23,  2.68it/s]
64it [00:23,  2.67it/s]
64it [00:23,  2.68it/s]
64it [00:23,  2.69it/s]
64it [00:23,  2.68it/s]
64it [00:24,  2.66it/s]
64it [00:24,  2.66it/s]
64it [00:24,  2.66it/s]
64it [00:23,  2.68it/s]
64it [00:24,  2.

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset
import torch
import torch.nn as nn
import torch.utils.data as Data
from torchvision import transforms
from torchvision.datasets import ImageFolder

train_data_dir = r'D:/Project/demo1/NN-Torch/aml-data'
batchsize = 32
train_data_transforms = transforms.Compose([
    transforms.CenterCrop(128),  # 128
    transforms.ToTensor(),
])

train_data = ImageFolder(train_data_dir, transform=train_data_transforms)

train_data_loader = Data.DataLoader(
    train_data,
    batch_size=batchsize,
    shuffle=True,
    num_workers=2,
    drop_last=True
)


# 构建鉴别器
class Discriminator(nn.Module):
    def __init__(self):
        # 初始化父类
        super(Discriminator, self).__init__()
        self.feature = nn.Sequential(
            nn.Conv2d(3, 256, kernel_size=8, stride=2),
            nn.BatchNorm2d(256),
            nn.GELU(),

            nn.Conv2d(256, 256, kernel_size=8, stride=2),
            nn.BatchNorm2d(256),
            nn.GELU(),

            nn.Conv2d(256, 3, kernel_size=8, stride=2),
            nn.GELU(),
        )
        self.classifier = nn.Sequential(
            nn.Linear(3 * 10 * 10, 1),
            nn.Sigmoid()
        )

    def forward(self, inputs):
        x = self.feature(inputs)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x


# 构建生成器
class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()
        self.linear = nn.Sequential(
            nn.Linear(100, 3 * 11 * 11),
            nn.GELU(),
        )

        self.feature = nn.Sequential(
            nn.ConvTranspose2d(3, 256, kernel_size=8, stride=2),
            nn.BatchNorm2d(256),
            nn.GELU(),

            nn.ConvTranspose2d(256, 256, kernel_size=8, stride=2),
            nn.BatchNorm2d(256),
            nn.GELU(),

            nn.ConvTranspose2d(256, 3, kernel_size=8, stride=2, padding=1),
            nn.BatchNorm2d(3),

            nn.Sigmoid()
        )

    def forward(self, x):
        x = self.linear(x)
        x = x.view(batchsize, 3, 11, 11)
        x = self.feature(x)
        return x


D = Discriminator()
G = Generator()
d_optimizer = torch.optim.SGD(D.parameters(), lr=0.01)
g_optimizer = torch.optim.SGD(G.parameters(), lr=0.01)
loss_func = nn.BCELoss()

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

D = D.to(device)
G = G.to(device)

if __name__ == '__main__':
    d_z_loss = []
    epoches = 25
    for epoch in range(epoches):
        print('开始第', epoch + 1, '轮', '*******************' * 3)
        sum_d_loss = 0
        for step, (b_x, _) in enumerate(train_data_loader):  # b_x shape(32,3,128,128)
            # 训练辨别器,real data
            real_out = D(b_x.to(device))
            real_out = real_out.squeeze()  # (batch_size,1) -> (batchsize)
            real_label = torch.ones(batchsize).to(device)
            d_loss_real = loss_func(real_out, real_label)  # 标签为1,为真实样本,辨别器的loss
            # fake data
            fake_img = G(torch.rand(batchsize, 100).to(device)).detach()
            fake_out = D(fake_img).squeeze()
            fake_label = torch.zeros(batchsize).to(device)
            d_loss_fake = loss_func(fake_out, fake_label)
            # 更新辨别器
            d_loss = (d_loss_real + d_loss_fake)
            sum_d_loss += d_loss.item()
            d_optimizer.zero_grad()  # 在反向传播之前，先将梯度归0
            d_loss.backward()  # 将误差反向传播
            d_optimizer.step()  # 更新参数

            # 训练生成器
            fake_img = G(torch.rand(batchsize, 100).to(device))
            output = D(fake_img).squeeze()
            g_loss = loss_func(output, real_label)

            g_optimizer.zero_grad()  # 梯度归0
            g_loss.backward()  # 进行反向传播
            g_optimizer.step()  # .step()一般用在反向传播后面,用于更新生成网络的参数

            fake_img = G(torch.rand(batchsize, 100).to(device))
            output = D(fake_img).squeeze()
            g_loss = loss_func(output, real_label)

            g_optimizer.zero_grad()  # 梯度归0
            g_loss.backward()  # 进行反向传播
            g_optimizer.step()  # .step()一般用在反向传播后面,用于更新生成网络的参数

            print('{:.5%}'.format(step / len(train_data_loader)))

        d_z_loss.append(sum_d_loss / len(train_data_loader))
        print(d_z_loss)
        if epoch % 2 == 0:
            name = 'face' + str(epoch) + '.pth'
            torch.save(G, name, _use_new_zipfile_serialization=False)

    torch.save(D, 'Generator.pth', _use_new_zipfile_serialization=False)