In [None]:
# install necessary packages using pip
!pip install keras numpy



In [None]:
!wget -cq http://www.hlt.utdallas.edu/~moldovan/CS6320.20F/train.zip
!unzip -qq train.zip

In [2]:
import os
import gzip
def load_corpus(path):
  if not os.path.isdir(path):
    sys.exit("Input path is not a directory")
  tuplesList = []
  tagSet = ['NOUN', 'PRONOUN', 'VERB', 'ADVERB', 'ADJECTIVE', 'CONJUNCTION','PREPOSITION', 'DETERMINER', 'NUMBER', 'PUNCT', 'X']
  for filename in os.listdir(path):
    filename = os.path.join(path, filename)
    try:
      with gzip.open(filename, 'rt') as reader:
        lines = reader.read().splitlines()
        for line in lines:
          lineTuples = []
          items = line.split()
          if(len(items) > 0):
            for item in items:
              [token, tag] = item.split('/')
              tag = tag if tag in tagSet else 'X'
              lineTuples.append((token.lower(), tag))
            tuplesList.append(lineTuples)
    except IOError:
      sys.exit("Cannot read file")
  return tuplesList


# test the function here:
path = "/content/train" # fill in the path
data = load_corpus(path)
print (data[0])

[('miraculously', 'ADVERB'), (',', 'PUNCT'), ('she', 'PRONOUN'), ('found', 'VERB'), ('exactly', 'ADVERB'), ('the', 'DETERMINER'), ('right', 'ADJECTIVE'), ('statement', 'NOUN'), ('.', 'PUNCT')]


In [3]:
import numpy as np # convert lists to np arrays before returning them

def create_dataset(sentences):
  train_X, train_y = list(), list()
  word2idx, tag2idx = dict(), dict() # dictionaries that will provide word/tag to integer mapping
  """
  Construct two lists: train_X and train_y that will be used to train your RNN. Input to the function will be the output of previous function
  """
  wordSet = set()
  tagSet = set()
  for line in sentences:
    for item in line:
      wordSet.add(item[0])
      tagSet.add(item[1])

  for idx, word in enumerate(wordSet,1):
    word2idx[word] = idx
  for idx, tag in enumerate(tagSet,1):
    tag2idx[tag] = idx

  word2idx['[PAD]'] = 0
  tag2idx['[PAD]'] = 0
  
  for line in sentences:
    x = []
    y = []
    for item in line:
      x.append(word2idx[item[0]])
      y.append(tag2idx[item[1]])
    train_X.append(x)
    train_y.append(y)

  train_X= np.array(train_X)
  train_y = np.array(train_y)
  idx2tag = {}
  for t in tag2idx:
    idx2tag[tag2idx[t]] = t


  return train_X, train_y, word2idx, tag2idx, idx2tag # you may also want to output the word and tag dictionaries created for evaluation

# test the function
train_X, train_y, word2idx, tag2idx, idx2tag = create_dataset(data)
print (train_X[0], train_y[0])
print(tag2idx)


[901, 22371, 48670, 44751, 33947, 11542, 18276, 6605, 37936] [1, 7, 9, 3, 1, 6, 2, 11, 7]
{'ADVERB': 1, 'ADJECTIVE': 2, 'VERB': 3, 'X': 4, 'NUMBER': 5, 'DETERMINER': 6, 'PUNCT': 7, 'CONJUNCTION': 8, 'PRONOUN': 9, 'PREPOSITION': 10, 'NOUN': 11, '[PAD]': 0}


In [4]:
from keras.preprocessing.sequence import pad_sequences as ps
def pad_sequences(train_X, train_y):
  """
  Use keras's pad_sequences method to pad zeros to each list within both lists of lists. You can define any large value as the max length 
  or use the length of the largest sequence in the entire corpus to be the max length.
  """
  train_X = ps(train_X, padding='post', value = 0)
  train_y = ps(train_y, padding='post', value= 0)
  MAX_LENGTH = len(train_X[0])
  return train_X, train_y, MAX_LENGTH
train_X, train_y, MAX_LENGTH = pad_sequences(train_X, train_y)
print(len(train_X[0]))
print(len(train_y[5]))
print(MAX_LENGTH)


180
180
180


In [5]:
from keras.models import Sequential
from keras.layers import InputLayer, Activation, Embedding, Bidirectional, LSTM, Dense, TimeDistributed
from keras.optimizers import Adam

