# Gated Convolutional Networks

Paper link: https://arxiv.org/pdf/1612.08083.pdf

I like the ideas of stacked convolutions which is better explained by the GIF below.

<img src="gcnn.gif">

In [None]:
import collections
import random

import torch
import torch.nn as nn
import torch.nn.functional as F

from utils import read_words, create_batches, to_var

The idea here is to create a stack of convolution layers depending upon the sequence length.
Here I have zero-padded the sentence with $k - 1$ times where $k$ is the kernel size.
That way it does not see the initial words like RNNs.

There will be two sets of convolution operations.

One is defined for gates ($A$) and other is defined without gates ($B$).

Once stack convolutions are done, we will concatenate those two operations with gating.

$h_i = A_i \times \sigma(B_i)$ for every $i$ layer.

Here, $\times$ is element-wise multiplication and $\sigma$ is sigmoid.

To avoid loss of gradients, I have also included the idea of resnets here as well.

In [None]:
class GatedConvNet(nn.Module):
    
    def __init__(self, seq_len, vocab_size, embed_size, n_layers, kernel_size, num_layers, res_block_count):
        super(GatedConvNet, self).__init__()
        
        # create embedding matrix
        self.embedding = nn.Embedding(vocab_size, embed_size)
        
        # first entry of convolutions
        self.first_conv = nn.Conv2d(1, num_filters, kernel, padding=(2, 0))
        
        # add bias to first convolutional layer
        self.bias_first_conv = nn.Parameter(torch.randn(1, num_filters, 1, 1))
        
        # first entry of convolutions with gate
        self.first_conv_gated = nn.Conv2d(1, num_filters, kernel, padding=(2, 0))
        
        # add bias to first convolutional layer with gate
        self.bias_first_conv_gated = nn.Conv2d(1, num_filters, kernel, padding=(2, 0))
        
        # define function for convolution stack
        self.convolve = nn.ModuleList([nn.Conv2d(num_filters, num_filters, (kernel[0], 1), padding=(2, 0)) for _ in range(n_layers)])
        
        # define bias for convolution stack
        self.bias_convolve = nn.ParameterList([nn.Parameter(torch.randn(1, num_filters, 1, 1)) for _ in range(n_layers)])
        
        # define function for convolution stack with gate
        self.convolve_gate = nn.ModuleList([nn.Conv2d(num_filters, num_filters, (kernel[0], 1), padding=(2, 0)) for _ in range(n_layers)])
        
        # define bias for convolution stack with gate
        self.bias_convolve_gate = nn.ModuleList([nn.Conv2d(num_filters, num_filters, (kernel[0], 1), padding=(2, 0)) for _ in range(n_layers)])

        # final decoder to vocab size
        self.fc = nn.Linear(num_filters * seq_len, vocab_size)
        
    def forward(self, x):
        batch_size = x.size(0)
        sequence_length = x.size(1)
        x = self.embedding(x)
        
        # add another dimension
        x = x.unsqueeze(1)
        
        # do the convolution and add the bias
        without_gate = self.first_conv(x)
        without_gate += self.bias_first_conv.repeat(1, 1, seq_len, 1)
        
        # repeating the bias tensor seq_len times
        
        with_gate = self.first_conv_gated(x)
        with_gate += self.bias_first_conv_gated(1, 1, seq_len, 1)
        
        h = without_gate * F.sigmoid(with_gate)
        res_input = h 
        
        # applying idea of resnets here

        for i, (conv, conv_gate) in enumerate(zip(self.convolve, self.convolve_gate)):
            A = conv(h) + self.bias_convolve[i].repeat(1, 1, seq_len, 1)
            B = conv_gate(h) + self.bias_convolve_gate[i].repeat(1, 1, seq_len, 1)
            h = A * F.sigmoid(B) 
            if i % self.res_block_count == 0: # size of each residual block
                h += res_input
                res_input = h

        h = h.view(batch_size, -1) 
        out = self.fc(h) 
        out = F.log_softmax(out, dim = 1)

        return out

In [None]:
vocab_size      = 2000
embed_size       = 200
seq_len         = 17
n_layers        = 10
kernel_size     = (5, embed_size)
num_filters     = 64
res_block_count = 5
batch_size      = 64

Read the file and encode the words. Finally split the dataset.

In [None]:
words = read_words('./data', seq_len, kernel[0])

# encoding matrix, extract most popular words
word_counter = collections.Counter(words).most_common(vocab_size - 1)
vocab = [w for w, _ in word_counter]

# assign word IDs
word_ids = dict((w, i) for i, w in enumerate(vocab, 1))
word_ids['<unk>'] = 0

# read files
data = [word_ids[w] if w in word_ids else 0 for w in words]
data = create_batches(data, batch_size, seq_len)
split_idx = int(len(data) * 0.8)
training_data = data[:split_idx]
test_data = data[split_idx:]

## Train Loop

In [None]:
def train(model, data, test_data, optimizer, loss_fn, epochs = 10):
    model.train()
    for epoch in range(epochs):
        print('Epoch', epoch)
        random.shuffle(data)
        for batch_ct, (X, Y) in enumerate(data):
            X = to_var(torch.LongTensor(X)) 
            Y = to_var(torch.LongTensor(Y)) 
            pred = model(X) 
            loss = loss_fn(pred, Y)
            
            if batch_ct % 100 == 0:
                print('Training Loss: {:.4f} Perplexity: {:.4f}'.format(loss.item(), np.exp(loss.item())))

            model.zero_grad()
            loss.backward()
            optimizer.step()
        print('Test set performance', epoch)
        test(model, test_data)

## Test Loop

In [None]:
def test(model, data):
    model.eval()
    counter = 0
    correct = 0
    losses = 0.0
    for batch_ct, (X, Y) in enumerate(data):
        X = to_var(torch.LongTensor(X)) 
        Y = to_var(torch.LongTensor(Y)) 
        pred = model(X) # 
        loss = loss_fn(pred, Y)
        losses += torch.sum(loss).item() # Accumulative averages
        _, pred_ids = torch.max(pred, 1)
        print('Loss: {:.4f}'.format(loss.item()))
        correct += torch.sum(pred_ids == Y).item()
        counter += 1

    loss = losses/counter
    ppl = np.exp(loss)
    print('Test Loss: {:.4f} Perplexity: {:.4f}'.format(losses/counter, ppl))

In [None]:
model = GatedCNN(seq_len, vocab_size, embed_size, n_layers, kernel_size, num_layers, res_block_count)
optimizer = torch.optim.Adadelta(model.parameters())
loss_fn = nn.CrossEntropyLoss()
train(model, training_data, test_data, optimizer, loss_fn)

After 10 epochs, we get around 105.03 perplexity.

I am still unclear from the paper as how to approach for variable lengths.

If I submitted a paper like this in ACL, it will be outright rejected for "lack of motivation".