<a href="https://colab.research.google.com/github/omergenkin/Projects/blob/main/NLP_torch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
pip install torch



In [3]:
import torch
from torch import nn
import torch.nn.functional as F

In [4]:
from ast import increment_lineno
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

In [5]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [6]:
with open('/content/drive/My Drive/pytorch_udemy/Udemy_pytorch/PYTORCH_NOTEBOOKS/Data/shakespeare.txt', 'r') as f:
  text = f.read()

In [7]:
all_characters = set(text)

In [8]:
encoder = {char: ind for ind, char in enumerate(all_characters)}
decoder = {ind: char for ind, char in enumerate(all_characters)}

In [9]:
encoder_text = np.array([encoder[char] for char in text])

In [10]:
def one_hot_encoder(encoder_text, num_uni_chars):
  # encoded_text --> batch of encoded text
  # num_uni_char --> len(set(text))
  one_hot = np.zeros((encoder_text.size, num_uni_chars))
  one_hot = one_hot.astype(np.float32)
  one_hot[np.arange(one_hot.shape[0]), encoder_text.flatten()] = 1.0
  one_hot = one_hot.reshape((*encoder_text.shape, num_uni_chars))
  return one_hot

In [11]:
def generate_batches(encoder_text, samp_per_batch=10, seq_len=50):

  # X : encoded text of length seq_len
  # y : encoded text shifted by one
  # how many chars per batch
  char_per_batch = samp_per_batch*seq_len

  # how many batches can we make, given the len of encoded text
  num_batches_avail = int(len(encoder_text)/char_per_batch)
  #cut off the end of the encoded text. that won't fit evenly into the batch
  encoder_text = encoder_text[:num_batches_avail*char_per_batch]
  # reshape into batch_size rows
  encoder_text = encoder_text.reshape((samp_per_batch, -1))
  #go through each row in array
  for n in range(0, encoder_text.shape[1], seq_len):
    #grab feature chars
    x = encoder_text[:, n:n+seq_len]
    #shift label chars by one
    y = np.zeros_like(x)
    try:
      y[:, :-1] = x[:, 1:]
      y[:, -1] = encoder_text[:, n+seq_len]
    except:
      y[:, :-1] = x[:, 1:]
      y[:, -1] = -1
    yield x, y


In [12]:
sampel_text = np.arange(20)

In [13]:
batch_generator = generate_batches(sampel_text, samp_per_batch=2, seq_len=5)

In [14]:
x,y = next(batch_generator)

In [15]:
x

array([[ 0,  1,  2,  3,  4],
       [10, 11, 12, 13, 14]])

In [16]:
y

array([[ 1,  2,  3,  4,  5],
       [11, 12, 13, 14, 15]])

In [17]:
class CharModel(nn.Module):

  def __init__(self, all_chars,num_hidden=256, num_layers=4, drop_prob=0.5, use_gpu=False):

    super().__init__()
    self.drop_prob = drop_prob
    self.num_layers = num_layers
    self.num_hidden = num_hidden
    self.use_gpu = use_gpu

    self.all_chars = all_chars
    self.decoder = decoder
    self.encoder = encoder

    self.lstm = nn.LSTM(len(self.all_chars), num_hidden, num_layers, dropout=drop_prob, batch_first=True)
    self.dropout = nn.Dropout(drop_prob)
    self.fc_linear = nn.Linear(num_hidden, len(self.all_chars))

  def forward(self, x, hidden):

    lstm_output, hidden = self.lstm(x, hidden)
    drop_output = self.dropout(lstm_output)
    drop_output = drop_output.contiguous().view(-1, self.num_hidden)
    final_out = self.fc_linear(drop_output)

    return final_out, hidden

  def hidden_state(self, batch_size):

    if self.use_gpu:
      hidden = (torch.zeros(self.num_layers, batch_size, self.num_hidden).cuda(),
                torch.zeros(self.num_layers, batch_size, self.num_hidden).cuda())
    else:
      hidden = (torch.zeros(self.num_layers, batch_size, self.num_hidden),
                torch.zeros(self.num_layers, batch_size, self.num_hidden))
    return hidden





In [18]:
model = CharModel(all_chars=all_characters,
                  num_hidden=512,
                  num_layers=3,
                  drop_prob=0.5,
                  use_gpu=True)

In [19]:
total_param = []

for p in model.parameters():
  total_param.append(int(p.numel()))
sum(total_param)

5470292

In [20]:
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

In [21]:
train_percent = 0.1
train_ind = int(len(encoder_text)*(train_percent))
train_data = encoder_text[:train_ind]
val_data = encoder_text[train_ind:]

In [22]:
train_percent = 0.9
train_ind = int(len(encoder_text)*(train_percent))
train_data = encoder_text[:train_ind]
val_data = encoder_text[train_ind:]

In [23]:
epochs = 12
batch_size = 100
seq_len = 100
tracker = 0
num_char = max(encoder.values())+1

In [24]:
model.train()
if model.use_gpu:
  model.cuda()
# else:
#   model.use_gpu = False

