In [9]:
# !jupyter nbconvert --to python gan-dcnn-minist-ipynb
import time

import  torch
import torch.nn as nn
import torchvision
import matplotlib.pyplot as plt
import torch.utils.data as td

This application is used to convert notebook files (*.ipynb)
        to various other formats.


Options
The options below are convenience aliases to configurable class-options,
as listed in the "Equivalent to" description-line of the aliases.
To see all configurable class-options for some <cmd>, use:
    <cmd> --help-all

--debug
    set log level to logging.DEBUG (maximize logging output)
    Equivalent to: [--Application.log_level=10]
--show-config
    Show the application's configuration (human-readable format)
    Equivalent to: [--Application.show_config=True]
--show-config-json
    Show the application's configuration (json format)
    Equivalent to: [--Application.show_config_json=True]
--generate-config
    generate default config file
    Equivalent to: [--JupyterApp.generate_config=True]
-y
    Answer yes to any questions instead of prompting.
    Equivalent to: [--JupyterApp.answer_yes=True]
--execute
    Execute the notebook prior to export.
    Equivalent to: [--ExecutePr



定义变量

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
num_works = 4
batch_size = 64
input_dim= 100
epochs = 25
print("run on ",device.type)

run on  cuda


In [4]:
from typing import Iterable


# loading data
def loading_minist_data(batch_siz:int, works:int)->(Iterable,Iterable):

    train_set = torchvision.datasets.MNIST(root="./data",train=True,transform=torchvision.transforms.ToTensor(),download=True)
    test_set = torchvision.datasets.MNIST(root="./data",train=False,transform=torchvision.transforms.ToTensor(),download=True)
    return td.DataLoader(dataset=train_set, batch_size=batch_siz, shuffle=True, num_workers=works,drop_last=True), td.DataLoader(dataset=test_set, batch_size=batch_siz, shuffle=True, num_workers=works,drop_last=True)

train_iter ,test_iter = loading_minist_data(batch_size,4)


In [5]:
print("train_iter len ",len(train_iter))
print("all size ",len(train_iter)*batch_size)

train_iter len  937
all size  59968


In [6]:
class View(nn.Module):
    def __init__(self, shape):
        super().__init__()
        self.shape = shape,

    def forward(self, x):
        return x.view(*self.shape)

class Discriminator(nn.Module):
    def __init__(self):
        super().__init__()
        self.model = nn.Sequential(
            nn.Conv2d(1,32,kernel_size=3,stride=2,padding=1),
            nn.LeakyReLU(0.01),
            nn.Conv2d(32,64,kernel_size=3,padding=1,stride=2),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(0.01),
            nn.Conv2d(64,128,kernel_size=3,stride=2),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.01),
            nn.Flatten(),
            nn.Linear(128*3*3,1),
            nn.Sigmoid(),
        ).to(device)
        self.cnt=0
        self.running_loss=0
        self.loss_func = nn.BCELoss()
        self.loss_metric=[]
        self.optimizer = torch.optim.Adam(self.parameters(),lr=0.001)
        pass
    def forward(self,inputs):
        for op in  self.model:
            # print(inputs.shape)
            inputs = op(inputs)
        return inputs
    def train(self,inputs,targets):
        out = self.forward(inputs)
        loss = self.loss_func(out,targets)
        self.running_loss += loss.detach().item()
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        pass
    def evaluate_loss(self):
        ts = self.running_loss/(len(train_iter)*batch_size)
        self.loss_metric.append(ts)

        self.running_loss=0

        pass
