# Setup

## Imports

In [1]:
import os.path

from tqdm import tqdm

vai Modules

In [2]:
from vaiutils import path_consts, randpick, smooth_plot
from vainlp import extract_glove_embeddings
from vaidata import pickle_dump, pickle_load

Using TensorFlow backend.


Keras Modules

In [3]:
from keras.preprocessing.text import Tokenizer

PyTorch Modules

In [4]:
import torch

import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from torch.autograd import Variable

## Define Useful Variables and Functions

In [5]:
for k, v in path_consts(['GloVe', 'SampleText']):
    exec(k+'=v')

In [6]:
def get_text(vec):
    return ' '.join(idx_word[i] for i in vec)

## Load Data

In [7]:
with open(os.path.join(DIR_DATA['GloVe'], 'glove.6B.50d.txt')) as f:
    glove = f.readlines()

In [8]:
with open(os.path.join(DIR_DATA['SampleText'], 'styles.txt')) as f:
    data = f.read().split('.')

In [9]:
vocab_size = 2000
embedding_dim = len(glove[0].split()) - 1

In [10]:
tokenizer = Tokenizer(vocab_size)
if os.path.exists(os.path.join(DIR_CHECKPOINTS, 'glove_embeddings.p')):
    glove_embeddings = pickle_load(os.path.join(DIR_CHECKPOINTS, 'glove_embeddings.p'))
    extract_glove_embeddings(data, glove, tokenizer, return_embeddings=False)
else:
    glove_embeddings = extract_glove_embeddings(data, glove, tokenizer)
    pickle_dump(os.path.join(DIR_CHECKPOINTS, 'glove_embeddings.p'), glove_embeddings)

In [11]:
word_idx = tokenizer.word_index
idx_word = {v: k for k, v in word_idx.items()}

In [12]:
data = tokenizer.texts_to_sequences(data)

In [13]:
data = [datum for datum in data if len(datum) != 0]

In [14]:
data = [torch.from_numpy(np.array(datum, dtype=long)) for datum in data]

# Create Model

In [15]:
z_dim = 32

In [16]:
embedding_layer = nn.Embedding(vocab_size, embedding_dim).cuda()

In [17]:
embedding_layer.weight = nn.Parameter(torch.from_numpy(glove_embeddings.astype(float32)).cuda())
embedding_weights = Variable(embedding_layer.weight.data.unsqueeze(1), volatile=True)

In [18]:
class Generator(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.ConvTranspose1d(z_dim, embedding_dim*4, 3, 2, 1, 1)
        self.conv2 = nn.ConvTranspose1d(embedding_dim*4, embedding_dim*2, 3, 2, 1, 1)
        self.conv3 = nn.ConvTranspose1d(embedding_dim*2, embedding_dim, 3, 2, 1, 1)
        
    def forward(self, x):
        x = F.relu(self.conv1(x.unsqueeze(-1)))
        x = F.relu(self.conv2(x))
        x = self.conv3(x)
        return x

In [19]:
class Discriminator(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv1d(embedding_dim, embedding_dim*2, 3, 2, 1)
        self.conv2 = nn.Conv1d(embedding_dim*2, embedding_dim*4, 3, 2, 1)
        self.conv3 = nn.Conv1d(embedding_dim*4, 1, 3, 2, 1)
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = F.adaptive_avg_pool1d(x, 1).squeeze(0)
        return x

# Train Model

In [20]:
generator = Generator().cuda()

In [21]:
discriminator = Discriminator().cuda()

In [38]:
optimizer_g = optim.Adam(generator.parameters(), lr=1e-3)
optimizer_d = optim.Adam(discriminator.parameters(), lr=1e-3)

In [23]:
criterion = nn.BCEWithLogitsLoss()

In [24]:
history = {'loss_g': [], 'loss_d': []}
batches_per_epoch = len(data)

def optimize(epochs=0, writes_per_epoch=10):
    y_targets = [Variable(torch.from_numpy(np.array([i]).astype(float32)).cuda(), requires_grad=False) for i in range(2)]
    for epoch in tqdm(range(epochs)):
        for batch in range(len(data)):
            x = torch.unsqueeze(torch.transpose(embedding_layer(Variable(randpick(data).cuda(), requires_grad=False)), 0, 1), 0).cuda()
            z = Variable(torch.randn(1, z_dim).cuda(), requires_grad=False)
            
            x_gen = generator(z)
            
            d_x = discriminator(x).squeeze()
            d_x_gen = discriminator(x_gen).squeeze()
            optimizer_g.zero_grad()
            loss_g = criterion(d_x_gen, y_targets[1])
            loss_g.backward(retain_graph=True)
            optimizer_g.step()
            
            optimizer_d.zero_grad()
            loss_d = criterion(d_x, y_targets[1]) + criterion(d_x_gen, y_targets[0])
            loss_d.backward()
            optimizer_d.step()
            
            if batch % int(batches_per_epoch / (writes_per_epoch - 1)) == 0:
                history['loss_g'].append(loss_g.data.cpu().numpy()[0])
                history['loss_d'].append(loss_d.data.cpu().numpy()[0])

In [39]:
optimize(1)

100%|██████████| 1/1 [00:44<00:00, 44.84s/it]


In [None]:
smooth_plot(history, remove_outlier=False)

# Test Model

In [27]:
def sample_text():
    z = Variable(torch.randn(1, z_dim).cuda(), volatile=True)
    x_gen = generator(z)
    
    vectors = torch.transpose(x_gen, 1, 2).repeat(vocab_size, 1, 1)
    
    distances = torch.norm(vectors - embedding_weights, dim=-1)
    
    probabilities = F.softmax(-torch.transpose(distances, 0, 1))
    
    return get_text(probabilities.max(1)[1].data.cpu().numpy())

In [37]:
'.'.join(sample_text() for _ in range(10))

'his hurriedly tangible withdrew his agitated tangible tangible.his agitated tangible withdrew case hurriedly tangible tangible.his hurriedly tangible withdrew case hurriedly tangible tangible.case tangible weak withdrew his agitated tangible tangible.his agitated tangible withdrew case hurriedly tangible tangible.his hurriedly tangible withdrew in hurriedly tangible tangible.his hurriedly tangible withdrew his hurriedly tangible tangible.his agitated tangible withdrew case hurriedly tangible tangible.case hurriedly weak withdrew his agitated tangible tangible.case hurriedly tangible withdrew his hurriedly tangible tangible'