In [None]:
import numpy as np
import pickle


In [None]:
class Rnn:
    def __init__(self, input_size, hidden_state_size, output_size):
      # input_size = the number of values (dimensions) in each embedding vector.
      # For example, a one-hot encoding like [1, 0, 0, 0, 0] can represent a word by its position in the vocabulary,
      # but it doesn't capture the meaning or similarity between words.
      # In contrast, an embedding vector like [0.1, 0.6, 0.4] represents the same word in a way that captures its semantic meaning.
      # So two similar words may have similar embeddings, like [0.1, 0.6, 0.4] and [0.11, 0.58, 0.43], indicating they are related in meaning.


        # hidden_state_size = size of the hidden state vector h_t 128 is a good start for medium levels of complexity
        # output_size = size of the vocabulary (we're predicting next word from vocab)

        # Weights to map input (embedding) to hidden state
        self.W_xh = np.random.randn(hidden_state_size, input_size) * 0.01

        # Weights to map previous hidden state to next hidden state
        self.W_hh = np.random.randn(hidden_state_size, hidden_state_size) * 0.01

        # Weights to map hidden state to output logits (vocab-sized)
        self.W_hy = np.random.randn(output_size, hidden_state_size) * 0.01

        # Initial hidden state (starts as zeros)
        self.h = np.zeros((hidden_state_size, 1))

        # hidden state for each phase
        self.h_phases = [np.zeros_like(self.h)]
        self.y_phases = []
        self.x_phases = []

        # Embedding matrix: each row is a word vector, basically a dictionary of all our words in our vocabulary.
        self.E = np.random.randn(output_size, input_size) * 0.01

        # biases needed for better learning, basically extra level of rules that
        # contain extra information about when a neuron should fire that isn't contained in the weights.
        # In other words, they encode additional rules or tendencies for when a neuron should "fire"
        # that aren't captured by the weights alone.
        self.b_h = np.zeros((hidden_state_size, 1))
        self.b_y = np.zeros((output_size, 1))


    def embed_word(self, word_index): # the word index can be in this case: What index of the words one hot encoding is equal to 1 while the rest are zero. eg [0, 0, 1, 0] means we are looking for index  2
        return self.E[word_index].reshape(-1, 1) # basically turning an array into a one column vector.


    def softmax(self, x):
        exp_x = np.exp(x - np.max(x))  # Stabilizing to prevent overflow
        return exp_x / np.sum(exp_x)

    def forward(self, x):
        self.h = np.tanh(np.dot(self.W_xh, x) + np.dot(self.W_hh, self.h) + self.b_h)
        self.h_phases.append(self.h.copy())
        y = np.dot(self.W_hy, self.h) + self.b_y
        output = self.softmax(y)
        return output




  # the true seq is of shape T,V where T is the number of steps in overall sentence and V is the size of bag of words.
  # basically something like [[001], [100], [010]]
    def learn(self, trueSeq, alpha, inputSeq, number_of_iterations):
        for i in range(number_of_iterations):
          loss = 0
          for t in range(len(trueSeq)):
            word = inputSeq[t] # could also be character in one hot encoding
            embed_idx = np.argmax(word)
            x = self.embed_word(embed_idx)
            self.x_phases.append(x.copy())
            y = self.forward(x)
            self.y_phases.append(y.copy())
            target_idx = np.argmax(trueSeq[t]) # use argmax to get the idx of the 1 at "should be out" one hot encoding
            loss += -np.log(y[target_idx]) # use that index to get the prob predicted for that word in the output, use negative log likelyhood loss function
          mean_loss = loss / len(trueSeq)
          self.backpropagate(alpha, trueSeq, inputSeq)





    def backpropagate(self, alpha, true_seq, input_seq):
        hh_acc = np.zeros_like(self.W_hh)
        xh_acc = np.zeros_like(self.W_xh)
        hy_acc = np.zeros_like(self.W_hy)
        bh_acc = np.zeros_like(self.b_h)
        by_acc = np.zeros_like(self.b_y)
        e_acc = np.zeros_like(self.E)



        # this was the tricky to understand. when propagating the error signals to the hidden state at time step t, remember to propagate the error from the future hidden state also.
        dh_next = np.zeros_like(self.h)

        # start from the last output,
        # accumulate the gradients of the hidden layer.
        for idx in reversed(range(len(true_seq))):
          y_pred = self.y_phases[idx] # the last most output
          y_true = true_seq[idx] # the last most true seq
          dy = y_pred - y_true # the gradient of the output
          by_acc += dy # accumulate the gradient of the bias
          ht = self.h_phases[idx+1]
          x_t = self.x_phases[idx]
          ht_prev = self.h_phases[idx]
          dht = np.dot(self.W_hy.T, dy) * (1 - ht**2) + dh_next
          dh_next = dht
          hh_acc += np.dot(dht, ht_prev.T)
          hy_acc += np.dot(dy, ht.T)
          bh_acc += dht
          xh_acc += np.dot(dht, x_t.T)
          dx = np.dot(self.W_xh.T, dht)
          embed_index = np.argmax(input_seq[idx])
          e_acc[embed_index] += dx.flatten()
          # clear

        self.W_xh -= alpha * xh_acc
        self.W_hh -= alpha * hh_acc
        self.W_hy -= alpha * hy_acc
        self.b_h -= alpha * bh_acc
        self.b_y -= alpha * by_acc
        self.E -= alpha * e_acc

        # Clear phases
        self.h_phases = [np.zeros_like(self.h)]
        self.y_phases = []
        self.x_phases = []

    def save(self, filename="rnn_model.pkl"):
      model_data = {
          "W_xh": self.W_xh,
          "W_hh": self.W_hh,
          "W_hy": self.W_hy,
          "b_h": self.b_h,
          "b_y": self.b_y,
          "E": self.E,
          "h": self.h,
          "input_size": self.W_xh.shape[1],
          "hidden_state_size": self.W_xh.shape[0],
          "output_size": self.W_hy.shape[0]
      }
      with open(filename, "wb") as f:
        pickle.dump(model_data, f)

    @staticmethod
    def load(filename="rnn_model.pkl"):
      with open(filename, "rb") as f:
        model_data = pickle.load(f)

      rnn = Rnn(model_data["input_size"], model_data["hidden_state_size"], model_data["output_size"])
      rnn.W_xh = model_data["W_xh"]
      rnn.W_hh = model_data["W_hh"]
      rnn.W_hy = model_data["W_hy"]
      rnn.b_h = model_data["b_h"]
      rnn.b_y = model_data["b_y"]
      rnn.E = model_data["E"]
      rnn.h = model_data["h"]
      return rnn







