# C-RNN-GAN
http://mogren.one/publications/2016/c-rnn-gan/mogren2016crnngan.pdf

In [41]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
# https://github.com/cjbayron/c-rnn-gan.pytorch/blob/master/train_simple.py

In [42]:
# torch.cuda.is_available() checks and returns a Boolean True if a GPU is available, else it'll return False
is_cuda = torch.cuda.is_available()

# If we have a GPU available, we'll set our device to GPU. We'll use this device variable later in our code.
if is_cuda:
    device = torch.device("cuda")
    print("GPU is available")
else:
    device = torch.device("cpu")
    print("GPU not available, CPU used")

GPU is available


In [43]:
class Generator(nn.Module):
    def __init__(self, features, hidden_size):
        super(Generator, self).__init__()
        
        self.hidden_size = hidden_size
        self.features = features
        
        self.fc1 = nn.Linear(in_features=(features*2), out_features=hidden_size)
        self.lstm1 = nn.LSTMCell(input_size=hidden_size, hidden_size=hidden_size)
        self.dropout = nn.Dropout(p=0.6)
        self.lstm2 = nn.LSTMCell(input_size=hidden_size, hidden_size=hidden_size)
        self.fc2 = nn.Linear(in_features=hidden_size, out_features=features)
        
    def forward(self, z, states):
        z = z.to(device)
        batch_size, seq_len, num_feats = z.shape
        z = torch.split(z, 1, dim=1)
        z = [z_step.squeeze(dim=1) for z_step in z]
        
        prev_gen = torch.empty([batch_size, num_feats]).uniform_()
        prev_gen = prev_gen.to(device)
        
        state1, state2 = states
        gen_feats = []
        for z_step in z:
            concat_in = torch.cat((z_step, prev_gen), dim=-1)
            out = F.relu(self.fc1(concat_in))
            h1, c1 = self.lstm1(out, state1)
            h1 = self.dropout(h1)
            h2, c2 = self.lstm2(h1, state2)
            prev_gen = self.fc2(h2)
            gen_feats.append(prev_gen)
            state1 = (h1, c1)
            state2 = (h2, c2)
        
        # seq_len * (batch_size * num_feats) -> (batch_size * seq_len * num_feats)
        gen_feats = torch.stack(gen_feats, dim=1)
        
        states = (state1, state2)
        return gen_feats, states

    def init_hidden(self, batch_size):
        weight = next(self.parameters()).data
        
        hidden = ( (weight.new(batch_size, self.hidden_size).zero_().to(device),
                   weight.new(batch_size, self.hidden_size).zero_().to(device)),
                   (weight.new(batch_size, self.hidden_size).zero_().to(device),
                   weight.new(batch_size, self.hidden_size).zero_().to(device)) )

        return hidden

In [47]:
class Discriminator(nn.Module):
    def __init__(self, features, hidden_size):
        super(Discriminator, self).__init__()
        
        self.hidden_size = hidden_size
        self.num_layers = 2
        self.dropout = nn.Dropout(p=.5)
        self.lstm = nn.LSTM(input_size=features, hidden_size=hidden_size,
                           num_layers=self.num_layers, batch_first=True, dropout=0.5,
                           bidirectional=True)
    
        self.fc = nn.Linear(in_features=(2*hidden_size), out_features=1)
        
    def forward(self, sequence, state):
        sequence = sequence.to(device)
        drop_in = self.dropout(sequence)
        
        lstm_out, state = self.lstm(drop_in, state)
        out = self.fc(lstm_out)
        out = torch.sigmoid(out)
        
        num_dims = len(out.shape)
        reduction_dims = tuple(range(1, num_dims))
        out = torch.mean(out, dim=reduction_dims)
        
        return out, lstm_out, state
    
    def init_hidden(self, batch_size):
        weight = next(self.parameters()).data
        layer_mult = 2
        
        hidden = (weight.new(self.num_layers * layer_mult, batch_size, self.hidden_size).zero_().to(device),
                 weight.new(self.num_layers * layer_mult, batch_size, self.hidden_size).zero_().to(device))
        
        return hidden

class DLoss(nn.Module):
    ''' C-RNN-GAN discriminator loss
    '''
    def __init__(self):
        super(DLoss, self).__init__()

    def forward(self, logits_real, logits_gen):
        ''' Discriminator loss
        logits_real: logits from D, when input is real
        logits_gen: logits from D, when input is from Generator
        '''
        logits_real = torch.clamp(logits_real, EPSILON, 1.0)
        d_loss_real = -torch.log(logits_real)

        logits_gen = torch.clamp((1 - logits_gen), EPSILON, 1.0)
        d_loss_gen = -torch.log(logits_gen)

        batch_loss = d_loss_real + d_loss_gen
        return torch.mean(batch_loss)

In [None]:
d_logits_gen, _, _ = dmodel(g_feats, d_state)

# Training Examples

Training, trying to output numbers with the function `f(n) = 2*f(n-1)`

In [None]:
from torch.utils.data import TensorDataset, DataLoader
from torch import optim
npdata = np.stack([2 ** np.arange(10)[:, np.newaxis] * np.random.rand() for i in range(10)])

In [None]:
data = TensorDataset(torch.from_numpy(npdata))

In [None]:
dataloader = DataLoader(data, shuffle=True)

In [None]:
list(enumerate(dataloader))[3]

Each element of data loader is a sequence. Thats it We just put this into a function to make a train set and validation set easily

In [None]:
def dummy_dataloader(seq_len, batch_size, num_sample):
    ''' Dummy data generator (for debugging purposes)
    '''
    # the following code generates random data of numbers
    # where each number is twice the prev number
    np_data = np.stack([(2 ** np.arange(seq_len))[:, np.newaxis] \
                        * np.random.rand() for i in range(num_sample)])

    data = TensorDataset(torch.from_numpy(np_data))
    return DataLoader(data, shuffle=True, batch_size=batch_size)

In [None]:
trn_dataloader = dummy_dataloader(20, 13, 7)
val_dataloader = dummy_dataloader(20, 13, 4)

In [46]:
model = {
    'g': Generator(features=1, hidden_size=100),
    'd': Discriminator(features=1, hidden_size=100)
}

In [50]:
G_LRN_RATE = 0.001
D_LRN_RATE = 0.001
optimizer = {
    'g': optim.Adam(model['g'].parameters(), G_LRN_RATE),
    'd': optim.Adam(model['d'].parameters(), D_LRN_RATE)
}
criterion = {
    'g': nn.MSELoss(reduction='sum'),
    'd': DLoss()
}