In [61]:
import torch
import torchtext
import torch.nn as nn
from torchtext.datasets import IMDB
torchtext.disable_torchtext_deprecation_warning()
# from torchtext.datasets import IMDB
train_dataset = IMDB(split = 'train')
test_dataset = IMDB(split = 'test')
from itertools import islice
import pandas as pd

In [62]:
# split the training dataset into seperate training and validation partitions
from torch.utils.data.dataset import random_split
torch.manual_seed(1)
train_dataset, valid_dataset = random_split(list(train_dataset), [20000, 5000])

In [63]:
test_dataset = list(test_dataset)

In [64]:
# identify the unique words in the training dataset
#  map these unique words to a unique integer and encode the review text into encoded integers
import re
from collections import Counter, OrderedDict


# split the text into words
def tokenizer(text):
  text = re.sub('<[^>]*>', '', text)
  emoticons = re.findall('(?::|;|=)(?:\)|\(|D|P)', text.lower())
  text = re.sub('[\W]+', ' ', text.lower()) + ' '.join(emoticons).replace('-', '')
  tokenized = text.split()
  return tokenized

In [65]:
token_counts = Counter()
for label, line in train_dataset:
  tokens = tokenizer(line)
  token_counts.update(tokens)
print('Vocab-size: ', len(token_counts))

Vocab-size:  69019


In [66]:
# 2 is positive feedback, and 1 is negative feedback

token = Counter()
for label, line in train_dataset:
  t = str(label)
  token.update(t)
token

Counter({'2': 10064, '1': 9936})

In [67]:
# map these unique words in integer
from torchtext.vocab import vocab
sort_tuple = sorted(token_counts.items(), key = lambda x: x[1], reverse = True)
order_dict = OrderedDict(sort_tuple)
vocab= vocab(order_dict)
vocab.insert_token('<pad>', 0)
vocab.insert_token('<unk>', 1)
vocab.set_default_index(1)

In [68]:
# define the function for transformation 
text_pipeline = lambda x: [vocab[token] for token in tokenizer(x)]
# need to change label -1, so that the label between 0 and 1, original 1 mean bad, 2 mean good
label_pipeline = lambda x: x-1.0

In [69]:
# wrap the encode and transformation function
def collate_batch(batch):
  label_list, text_list, lengths = [], [], []
  for _label, _text in batch:
    label_list.append(label_pipeline(_label))
    processed_text = torch.tensor(text_pipeline(_text), dtype = torch.int64)
    text_list.append(processed_text)
    lengths.append(processed_text.size(0))
  label_list = torch.tensor(label_list)
  lengths = torch.tensor(lengths)
  padded_text_list = nn.utils.rnn.pad_sequence(text_list, batch_first = True)
  return padded_text_list, label_list, lengths

In [70]:
# take a small batch
from torch.utils.data import DataLoader

# collate_fn is self funtion to define batch
dataloader = DataLoader(train_dataset, batch_size = 4, shuffle = False, collate_fn = collate_batch)
# let check the first batch
text_batch1, label_list1, lengths1 = next(iter(dataloader))
text_batch1.shape

torch.Size([4, 218])

In [71]:
# let divide all three datasets into dataloader with a batch size of 32:
batch_size = 128
train_dl = DataLoader(train_dataset, batch_size = batch_size, shuffle= True, collate_fn = collate_batch)
valid_dl = DataLoader(valid_dataset, batch_size = batch_size, shuffle= True, collate_fn= collate_batch)
test_dl = DataLoader(test_dataset, batch_size = batch_size, shuffle= True, collate_fn= collate_batch)

In [72]:
# embedding matric change token to vectors which has less dimension than then dimension of one hot code (number of tokens)
# embedding layer 
embedding = nn.Embedding(num_embeddings = 69019+2, embedding_dim = 300, padding_idx = 0)
text_batch0, label_list0, length0 = next(iter(train_dl))
# embedding(text_batch0).shape
# shape: batch_size, row is number of tokens in each sub_batch, and size of embedding