class Generator(nn.Module):
    def __init__(self,in_dim:int):
        super(Generator, self).__init__()
        self.model = nn.Sequential(
            # nn.Linear(in_dim, 200),
            # nn.LeakyReLU(0.02),
            # # nn.LayerNorm(200),
            # nn.LayerNorm(200),
            # nn.Linear(200, 784),
            # nn.ReLU(True),
            # nn.Linear(256,28*28),
            # nn.Tanh()
            nn.Linear(in_dim,256*7*7),
            View( shape=(batch_size,256,7,7)),
            # 256*7*7 -> 128*14*14
            # 7 + (2-1)*6 +
            nn.ConvTranspose2d(256,128,kernel_size=4,stride=2,padding=1),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.01),
            nn.ConvTranspose2d(128,64,kernel_size=3,stride=1,padding=1),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(0.01),
            nn.ConvTranspose2d(64,1,kernel_size=4,padding=1,stride=2),
            nn.Tanh()
        ).to(device)
        self.optimiser = torch.optim.Adam(self.parameters(), lr=0.0001)
        # counter and accumulator for progress
        self.loss_metric = []
        self.running_loss =0
    def forward(self,inputs):
        for op in  self.model:
            # print(inputs.shape)
            inputs = op(inputs)
        return  inputs
    def train(self,D:Discriminator,inputs,targets):
        gen_out = self.forward(inputs)
        # print(gen_out.shape)
        d_out = D.forward(gen_out)
        loss  = D.loss_func(d_out,targets)
        self.running_loss += loss.detach().item()
        # print(self.running_loss)
        self.optimiser.zero_grad()
        loss.backward()
        self.optimiser.step()
    def evaluate_loss(self):
        print("G running loss ",self.running_loss)
        ts = self.running_loss/(len(train_iter)*batch_size)
        self.loss_metric.append(ts)
        self.running_loss=0
        pass
def generate_random_seed(size):
    random_data = torch.randn(size*batch_size).view(batch_size,size)
    return random_data.to(device)


In [7]:
G = Generator(in_dim=input_dim)
G.forward(generate_random_seed(input_dim))

KeyboardInterrupt: 

In [None]:
D = Discriminator()

for k in  range(3):
    for it,_ in train_iter:
        it = it.to(device)
        y = torch.ones(size=(64,1),dtype=torch.float32)
        y = y.to(device)
        D.train(it,y)
    # print(len(train_iter)*batch_size)
    print(D.running_loss/(len(train_iter)*batch_size))
    D.running_loss =0

In [None]:
G = Generator(input_dim)

print("generate nums is run on ",generate_random_seed(input_dim).device)

output = G.forward(generate_random_seed(input_dim))
# img = output.detach().numpy().reshape(28,28)
plt.imshow(output.cpu().detach().numpy()[0][0], interpolation='none', cmap='Blues')

In [None]:
%%time
D = Discriminator()
G = Generator(input_dim)
for i in range(epochs):
    print("epoch  {} of {}  ".format(i+1,epochs))
    time_start = time.time()
    for tr_item,_ in train_iter:
        tr_item = tr_item.to(device)
        true_label = torch.ones(size=(batch_size,1),dtype=torch.float32).to(device)
        fake_label = torch.zeros(size=(batch_size,1),dtype=torch.float32).to(device)
        D.train(tr_item,true_label)

        D.train(G.forward(generate_random_seed(input_dim)).detach() , fake_label)
        #
        G.train(D,generate_random_seed(input_dim),true_label)
    D.evaluate_loss()
    G.evaluate_loss()
    print("spend time {}   D loss {} ,G loss {} ".format(time.time()-time_start,D.loss_metric[i],G.loss_metric[i]))

        # break

In [None]:
output = G.forward(generate_random_seed(input_dim))
img = output.detach().cpu().numpy().reshape(64,28,28)
plt.imshow(img[9], interpolation='none', cmap='Blues')

In [None]:
f,axarr = plt.subplots(2,3, figsize=(16,8))
for i in range(2):
    for j in range(3):
        output = G.forward(generate_random_seed(input_dim))
        img = output.detach().cpu().numpy()[i*3+j].reshape(28,28)
        axarr[i,j].imshow(img, interpolation='none', cmap='Blues')

# for it,_ in train_iter:
    # plt.imshow(it[0].reshape(28,28),cmap="Blues")
    # break