# **Single cell RNN**

In [None]:
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
from torch.nn import functional as F
from torch.distributions.categorical import Categorical
import torch.optim as optim

#hyperparameters
batch_size = 64
block_size = 16
max_iters = 5001
eval_interval = 500
learning_rate = 3e-4
device = 'cuda' if torch.cuda.is_available() else 'cpu'
eval_iters = 200
n_embd = 384
split_ratio = 0.9
rnn_hidden_size = 512
chunk_size = block_size + 1

torch.manual_seed(1337)

with open('/content/cfg3b.txt', 'r', encoding='utf-8') as fp: #ISO-8859-1 para español
    text = fp.read()

char_set = set(text)
chars_sorted = sorted(char_set)
char2int = {ch:i for i,ch in enumerate(chars_sorted)}
char_array = np.array(chars_sorted)

text_encoded = np.array(
    [char2int[ch] for ch in text],
    dtype=np.int32)

text_chunks = [text_encoded[i:i+chunk_size]
               for i in range(len(text_encoded)-chunk_size+1)]

split_index = int(split_ratio * len(text_chunks))
train_data, val_data = text_chunks[:split_index], text_chunks[split_index:]

class TextDataset(Dataset):
    def __init__(self, text_chunks):
        self.text_chunks = text_chunks

    def __len__(self):
        return len(self.text_chunks)

    def __getitem__(self,idx):
        text_chunk = self.text_chunks[idx]
        return text_chunk[:-1].long(), text_chunk[1:].long()

train_dataset = TextDataset(torch.tensor(np.array(train_data)))
val_dataset = TextDataset(torch.tensor(np.array(val_data)))
train_dl = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True)
val_dl = DataLoader(val_dataset, batch_size=batch_size, drop_last=True)

class RNN(nn.Module):
    def __init__(self, vocab_size, n_embd, rnn_hidden_size):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, n_embd) #4x384
        self.rnn_hidden_size = rnn_hidden_size
        self.rnn = nn.LSTM(n_embd, rnn_hidden_size,
                           batch_first=True) #batch_first parameter is set to True
                           #to indicate that the input data has batch size as the first dimension
        self.fc = nn.Linear(rnn_hidden_size, vocab_size)

    #x: The input sequence represented as integer-encoded words
    def forward(self, x, hidden, cell):
        out = self.embedding(x).unsqueeze(1) #(64,1,384)
        out, (hidden, cell) = self.rnn(out, (hidden, cell)) #(64,1,512)
        out = self.fc(out).reshape(out.size(0), -1) #(64, 4)
        return out, hidden, cell

    def init_hidden(self, batch_size):
        hidden = torch.zeros(1, batch_size, self.rnn_hidden_size) #(1, 64, 512)
        cell = torch.zeros(1, batch_size, self.rnn_hidden_size)
        return hidden.to(device), cell.to(device)

vocab_size = len(char_array)
model = RNN(vocab_size, n_embd, rnn_hidden_size)
model = model.to(device)

print(sum(p.numel() for p in model.parameters())/1e6, 'M parameters')

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

for epoch in range(max_iters):
  #if epoch % eval_interval == 0:
    #Validation set
    model.eval()
    val_loss = 0
    #train_loss = 0
    with torch.no_grad():
        val_hidden, val_cell = model.init_hidden(batch_size)
        val_seq_batch, val_target_batch = next(iter(val_dl)) #64x16 tensors
        val_seq_batch = val_seq_batch.to(device)
        val_target_batch = val_target_batch.to(device)
        for c in range(block_size): #tengo 16 tensores de 64
            val_pred, val_hidden, val_cell = model(val_seq_batch[:, c], val_hidden, val_cell)
            val_loss += loss_fn(val_pred, val_target_batch[:, c])
        val_loss = val_loss.item()/block_size

    #Training set
    model.train()
    train_loss = 0
    optimizer.zero_grad()
    hidden, cell = model.init_hidden(batch_size)
    seq_batch, target_batch = next(iter(train_dl))
    seq_batch = seq_batch.to(device)
    target_batch = target_batch.to(device)
    for c in range(block_size):
        pred, hidden, cell = model(seq_batch[:, c], hidden, cell)
        train_loss += loss_fn(pred, target_batch[:, c])
    train_loss.backward()
    optimizer.step()
    train_loss = train_loss.item()/block_size
    #print(f'Epoch {epoch} - Train Loss: {train_loss:.4f} | Validation Loss: {val_loss:.4f}')
    if epoch % eval_interval == 0:
        print(f'Epoch {epoch} - Train Loss: {train_loss:.4f} | Validation Loss: {val_loss:.4f}')