In [73]:
# Building an rnn model
# class RNN(nn.Module):
#   def __init__(self, input_size, hidden_size):
#     super().__init__()
#     self.rnn = nn.RNN(input_size, hidden_size, num_layers = 2, batch_first= True)
#     self.fc = nn.Linear(hidden_size, 1)
    
#   def forward(self, x):
#     _ , hidden = self.rnn(x)
#     hidden = hidden[-1,:,:]
#     # I use final hidden state from the last hidden layer as the input to the fully connected layer
#     hidden = self.fc(hidden)
#     return hidden
  
# model = RNN(64, 32)
# print(model)
# model(torch.randn(5, 3, 64))    

In [74]:
#  Building an RNN 
#  1.starting with embedding to reduce dimension of features(feature size = 20)
#  2.recurrent layer of type LSTM 
#  3.connected layer as a hiddien layer and another connected layer as the output layer(return a single class membership possibility value)

class RNN(nn.Module):
  def __init__(self, vocab_size, embed_dim, rnn_hidden_size, fc_hidden_size):
    super().__init__()
    self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx = 0)
    self.rnn = nn.LSTM(embed_dim, rnn_hidden_size, batch_first= True)
    self.fc1 = nn.Linear(rnn_hidden_size, fc_hidden_size)
    self.relu = nn.ReLU()
    self.fc2 = nn.Linear(fc_hidden_size, 1)
    self.sigmoid = nn.Sigmoid()
    
  def forward(self, text, lengths):
    out = self.embedding(text)
    out = nn.utils.rnn.pack_padded_sequence(out, lengths.cpu().numpy(), enforce_sorted= False, batch_first = True)
    alloutput,(hidden, cell) = self.rnn(out)
    hidden1 = hidden[-1, :, :]
    hidden1 = self.fc1(hidden1)
    hidden1 = self.relu(hidden1)
    hidden1 = self.fc2(hidden1)
    output = self.sigmoid(hidden1)
    return output

In [75]:
vocab_size = len(vocab)
embed_dim = 20
rnn_hidden_size = 64
fc_hidden_size = 64
torch.manual_seed(1)
model = RNN(vocab_size, embed_dim, rnn_hidden_size, fc_hidden_size)
model

RNN(
  (embedding): Embedding(69021, 20, padding_idx=0)
  (rnn): LSTM(20, 64, batch_first=True)
  (fc1): Linear(in_features=64, out_features=64, bias=True)
  (relu): ReLU()
  (fc2): Linear(in_features=64, out_features=1, bias=True)
  (sigmoid): Sigmoid()
)

In [76]:
# develop the train function to train the model on the given dataset for one epoch and return the classification accuracy and loss
def train(model, dataloader):
  model.train()
  total_acc, total_loss = 0, 0
  loss_fn = nn.BCELoss(reduction = 'mean')
    #create optimizer to update weight and bias 
  optimizer = torch.optim.Adam(model.parameters(), lr = 0.001)
  for text_batch, label_batch, lengths in dataloader:
    optimizer.zero_grad()
    pred = model(text_batch, lengths)[:, 0]
    loss = loss_fn(pred, label_batch)
    loss.backward()
    optimizer.step()
    total_acc += ((pred >= 0.5) == label_batch).float().sum().item()
    total_loss += loss.item()*label_batch.size(0)

  
  return total_acc/ len(dataloader.dataset),  total_loss/ len(dataloader.dataset)
  

In [17]:
def evaluate(model, dataloader):
  model.eval()
  total_acc, total_loss = 0,0 
  loss_fn = nn.BCELoss(reduction = 'mean')
  with torch.no_grad():
    for text_batch, label_batch, lengths in dataloader:
      pred = model(text_batch, lengths)[:, 0]
      loss = loss_fn(pred, label_batch)
      total_acc += ((pred >= 0.5) == label_batch).float().sum().item()
      total_loss += loss.item() *label_batch.size(0)
  return total_acc/ len(dataloader.dataset),  total_loss/ len(dataloader.dataset)

