<a href="https://colab.research.google.com/github/josepeon/python_dad_class/blob/main/text_intro_to_rnn_dele.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Text Classification with Neural Networks

In [145]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

import numpy as np
import pandas as pd

In [146]:
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

#### Classifying Spam

In [147]:
#read in data
spam = pd.read_csv('https://raw.githubusercontent.com/stedy/Machine-Learning-with-R-datasets/refs/heads/master/sms_spam.csv')

In [148]:
#take a peek
spam.head()

Unnamed: 0,type,text
0,ham,"Go until jurong point, crazy.. Available only ..."
1,ham,Ok lar... Joking wif u oni...
2,spam,Free entry in 2 a wkly comp to win FA Cup fina...
3,ham,U dun say so early hor... U c already then say...
4,ham,"Nah I don't think he goes to usf, he lives aro..."


In [149]:
#create a tokenizer
tokenizer = Tokenizer(num_words = 500)

In [150]:
#fit the tokenizer -- learns the vocabulary
tokenizer.fit_on_texts(spam['text'].values)

In [151]:
#look at tokenizer
tokenizer.num_words

500

In [152]:
#create document term matrix (binarized)
dtm = tokenizer.texts_to_matrix(spam['text'].values)

In [153]:
#take a peek
dtm

array([[0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 1., ..., 0., 0., 0.],
       ...,
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 1., 1., ..., 0., 0., 0.],
       [0., 0., 1., ..., 0., 0., 0.]])

In [154]:
tokenizer.index_word[2]

'to'

In [155]:
spam['text'][2]

"Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's"

In [156]:
[tokenizer.index_word[i] for i in range(1, 500)]

['i',
 'to',
 'you',
 'a',
 'the',
 'u',
 'and',
 'in',
 'is',
 'me',
 'my',
 'for',
 'your',
 'it',
 'of',
 'call',
 'have',
 'on',
 '2',
 'that',
 'now',
 'are',
 'so',
 'but',
 'not',
 'or',
 'do',
 'can',
 'at',
 "i'm",
 'ur',
 'get',
 'will',
 'if',
 'be',
 'with',
 'just',
 'no',
 'we',
 'this',
 '4',
 'gt',
 'lt',
 'up',
 'when',
 'ok',
 'free',
 'from',
 'go',
 'how',
 'all',
 'out',
 'what',
 'know',
 'like',
 'good',
 'then',
 'got',
 'come',
 'was',
 'its',
 'am',
 'time',
 'only',
 'day',
 'love',
 'there',
 'send',
 'he',
 'want',
 'text',
 'as',
 'txt',
 'one',
 'going',
 'by',
 'ü',
 "i'll",
 'need',
 'home',
 'about',
 'r',
 'lor',
 'sorry',
 'stop',
 'still',
 'see',
 'n',
 'back',
 'today',
 'da',
 'our',
 'dont',
 'reply',
 'k',
 "don't",
 'she',
 'mobile',
 'take',
 'hi',
 'tell',
 'new',
 'please',
 'later',
 'her',
 'pls',
 'any',
 'think',
 'been',
 'they',
 'phone',
 'here',
 'week',
 'did',
 'dear',
 'some',
 'well',
 'has',
 '1',
 'night',
 'much',
 'd',
 'gre

In [157]:
class TextDataset(Dataset):
  def __init__(self, X, y):
    super().__init__()
    self.x = torch.tensor(X, dtype = torch.float)
    self.y = torch.tensor(y, dtype = torch.float)

  def __len__(self):
    return len(self.y)

  def __getitem__(self, idx):
    return self.x[idx], self.y[idx]

In [158]:
X = dtm
y = np.where(spam['type'] == 'ham', 0, 1)

In [159]:
Xt = torch.tensor(X, dtype = torch.float32)
yt = torch.tensor(y, dtype = torch.float32)

In [160]:
from torch.utils.data import TensorDataset, DataLoader

In [161]:
from sklearn.model_selection import train_test_split

In [162]:
X_train, X_test, y_train, y_test = train_test_split(Xt, yt, test_size=.2)

In [163]:
X_train

tensor([[0., 0., 0.,  ..., 0., 0., 0.],
        [0., 1., 0.,  ..., 0., 0., 0.],
        [0., 0., 1.,  ..., 0., 0., 0.],
        ...,
        [0., 1., 1.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 1.,  ..., 0., 0., 0.]])

In [164]:
#create data class
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TextDataset(X_test, y_test)

  self.x = torch.tensor(X, dtype = torch.float)
  self.y = torch.tensor(y, dtype = torch.float)


In [165]:
#dataset and loader -- making batches of our bigger dataset
trainloader = DataLoader(train_dataset, batch_size = 32)
#dataset and loader
testloader = DataLoader(test_dataset, batch_size = 32)

In [166]:
model = nn.Sequential(nn.Linear(in_features=500, out_features=1000),
                      nn.ReLU(),
                      nn.Linear(1000, 100),
                      nn.ReLU(),
                      nn.Linear(100, 1),
                      nn.Sigmoid()
                      )

In [167]:
model(Xt)

tensor([[0.5250],
        [0.5198],
        [0.5215],
        ...,
        [0.5230],
        [0.5183],
        [0.5227]], grad_fn=<SigmoidBackward0>)

In [168]:
loss_fn = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr = 0.01)