def sample(model, starting_str,
           len_generated_text=512,
           scale_factor=1.0):

    encoded_input = torch.tensor([char2int[s] for s in starting_str])
    encoded_input = torch.reshape(encoded_input, (1, -1))

    generated_str = starting_str

    model.eval()
    hidden, cell = model.init_hidden(1)
    hidden = hidden.to(device)
    cell = cell.to(device)
    for c in range(len(starting_str)-1):
        _, hidden, cell = model(encoded_input[:, c].view(1), hidden, cell)

    last_char = encoded_input[:, -1]
    last_char = last_char.to(device)
    for i in range(len_generated_text):
        logits, hidden, cell = model(last_char.view(1), hidden, cell)
        logits = torch.squeeze(logits, 0)
        scaled_logits = logits * scale_factor
        m = Categorical(logits=scaled_logits)
        last_char = m.sample()
        if str(char_array[last_char]) == " ":
          return generated_str
        generated_str += str(char_array[last_char])
        last_char = last_char.to(device)

    return generated_str

1.842692 M parameters
Epoch 0 - Train Loss: 1.3832 | Validation Loss: 1.3830
Epoch 500 - Train Loss: 1.2393 | Validation Loss: 1.2330
Epoch 1000 - Train Loss: 1.1204 | Validation Loss: 1.1181
Epoch 1500 - Train Loss: 1.0408 | Validation Loss: 1.0289
Epoch 2000 - Train Loss: 0.9774 | Validation Loss: 0.9595
Epoch 2500 - Train Loss: 0.9229 | Validation Loss: 0.9046
Epoch 3000 - Train Loss: 0.8886 | Validation Loss: 0.8610
Epoch 3500 - Train Loss: 0.8569 | Validation Loss: 0.8266
Epoch 4000 - Train Loss: 0.8354 | Validation Loss: 0.7991
Epoch 4500 - Train Loss: 0.8401 | Validation Loss: 0.7756
Epoch 5000 - Train Loss: 0.7684 | Validation Loss: 0.7542


In [None]:
def aVerQuePlan(text):
  listTokens = text.split()
  setTokens = set(listTokens)
  print(len(listTokens))
  print(len(setTokens))

aVerQuePlan(text)
#print(f'Número de frases: {frases} \n Número de frases diferentes: {frasesDif}')

7000
7000


In [None]:
def sample(model, starting_str,
           len_generated_text=512,
           scale_factor=1.0):

    encoded_input = torch.tensor([char2int[s] for s in starting_str])
    encoded_input = torch.reshape(encoded_input, (1, -1))

    generated_str = starting_str

    model.eval()
    hidden, cell = model.init_hidden(1)
    hidden = hidden.to(device)
    cell = cell.to(device)
    for c in range(len(starting_str)-1):
        _, hidden, cell = model(encoded_input[:, c].view(1), hidden, cell)

    last_char = encoded_input[:, -1]
    last_char = last_char.to(device)
    for i in range(len_generated_text):
        logits, hidden, cell = model(last_char.view(1), hidden, cell)
        logits = torch.squeeze(logits, 0)
        scaled_logits = logits * scale_factor
        m = Categorical(logits=scaled_logits)
        last_char = m.sample()
        #if str(char_array[last_char]) == " ":
        #  return generated_str
        generated_str += str(char_array[last_char])
        last_char = last_char.to(device)

    return generated_str