In [18]:
num_epoch = 10
torch.manual_seed(1)
for epoch in range(num_epoch):
  train_acc, train_loss = train(model, train_dl)
  valid_acc, valid_loss = evaluate(model, valid_dl)
  print(f'epoch{epoch}: accuracy for train: {train_acc}, loss for train: {train_loss},  accuracy for valid: {valid_acc}, and loss for valid: {valid_loss}')
 

KeyboardInterrupt: 

In [19]:
# test the model for test dataset
acc_test, loss_test = evaluate(model, test_dl)
print(f'the accuracy for test dataset is : {acc_test}') 

KeyboardInterrupt: 

# Generate text

In [77]:
# the goal is : according to input text, and develop a model that can generate new text which is similar in style to the input text
# theere are three steps : 1 preprocessing the dataset
# 2 Building the RNN model
# 3 Performing next- character prediction and sampling to generate new text

In [78]:
import numpy as np
# reading and processing text
with open ("C:/Users/julia/Downloads/pg74657.txt", 'r', encoding = 'utf8') as fp:
  text = fp.read()

start_index = text.find('The Project Gutenberg eBook')
end_index = text.find('hear about new eBooks')

text = text[start_index: end_index]
char_set = set(text)

print(f'the total length of text is : {len(text)}')
print(f'the total unique character of text is : {len(char_set)}')

the total length of text is : 154188
the total unique character of text is : 90


In [79]:
# create a dictionary to map characters to integers, and need a reverse mapping to convert integers back to characters
# reverse mapping: using numpy.array = ['o', 'e', ....] and index is more efficient than dictionary

# bulid  the dictionary to map characters to integers
chars_sorted = sorted(char_set)
char_int = {ch: i for i, ch in enumerate(chars_sorted)}

# bulid a numpy array

text_encoded = np.array([char_int[ch] for ch in text], dtype = np.int32)
char_array = np.array(chars_sorted)

print(text[:20], '------>',text_encoded[:20])
print(text_encoded[100: 130], '------->', ''.join(char_array[text_encoded[100: 130]]))
print(text_encoded[100: 130], '----->', text[100: 130])


The Project Gutenber ------> [46 63 60  1 42 73 70 65 60 58 75  1 33 76 75 60 69 57 60 73]
[64 75 60 59  1 45 75 56 75 60 74  1 56 69 59  0 68 70 74 75  1 70 75 63
 60 73  1 71 56 73] -------> ited States and
most other par
[64 75 60 59  1 45 75 56 75 60 74  1 56 69 59  0 68 70 74 75  1 70 75 63
 60 73  1 71 56 73] -----> ited States and
most other par


In [80]:
# build input and output, seperate each 41 letters, and the first 40 letters is input, and the last 40 letters is output

from torch.utils.data import Dataset
seq_length = 40
chunk_size = seq_length + 1
text_chunks = np.array([text_encoded[i : i + chunk_size] for i in range(len(text) - chunk_size)])

In [81]:
# build a tensor class
class TextDataset(Dataset):
  def __init__(self, text_chunks):
    self.text_chunks = text_chunks
    
  def __len__(self):
    return len(self.text_chunks)
  
  def __getitem__(self, index):
    return self.text_chunks[index][ : -1].long(), self.text_chunks[index][1: ].long()
  
seq_dataset = TextDataset(torch.tensor(text_chunks))

In [82]:
# change this dataset to min-batch
from torch.utils.data import DataLoader
batch_size = 64
torch.manual_seed(1)
seq_dl = DataLoader(seq_dataset, batch_size = batch_size, shuffle= True, drop_last= True)