In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

In [None]:
import os

drive_path = '/content/drive/MyDrive/stored_weights'
if not os.path.exists(drive_path):
  os.makedirs(drive_path)

Got tired of trying to find a dataset for python code, will just create mine using github

In [None]:
!git clone https://github.com/psf/requests.git
print("Repository cloned!")
!ls -F # Verify 'requests/' directory exists

In [None]:
import os


codebase_root_dir = 'requests'
output_dataset_file = 'requests_code_dataset.txt'

print(f"Collecting Python files from '{codebase_root_dir}' and saving to '{output_dataset_file}'...")

with open(output_dataset_file, 'w', encoding='utf-8') as outfile:
    # os.walk generates the file names in a directory tree by walking the tree
    for dirpath, _, filenames in os.walk(codebase_root_dir):
        for f in filenames:
            if f.endswith('.py'):
                file_path = os.path.join(dirpath, f)
                try:
                    with open(file_path, 'r', encoding='utf-8') as infile:
                        outfile.write(infile.read())
                        outfile.write("\n\n# --- FILE_SEPARATOR ---\n\n")
                except UnicodeDecodeError:
                    print(f"Skipping {file_path} due to encoding error (likely non-UTF-8 characters).")
                except Exception as e:
                    print(f"Could not read {file_path}: {e}")

print(f"\nDataset created: '{output_dataset_file}'")
print(f"Total size of generated dataset file: {os.path.getsize(output_dataset_file) / (1024*1024):.2f} MB")

# Read the generated dataset into your 'text' variable for the RNN
with open(output_dataset_file, 'r', encoding='utf-8') as f:
    text = f.read()

print(f"Total characters in the loaded dataset: {len(text)}")

In [None]:
print(len(text))

In [None]:
text = text[:50000]

In [None]:
chars = sorted(list(set(text)))
vocab_size = len(chars)
char_to_ix = {ch: i for i, ch in enumerate(chars)}
ix_to_char = {i: ch for i, ch in enumerate(chars)}
print(f"Vocabulary size: {vocab_size}, characters: {''.join(chars)}")

In [None]:
print(char_to_ix)

In [None]:
input_size = 128
hidden_state_size = 128
output_size = vocab_size
rnn = Rnn(input_size, hidden_state_size, output_size)

In [None]:
seq_length = 30       # Number of characters in each training sequence chunk
learning_rate = 0.01
num_epochs = 100     # Number of times to iterate during training


In [None]:
# will crash your ram.

# training_data_pairs = []
# for i in range(0, len(text) - seq_length):
#     input_chunk_chars = text[i : i + seq_length]
#     target_chunk_chars = text[i + 1 : i + seq_length + 1] # Shifted by one for next char prediction

#     input_one_hots_sequence = []
#     target_one_hots_sequence = []

#     for char_in, char_target in zip(input_chunk_chars, target_chunk_chars):
#         input_one_hot_seq = np.zeros((vocab_size, 1))
#         input_one_hot_seq[char_to_ix[char_in]] = 1
#         input_one_hots_sequence.append(input_one_hot_seq)
#         target_one_hot_seq = np.zeros((vocab_size,1))
#         target_one_hot_seq[char_to_ix[char_target]] = 1
#         target_one_hots_sequence.append(target_one_hot_seq)

