In [1]:
import random
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
import tqdm
import os

In [2]:
# Set random seed for reproducibility
manualSeed = 999
#manualSeed = random.randint(1, 10000) # use if you want new results
print("Random Seed: ", manualSeed)
random.seed(manualSeed)
torch.manual_seed(manualSeed)

Random Seed:  999


<torch._C.Generator at 0x7f59a83f4070>

In [3]:
# dataset
class Dataset(torch.utils.data.Dataset):
  
    def __init__(self):
        'Initialization'
        
    def __len__(self):
        'Denotes the total number of samples'
        return 1200
        #return 6 #for testing code
    
    def __getitem__(self, index):
        'Generates one sample of data'
        
        # Load data
        X = torch.load('data/true_data/{}.pt'.format(index))

        return X

In [4]:
# custom weights initialization called on netG and netD
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        nn.init.normal_(m.weight.data, 0.0, 0.02)
    elif classname.find('BatchNorm') != -1:
        nn.init.normal_(m.weight.data, 1.0, 0.02)
        nn.init.constant_(m.bias.data, 0)

In [5]:
# Generator Code

class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()
        
        self.layer0 = nn.Sequential(
            
            # input is Z, going into a convolution
            nn.ConvTranspose2d( nz, ngf * 64, kernel_size=3, stride=1, padding=0, bias=False),
            nn.BatchNorm2d(ngf * 64),
            nn.ReLU(True))
        
        self.layer1 = nn.Sequential(
            
            nn.ConvTranspose2d(ngf * 64, ngf * 32, kernel_size=3, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(ngf * 32),
            nn.ReLU(True))
            
        self.layer2 = nn.Sequential(
            
            nn.ConvTranspose2d(ngf * 32, ngf * 16, kernel_size=3, stride=2, padding=(0,1), bias=False),
            nn.BatchNorm2d(ngf * 16),
            nn.ReLU(True))
            
        self.layer3 = nn.Sequential(
            
            nn.ConvTranspose2d( ngf * 16, ngf * 8, kernel_size=3, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(ngf * 8),
            nn.ReLU(True))
        
        self.layer4 = nn.Sequential(
            
            nn.ConvTranspose2d( ngf * 8, ngf * 4, kernel_size=3, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(ngf * 4),
            nn.ReLU(True))
        
        self.layer5 = nn.Sequential(
            
            nn.ConvTranspose2d( ngf * 4, ngf * 2, kernel_size=3, stride=2, padding=(0,1), bias=False),
            nn.BatchNorm2d(ngf * 2),
            nn.ReLU(True))
            
        self.layer6 = nn.Sequential(
            
            nn.ConvTranspose2d( ngf * 2, ngf, kernel_size=3, stride=2, padding=(1,0), bias=False),
            nn.BatchNorm2d(ngf),
            nn.ReLU(True))
            
        self.layer7 = nn.Sequential(
            
            nn.ConvTranspose2d( ngf, 1, kernel_size=(4,3), stride=2, padding=1, bias=False),
            nn.Tanh()
            
            
        )

    def forward(self, x):
        
        out = self.layer0(x)
        
        out = self.layer1(out)
        
        out = self.layer2(out)
        
        out = self.layer3(out)
        
        out = self.layer4(out)
        
        out = self.layer5(out)
        
        out = self.layer6(out)
        
        out = self.layer7(out)
        
        return out

In [6]:
class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()
        
        self.main = nn.Sequential(
            # input is 1 x 330 x 261
            nn.Conv2d(1, ndf, kernel_size=3, stride=2, padding=1, bias=False),
            nn.LeakyReLU(0.2, inplace=True),
        
            
            nn.Conv2d(ndf, ndf * 2, kernel_size=3, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(ndf * 2),
            nn.LeakyReLU(0.2, inplace=True),
            
            
            
            nn.Conv2d(ndf * 2, ndf * 4, kernel_size=3, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(ndf * 4),
            nn.LeakyReLU(0.2, inplace=True),
            
            
            
            nn.Conv2d(ndf * 4, ndf * 8, kernel_size=3, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(ndf * 8),
            nn.LeakyReLU(0.2, inplace=True),
            
     
            nn.Conv2d(ndf * 8, ndf * 16, kernel_size=3, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(ndf * 16),
            nn.LeakyReLU(0.2, inplace=True),

            nn.Conv2d(ndf * 16, ndf * 32, kernel_size=3, stride=2, padding=(0,1), bias=False),
            nn.BatchNorm2d(ndf * 32),
            nn.LeakyReLU(0.2, inplace=True),
            
            nn.Conv2d(ndf * 32, ndf * 64, kernel_size=3, stride=1, padding=0, bias=False),
            nn.BatchNorm2d(ndf * 64),
            nn.LeakyReLU(0.2, inplace=True),
            
            nn.Conv2d(ndf * 64, 1, kernel_size=3, stride=1, padding=0, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        
        out = self.main(x)
        
        return out

In [7]:
# Batch size during training
batch_size = 100

# Size of z latent vector (i.e. size of generator input)
nz = 100

# Size of feature maps in generator
ngf = 128

# Size of feature maps in discriminator
ndf = 16

# Number of training epochs
num_epochs = 1

# Learning rate for optimizers
lr = 0.0002

# Beta1 hyperparam for Adam optimizers
beta1 = 0.5

In [8]:
dataset = Dataset()

dataloader = torch.utils.data.DataLoader(dataset = dataset, batch_size=batch_size,
                                         shuffle=True)

In [9]:
# Create the generator
netG = Generator()

# Apply the weights_init function to randomly initialize all weights
#  to mean=0, stdev=0.2.
netG.apply(weights_init)

# Print the model
print(netG)

# Create the Discriminator
netD = Discriminator()

# Apply the weights_init function to randomly initialize all weights
#  to mean=0, stdev=0.2.
netD.apply(weights_init)

# Print the model
print(netD)

# Initialize BCELoss function
criterion = nn.BCELoss()

# Create batch of latent vectors that we will use to visualize
#  the progression of the generator
fixed_noise = torch.randn(64, nz, 1, 1)

fixed_noise = fixed_noise.type(torch.DoubleTensor)



# Establish convention for real and fake labels during training
real_label = 1
fake_label = 0


# Setup Adam optimizers for both G and D
optimizerD = torch.optim.Adam(netD.parameters(), lr=lr, betas=(beta1, 0.999))
optimizerG = torch.optim.Adam(netG.parameters(), lr=lr, betas=(beta1, 0.999))

Generator(
  (layer0): Sequential(
    (0): ConvTranspose2d(100, 8192, kernel_size=(3, 3), stride=(1, 1), bias=False)
    (1): BatchNorm2d(8192, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
  )
  (layer1): Sequential(
    (0): ConvTranspose2d(8192, 4096, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
    (1): BatchNorm2d(4096, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
  )
  (layer2): Sequential(
    (0): ConvTranspose2d(4096, 2048, kernel_size=(3, 3), stride=(2, 2), padding=(0, 1), bias=False)
    (1): BatchNorm2d(2048, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
  )
  (layer3): Sequential(
    (0): ConvTranspose2d(2048, 1024, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
    (1): BatchNorm2d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
  )
  (layer4): Sequen

In [10]:
# Training Loop

# Lists to keep track of progress
fake_data_list = []
G_losses = []
D_losses = []


netD = netD.double()
netG = netG.double()


# For each epoch
for epoch in tqdm.tqdm(range(num_epochs)):
    
    # For each batch in the dataloader
    for i, data in enumerate(dataloader, 0):

        
        # Train D
        
        ## Train with all-real batch
        netD.zero_grad()
       
        
        b_size = data.size(0)
        print(b_size)
        label = torch.full((b_size,), real_label)
        
        data = data.type(torch.DoubleTensor)
        label = label.type(torch.DoubleTensor)
        
        # Forward pass real batch through D
        output = netD(data).view(-1)
        
        # Calculate loss on all-real batch
        errD_real = criterion(output, label)
        
        # Calculate gradients for D in backward pass
        errD_real.backward()
        
        D_x = output.mean().item()

        
        
        
        ## Train with all-fake batch
        # Generate batch of latent vectors
        noise = torch.randn(b_size, nz, 1, 1)
        
        noise = noise.type(torch.DoubleTensor)
        
        # Generate fake image batch with G
        fake = netG(noise)
        
        
        label.fill_(fake_label)
        
        # Classify all fake batch with D
        output = netD(fake.detach()).view(-1)
        
        # Calculate D's loss on the all-fake batch
        errD_fake = criterion(output, label)
        
        # Calculate the gradients for this batch
        errD_fake.backward()
        
        D_G_z1 = output.mean().item()
        
        # Add the gradients from the all-real and all-fake batches
        errD = errD_real + errD_fake
        
        # Update D
        optimizerD.step()

        
        
        # Train G
        netG.zero_grad()
        
        label.fill_(real_label)  # fake labels are real for generator cost
        
        # Since we just updated D, perform another forward pass of all-fake batch through D
        output = netD(fake).view(-1)
        
        # Calculate G's loss based on this output
        errG = criterion(output, label)
        
        # Calculate gradients for G
        errG.backward()
        
        D_G_z2 = output.mean().item()
        
        # Update G
        optimizerG.step()
        
        print('[%d/%d][%d/%d]\tLoss_D: %.4f\tLoss_G: %.4f\tD(x): %.4f\tD(G(z)): %.4f / %.4f'
                % (epoch, num_epochs, i, len(dataloader),
                    errD.item(), errG.item(), D_x, D_G_z1, D_G_z2))

        # Save Losses for plotting later
        G_losses.append(errG.item())
        D_losses.append(errD.item())

        
        
    # Check how the generator is doing by saving G's output on fixed_noise
        
    with torch.no_grad():
        fake = netG(fixed_noise).detach()
        fake_data_list.append(fake)

  0%|          | 0/1 [00:00<?, ?it/s]

100


  0%|          | 0/1 [02:39<?, ?it/s]


KeyboardInterrupt: 

In [None]:
plt.figure(figsize=(10,5))
plt.title("Generator and Discriminator Loss During Training")
plt.plot(G_losses,label="G")
plt.plot(D_losses,label="D")
plt.xlabel("iterations")
plt.ylabel("Loss")
plt.legend()
plt.show()

In [None]:
# Grab a batch of real images from the dataloader
real_batch = next(iter(dataloader_valid))

# Plot the real images
plt.axis("off")
plt.title("Real Images")
plt.imshow(real_batch[0][0])
plt.colorbar()
plt.show()

In [None]:
# Plot the fake images from the last epoch
fake_image = fake_data_list[0][0][0]
plt.axis("off")
plt.title("Fake Images")
plt.imshow(fake_image)
plt.colorbar()
plt.show()

In [12]:
import pickle
with open('fake_data_dcgan.pickle', 'wb') as f:
    pickle.dump(fake_data_list,f)

In [None]:
fake_data_list