In [1]:
import torch
import torch.nn as nn
import torch.optim as optim

# Generate data

In [2]:
import random
import itertools

In [3]:
def data_labeler(seq):
    if seq.count("a") % 2 != 0:
      return 0
    return 1


def label_data_in_batch(seqs):
  labeling = lambda seq: [seq, data_labeler(seq)]
  return list(map(labeling, seqs))


def all_words_of_length(length, alphabet="ab"):
    '''Returns all possible strings given the string length and an alphabet.'''
    return [''.join(list(b)) for b 
            in itertools.product(alphabet, repeat=length)]

In [4]:
label_data_in_batch(all_words_of_length(3))

[['aaa', 0],
 ['aab', 1],
 ['aba', 1],
 ['abb', 0],
 ['baa', 1],
 ['bab', 0],
 ['bba', 0],
 ['bbb', 1]]

In [5]:
data = label_data_in_batch(all_words_of_length(10))
random.shuffle(data)

split_1 = 800
split_2 = 900

train = data[:split_1]
dev = data[split_1:split_2]
test = data[split_2:]

In [6]:
len(train), len(dev), len(test)

(800, 100, 124)

In [7]:
print("Train set examples:", train[:5])
print("Dev set examples:", dev[:5])
print("Test set examples:", test[:5])


Train set examples: [['abbabaabaa', 1], ['bbbabaabba', 1], ['baabbbbbba', 0], ['baabbaaaba', 1], ['aaabbbabba', 0]]
Dev set examples: [['baaabbabab', 0], ['ababaaaabb', 1], ['abbabbabab', 1], ['baabbabbaa', 0], ['baaabbbaaa', 1]]
Test set examples: [['abababaaba', 1], ['aabbababab', 0], ['baaaaaabbb', 1], ['abbaababba', 0], ['aabbbbbaba', 1]]


## Transform text

In [8]:
vocab_to_idx = {"a": 0, "b": 1}
idx_to_vocab = {0: "a", 1: "b"}


def tokenizer(seq):
  return list(seq)


def encode(seq, vocab_to_idx=vocab_to_idx):
  seq = tokenizer(seq)
  return [vocab_to_idx[s] for s in seq]


def decode(indices, idx_to_vocab=idx_to_vocab):
  return [idx_to_vocab[i] for i in indices]


def transform_dataset(dataset):
  X, Y = [], []

  for data in dataset:
    X.append(encode(data[0]))
    Y.append(data[1])
  
  X = torch.tensor(X)
  Y = torch.tensor(Y)
  return X, Y

In [9]:
encode("babababba"), decode(encode("babababba"))

([1, 0, 1, 0, 1, 0, 1, 1, 0], ['b', 'a', 'b', 'a', 'b', 'a', 'b', 'b', 'a'])

In [10]:
train_X, train_Y = transform_dataset(train)
dev_X, dev_Y = transform_dataset(dev)
test_X, test_Y = transform_dataset(test)

In [11]:
train_X.shape, train_Y.shape

(torch.Size([800, 10]), torch.Size([800]))

In [12]:
dev_X.shape, dev_Y.shape

(torch.Size([100, 10]), torch.Size([100]))

In [13]:
test_X.shape, test_Y.shape

(torch.Size([124, 10]), torch.Size([124]))

In [14]:
test_Y

tensor([1, 0, 1, 0, 1, 0, 1, 1, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 1,
        1, 1, 0, 0, 0, 0, 1, 0, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 1, 1, 0, 0, 1, 1,
        0, 1, 1, 0, 1, 1, 1, 1, 1, 1, 0, 0, 1, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 0,
        0, 1, 1, 0, 0, 0, 1, 1, 1, 1, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1, 1,
        1, 0, 0, 1, 1, 0, 0, 1, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 1, 0, 1, 0, 0, 0,
        0, 0, 0, 1])

# Customized functions for training 

In [15]:
import math 


def avg_accuracy(Y, logits):
  Ypred = logits.argmax(dim=1)
  return (Y == Ypred).float().mean()


def train_loop(model, X, Y, optimizer, criterion):
  model.train()

  optimizer.zero_grad()  
  logits = model(X)
  loss = criterion(logits, Y)
  loss.backward()    
  optimizer.step()

  accu = avg_accuracy(Y, logits)
  return loss.item(), accu.item()


def evaluate(model, X, Y, criterion):
  model.eval()
  logits = model(X)
  loss = criterion(logits, Y)
  accu = avg_accuracy(Y, logits)
  return loss.item(), accu.item()