In [83]:
# bulid a character-level RNN model
import torch.nn as nn
class RNN(nn.Module):
  def __init__(self, vocab_size, embed_dim, rnn_hidden_size):
    super().__init__()
    self.embedding = nn.Embedding(vocab_size, embed_dim)
    self.rnn_hidden_size = rnn_hidden_size
    self.rnn = nn.LSTM(embed_dim, rnn_hidden_size, batch_first= True)
    self.fc = nn.Linear(rnn_hidden_size, vocab_size) 
    # self.softmax = nn.Softmax(dim = 1)
    
  def forward(self, x, hidden, cell):
    out = self.embedding(x).unsqueeze(1)
    out, (hidden, cell) = self.rnn(out, (hidden, cell))
    out = self.fc(out).reshape(out.size(0), -1)
    # out = self.softmax(out)
    argmax_out = torch.argmax(out, dim = 1)
    return out, hidden, cell
  
  
  def init_hidden(self, batch_size):
    hidden = torch.zeros(1, batch_size, self.rnn_hidden_size)
    cell = torch.zeros(1, batch_size, self.rnn_hidden_size)
    return hidden, cell

In [84]:
# we can specify the model parmeters and create an rnn model
vocab_size = len(char_array)
embed_dim = 256
rnn_hidden_size = 512
torch.manual_seed(1)
model = RNN(vocab_size, embed_dim, rnn_hidden_size)
model

RNN(
  (embedding): Embedding(90, 256)
  (rnn): LSTM(256, 512, batch_first=True)
  (fc): Linear(in_features=512, out_features=90, bias=True)
)

In [85]:
# create a loss function and optimizer 
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr = 0.001)

In [86]:
# train the model with epoch = 1000 and every 100, print the result 
# in each epoch , only use one batch
num_epoch = 3001
torch.manual_seed(1)
for epoch in range(num_epoch):
  hidden, cell = model.init_hidden(batch_size)
  input_batch, target_batch = next(iter(seq_dl))
  optimizer.zero_grad()
  loss = 0
  for c in range(seq_length):
    pred, hidden, cell = model(input_batch[:, c], hidden, cell)
    loss += loss_fn(pred, target_batch[:, c])
  loss.backward()
  optimizer.step()
  loss = loss.item()/ seq_length
  if epoch % 500 == 0:
    print(f'Epoch {epoch} loss : {loss: .4f}') 

Epoch 0 loss :  4.4737
Epoch 500 loss :  1.5696
Epoch 1000 loss :  1.2565
Epoch 1500 loss :  1.1097
Epoch 2000 loss :  1.0154
Epoch 2500 loss :  0.8309
Epoch 3000 loss :  0.7262


# Evaluation phase - Gererating new text passages

In [87]:
from torch.distributions.categorical import Categorical
torch.manual_seed(1)
logits = torch.tensor([[1.0,1.0,1.0, 1.0, 1.0]])
print('Probabilities: ', nn.functional.softmax(logits, dim = 1).numpy()[0])

m = Categorical(logits = logits)
samples = m.sample((10, ))
print(samples.numpy())

Probabilities:  [0.2 0.2 0.2 0.2 0.2]
[[0]
 [1]
 [1]
 [0]
 [2]
 [0]
 [2]
 [3]
 [2]
 [2]]


In [94]:
starting_str = 'The Project Gutenberg'
encoded_input = torch.tensor([char_int[ch] for ch in starting_str])
encoded_input = torch.reshape(encoded_input, (1, -1))
encoded_input

tensor([[46, 63, 60,  1, 42, 73, 70, 65, 60, 58, 75,  1, 33, 76, 75, 60, 69, 57,
         60, 73, 62]])

In [88]:
def sample(model, starting_str, len_generated_text = 500, scale_factor = 1.0):
  encoded_input = torch.tensor([char_int[ch] for ch in starting_str])
  encoded_input = torch.reshape(encoded_input, (1, -1))
  generated_str = starting_str
  model.eval()
  hidden, cell = model.init_hidden(1)
  for c in range(len(starting_str)-1):
    _, hidden, cell = model(encoded_input[:, c].view(1), hidden, cell)
  last_char = encoded_input[:, -1]
  for i in range(len_generated_text):
    logits, hidden, cell = model(last_char, hidden, cell)
    generated_int = 
    last_char = generated_str[-1]

SyntaxError: invalid syntax (3607262839.py, line 12)