In [None]:
import csv
import torch
import torch.nn as nn
import pandas as pd
import json
from torch.nn import functional as F
import matplotlib.pyplot as plt
import numpy as np


In [None]:
from google.colab import drive
#Google disc mount
drive.mount('/content/drive')

In [None]:
cd /content/drive/MyDrive/dadpranks-main/dadpranks-main

In [None]:
data = pd.read_csv('shortjokes.csv')
data['Joke'] = data['Joke'].astype(str)
big_string = ' '.join(data['Joke'])
characters = sorted(list(set(big_string)))
stoi = {ch:i for i,ch in enumerate(characters)}
itos = {i:ch for i,ch in enumerate(characters)}

stoi['<E>'] = 97
itos[97] = '<E>'
print(len(stoi))


encode = lambda s: [stoi[c] for c in s]
decode = lambda l: [itos[i] for i in l]

vocab_size = len(characters) + 1
print(vocab_size)


In [None]:
# Visualize counts of each character - helpful for data cleaning
characters_count = {}

for char in big_string:
  if char in characters_count:
    characters_count[char] += 1
  else:
    characters_count[char] = 1

# Sort the character count dictionary by count from least to greatest
sorted_characters_count = dict(sorted(characters_count.items(), key=lambda item: item[1]))

# Print the sorted character count dictionary
for char, count in sorted_characters_count.items():
    print(f"Character: '{char}', Count: {count}")

In [None]:
#Data cleaning
drop_indexes = []
for index,row in data.iterrows():
  if '\x08' in row['Joke'] or '\x10' in row['Joke']:
    print(index)
    drop_indexes.append(index)

for indexx in drop_indexes:
  data = data.drop(indexx)

In [None]:
list_of_jokes_encoded = []

for index,row in data.iterrows():
  list_of_jokes_encoded.extend(encode(row['Joke']))
  list_of_jokes_encoded.append(97)


data_tensor = torch.tensor(list_of_jokes_encoded, dtype=torch.long)
n = int(0.9*len(data))
train_data = data_tensor[:n]
val_data = data_tensor[n:]

In [None]:
# hyperparameters
batch_size = 32 # how many independent sequences will we process in parallel?
block_size = 256# what is the maximum context length for predictions?
max_iters = 3500


eval_interval = 500
learning_rate = 3e-4
#device = 'cuda' if torch.cuda.is_available() else 'cpu'
device = 'cuda'
eval_iters = 200
n_embd = 320
n_head = 6
n_layer = 6
dropout = 0.3
hyperparameters = {'batch_size': batch_size,
                   'block_size': block_size,
                   'max_iters': max_iters,
                   'eval_interval': eval_interval,
                   'learning_rate': learning_rate,
                   'device': device,
                   'eval_iters': eval_iters,
                   'n_embd': n_embd,
                   'n_head': n_head,
                   'n_layer': n_layer,
                   'dropout': dropout,}
print(hyperparameters)
# ------------

In [None]:
def get_batch(split):
  # Pobieranie małej partii danych z input x i target y
  data = train_data if split == 'train' else val_data
  # ix tworzy indeksy w train_data o długości batch_size i nie krótsze niż len(train_data - block_size), żeby nie "wyjść" poza dane tekstowe. block_size - to maksymalna długość kontekstu
  ix = torch.randint(len(data)-block_size, (batch_size,))
  x = torch.stack([data[i:i+block_size] for i in ix])
  y = torch.stack([data[i+1:i+block_size+1] for i in ix])
  x, y = x.to(device), y.to(device)
  return x,y

In [None]:
#Model Transofmers:
@torch.no_grad()
def estimate_loss():
  out = {}
  model.eval()
  for split in ['train', 'val']:
    losses = torch.zeros(eval_iters)
    for k in range(eval_iters):
      X, Y = get_batch(split)
      #print(X.shape)
      logits, loss = model(X, Y)
      losses[k] = loss.item()
    out[split] = losses.mean()
  model.train()
  return out


class Head(nn.Module):

  def __init__(self, head_size):
    super().__init__()
    self.key = nn.Linear(n_embd, head_size, bias=False)
    self.query = nn.Linear(n_embd, head_size, bias=False)
    self.value = nn.Linear(n_embd, head_size, bias=False)
    self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))

    self.dropout = nn.Dropout(dropout)

  def forward(self, x):
    # tensor o postaci (batch, time-step, channels)
    # output o postaci (batch, time-step, head size)

    B,T,C = x.shape
    k = self.key(x)
    q = self.query(x)
    # Attention scores:
    wei = q @ k.transpose(-2,-1) * k.shape[-1]**-0.5 # (B,T,hs) @ (B,hs,T) -> (B,T,T)
    wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf'))
    wei = F.softmax(wei, dim=-1)
    wei = self.dropout(wei)
    # perform the weighted aggregation of the values
    v = self.value(x)
    out = wei @ v
    return out