In [169]:
from tqdm import tqdm

In [170]:
#keep track of the losses
losses = []
#train it for 20 epochs
for epoch in tqdm(range(20)):
  #iterate over the batches
  for x, y in trainloader:
    #feeds data into the model
    yhat = model(x)
    #evaluate the predictions
    loss =loss_fn(yhat, y.unsqueeze(1))
    #update the weights/params
    optimizer.zero_grad() #pytorch house cleaning
    loss.backward() #pass info backward
    optimizer.step() #step toward less loss
    losses.append(loss.item()) #tracking the loss

100%|██████████| 20/20 [00:38<00:00,  1.94s/it]


In [171]:
train_predictions = model(X_train)
ytrain_preds = torch.where(train_predictions> .5, 1, 0)

In [172]:
torch.sum(ytrain_preds.squeeze(1) == y_train)/len(y_train)

tensor(0.9998)

In [173]:
ytest_preds = torch.where(model(X_test) > .5, 1, 0)
torch.sum(ytest_preds.squeeze(1) == y_test)/len(y_test)

tensor(0.9874)

In [174]:
#loss and optimizer
class TextModel(nn.Module):
  def __init__(self):
    super().__init__()
    self.lin1 = nn.Linear(in_features = 500, out_features = 100)
    self.lin2 = nn.Linear(100, 100)
    self.lin3 = nn.Linear(100, 1)
    self.sigmoid = nn.Sigmoid()
    self.act = nn.ReLU()

  def forward(self, x):
    x = self.act(self.lin1(x))
    x = self.act(self.lin2(x))
    return self.sigmoid(self.lin3(x))




In [175]:
#training function
model = TextModel()
optimizer = optim.Adam(model.parameters(), lr = 0.01)
loss_fn = nn.BCELoss()

In [176]:
#torch.save(model, 'textmodel.pt')

In [177]:
from tqdm import tqdm

In [178]:
#evaluate
for epoch in tqdm(range(100)):
  losses = 0
  for x,y in trainloader:
    yhat = model(x)
    y = y.reshape(-1, 1)
    loss = loss_fn(yhat, y)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    losses += loss.item()
  if epoch % 10 == 0:
    print(f'Epoch {epoch} Loss: {losses}')

  1%|          | 1/100 [00:00<01:26,  1.14it/s]

Epoch 0 Loss: 19.897117960848846


 11%|█         | 11/100 [00:08<01:08,  1.30it/s]

Epoch 10 Loss: 0.3340495549133209


 21%|██        | 21/100 [00:17<01:04,  1.23it/s]

Epoch 20 Loss: 0.20322521342030697


 31%|███       | 31/100 [00:27<01:05,  1.05it/s]

Epoch 30 Loss: 0.19342272566609675


 41%|████      | 41/100 [00:32<00:34,  1.70it/s]

Epoch 40 Loss: 0.20008202656111607


 51%|█████     | 51/100 [00:39<00:39,  1.24it/s]

Epoch 50 Loss: 4.507611352721057


 61%|██████    | 61/100 [00:43<00:17,  2.27it/s]