model.to(device)
gen_text = ""
for i in range (500):
  gen_text += sample(model, starting_str=' ')
#print(gen_text)

In [None]:
#Definición de CFGs
import nltk
from nltk import CFG
from nltk.util import ngrams
from nltk.parse.generate import generate
from nltk.parse import RecursiveDescentParser

cfg3b = CFG.fromstring("""
    22 -> 21 20 | 20 19
    21 -> 18 16 | 16 18 17
    20 -> 16 17 | 17 16 18
    19 -> 17 18 16 | 16 17 18
    18 -> 14 13 | 15 14 13
    17 -> 15 13 14 | 14 13 15
    16 -> 13 15 14 | 15 13
    15 -> 11 12 10 | 12 11 10
    14 -> 10 11 12 | 11 10 12
    13 -> 12 11 | 11 12
    12 -> 9 7 8 | 8 9 7
    11 -> 7 8 9 | 8 7 9
    10 -> 9 8 7 | 7 9 8
    9 -> '2' '1' | '3' '2' '1'
    8 -> '3' '1' '2' | '3' '2'
    7 -> '1' '2' '3' | '3' '1'
""")

cfg3i = CFG.fromstring("""
    22 -> 21 20 19 | 19 19 20
    21 -> 18 17 | 16 16 18
    20 -> 18 18 | 17 16 17
    19 -> 16 16 | 18 16 18
    18 -> 14 15 | 14 15 13
    17 -> 15 14 | 15 15
    16 -> 14 14 | 13 13
    15 -> 11 10 12 | 11 11 10
    14 -> 10 10 | 10 10 10
    13 -> 10 12 11 | 12 11
    12 -> 8 7 | 7 9 9
    11 -> 7 7 8 | 7 7 7
    10 -> 9 9 | 8 7 7
    9 -> '1' '2' | '1' '1' '3'
    8 -> '2' '2' | '1' '1'
    7 -> '2' '3' '1' | '3' '1' '2'
""")

cfg3h = CFG.fromstring("""
    22 -> 19 21 | 20 20 21
    21 -> 17 18 17 | 17 17 18
    20 -> 17 16 | 18 16
    19 -> 18 17 | 16 17
    18 -> 14 15 15 | 15 14 14 | 15 13 13
    17 -> 15 13 15 | 13 14
    16 -> 15 13 | 14 13
    15 -> 11 11 10 | 10 12
    14 -> 12 12 10 | 10 10 | 10 12 12
    13 -> 11 10 | 12 11
    12 -> 9 8 | 8 7 | 7 9
    11 -> 7 9 9 | 7 7 | 8 7 7
    10 -> 8 8 | 9 7 | 8 7 9
    9 -> '1' '3' '3' | '2' '1' '3'
    8 -> '1' '3' | '3' '3' '1' | '1' '2'
    7 -> '1' '3' '1' | '1' '2' '3' | '2' '3' '2'
""")

cfg3g = CFG.fromstring("""
    22 -> 20 19 21 | 20 20 19 | 19 20
    21 -> 18 16 | 16 16 18 | 16 16
    20 -> 16 17 17 | 18 18 | 16 17
    19 -> 18 16 17 | 18 17 16 | 17 17 16
    18 -> 14 13 15 | 15 15 | 15 13
    17 -> 15 14 | 14 15 13 | 14 13 14
    16 -> 13 13 | 13 14 | 14 13 13
    15 -> 12 11 | 12 10 10 | 10 11
    14 -> 10 10 | 10 11 10 | 11 12
    13 -> 11 11 | 11 11 11 | 10 12
    12 -> 9 9 9 | 7 8 | 7 9
    11 -> 8 9 7 | 9 7 | 8 8 9
    10 -> 7 7 | 7 7 7 | 8 8 8
    9 -> '2' '1' | '2' '3' | '2' '3' '3'
    8 -> '3' '3' '1' | '1' '3' | '1' '3' '2'
    7 -> '2' '2' | '1' '1' | '2' '3' '1'
""")

