<a href="https://colab.research.google.com/github/rodrigoromanguzman/Actividades_Aprendizaje-/blob/main/language_model_rnn_from_zero.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np

# Utility libraries
import random
import re


from google.colab import drive


In [None]:
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
news_s_path = "/content/drive/MyDrive/newsSpace"

In [None]:
# Utility function for detecting urls
def is_url(s):
    # A simple regex to check for a basic URL structure
    return re.match(r'https?://', s) is not None


In [None]:
# We will separate the batches as the elements from an individual article
data_articles = []
# Lets collect all the unique ocurrences of each word
# to create the dictionary
vocabulary = set([])
num_articles = 1
counter = 0
with open(news_s_path, encoding='ISO-8859-1') as f:
    while (counter<num_articles):
      line = f.readline()
      if not line:
          break
      data_line = line.strip().split()
      url_index = next((i for i, item in enumerate(data_line) if is_url(item)), None)
      # We will get only the articles which have a url
      if url_index is not None:
        # We take after the url
        article = re.split(r'[ ,.;:!?()]+', ' '.join(data_line[url_index+1:]))
        [vocabulary.add(i.lower()) for i in article]
        data_articles.append(article)
      counter += 1
print("Vocabulary size -> ", len(vocabulary))
# print(data_articles[:100])
# print(list(vocabulary)[:200])


Vocabulary size ->  14


In [None]:
def preprocess_data(data_articles, vocabulary):
    # Lowercase and tokenize the text
    processed_articles = []
    for article in data_articles:
        processed_article = [word.lower() for word in article if word.isalpha()]
        processed_articles.append(processed_article)
        vocabulary.update(processed_article)
    return processed_articles, vocabulary

data_articles, vocabulary = preprocess_data(data_articles, vocabulary)

In [None]:
# Splitting data into training and test
test_percentage = 0.2
random.shuffle(data_articles)
split_point = int(len(data_articles)*test_percentage)
training_set = data_articles[split_point::]
test_set = data_articles[:split_point:]

In [None]:
# Word embedding initialization with random values
mean = 0
std_dev = 0.1
vocab_size = len(vocabulary)
embedding_size = 10 #Hardcoded value for the size of the vectors
word_embeddings = np.random.uniform(-0.01, 0.01, (vocab_size, embedding_size))
word_embeddings = word_embeddings / embedding_size  # Scaling by embedding size
hidden_size =50  # Size of the hidden state vectors

# Create a dictionary to map words to their embedding vectors
word_to_embedding = {}
for i, word in enumerate(vocabulary):
    word_to_embedding[word] = word_embeddings[i]

In [None]:
# Initialize weights randomly
# W_e = np.random.normal(mean, std_dev, size=(hidden_size, embedding_size))
# W_h = np.random.normal(mean, std_dev, size=(hidden_size,hidden_size))
# W_y = np.random.normal(mean, std_dev, size=(hidden_size,vocab_size))# Hidden to output
# bh = np.zeros((hidden_size, 1))  # Hidden bias
# by = np.zeros((vocab_size, 1))  # Output bias
# def initialize_parameters(vocab_size, embedding_size, hidden_size):
#     # Xavier initialization for weights
W_e = np.random.randn(hidden_size, embedding_size) * np.sqrt(2.0 / (hidden_size + embedding_size))
W_h = np.random.randn(hidden_size, hidden_size) * np.sqrt(2.0 / (hidden_size + hidden_size))
W_y = np.random.randn(hidden_size, vocab_size) * np.sqrt(2.0 / (vocab_size + hidden_size))

# Biases initialized to zero
bh = np.zeros((hidden_size, 1))
by = np.zeros((vocab_size, 1))

#     return W_e, W_h, W_y, bh, by

In [None]:
# One hot encoded vocabulary
vocabulary_list = list(vocabulary)

vocabulary_list.sort()  # Sorting to ensure consistent indexing

# Create a dictionary that maps words to indices based on the sorted order
word_to_index = {word: i for i, word in enumerate(vocabulary)}
word_to_index['<UNK>'] = len(vocabulary)  # Add '<UNK>' token with a new index

In [None]:
def sentence_to_indices(sentence, word_to_index):
    return [word_to_index.get(word, word_to_index['<UNK>']) for word in sentence.split()]

In [None]:
def softmax(x):
  return np.exp(x)/sum(np.exp(x))

