<a href="https://colab.research.google.com/github/jHellmundt/CogSci-Testat_Mel-Jo/blob/master/IANNWTF_HW10.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [11]:
import numpy as np
#%tensorflow_version 2.x
import tensorflow as tf
import tensorflow_datasets as tfds
import matplotlib.pyplot as plt
import re
import math
from tensorflow.keras import layers
from collections import Counter
from time import perf_counter

In [12]:
# The number of the most common words to keep
NUM_WORDS = 10000 #@param

# Only even word windows allowed (will be downscaled to the next even number)
WORD_WIN = 4 #@param
BATCH_SIZE = 128 #@param


In [13]:
# Since tfds datasets don't make any sense we get the data in the form a normal person would get it -> txt
path_to_file = tf.keras.utils.get_file('shakespeare.txt', 'https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt')
data_og = open(path_to_file, 'rb').read().decode(encoding='utf-8')

# Make the whole data lowercase
data = data_og.lower()

# Remove new-line marks and all punctuation
data = re.sub("\n+", " ", data)
data = re.sub(r"[.|,|;|:|!|?|\-\-]", "", data)

# Tokenize
data = re.split(r"\ +", data[:-1])

# Take only the n most common words
word_counts = Counter(data)
total_word_count = len(data)
token = np.array(word_counts.most_common(NUM_WORDS))[:,0]

# Create unigrams which contain the probability of each token based on word frequency
# This is later used for Negative Sampling
unigram = np.array(word_counts.most_common(NUM_WORDS))[:,1].astype("int32")
unigram = list(unigram / np.sum(unigram))

# Create token to ID and ID to token Dictionaries
token2id = dict(zip(np.concatenate([["<UNK>"], token]), range(len(token))))
id2token = dict([(token2id[token], token) for token in token2id.keys()])

In [14]:
def subsampler(word, s=0.001):
  freq = word_counts[word] / total_word_count
  prob = (math.sqrt(freq/s) + 1) * (s/freq)
  return np.random.random() <= prob

# Define the relative word window ids
word_win_ids = np.array([[x,-x] for x in range(1,int(WORD_WIN/2)+1)]).reshape(-1)

# Create the training data using the word window
data_train = []
for i in range(len(data)):
  for j in word_win_ids:
    try:
      if(subsampler(data[i+j]) and i+j >= 0 and i+j <= len(data)):
        data_train.append((token2id[data[i]], token2id[data[i+j]]))
    except:
      pass

data_train = np.array(data_train)

In [15]:
# Create a Tensorflow Dataset for Training the SKIP-GRAM
data_train = tf.data.Dataset.from_tensor_slices((data_train[:,0], data_train[:,1]))
data_train = data_train.shuffle(1000).batch(BATCH_SIZE)

In [16]:
class SkipGram(layers.Layer):
  def __init__(self, e_size, v_size):
    super(SkipGram, self).__init__()

    self.e_size = e_size
    self.v_size = v_size

  def build(self, _):
    self.embedding_matrix = self.add_weight(
                              shape=(self.v_size, self.e_size),
                              initializer='RandomNormal'
                            )
    self.score_matrix = self.add_weight(
                          shape=(self.v_size, self.e_size),
                          initializer='RandomNormal'
                        )
    self.score_bias = self.add_weight(
                        shape=(self.v_size),
                        initializer='zeros'
                      )

  #@tf.function
  def call(self, target, context):
    batch_size = tf.shape(context)[0]
    context = tf.reshape(context, (batch_size, 1))

    target_embedding = tf.nn.embedding_lookup(self.embedding_matrix, target)
    target_embedding = tf.reshape(target_embedding, shape=(batch_size, self.e_size))

    loss = tf.reduce_mean(
        tf.nn.nce_loss(weights=self.score_matrix,
                       biases=self.score_bias,
                       labels=context,
                       inputs=target_embedding,
                       num_sampled = 1,
                       num_classes = self.v_size,
                       num_true=1,
                       sampled_values=tf.random.fixed_unigram_candidate_sampler(
                           true_classes=context, # Maybe use matrix of all words that ever appeared next to the target word
                           num_true=1,
                           num_sampled=1,
                           unique=False,
                           range_max=self.v_size,
                           unigrams=unigram
                       ))
    )

    return loss

In [17]:
def train_step(model, target, context, optimizer):
  # Train the model using gradient tape and return the loss for visualisation
  with tf.GradientTape() as tape:
    loss = model(target, context)
    gradients = tape.gradient(loss, model.trainable_variables)

  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  return loss 

def nearest_neighbours(target, k, embedding_matrix):
  cos_sims = []
  target_embedding = tf.nn.embedding_lookup(embedding_matrix, token2id[target])
  for e in embedding_matrix.numpy():
    cos_sims.append(np.dot(target_embedding, e) / (np.linalg.norm(target_embedding) * np.linalg.norm(e)))
  k_best_neighbours = np.argsort(-cos_sims)[1:k+1]

  return [id2token[n] for n in k_best_neighbours]

def train_model(num_epochs, learning_rate, model, test_tokens, num_neighbours):
  tf.keras.backend.clear_session()

  running_average_factor = 0.95

  optimizer = tf.keras.optimizers.Adam(learning_rate)

  train_losses = []
  
  # TODO: What are these warnings??
  tf.get_logger().setLevel("ERROR")
  e = []
  # Train the model (record the time as well for performance judgements)
  for epoch in range(1, num_epochs + 1):
      start = perf_counter()

      average = []
      for (target, context) in data_train:
          train_loss = train_step(model, target, context, optimizer)
          average.append(train_loss)
          
      train_losses.append(np.mean(average))

      print(f"Epoch #{epoch}:" + " " * (len(str(num_epochs)) - len(str(epoch))) + f"Loss: {'{0:.3f}'.format(round(float(train_losses[-1]), 3))}  Time: {'{0:.2f}'.format(round(perf_counter() - start, 2))}s")
      for token in test_tokens:
        print(f"  {token}: {', '.join(nearest_neighbours(token, num_neighbours, model.embedding_matrix))}")
      e.append(model.embedding_matrix)
      
  return train_losses, e

def plot_learning(train_losses, num_epochs):
  # draw the loss plot
  line1, = plt.plot(train_losses)
  plt.xlabel("Epochs")
  plt.ylabel("Loss")
  plt.show()

In [18]:
num_epochs =  3#@param
learning_rate = 1 #@param
embedding_size = 64 #@param
num_best_neighbours = 5 #@param

test_tokens = ["queen", "throne", "wine", "poison", "love", "strong", "day"]

In [None]:
tf.keras.backend.clear_session()

model = SkipGram(embedding_size, NUM_WORDS)

train_losses, e = train_model(num_epochs, learning_rate, model, test_tokens, num_best_neighbours)

plot_learning(train_losses, num_epochs)

In [None]:
# To show you whats going wrong here are the embedding matrices of each epoch
for i in e:
  print(e)