#     training_data_pairs.append((input_one_hots_sequence, target_one_hots_sequence))



In [None]:
# --- Generator Function ---
def create_training_data_generator(text_data, sequence_length, char_to_ix, vocab_size):
    for i in range(0, len(text_data) - sequence_length):
        input_chunk_chars = text_data[i : i + sequence_length]
        target_chunk_chars = text_data[i + 1 : i + sequence_length + 1]

        input_one_hots_sequence = []
        target_one_hots_sequence = []

        for char_in, char_target in zip(input_chunk_chars, target_chunk_chars):
            input_oh = np.zeros((vocab_size, 1))
            input_oh[char_to_ix[char_in]] = 1
            input_one_hots_sequence.append(input_oh)

            target_oh = np.zeros((vocab_size, 1))
            target_oh[char_to_ix[char_target]] = 1
            target_one_hots_sequence.append(target_oh)

        yield (input_one_hots_sequence, target_one_hots_sequence) # Use yield instead of append




print("\n--- Starting Training with Generator ---")
for epoch in range(num_epochs):
    print(f"Epoch {epoch + 1}/{num_epochs}")
    data_generator_obj = create_training_data_generator(text, sequence_length=seq_length, char_to_ix=char_to_ix, vocab_size=vocab_size)

    for seq_idx, (input_seq_data, target_seq_data) in enumerate(data_generator_obj):
        rnn.learn(trueSeq=target_seq_data, alpha=learning_rate, inputSeq=input_seq_data, number_of_iterations=1)


# file path /content/drive/MyDrive/stored_weights

rnn.save(filename="/content/drive/MyDrive/stored_weights/rnn_model.pkl")
print("\n--- Model Saved ---")






In [None]:
# load a pkl file

rnn_loaded = Rnn.load(filename="/content/drive/MyDrive/stored_weights/rnn_model.pkl")
print(rnn_loaded.b_h)

In [None]:
print("rnn_loaded.W_xh (first few values):", rnn_loaded.W_xh.flatten()[:5])
print("rnn_loaded.W_hh (first few values):", rnn_loaded.W_hh.flatten()[:5])
print("rnn_loaded.W_hy (first few values):", rnn_loaded.W_hy.flatten()[:5])
print("rnn_loaded.b_h (first few values):", rnn_loaded.b_h.flatten()[:5])
print("rnn_loaded.b_y (first few values):", rnn_loaded.b_y.flatten()[:5])
print("rnn_loaded.h (first few values):", rnn_loaded.h.flatten()[:5]) # Check initial hidden state too

In [None]:
def generate_text(model, seed_text, num_chars_to_generate, char_to_ix, ix_to_char, hidden_size):
    generated_text_chars = list(seed_text)
    model.h = np.zeros((hidden_size, 1), dtype=np.float32 if hasattr(model, 'W_hh') and model.W_hh.dtype == np.float32 else np.float64)
    for char_in_seed in seed_text:
        idx_in = char_to_ix.get(char_in_seed, 0)
        x = model.embed_word(idx_in)
        _ = model.forward(x)
    if seed_text:
        last_char_idx = char_to_ix.get(seed_text[-1], 0)
    else:
        last_char_idx = char_to_ix.get(' ', 0) if ' ' in char_to_ix else 0
    for _ in range(num_chars_to_generate):
        x = model.embed_word(last_char_idx)
        output_probs = model.forward(x)
        p = output_probs.ravel()
        p /= p.sum()

        next_char_idx = np.random.choice(len(p), p=p)

        next_char = ix_to_char[next_char_idx]
        generated_text_chars.append(next_char)

        last_char_idx = next_char_idx

    return "".join(generated_text_chars)


In [None]:
seed_text_example = "def my_function(self, arg):"
num_chars_to_generate_example = 20

print(f"\n--- Generating text with seed: '{seed_text_example}' ---")
generated_code = generate_text(
    model=rnn_loaded,
    seed_text=seed_text_example,
    num_chars_to_generate=num_chars_to_generate_example,
    char_to_ix=char_to_ix,
    ix_to_char=ix_to_char,
    hidden_size=hidden_state_size
)

print(generated_code)
print("\n" + "-"*50 + "\n")


seed_text_example_2 = "import numpy as "
generated_code_2 = generate_text(rnn, seed_text_example_2, 150, char_to_ix, ix_to_char, hidden_size=hidden_state_size)
print(f"Generated text with seed: '{seed_text_example_2}':\n{generated_code_2}")


seed_text_example_3 = "    "
generated_code_3 = generate_text(rnn, seed_text_example_3, 100, char_to_ix, ix_to_char, hidden_size=hidden_state_size)
print(f"Generated text with seed: '{seed_text_example_3}':\n{generated_code_3}")