class MultiHeadAttention(nn.Module):
  def __init__(self, num_heads, head_size):
    super().__init__()
    self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
    self.proj = nn.Linear(head_size * num_heads, n_embd)
    self.dropout = nn.Dropout(dropout)

  def forward(self, x):
    out = torch.cat([h(x) for h in self.heads], dim=-1)
    out = self.dropout(self.proj(out))
    return out

class FeedForward(nn.Module):
  def __init__(self, n_embd):
    super().__init__()
    self.net = nn.Sequential(
        nn.Linear(n_embd, 4*n_embd),
        nn.ReLU(),
        nn.Linear(4 * n_embd, n_embd),
        nn.Dropout(dropout)
    )
  def forward(self, x):
    return self.net(x)

class Block(nn.Module):

  def __init__(self, n_embd, n_head):
    super().__init__()
    head_size = n_embd // n_head
    self.sa = MultiHeadAttention(n_head, head_size)
    self.ffwd = FeedForward(n_embd)
    self.ln1 = nn.LayerNorm(n_embd)
    self.ln2 = nn.LayerNorm(n_embd)

  def forward(self, x):
    x = x + self.sa(self.ln1(x))
    x = x + self.ffwd(self.ln2(x))
    return x

class LanguageModel(nn.Module):

  def __init__(self):
    super().__init__()
    self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
    #print(self.token_embedding_table)
    self.position_embedding_table = nn.Embedding(block_size, n_embd)
    self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) for _ in range(n_layer)])
    self.ln_f = nn.LayerNorm(n_embd) # final layer norm
    self.lm_head = nn.Linear(n_embd, vocab_size)

    self.apply(self._init_weights)

  def _init_weights(self, module):
      if isinstance(module, nn.Linear):
          torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
          if module.bias is not None:
              torch.nn.init.zeros_(module.bias)
      elif isinstance(module, nn.Embedding):
          torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
  def forward(self, idx, targets=None):
    B, T = idx.shape

    #print(idx)
    #idx_np = idx.numpy()
    #print(np.min(idx_np))
    #print(np.max(idx_np))

    tok_emb = self.token_embedding_table(idx)
    pos_emb = self.position_embedding_table(torch.arange(T, device=device))
    x = tok_emb + pos_emb

    x = self.blocks(x)
    x = self.ln_f(x)
    logits = self.lm_head(x)

    if targets is None:
      loss = None
    else:
      B, T, C = logits.shape
      logits = logits.view(B*T, C)
      targets = targets.view(B*T)
      loss = F.cross_entropy(logits, targets)

    return logits, loss
  def generate(self, idx, max_new_tokens):
    # idx is (B, T) array of indices in the current context
    for _ in range(max_new_tokens):
        # crop idx to the last block_size tokens
        idx_cond = idx[:, -block_size:]
        # get the predictions
        logits, loss = self(idx_cond)
        # focus only on the last time step
        logits = logits[:, -1, :] # becomes (B, C)
        # apply softmax to get probabilities
        probs = F.softmax(logits, dim=-1) # (B, C)
        # sample from the distribution
        idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
        # append sampled index to the running sequence
        idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
    return idx

In [None]:
model = LanguageModel()
m = model.to(device)

# print the number of parameters in the model
print(sum(p.numel() for p in m.parameters())/1e6, 'M parameters')

# create a PyTorch optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

In [None]:
#TRAINING LOOP
for iter in range(max_iters):

    # every once in a while evaluate the loss on train and val sets
    if iter % eval_interval == 0 or iter == max_iters - 1:
        losses = estimate_loss()
        print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")

    # sample a batch of data
    xb, yb = get_batch('train')

    #print(xb.shape)
    # evaluate the loss
    logits, loss = model(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()


In [None]:
context = torch.zeros((1, 1), dtype=torch.long, device=device)
generated = decode(m.generate(context, max_new_tokens=1500)[0].tolist())
#print(generated)
text = ''
for letter in generated:
  if letter == "<E>":
    print(text)
    text=''
  text += letter

print(text)