Epoch 60 Loss: 0.153403052560737


 71%|███████   | 71/100 [00:50<00:18,  1.58it/s]

Epoch 70 Loss: 0.1568060389889246


 81%|████████  | 81/100 [00:56<00:12,  1.58it/s]

Epoch 80 Loss: 0.16012698261154504


 91%|█████████ | 91/100 [01:01<00:05,  1.75it/s]

Epoch 90 Loss: 0.15900360136278344


100%|██████████| 100/100 [01:07<00:00,  1.48it/s]


In [179]:
Xt = torch.tensor(X_test, dtype = torch.float)

  Xt = torch.tensor(X_test, dtype = torch.float)


In [180]:

output = model(Xt) #model predictions

In [181]:
output

tensor([[7.9505e-20],
        [1.0000e+00],
        [6.8780e-36],
        ...,
        [0.0000e+00],
        [0.0000e+00],
        [1.3955e-08]], grad_fn=<SigmoidBackward0>)

In [182]:
#Converting probabilities to prediction
preds = np.where(np.array(output.detach()) >= .5, 1, 0)

In [183]:
preds.shape

(1115, 1)

In [184]:
y = np.where(spam['type'] == 'ham', 0, 1)

In [185]:
sum(preds[:, 0] == y_test)/len(y_test)

tensor(0.9848)

In [186]:
1 - sum(y_test)/len(y_test)

tensor(0.8538)

### Basic RNN