In [None]:
def tahnh(x):
  return np.tanh(x)

In [None]:
# Input the probabilies and the true value
def cross_entropy(y_probs, y_true):
  # y_probs is a dictionary with timestep as key and array of probabilities as value
  # y_true is a list or array of true class indices
  total_loss = 0
  for t, true_index in enumerate(y_true):
      if t in y_probs:
          # Select the probability corresponding to the true class at timestep t
          prob_true_class = y_probs[t][true_index, 0]  # Adjust indexing if necessary
          # Calculate the negative log of this probability
          total_loss += -np.log(prob_true_class)
  # Average the loss over all timesteps
  average_loss = total_loss / len(y_true)
  return average_loss


In [None]:
def apply_dropout(X, dropout_rate):
    mask = np.random.binomial(1, 1 - dropout_rate, size=X.shape)
    return np.multiply(X, mask)

In [None]:
# Input the sentnece as thie index representation
# parameters: dictionary with keys -> W_e, W_h, W_y, bh, by
def forward_pass(indices, word_embeddings,parameters):
    h_prev = np.zeros((hidden_size, 1))  # Initial hidden state

    states = {'es': {},'hs': {},'ys': {},'ps': {}}
    states['hs'][-1] = np.copy(h_prev)


    # Forward pass
    for t in range(len(indices)):
        states['es'][t] = word_embeddings[indices[t]].reshape(-1, 1)  # Embedding vector for current input word
        # es_t_reshaped = states['es'][t].reshape(-1, 1)  # Reshape to (150, 1)

        h_rec = np.dot(parameters['W_h'], h_prev) + np.dot(parameters['W_e'], states['es'][t]) + parameters['bh']
        # print("HREC")
        # print(h_rec.shape)
        states['hs'][t] = tahnh(h_rec)  # Hidden state

        states['ys'][t] = np.dot(parameters['W_y'].T, states['hs'][t]) + parameters['by']  # Unnormalized log probabilities for next words
        states['ps'][t] = softmax(states['ys'][t])  # Probabilities for next words
        h_prev = states['hs'][t]  # Pass the current hidden state to the next time step
    return states

In [None]:
# Backpropagation
# outputs -> dictionary of dictionaries: es, hs, ys, ps
# parameters -> dictionary with: W_h, W_y, bh, by

def backward_pass(indices,outputs, parameters):

  gradients = {
      'dW_e': np.zeros_like(W_e),
      'dW_h': np.zeros_like(parameters['W_h']),
      'dW_y': np.zeros_like(parameters['W_y']),
      'dbh': np.zeros_like(parameters['bh']),
      'dby': np.zeros_like(parameters['by'])
  }

  dh_next = np.zeros_like(outputs['hs'][0])

  for t in reversed(range(len(indices))):

    dy = np.copy(outputs['ps'][t])  # Copy softmax probabilities
    true_label_index = indices[t]
    dy[true_label_index] -= 1
    gradients['dW_y'] += np.dot(dy, outputs['hs'][t].T).T
    gradients['dby'] += dy
    dh = np.dot(parameters['W_y'], dy) + dh_next  # Backprop into h

    # Apply the derivative of the tanh function
    dh_rec = (1 - outputs['hs'][t] * outputs['hs'][t]) * dh

    # Handle the case when t = 0 separately
    if t > 0:
      gradients['dW_h'] += np.dot(dh_rec, outputs['hs'][t-1].T)
    else:
      # If t = 0, use the initial hidden state (which could be a zero vector)
      initial_h = np.zeros_like(outputs['hs'][t])
      gradients['dW_h'] += np.dot(dh_rec, initial_h.T)

    gradients['dbh'] += dh_rec
    gradients['dW_e'] += np.dot(dh_rec, outputs['es'][t].T)

    # Update dh_next for the next iteration
    dh_next = np.dot(parameters['W_h'].T, dh_rec)
  # Gradient clipping
  for gradientKey in gradients:
    np.clip(gradients[gradientKey], -5, 5, out=gradients[gradientKey])
  return gradients


In [None]:
# Update weights
def update_parameters(parameters, gradients, learning_rate):
    # parameters and gradients are dictionaries with the same keys: 'W_e', 'W_h', 'W_y', 'bh', 'by'
    for key in parameters.keys():
        parameters[key] -= learning_rate * gradients['d' + key]

In [None]:

