In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

# Text classification
## Flow
- text -> tokenize -> padding -> model

## Model 
-   nn.Embedding
-   x = get_embedding(), 
-   x = (len(seq), embedding_size)
-   z = nn.RNN(x)
-   nn.Linear(num_class)(z)
-   optimizer = Adam
-   softmax -> loss crossentropy -> loss backward -> optimize step -> optimize zero grad

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn import preprocessing
import pickle
import os
from tqdm import tqdm
from sklearn.metrics import classification_report
import torch
import torch.nn as nn
from torch.utils.data import Dataset
from torch.utils.data.dataloader import DataLoader
import numpy as np
import collections
from gensim.models import KeyedVectors
import torch.nn.functional as F
import torch.optim as optim
torch.manual_seed(1)

In [None]:
data_df = pd.read_csv('/content/gdrive/MyDrive/Machine Learning/NLP/Classification/Question_Classification_Dataset.csv')

In [None]:
data_df.head()

In [None]:
le = preprocessing.LabelEncoder()
data_texts = data_df['Questions'].to_list()
labels = le.fit_transform(data_df['Category0'])
print(data_texts[0])

In [None]:
class_num=len(data_df['Category0'].unique())
print(class_num)
print(len(labels)) # rows =5452

In [None]:
# def encodingLabels(labels,class_num):
#   onehotLabels=[]
#   for label in labels:
#     l=[0]*class_num
#     l[label]=1
#     onehotLabels.append(l)
#   return onehotLabels

# labels=encodingLabels(labels,class_num)

In [None]:
#!gzip -d '/content/gdrive/MyDrive/Machine Learning/NLP/Classification/GoogleNews-vectors-negative300.bin.gz'

In [None]:
model = KeyedVectors.load_word2vec_format('/content/gdrive/MyDrive/Machine Learning/NLP/Classification/GoogleNews-vectors-negative300.bin', binary=True)

In [None]:
import re

def getRidOfXXX(data_texts,model):
    new_one=[]
    for text in data_texts:
        list_tokens = []
        new_text=re.sub("[':!`\?,\.\)\(]",'',text)
        
        word_list=new_text.split()
              
        for word in word_list:
          if word not in model:
            word_list.remove(word)
          else:
            list_tokens.append(word)
          
        new_text= ' '.join(list_tokens)
          
        new_one.append(new_text)
        
    return new_one

data_texts_1=getRidOfXXX(data_texts,model)
print(data_texts_1[0])

In [None]:
def getMaxLen(data_texts):
    max=0
    for text in data_texts:
        if len(text.split()) > max:
            max=len(text.split())
    return max

max_len=getMaxLen(data_texts_1)
print(max_len)

In [None]:
def getDic(data_texts_1,model):
  dic={}
  for text in data_texts_1:
    word_list=text.split()
    for word in word_list:
      if word not in model:
        #break
        print(word)
      elif word not in dic:
        dic[word]=model[word]
  return dic

vocab=getDic(data_texts_1,model)
EMBEDDING_SIZE = vocab['what'].shape[0]
print(EMBEDDING_SIZE)
vocab['<PAD>']=np.array([0]*EMBEDDING_SIZE)



In [None]:
import json
# with open("vocab.json",'w') as f:
#   new_vocab={}
#   for word in list(vocab.keys()):
#     new_vocab[word]=(vocab[word]).tolist()
#   json.dump(vocab,f,indent=4)

In [None]:
def padding(data_texts_1,max_len):
    new_data=[]
    for text in data_texts_1:
        delta=max_len - len(text.split())
        #print(delta)
        new_text='<PAD> '*delta+text
        new_data.append(new_text)
    return new_data

data_texts_2=padding(data_texts_1,max_len)

In [None]:
#print(data_texts_2[0])

def encodingData(data_texts_2,vocab):
  data=[]
  for text in data_texts_2:
    text_to_vec=[]
    for word in text.split():
      text_to_vec.append(vocab[word])
    data.append(text_to_vec)
  return np.array(data)




In [None]:
data=torch.from_numpy(encodingData(data_texts_2,vocab))
targets=torch.from_numpy(np.array(labels))
print(targets.shape)