for i in range(epochs):
  hidden = model.hidden_state(batch_size)
  for x,y in generate_batches(train_data, batch_size, seq_len):
    tracker += 1
    x = one_hot_encoder(x, num_char)
    inputs = torch.from_numpy(x)
    targets = torch.from_numpy(y) # Ensure targets is a tensor here
    if model.use_gpu:
      inputs = inputs.cuda()
      targets = targets.cuda()

    hidden = tuple([state.data for state in hidden])
    model.zero_grad()
    lstm_output, hidden = model.forward(inputs, hidden)

    # Ensure targets are within the valid range [0, num_char-1]
    targets = targets.view(batch_size*seq_len).long() # targets should be a tensor here
    targets = torch.clamp(targets, 0, num_char-1)

    loss = criterion(lstm_output, targets)

    loss.backward()
    nn.utils.clip_grad_norm_(model.parameters(), max_norm=5)

    optimizer.step()

    if tracker % 25 == 0:
      val_hidden = model.hidden_state(batch_size)
      val_losses = []
      model.eval()

      for x,y in generate_batches(val_data, batch_size, seq_len):
        x = one_hot_encoder(x, num_char)
        inputs = torch.from_numpy(x)
        targets = torch.from_numpy(y) # Ensure targets is a tensor here
        if model.use_gpu:
          inputs = inputs.cuda()
          targets = targets.cuda()
        val_hidden = tuple([state.data for state in val_hidden])
        lstm_output, val_hidden = model.forward(inputs, val_hidden)

        # Ensure targets are within the valid range [0, num_char-1]
        targets = targets.view(batch_size*seq_len).long() # targets should be a tensor here
        targets = torch.clamp(targets, 0, num_char-1)

        val_loss = criterion(lstm_output, targets)
        val_losses.append(val_loss.item())
      model.train()
      print(f"Epoch: {i} Step: {tracker} Val Loss: {val_loss.item()}")

Epoch: 0 Step: 25 Val Loss: 3.2495577335357666
Epoch: 0 Step: 50 Val Loss: 3.2430179119110107
Epoch: 0 Step: 75 Val Loss: 3.2458484172821045
Epoch: 0 Step: 100 Val Loss: 3.2428364753723145
Epoch: 0 Step: 125 Val Loss: 3.171109914779663
Epoch: 0 Step: 150 Val Loss: 3.0699353218078613
Epoch: 0 Step: 175 Val Loss: 3.0071351528167725
Epoch: 0 Step: 200 Val Loss: 2.9083383083343506
Epoch: 0 Step: 225 Val Loss: 2.7903800010681152
Epoch: 0 Step: 250 Val Loss: 2.7165820598602295
Epoch: 0 Step: 275 Val Loss: 2.6364428997039795
Epoch: 0 Step: 300 Val Loss: 2.517833709716797
Epoch: 0 Step: 325 Val Loss: 2.435997486114502
Epoch: 0 Step: 350 Val Loss: 2.353422164916992
Epoch: 0 Step: 375 Val Loss: 2.2889034748077393
Epoch: 0 Step: 400 Val Loss: 2.2449541091918945
Epoch: 0 Step: 425 Val Loss: 2.1950573921203613
Epoch: 0 Step: 450 Val Loss: 2.1715080738067627
Epoch: 0 Step: 475 Val Loss: 2.126555919647217
Epoch: 1 Step: 500 Val Loss: 2.0862832069396973
Epoch: 1 Step: 525 Val Loss: 2.056349992752075
E

In [25]:
model_name= 'hidden512_layers3_shakes.net'

In [27]:
torch.save(model_dict, model_name)

NameError: name 'model_dict' is not defined

In [28]:
def predict_next_char(model, char, hidden=None, k=1):
  encoded_text = encoder[char]
  encoded_text = np.array([[encoded_text]])
  encoded_text = one_hot_encoder(encoded_text, len(model, all_characters))
  inputs = torch.from_numpy(encoded_text)
  if model.use_gpu:
    inputs = inputs.cuda()

  hidden = tuple([state.data for state in hidden])
  lstm_out, hidden = model(inputs, hidden)
  probs = F.softmax(lstm_out, dim=1).data
  if model.use_gpu:
    probs = probs.cpu

  probs, index_positions = probs.topk(k)
  index_positions = index_positions.numpy().squeeze()
  probs = probs.numpy().flatten()
  probs = probs/probs.sum()
  char = np.random.choice(index_positions, p=probs)
  return model.decoder[char], hidden


In [29]:
def generate_text(model, size, seed='The', k=1):
  if model.use_gpu:
    model.cuda()
  else:
    model.cpu()

  model.eval()
  output_chars = [c for c in seed]
  hidden = model.hidden_state(1)
  for char in seed:
    char, hidden = predict_next_char(model, char, hidden, k=k)

  output_chars.append(char)

  for i in range(size):
    char, hidden = predict_next_char(model, output_chars[-1], hidden, k=k)
    output_chars.append(char)

  return ''.join(output_chars)

In [31]:
print(generate_text(model, 1000, seed='The ', k=1))

TypeError: len() takes exactly one argument (2 given)