def train_and_evaluate(model, train_X, train_Y, eval_X, eval_Y, 
                       optimizer, criterion, num_epoch, batch_size=None, 
                       dev_accu_threshold=1.0, print_freq=None):
  
  train_size = train_X.shape[0]

  if not batch_size: # mini-bacth
    batch_size = train_size # mini-bacth
  num_batches = math.ceil(train_size / batch_size) 

  if not print_freq:
    print_freq = print_freq = num_epoch // 20 

  for epoch in range(1, num_epoch+1): 

    for batch_i in range(num_batches): # mini-bacth
      l = batch_i * batch_size
      r = (batch_i + 1) * batch_size 
        
      train_loss, train_accu = train_loop(model, train_X[l:r], train_Y[l:r], optimizer, criterion)
        
    if epoch in [1, num_epoch+1] or epoch % print_freq == 0:   
      dev_loss, dev_accu = evaluate(model, dev_X, dev_Y, criterion)  

      print("Epoch: {}, Train loss: {}, Train accu: {}, Dev loss: {}, Dev accu: {}".format(
          epoch, train_loss, train_accu, dev_loss, dev_accu))
          
      if dev_accu >= dev_accu_threshold: # dev set used to make a traning decision 
        break

# BOW

In [16]:
class BOW(nn.Module):

  def __init__(self, in_dim, embd_dim, hid_dim, out_dim, activation_func):
    super(BOW, self).__init__()
    self.embedding = nn.Embedding(in_dim, embd_dim)
    self.linear = nn.Linear(embd_dim, hid_dim)
    self.activation = activation_func
    self.linear_out = nn.Linear(hid_dim, out_dim)

  def forward(self, X):
    # input X: (batch size, in_dim)
    # embd X: (batch size, embd_dim)
    X = self.embedding(X).sum(dim=1)

    # hid: (batch size, hid_dim)
    hid = self.activation(self.linear(X))

    # logits: (batch size, out_dim)
    logits = self.linear_out(hid)
    return logits

In [17]:
bow = BOW(9, 10, 10, 2, torch.sigmoid)

num_epoch = 5000
print_freq = num_epoch // 20 
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(bow.parameters(), lr=0.005)

train_and_evaluate(model=bow, 
                   train_X=train_X, 
                   train_Y=train_Y,
                   eval_X=dev_X, 
                   eval_Y=dev_Y, 
                   optimizer=optimizer, 
                   criterion=criterion, 
                   num_epoch=num_epoch, 
                   batch_size=None, 
                   dev_accu_threshold=1.0, 
                   print_freq=None)

Epoch: 1, Train loss: 0.7595751881599426, Train accu: 0.5024999976158142, Dev loss: 0.7650527358055115, Dev accu: 0.47999998927116394
Epoch: 250, Train loss: 0.38291680812835693, Train accu: 0.8500000238418579, Dev loss: 0.41210702061653137, Dev accu: 0.8100000023841858
Epoch: 500, Train loss: 0.19409246742725372, Train accu: 0.956250011920929, Dev loss: 0.23440556228160858, Dev accu: 0.9200000166893005
Epoch: 750, Train loss: 0.14444191753864288, Train accu: 0.956250011920929, Dev loss: 0.2026374489068985, Dev accu: 0.9200000166893005
Epoch: 1000, Train loss: 0.12524624168872833, Train accu: 0.956250011920929, Dev loss: 0.19125743210315704, Dev accu: 0.9200000166893005
Epoch: 1250, Train loss: 0.11544281989336014, Train accu: 0.956250011920929, Dev loss: 0.1866675317287445, Dev accu: 0.9200000166893005
Epoch: 1500, Train loss: 0.10968522727489471, Train accu: 0.956250011920929, Dev loss: 0.184885174036026, Dev accu: 0.9200000166893005
Epoch: 1750, Train loss: 0.10598722100257874, Trai

In [18]:
loss, accu = evaluate(bow, test_X, test_Y, criterion)
print("Test loss:", loss, "Test accu:", accu)

Test loss: 0.09316170960664749 Test accu: 0.9677419066429138


# LSTM

In [19]:
class LSTM(nn.Module):

  def __init__(self, in_dim, embd_dim, hid_dim, out_dim):
    super(LSTM, self).__init__()
    self.embedding = nn.Embedding(in_dim, embd_dim)
    self.lstm = nn.LSTM(embd_dim, hid_dim, batch_first=True)
    self.linear_out = nn.Linear(hid_dim, out_dim)

  def forward(self, X):
    # input X: (batch size, in_dim)
    # embd X: (batch size, in_dim, embd_dim)
    X = self.embedding(X) # note: no ".sum(dim=1)" here!

    # hiddens_all: (batch size, in_dim, hid_dim)
    # hiddens_final: (1, batch size, hid_dim)
    # cells_final: (1, batch size, hid_dim)
    hiddens_all, (hiddens_final, cells_final) = self.lstm(X)

    # hiddens_all: (batch size, hid_dim)
    hiddens_final = hiddens_final.squeeze(0)

    # logits: (batch size, out_dim)
    logits = self.linear_out(hiddens_final)
    return logits

In [20]:
lstm = LSTM(9, 10, 10, 2)

num_epoch = 5000
print_freq = num_epoch // 20 
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(lstm.parameters(), lr=0.005)