In [None]:
#X_train, X_test, y_train, y_test = train_test_split(data, targets, test_size=0.2, random_state=0)
X_train_val, X_test, y_train_val, y_test = train_test_split(data, targets, test_size=0.2, random_state=2000)
X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.4, random_state=2000)      


In [None]:
class makeDataset(Dataset):
    def __init__(self,data,labels):
        self.data=data
        self.labels=labels
        self.n_samples=data.shape[0]
        
    def __getitem__(self,idx):
        return self.data[idx],self.labels[idx]

    
    def __len__(self):
        return self.n_samples
    


In [None]:
class RNN(nn.Module):
    def __init__(self, input_size, output_size, hidden_dim, n_layers):
        super(RNN, self).__init__()
        self.rnn = nn.RNN(input_size=input_size, hidden_size=hidden_dim, num_layers=n_layers, nonlinearity='tanh')
        self.linear1 = nn.Linear(in_features=hidden_dim, out_features=output_size)
        self.classifier = nn.Softmax()

    def forward(self, X):
        out,hidden = self.rnn(X)
        #print(out.size())
        out = out[:, -1, :]
        out = self.linear1(out)
        out = self.classifier(out)

        return out



class QuestionClassifier(nn.Module):
    def __init__(self,n_classes,pretrained_model_name='bert-base-uncased'):
        super(QuestionClassifier,self).__init__()
        self.bert=BertModel.from_pretrained(pretrained_model_name)
        self.dense=nn.Linear(self.bert.config.hidden_size,n_classes)
        
    def forward(self,input_ids):
        hidden_states,pooled_output=self.bert(input_ids=input_ids)
        sequence_output_cls=hidden_states[0,:,0]
        x=self.dropout(sequence_output_cls)
        x=self.dense(x)
        x=get_activation('tanh')(x)
        x=self.dropout(x)
        x=self.out_proj(x)
        return x



In [None]:
from torch.optim import Adam
from tqdm import tqdm
import matplotlib.pyplot as plt
from sklearn.metrics import precision_score, f1_score, recall_score


In [None]:
BATCH_SIZE=20
train_dataset = makeDataset(X_train, y_train)
train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE)
val_dataset = makeDataset(X_val, y_val)
val_dataloader = DataLoader(val_dataset, batch_size=BATCH_SIZE)
test_dataset = makeDataset(X_test, y_test)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE)
model = RNN(input_size=300, output_size=6, hidden_dim=64, n_layers=1)


In [None]:

lr = 0.0001
N_EPOCHS = 2000
loss_fn = nn.CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=lr)
MODEL_SAVE_PATH = './rnn_model.pt'
# train
train_losses = []
val_losses = []
best_val_loss = 1000

for epoch in tqdm(range(N_EPOCHS)):
    print('\nEpoch {}: '.format(epoch + 1))

    train_loss = []
    for X_train_batch, y_train_batch in train_dataloader:
        out = model(X_train_batch.float())
        loss = loss_fn(out, y_train_batch)
        train_loss.append(loss.item())
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    train_l=sum(train_loss) / len(train_loss)
    train_losses.append(train_l)
    print(train_l)

    val_loss = []
    for X_val_batch, y_val_batch in val_dataloader:
        out = model(X_val_batch.float())
        loss = loss_fn(out, y_val_batch)
        train_loss.append(loss.item())
    val_losses.append(sum(train_loss) / len(train_loss))
    if best_val_loss > val_losses[-1]:
        best_val_loss = val_losses[-1]
        torch.save(model.state_dict(), MODEL_SAVE_PATH)

print("Train loss: ", train_losses)
print("Validation loss: ", val_losses)

x = np.arange(len(train_losses))
fig, ax = plt.subplots()
ax.plot(x, train_losses, label='Train loss')
ax.plot(x, val_losses, label='Validation loss')
ax.legend()
plt.show()



In [None]:

model.state_dict(torch.load(MODEL_SAVE_PATH))

In [None]:
print("Test results: ")
for X_test, y_test in test_dataloader:
    pred = torch.argmax(model(X_test.float()), dim=1)
    print("Test precision: {}".format(precision_score(y_test, pred, average='weighted')))
    print("Test recall: {}".format(recall_score(y_test, pred, average='weighted')))
    print("Test F1-score: {}".format(f1_score(y_test, pred, average='weighted')))