![](https://upload.wikimedia.org/wikipedia/commons/thumb/b/b5/Recurrent_neural_network_unfold.svg/440px-Recurrent_neural_network_unfold.svg.png)

In [187]:
#create sequences
sequences = tokenizer.texts_to_sequences(spam['text'].values)

In [188]:
#look at first sequence
sequences[0]

[49, 471, 64, 8, 88, 123, 351, 148, 67, 58, 145]

In [189]:
X_train[0]

tensor([0., 0., 0., 0., 1., 0., 1., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0.,
        0., 1., 0., 0., 0., 1., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 

In [190]:
#compare to text
spam['text'].values[1]

'Ok lar... Joking wif u oni...'

In [191]:
#pad and make all same length
sequences = pad_sequences(sequences, maxlen=100)

In [192]:
#examine results
sequences[1].shape

(100,)

In [193]:
sequences[1]

array([  0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
         0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
         0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
         0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
         0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
         0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
         0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
         0,   0,   0,   0,   0,  46, 336, 472,   6], dtype=int32)

In [194]:
#example rnn
rnn = nn.RNN(input_size = 100,
             hidden_size = 30,
             num_layers = 1,
             batch_first = True)

In [195]:
#pass data through
sample_sequence = torch.tensor(sequences[1],
                               dtype = torch.float,
                               ).reshape(1, -1)
sample_sequence.shape

torch.Size([1, 100])

In [196]:
#output
output, hidden = rnn(sample_sequence)

In [197]:
#hidden
hidden.shape

torch.Size([1, 30])

In [198]:
#linear layer
output.shape

torch.Size([1, 30])

In [199]:
#pass through linear
lin1 = nn.Linear(in_features = 30, out_features = 1)

In [200]:
lin1(output)

tensor([[0.7749]], grad_fn=<AddmmBackward0>)

In [201]:
for x, y in trainloader:
  print(x.shape)
  break

torch.Size([32, 500])


In [203]:
X_train, X_test, y_train, y_test = train_test_split(sequences, yt)

In [205]:
train_dataset = TensorDataset(torch.tensor(X_train, dtype = torch.float32), y_train)
test_dataset = TensorDataset(torch.tensor(X_test, dtype = torch.float32), y_test)

In [206]:
trailoader = DataLoader(train_dataset, batch_size = 32)
testloader = DataLoader(test_dataset, batch_size = 32)

In [207]:
model = nn.Sequential(nn.RNN(input_size = 100, hidden_size  =50, num_layers=2),
                      nn.Linear(in_features = 50, out_features=1),
                      nn.Sigmoid())

In [211]:
ex_rnn = nn.RNN(input_size = 100, hidden_size  =50, num_layers=2)
ex_rnn(train_dataset[0][0].unsqueeze(0))

(tensor([[-0.4118, -0.0025, -0.4569,  0.5110,  0.0858,  0.2192, -0.2276, -0.0446,
          -0.8004, -0.0141,  0.1393, -0.1261, -0.4060, -0.0451, -0.6247,  0.2486,
           0.3606,  0.3140,  0.0907, -0.3226,  0.0997, -0.1068,  0.1623, -0.5177,
           0.4297,  0.3292,  0.0213, -0.1083, -0.3810,  0.2312, -0.4693,  0.3344,
           0.4966,  0.7659,  0.2372, -0.0045, -0.2174,  0.7303,  0.5353,  0.2707,
          -0.6379,  0.0521, -0.1645,  0.3039, -0.0313,  0.3005, -0.0924, -0.4060,
          -0.2611,  0.1055]], grad_fn=<SqueezeBackward1>),
 tensor([[-1.0000, -0.9699, -0.8960,  1.0000,  1.0000, -1.0000, -1.0000, -1.0000,
           1.0000, -1.0000, -1.0000,  1.0000,  1.0000, -1.0000, -0.9999,  1.0000,
           1.0000,  1.0000,  1.0000,  1.0000, -1.0000,  1.0000, -1.0000,  0.9991,
           1.0000,  1.0000, -1.0000,  1.0000, -1.0000,  1.0000, -0.7305, -1.0000,
           1.0000, -0.5038, -0.9836,  1.0000, -1.0000, -1.0000, -1.0000, -0.8787,
           1.0000,  1.0000, -1.0000, -1

In [209]:
model(train_dataset[0][0].unsqueeze(0))

TypeError: linear(): argument 'input' (position 1) must be Tensor, not tuple

In [212]:
class TextDataset(Dataset):
  def __init__(self, X, y):
    super().__init__()
    self.x = torch.tensor(X, dtype = torch.float)
    self.y = torch.tensor(y, dtype = torch.float)

  def __len__(self):
    return len(self.y)

  def __getitem__(self, idx):
    return self.x[idx], self.y[idx]

In [214]:
#class
class BasicRNN(nn.Module):
  def __init__(self):
    super().__init__()
    self.rnn = nn.RNN(input_size = 100,
                    hidden_size = 50,
                    num_layers = 3,
                    batch_first = True)
    self.lin1 = nn.Linear(in_features = 50, out_features=1000)
    self.lin2 = nn.Linear(1000, 100)
    self.lin3 = nn.Linear(100, 1)
    self.act = nn.ReLU()
    self.sigmoid = nn.Sigmoid()

  def forward(self, x):
    x, _ = self.rnn(x) #extracting important information
    x = self.act(self.lin1(x)) #multilayer perceptron -- to predict
    x = self.act(self.lin2(x))
    x = self.sigmoid(self.lin3(x))
    return x


In [215]:
#data
X = sequences
y = np.where(spam['type'] == 'spam', 1, 0)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = .2)
traindata = TextDataset(X_train, y_train)
trainloader = DataLoader(traindata, batch_size = 32)

In [216]:
#optimizer and loss
model = BasicRNN()
optimizer = optim.Adam(model.parameters(), lr = 0.01)
loss_fn = nn.BCELoss()

In [217]:
#train
for epoch in tqdm(range(100)):
  losses = 0
  for x,y in trainloader:
    yhat = model(x)
    y = y.reshape(-1, 1)
    loss = loss_fn(yhat, y)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    losses += loss.item()
  if epoch % 10 == 0:
    print(f'Epoch {epoch} Loss: {losses}')

  1%|          | 1/100 [00:02<03:18,  2.01s/it]

Epoch 0 Loss: 52.82915246486664


 11%|█         | 11/100 [00:24<03:31,  2.37s/it]

Epoch 10 Loss: 54.446724608540535


 21%|██        | 21/100 [00:49<03:16,  2.49s/it]

Epoch 20 Loss: 54.449391439557076


 31%|███       | 31/100 [01:13<02:52,  2.50s/it]

Epoch 30 Loss: 54.45001582801342


 41%|████      | 41/100 [01:38<02:27,  2.50s/it]

Epoch 40 Loss: 54.450166910886765


 51%|█████     | 51/100 [02:02<02:04,  2.54s/it]

Epoch 50 Loss: 54.45020292699337


 61%|██████    | 61/100 [02:27<01:37,  2.51s/it]

Epoch 60 Loss: 54.45021215081215


 71%|███████   | 71/100 [02:51<01:13,  2.54s/it]

Epoch 70 Loss: 54.450213357806206


 81%|████████  | 81/100 [03:16<00:48,  2.55s/it]

Epoch 80 Loss: 54.4502155482769


 91%|█████████ | 91/100 [03:41<00:22,  2.55s/it]

Epoch 90 Loss: 54.45021505653858


100%|██████████| 100/100 [04:03<00:00,  2.44s/it]


In [None]:
Xt = torch.tensor(X_test, dtype = torch.float)

In [None]:
output = model(Xt)

In [None]:
preds = np.where(np.array(output.detach()) >= .5, 1, 0)

In [None]:
#preds = output.argmax(axis = 1)

In [None]:
y_test

In [None]:
# y = np.where(spam['type'] == 'ham', 0, 1)

In [None]:
# y.shape

In [None]:
sum(preds.reshape(1115,) == y_test)/len(y_test)

#### LSTM

In [None]:
# nn.LSTM()
class BasicLSTM(nn.Module):
  def __init__(self):
    super().__init__()
    self.rnn = nn.LSTM(input_size = 100,
                    hidden_size = 100,
                    num_layers = 1,
                    batch_first = True)

    self.lin1 = nn.Linear(in_features = 100, out_features=100)
    self.lin2 = nn.Linear(in_features = 100, out_features = 1)
    self.act = nn.ReLU()
    self.sigmoid = nn.Sigmoid()

  def forward(self, x):
    x, _ = self.rnn(x)
    x = self.act(self.lin1(x))
    x = self.lin2(x)
    return self.sigmoid(x)

In [None]:
model = BasicLSTM()
optimizer = optim.Adam(model.parameters(), lr = 0.01)
loss_fn = nn.BCELoss()

In [None]:
#train
for epoch in range(10):
  losses = 0
  for x,y in trainloader:
    yhat = model(x)
    y = y.reshape(-1, 1)
    loss = loss_fn(yhat, y)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    losses += loss.item()
  if epoch % 10 == 0:
    print(f'Epoch {epoch} Loss: {losses}')

In [None]:
Xt = torch.tensor(X_test, dtype = torch.float)
output = model(Xt)
preds = np.where(np.array(output.detach()) >= .5, 1, 0)
sum(preds[:, 0] == y_test)/len(y_test)

In [None]:
#pad and make all same length
sequences = pad_sequences(sequences, maxlen=30)

In [None]:
sequences[0]

In [None]:
X = sequences
y = np.where(spam['type'] == 'spam', 1, 0)
data = TextDataset(X, y)
loader = DataLoader(data, batch_size = 32)

In [None]:
class RNN2(nn.Module):
  def __init__(self):
    super().__init__()
    self.rnn = nn.GRU(input_size = 30,
                    hidden_size = 30,
                    num_layers = 2,
                    batch_first = True)

    self.lin1 = nn.Linear(in_features = 30, out_features=100)
    self.lin2 = nn.Linear(in_features = 100, out_features = 1)
    self.sigmoid = nn.Sigmoid()

  def forward(self, x):
    x, _ = self.rnn(x)
    x = self.lin1(x)
    x = self.lin2(x)
    return self.sigmoid(x)

In [None]:
model = RNN2()
optimizer = optim.Adam(model.parameters(), lr = 0.01)
loss_fn = nn.BCELoss()

In [None]:
#train
for epoch in range(100):
  losses = 0
  for x,y in loader:
    yhat = model(x)
    y = y.reshape(-1, 1)
    loss = loss_fn(yhat, y)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    losses += loss.item()
  if epoch % 10 == 0:
    print(f'Epoch {epoch} Loss: {losses}')

In [None]:
Xt = torch.tensor(sequences, dtype = torch.float)
output = model(Xt)
preds = np.where(np.array(output.detach()) >= .5, 1, 0)
y = np.where(spam['type'] == 'ham', 0, 1)
sum(preds[:, 0] == y)/len(y)