train_and_evaluate(model=lstm, 
                   train_X=train_X, 
                   train_Y=train_Y,
                   eval_X=dev_X, 
                   eval_Y=dev_Y, 
                   optimizer=optimizer, 
                   criterion=criterion, 
                   num_epoch=num_epoch, 
                   batch_size=None, 
                   dev_accu_threshold=1.0, 
                   print_freq=None)

Epoch: 1, Train loss: 0.6985114216804504, Train accu: 0.4975000023841858, Dev loss: 0.6940468549728394, Dev accu: 0.5199999809265137
Epoch: 250, Train loss: 0.07609043270349503, Train accu: 0.9850000143051147, Dev loss: 0.0775027871131897, Dev accu: 1.0


In [21]:
loss, accu = evaluate(lstm, test_X, test_Y, criterion)
print("Test loss:", loss, "Test accu:", accu)

Test loss: 0.0865149050951004 Test accu: 1.0


# Generalizing to unseen lengths

In [22]:
for n in list(range(1, 10)) + [11, 15]:
  gen = label_data_in_batch(all_words_of_length(n))
  gen_X, gen_Y = transform_dataset(gen)
  loss, accu = evaluate(bow, gen_X, gen_Y, criterion)
  print("N: {}; Loss: {}; Accu: {}".format(n, loss, accu))

N: 1; Loss: 2.9075703620910645; Accu: 0.5
N: 2; Loss: 2.2168965339660645; Accu: 0.5
N: 3; Loss: 1.1107935905456543; Accu: 0.875
N: 4; Loss: 3.0219905376434326; Accu: 0.5
N: 5; Loss: 3.2528226375579834; Accu: 0.34375
N: 6; Loss: 2.058683395385742; Accu: 0.40625
N: 7; Loss: 3.085594654083252; Accu: 0.5
N: 8; Loss: 5.014426231384277; Accu: 0.0625
N: 9; Loss: 1.0939418077468872; Accu: 0.5703125
N: 11; Loss: 1.5376696586608887; Accu: 0.52685546875
N: 15; Loss: 2.7590348720550537; Accu: 0.486114501953125


In [23]:
for n in list(range(1, 10)) + [11, 15]:
  gen = label_data_in_batch(all_words_of_length(n))
  gen_X, gen_Y = transform_dataset(gen)
  loss, accu = evaluate(lstm, gen_X, gen_Y, criterion)
  print("N: {}; Loss: {}; Accu: {}".format(n, loss, accu))

N: 1; Loss: 0.3029220700263977; Accu: 1.0
N: 2; Loss: 1.0024473667144775; Accu: 0.75
N: 3; Loss: 1.9268141984939575; Accu: 0.375
N: 4; Loss: 0.8764588832855225; Accu: 0.5625
N: 5; Loss: 0.22073781490325928; Accu: 0.90625
N: 6; Loss: 0.4925258159637451; Accu: 0.765625
N: 7; Loss: 2.150557279586792; Accu: 0.03125
N: 8; Loss: 3.0738096237182617; Accu: 0.00390625
N: 9; Loss: 0.4274325370788574; Accu: 0.79296875
N: 11; Loss: 0.9923348426818848; Accu: 0.52880859375
N: 15; Loss: 0.9353342056274414; Accu: 0.64361572265625


# Deployment



In [24]:
def predict(model, text):
  model.eval()

  if isinstance(text, str):
    text = [text]
  elif isinstance(text, (list, tuple)):
    assert all(isinstance(t, str) for t in text)
  else:
    raise TypeError("input text must be str or a list of strings")

  X = torch.tensor([encode(t) for t in text])
  logits = model(X)
  Ypred = logits.argmax(dim=1)
  return Ypred.numpy().tolist()

- predict a single text

In [25]:
text = "aabaabbaab"

predict(bow, text), data_labeler(text)

([1], 1)

In [26]:
text = "aabaabbaab"

predict(lstm, text), data_labeler(text)

([1], 1)

- predict a batch of texts

In [27]:
texts = all_words_of_length(3)
preds = predict(bow, texts)
fmt = "{}\t{}\t{}"
print(fmt.format("Text", "Pred", "Correct"))

for text, pred in zip(texts, preds):
  print(fmt.format(text, pred, data_labeler(text) == pred))

Text	Pred	Correct
aaa	0	True
aab	1	True
aba	1	True
abb	0	True
baa	1	True
bab	0	True
bba	0	True
bbb	0	False


In [28]:
texts = all_words_of_length(3)
preds = predict(lstm, texts)
fmt = "{}\t{}\t{}"
print(fmt.format("Text", "Pred", "Correct"))

for text, pred in zip(texts, preds):
  print(fmt.format(text, pred, data_labeler(text) == pred))

Text	Pred	Correct
aaa	1	False
aab	0	False
aba	0	False
abb	0	True
baa	0	False
bab	1	False
bba	0	True
bbb	1	True