# And we have our parameters 'W_e', 'W_h', 'W_y', 'bh', 'by'
parameters = {
    'W_e': W_e,
    'W_h': W_h,
    'W_y': W_y,
    'bh': bh,
    'by': by
}
def initialize_gradients(parameters):
  gradients = {}
  for key in parameters.keys():
      gradients['d' + key] = np.zeros_like(parameters[key])
  return gradients

# Usage
gradients = initialize_gradients(parameters)
learning_rate = 0.001 #0.005

In [None]:
# Training
def training(mini_batches,gradients,parameters,vocabulary,learning_rate,epochs):
  for epoch in range(epochs):
    total_loss = 0
    correct_predictions = 0
    total_predictions = 0
    for batch_idx, sentence in enumerate(data_articles):

      # print("Sentence")
      # print(sentence)
      indices = [word_to_index.get(word, word_to_index['<UNK>']) for word in sentence]  # Tokenize the sentence

      # Forward pass
      outputs = forward_pass(indices, word_embeddings, parameters)

      # Create true labels (next words) for this batch
      true_labels = indices[1:]  # Assuming the next word is the true label for each word in the sentence

      # Backward pass
      gradients = backward_pass(indices, outputs, parameters)

      # Update parameters
      update_parameters(parameters, gradients, learning_rate)

      batch_loss = 0
      for t in range(len(indices) - 1):  # Loop through the length of indices minus 1
          true_index = indices[t + 1]  # true_label_index is the next word in the sequence
          if t in outputs['ps']:
              prob_true_class = outputs['ps'][t][true_index, 0]  # Select the probability of the true class
              batch_loss += -np.log(prob_true_class)  # Negative log-likelihood

      total_loss += batch_loss / len(true_labels)


      # Calculate accuracy for this batch
      predicted_words = [vocabulary_list[np.argmax(outputs['ps'][output])] for output in outputs['ps']]
      if epoch % 10 == 0:

        print("PREDICTED WORDS")
        print(predicted_words[:-1])
        print("Target words")
        print(sentence[1:])
      predicted_indices = [word_to_index.get(word, word_to_index['<UNK>']) for word in predicted_words]
      correct_predictions += sum([1 for predicted, true_id in zip(predicted_indices[:-1], true_labels) if predicted == true_id])
      total_predictions += len(true_labels)

          # Print loss and accuracy at specified intervals

      # Print average loss and final training accuracy for the epoch
      # if epoch % 5 == 0:  # Print some debug info every 10 epochs
      #   print("PREDICTED WORDS")
      #   print(predicted_words[:-1])
      #   print("Target words")
      #   print(sentence[1:])
        # print("How the probs look like")
        # print(outputs['ps'][0])
    avg_loss = total_loss / len(data_articles)
    final_accuracy = correct_predictions / total_predictions
    print(f"Epoch {epoch+1}, Average Loss: {avg_loss:.4f}, Final Training Accuracy: {final_accuracy:.4f}")


In [None]:
training(data_articles, gradients, parameters, vocabulary, learning_rate,50000)

PREDICTED WORDS
['wall', 'wall', 'reflects', 'reuters', '-', '-', 'reflects', 'business', 'drama', '\\', "street's"]
Target words
['st', 'pullback', 'reflects', 'tech', 'blowout', 'reuters', 'none', 'business', 'reuters', 'wall', 'drama']
Epoch 1, Average Loss: 2.6388, Final Training Accuracy: 0.1818
Epoch 2, Average Loss: 2.6249, Final Training Accuracy: 0.1818
Epoch 3, Average Loss: 2.6128, Final Training Accuracy: 0.1818
Epoch 4, Average Loss: 2.6018, Final Training Accuracy: 0.1818
Epoch 5, Average Loss: 2.5918, Final Training Accuracy: 0.1818
Epoch 6, Average Loss: 2.5825, Final Training Accuracy: 0.1818
Epoch 7, Average Loss: 2.5737, Final Training Accuracy: 0.1818
Epoch 8, Average Loss: 2.5652, Final Training Accuracy: 0.1818
Epoch 9, Average Loss: 2.5570, Final Training Accuracy: 0.1818
Epoch 10, Average Loss: 2.5489, Final Training Accuracy: 0.1818
PREDICTED WORDS
['pullback', 'pullback', 'tech', 'tech', "street's", "street's", "street's", "street's", "street's", "street's", "

KeyboardInterrupt: ignored