In [None]:
import numpy as np
import re
import string
import os
import tqdm
import tensorflow as tf
from tensorflow import keras
from keras import layers

In [None]:
SEED = 123
AUTOTUNE = tf.data.AUTOTUNE

In [None]:
# Tokenize the sentance
sentance = "My favourite programming language is  the python"
tokens = list(sentance.lower().split())
len(tokens)

7

In [None]:
vocab,index ={},1
vocab['<pad>'] = 0
for token in tokens:
  if token not in vocab:
    vocab[token] =index
    index += 1
vocab_size = len(vocab)
vocab

{'<pad>': 0,
 'my': 1,
 'favourite': 2,
 'programming': 3,
 'language': 4,
 'is': 5,
 'the': 6,
 'python': 7}

In [None]:
inverse_vocab = {index: token for token,index in vocab.items()}
inverse_vocab

{0: '<pad>',
 1: 'my',
 2: 'favourite',
 3: 'programming',
 4: 'language',
 5: 'is',
 6: 'the',
 7: 'python'}

In [None]:
# Vectorize the sentance
example_sequence = [vocab[word] for word in tokens]
example_sequence

[1, 2, 3, 4, 5, 6, 7]

In [None]:
# Generate skipgrams from one sentance
window_size = 2
positive_skip_grams , _ = keras.preprocessing.sequence.skipgrams(
    example_sequence,
    vocabulary_size=vocab_size,
    window_size=window_size,
    negative_samples=0)
len(positive_skip_grams)

22

In [None]:
for target, context in positive_skip_grams[:5]:
  print(f"({target}, {context}): ({inverse_vocab[target]}, {inverse_vocab[context]})")

(3, 2): (programming, favourite)
(4, 6): (language, the)
(1, 3): (my, programming)
(5, 6): (is, the)
(5, 7): (is, python)


In [None]:
# Now negative sampling for one skipgrams
target_word,context_word = positive_skip_grams[0]
num_ns = 5

context_class = tf.reshape(tf.constant(context_word,dtype='int64'),(1,1))
negative_sampling_candidates,_,_ = tf.random.log_uniform_candidate_sampler(
    true_classes=context_class,
    num_true = 1,
    num_sampled = num_ns,
    unique = True,
    range_max = vocab_size,
    seed = SEED,
    name = 'negative_sampling'
)

print(negative_sampling_candidates)
print([inverse_vocab[index.numpy()] for index in negative_sampling_candidates])

tf.Tensor([1 4 3 2 0], shape=(5,), dtype=int64)
['my', 'language', 'programming', 'favourite', '<pad>']


In [None]:
squeezed_context_class = tf.squeeze(context_class,1)
context = tf.concat([squeezed_context_class,negative_sampling_candidates],0)
label = tf.constant([1]+[0]*num_ns,dtype = 'int64')
target = target_word

In [None]:
print(f"target_index    : {target}")
print(f"target_word     : {inverse_vocab[target_word]}")
print(f"context_indices : {context}")
print(f"context_words   : {[inverse_vocab[c.numpy()] for c in context]}")
print(f"label           : {label}")

target_index    : 3
target_word     : programming
context_indices : [2 1 4 3 2 0]
context_words   : ['favourite', 'my', 'language', 'programming', 'favourite', '<pad>']
label           : [1 0 0 0 0 0]


In [None]:
print("target  :", target)
print("context :", context)
print("label   :", label)

target  : 3
context : tf.Tensor([2 1 4 3 2 0], shape=(6,), dtype=int64)
label   : tf.Tensor([1 0 0 0 0 0], shape=(6,), dtype=int64)


In [None]:
# skip-gram sampling table
sample_table = keras.preprocessing.sequence.make_sampling_table(size=10)
sample_table

array([0.00315225, 0.00315225, 0.00547597, 0.00741556, 0.00912817,
       0.01068435, 0.01212381, 0.01347162, 0.01474487, 0.0159558 ])

In [None]:
# Now Generate training data
def generate_training_data(sequences, window_size, num_ns, vocab_size, seed):
  targets, contexts, labels = [], [], []
  sampling_table = tf.keras.preprocessing.sequence.make_sampling_table(vocab_size)
  for sequence in tqdm.tqdm(sequences):
    positive_skip_grams, _ = tf.keras.preprocessing.sequence.skipgrams(
          sequence,
          vocabulary_size=vocab_size,
          sampling_table=sampling_table,
          window_size=window_size,
          negative_samples=0)

    for target_word, context_word in positive_skip_grams:
      context_class = tf.expand_dims(
          tf.constant([context_word], dtype="int64"), 1)
      negative_sampling_candidates, _, _ = tf.random.log_uniform_candidate_sampler(
          true_classes=context_class,
          num_true=1,
          num_sampled=num_ns,
          unique=True,
          range_max=vocab_size,
          seed=seed,
          name="negative_sampling")

      context = tf.concat([tf.squeeze(context_class,1), negative_sampling_candidates], 0)
      label = tf.constant([1] + [0]*num_ns, dtype="int64")

      targets.append(target_word)
      contexts.append(context)
      labels.append(label)

  return targets, contexts, labels