cfg3f = CFG.fromstring("""
    22 -> 20 20 | 21 19 19 | 20 19 21 | 20 21
    21 -> 16 18 | 16 17 18 | 17 16 | 18 17
    20 -> 17 16 18 | 16 17 | 16 16
    19 -> 18 18 | 17 18 | 18 16 18
    18 -> 13 15 | 15 13 13 | 14 15 13
    17 -> 15 14 | 14 15 | 15 14 13
    16 -> 14 14 | 14 13 | 13 15 13 | 15 15
    15 -> 12 12 11 | 10 10 | 11 11 10 | 10 11 11
    14 -> 10 12 12 | 12 11 | 12 10 12 | 10 12
    13 -> 10 12 11 | 12 11 12 | 11 12
    12 -> 8 8 9 | 9 8 | 7 9 7
    11 -> 9 7 7 | 9 7 | 8 8
    10 -> 7 9 9 | 9 7 9 | 8 9 9
    9 -> '1' '1' | '3' '3' | '1' '2' '1'
    8 -> '3' '3' '1' | '1' '2' | '3' '1' '1'
    7 -> '3' '2' | '3' '1' '2' | '3' '2' '2' | '2' '2' '1'
""")

In [None]:
#si el modelo tiene un block_size de tamaño x, n=x+1
#determina el porcentaje de n-1-gramas diferentes en cada palabra del texto
def diversity(text, n=17):
    tokens = text.split()  # Assuming 'text' is a space-separated string
    avg = 0
    for tok in tokens:
      if len(tok) > n:
        n_grams = list(ngrams(tok, n))
        unique_n_grams = set(n_grams)
        avg_tok = len(unique_n_grams) / len(list(n_grams))
        avg += avg_tok
    return avg/len(tokens)

def diversityNotCFG(text, n=17):
  #num_trozos = (len(text) + n - 1) // n
  # Dividir el texto en trozos
  #trozos = [text[i * n:(i + 1) * n] for i in range(num_trozos)]
  text_chunks = [text[i:i+n] for i in range(len(text)-n+1)]
  unique_n_grams = set(text_chunks)
  return len(unique_n_grams)/len(text_chunks)

#vamos comprobando para cada frase de un texto si cumple las reglas de la cfg
#devuelve el porcentaje de palabras del texto que las cumplen
#evalúa la calidad de las predicciones del modelo
def perplexity(text, grammar):
    parser = RecursiveDescentParser(grammar)
    frases = text.split()
    valid = 0
    for frase in frases:
    #frases_posibles = []
    #for frase in frases:
    #    if len(frase) >= 138: #tamaño mínimo frase
    #        frases_posibles.append(frase)
    #for i, frasep in enumerate(frases_posibles):
    #  if i < len(frases_posibles):
      for tree in parser.parse(frase):
    #print(frasep)
        valid += 1
            #print(tree)
        break
    print(f'Número de frases: {len(frases)}')
    #print(f'Número de frases posibles: {len(frases_posibles)}')
    print(f'Número de frases que cumplen las reglas: {valid}')
    print(f'Perplejidad: {valid/len(frases)}')
    #return valid/len(frases)

print(len(gen_text))
print(f'Diversity: {diversity(gen_text)}')
#print(f'DiversityNotCFG: {diversityNotCFG(gen_text)}')
#print(gen_text)
perplexity(gen_text, cfg3b)

256500
Diversity: 0.09829546204267123
Número de frases: 712
Número de frases que cumplen las reglas: 0
Perplejidad: 0.0