def define_model(MAX_LENGTH):  
  model = Sequential()
  model.add(InputLayer(input_shape=(MAX_LENGTH, ))) # MAX_LENGTH is the max length of each sequence, as output by previous method
  model.add(Embedding(len(word2idx), 128, input_length=MAX_LENGTH))
  model.add(Bidirectional(LSTM(256, return_sequences=True)))
  model.add(TimeDistributed(Dense(len(tag2idx))))

  """ 
  Add your layers here:

  """
  model.add(Activation('softmax'))
  
  model.compile(loss='categorical_crossentropy', optimizer=Adam(0.001), metrics=['accuracy'])
  
  print (model.summary())
  return model

# call the function here

model = define_model(MAX_LENGTH)

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
embedding (Embedding)        (None, 180, 128)          6366976   
_________________________________________________________________
bidirectional (Bidirectional (None, 180, 512)          788480    
_________________________________________________________________
time_distributed (TimeDistri (None, 180, 12)           6156      
_________________________________________________________________
activation (Activation)      (None, 180, 12)           0         
Total params: 7,161,612
Trainable params: 7,161,612
Non-trainable params: 0
_________________________________________________________________
None


In [6]:
import numpy as np
from keras.utils import to_categorical as tc 
def to_categorical(sequences, categories = 11):
    """
    one hot encode your tags
    POS tag list = [1, 2, 1, 3]
    One-hot encoded list = [[1, 0, 0], [0, 1, 0], [1, 0, 0], [0, 0, 1]]
    """
    return tc(np.array(sequences), categories)


# call the function here
train_y = to_categorical(train_y, categories = len(tag2idx))
print(train_y[0])
print(len(train_y[0][0]))

[[0. 1. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 1. 0. 0.]
 ...
 [1. 0. 0. ... 0. 0. 0.]
 [1. 0. 0. ... 0. 0. 0.]
 [1. 0. 0. ... 0. 0. 0.]]
12


In [7]:
def train(model, train_X, train_y):
  """
  train the model here calling fit(). If you don't want to see the training logs, you can set verbose to False.
  """
  model.fit(train_X, train_y, batch_size=128, epochs=40, validation_split=0.2 )
  return model

model = train(model, train_X, train_y)

Epoch 1/40
Epoch 2/40
Epoch 3/40
Epoch 4/40
Epoch 5/40
Epoch 6/40
Epoch 7/40
Epoch 8/40
Epoch 9/40
Epoch 10/40
Epoch 11/40
Epoch 12/40
Epoch 13/40
Epoch 14/40
Epoch 15/40
Epoch 16/40
Epoch 17/40
Epoch 18/40
Epoch 19/40
Epoch 20/40
Epoch 21/40
Epoch 22/40
Epoch 23/40
Epoch 24/40
Epoch 25/40
Epoch 26/40
Epoch 27/40
Epoch 28/40
Epoch 29/40
Epoch 30/40
Epoch 31/40
Epoch 32/40
Epoch 33/40
Epoch 34/40
Epoch 35/40
Epoch 36/40
Epoch 37/40
Epoch 38/40
Epoch 39/40
Epoch 40/40


In [10]:
from keras.preprocessing.sequence import pad_sequences as ps
import numpy as np
import sys
def test(model, sentence):
  """
  Take in a sentence as input and outputs its POST tags. 
  """
  input = []
  for token in sentence:
    if token not in word2idx:
      sys.exit('Token "'+ token + '" not in training corpus')
    input.append(word2idx[token])
  test_X = []
  test_X.append(input)
  test_X = ps(np.array(test_X), padding='post', value = 0, maxlen=MAX_LENGTH)
  output = model.predict(test_X)

  result = []
  for idx in range(0, len(sentence)):
    result.append(idx2tag[np.argmax(output[0][idx])])
  return result     

sentence1 = ["the", "secretariat", "is", "expected" ,"to" ,"race" ,"tomorrow", "." ]
tags = test(model, sentence1)
print (tags)
sentence2 = ["people","continue", "to", "inquire", "the", "reason", "for", "the" ,"race", "for", "outer", "space", "."]
tags = test(model, sentence2)
print (tags)
s3 = ["people", "race","tomorrow", "."]
tags = test(model,s3)
print(tags)

['DETERMINER', 'NOUN', 'VERB', 'VERB', 'X', 'VERB', 'NOUN', 'PUNCT']
['NOUN', 'VERB', 'X', 'VERB', 'DETERMINER', 'NOUN', 'PREPOSITION', 'DETERMINER', 'NOUN', 'PREPOSITION', 'ADJECTIVE', 'NOUN', 'PUNCT']
['NOUN', 'VERB', 'NOUN', 'PUNCT']