In [None]:
path_to_file = tf.keras.utils.get_file('shakespeare.txt', 'https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt')

Downloading data from https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt


In [None]:
with open(path_to_file) as f:
  lines = f.read().splitlines()
for line in lines[:20]:
  print(line)

First Citizen:
Before we proceed any further, hear me speak.

All:
Speak, speak.

First Citizen:
You are all resolved rather to die than to famish?

All:
Resolved. resolved.

First Citizen:
First, you know Caius Marcius is chief enemy to the people.

All:
We know't, we know't.

First Citizen:
Let us kill him, and we'll have corn at our own price.


In [None]:
text_ds = tf.data.TextLineDataset(path_to_file).filter(lambda x: tf.cast(tf.strings.length(x), bool))

In [None]:
# Now, create a custom standardization function to lowercase the text and remove punctuation.
def custom_standardization(input_data):
  lowercase = tf.strings.lower(input_data)
  return tf.strings.regex_replace(lowercase,
                                  '[%s]' % re.escape(string.punctuation), '')

vocab_size = 4096
sequence_length = 10

vectorize_layer = layers.TextVectorization(
    standardize=custom_standardization,
    max_tokens=vocab_size,
    output_mode='int',
    output_sequence_length=sequence_length)

In [None]:
vectorize_layer.adapt(text_ds.batch(1024))

In [None]:
inverse_vocab = vectorize_layer.get_vocabulary()
print(inverse_vocab[:20])

['', '[UNK]', 'the', 'and', 'to', 'i', 'of', 'you', 'my', 'a', 'that', 'in', 'is', 'not', 'for', 'with', 'me', 'it', 'be', 'your']


In [None]:
text_vector_ds = text_ds.batch(1024).prefetch(AUTOTUNE).map(vectorize_layer).unbatch()

In [None]:
sequences = list(text_vector_ds.as_numpy_iterator())
print(len(sequences))

32777


In [None]:
for seq in sequences[:5]:
  print(f"{seq} => {[inverse_vocab[i] for i in seq]}")

[ 89 270   0   0   0   0   0   0   0   0] => ['first', 'citizen', '', '', '', '', '', '', '', '']
[138  36 982 144 673 125  16 106   0   0] => ['before', 'we', 'proceed', 'any', 'further', 'hear', 'me', 'speak', '', '']
[34  0  0  0  0  0  0  0  0  0] => ['all', '', '', '', '', '', '', '', '', '']
[106 106   0   0   0   0   0   0   0   0] => ['speak', 'speak', '', '', '', '', '', '', '', '']
[ 89 270   0   0   0   0   0   0   0   0] => ['first', 'citizen', '', '', '', '', '', '', '', '']


In [None]:
targets, contexts, labels = generate_training_data(
    sequences=sequences,
    window_size=2,
    num_ns=4,
    vocab_size=vocab_size,
    seed=SEED)

targets = np.array(targets)
contexts = np.array(contexts)
labels = np.array(labels)

print('\n')
print(f"targets.shape: {targets.shape}")
print(f"contexts.shape: {contexts.shape}")
print(f"labels.shape: {labels.shape}")


100%|██████████| 32777/32777 [01:11<00:00, 456.20it/s]




targets.shape: (64610,)
contexts.shape: (64610, 5)
labels.shape: (64610, 5)


In [None]:
BATCH_SIZE = 1024
BUFFER_SIZE = 10000
dataset = tf.data.Dataset.from_tensor_slices(((targets, contexts), labels))
dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)
print(dataset)

<_BatchDataset element_spec=((TensorSpec(shape=(1024,), dtype=tf.int64, name=None), TensorSpec(shape=(1024, 5), dtype=tf.int64, name=None)), TensorSpec(shape=(1024, 5), dtype=tf.int64, name=None))>


In [None]:
dataset = dataset.cache().prefetch(buffer_size=AUTOTUNE)
print(dataset)

<_PrefetchDataset element_spec=((TensorSpec(shape=(1024,), dtype=tf.int64, name=None), TensorSpec(shape=(1024, 5), dtype=tf.int64, name=None)), TensorSpec(shape=(1024, 5), dtype=tf.int64, name=None))>


In [None]:
# subclass word2vec model
class Word2vec(keras.Model):
  def __init__(self,vocab_size,embedding_dim):
    super(Word2vec,self).__init__()
    self.target_embedding = layers.Embedding(vocab_size,embedding_dim,input_length=1,name='w2v_embedding')
    self.context_embedding = layers.Embedding(vocab_size,embedding_dim,input_length=num_ns+1)

  def call(self,pair):
    target,context = pair
    if len(target.shape) ==2:
      target = tf.squeeze(target,axis=1)
    word_emb = self.target_embedding(target)
    context_emb = self.context_embedding(context)
    dots = tf.einsum('be,bce->bc', word_emb, context_emb)
    return dots

In [None]:
embedding_dim = 128
word2vec = Word2vec(vocab_size, embedding_dim)
word2vec.compile(optimizer='adam',
                 loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True),
                 metrics=['accuracy'])

In [None]:
word2vec.fit(dataset,epochs=20)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


<keras.src.callbacks.History at 0x7e69d7399870>