In [None]:
import functools
import sys
from sklearn.model_selection import train_test_split
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torchtext
from tqdm.notebook import tqdm

!pip install hazm
import hazm

In [None]:
seed = 0

torch.manual_seed(seed)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
data = pd.read_csv('/content/drive/MyDrive/sentipers/final_sentipers_binary.csv', encoding='utf-8')

# tokenize data
def tokenize(comment):
    return hazm.word_tokenize(comment)


# creating vocab
min_freq = 5
data['tokens'] = data['comment'].apply(lambda t: tokenize(t))
special_tokens = ['<unk>', '<pad>']
vocab = torchtext.vocab.build_vocab_from_iterator(data['tokens'],
                                                  min_freq=min_freq,
                                                  specials=special_tokens)
unk_index = vocab['<unk>']
pad_index = vocab['<pad>']
vocab.set_default_index(unk_index)

# making input ids
# comments are in length between 3 and 256, so we do zero padding for those which have a length less than 256
def numeralize(tokens):
  ids = [vocab[token] for token in tokens]
  ids = np.pad(ids, (0, 256 - len(ids)), 'constant')
  return ids

# making label ids
def toId(label):
  return 1 if label=='positive' else 0


data['ids']=data['tokens'].apply(lambda t: numeralize(t))
data['label']=data['label_id'].apply(lambda t: toId(t))
data['length']=data['ids'].apply(lambda t: len(t))

data = data[['ids','label','length']]


new_data = []
for [ids,label,length] in data.values:
    new_data.append({'ids':torch.tensor(ids),'label':torch.tensor(label),'length':torch.tensor(length)})

In [None]:
class CNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, n_filters, filter_sizes, output_dim, dropout_rate, 
                 pad_index):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_index)
        self.convs = nn.ModuleList([nn.Conv1d(embedding_dim, 
                                              n_filters, 
                                              filter_size,
                                             ) 
                                    for filter_size in filter_sizes])
        self.fc = nn.Linear(len(filter_sizes) * n_filters, output_dim)
        self.dropout = nn.Dropout(dropout_rate)
        
    def forward(self, ids):
        # ids = [batch size, seq len]
        embedded = self.dropout(self.embedding(ids))
        # embedded = [batch size, seq len, embedding dim]
        embedded = embedded.permute(0,2,1)
        # embedded = [batch size, embedding dim, seq len]
        conved = [torch.relu(conv(embedded)) for conv in self.convs]
        # conved_n = [batch size, n filters, seq len - filter_sizes[n] + 1]
        pooled = [conv.max(dim=-1).values for conv in conved]
        # pooled_n = [batch size, n filters]
        cat = self.dropout(torch.cat(pooled, dim=-1))
        # cat = [batch size, n filters * len(filter_sizes)]
        prediction = self.fc(cat)
        # prediction = [batch size, output dim]
        return prediction

In [None]:
vocab_size = len(vocab)
embedding_dim = 300
n_filters = 100
filter_sizes = [3,5,7]
output_dim = 2
dropout_rate = 0.25
batch_size = 16

In [None]:
# counting params
model = CNN(vocab_size, embedding_dim, n_filters, filter_sizes, output_dim, dropout_rate, pad_index)
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(model):,} trainable parameters')

In [None]:
def initialize_weights(m):
    if isinstance(m, nn.Linear):
        nn.init.xavier_normal_(m.weight)
        nn.init.zeros_(m.bias)
    elif isinstance(m, nn.Conv1d):
        nn.init.kaiming_normal_(m.weight, nonlinearity='relu')
        nn.init.zeros_(m.bias)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
criterion = nn.CrossEntropyLoss()
criterion = criterion.to(device)

In [None]:
def collate(batch, pad_index):
    batch_ids = [i['ids'] for i in batch]
    batch_ids = nn.utils.rnn.pad_sequence(batch_ids, padding_value=pad_index, batch_first=True)
    batch_label = [i['label'] for i in batch]
    batch_label = torch.stack(batch_label)
    batch = {'ids': batch_ids,
             'label': batch_label}
    return batch

In [None]:
collate = functools.partial(collate, pad_index=pad_index)

In [None]:
def train_op(dataloader, model, criterion, optimizer, device):

    model.train()
    epoch_losses = 0
    epoch_accs = 0


    for batch in tqdm(dataloader, desc='training...', file=sys.stdout):
        ids = batch['ids'].to(device)
        length = batch['length']
        label = batch['label'].to(device)
        preds = model(ids, length)
        loss = criterion(preds, label)
        accuracy = get_accuracy(preds, label)
        
       
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        epoch_losses+=loss.item()
        epoch_accs+=accuracy.item()


    return epoch_losses/len(dataloader), epoch_accs/len(dataloader)

In [None]:
def evaluate(dataloader, model, criterion, device):
    
    model.eval()
    epoch_losses = 0
    epoch_accs = 0
    predictions = []
    labels = []
    with torch.no_grad():
        for batch in tqdm(dataloader, desc='evaluating...', file=sys.stdout):
            ids = batch['ids'].to(device)
            length = batch['length']
            label = batch['label'].to(device)
            preds = model(ids, length)
            loss = criterion(preds, label)
            accuracy = get_accuracy(preds, label)
            
            preds = preds.argmax(dim=-1)
            predictions.extend(preds)
            labels.extend(label)
            epoch_losses+=loss.item()
            epoch_accs+=accuracy.item()
    
    predictions = torch.stack(predictions).cpu().detach().numpy()
    labels = torch.stack(labels).cpu().detach().numpy()
    f_score = f1_score(labels, predictions, average="weighted")
    return epoch_losses/len(dataloader), epoch_accs/len(dataloader) , f_score

In [None]:
def get_accuracy(prediction, label):
    batch_size, _ = prediction.shape
    predicted_classes = prediction.argmax(dim=-1)
    correct_predictions = predicted_classes.eq(label).sum()
    accuracy = correct_predictions / batch_size
    return accuracy

In [None]:
# initializing fastText
vectors = torchtext.vocab.FastText(language='fa')
pretrained_embedding = vectors.get_vecs_by_tokens(vocab.get_itos())

In [None]:
k=5
splits=KFold(n_splits=k,shuffle=True,random_state=42)
foldperf={}

n_epochs = 10
best_test_loss = float('inf')

history = {'train_losses': [], 'test_losses': [],'train_accs':[],'test_accs':[],'test_f1s':[]}

for fold, (train_idx,val_idx) in enumerate(splits.split(np.arange(len(data)))):
    train_sampler = torch.utils.data.SubsetRandomSampler(train_idx)
    test_sampler = torch.utils.data.SubsetRandomSampler(val_idx)
    train_dataloader=torch.utils.data.DataLoader(new_data, batch_size=16,collate_fn=collate, sampler=train_sampler)
    test_dataloader=torch.utils.data.DataLoader(new_data, batch_size=16,collate_fn=collate, sampler=test_sampler)
        
    model = LSTM(vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, bidirectional, dropout_rate, 
             pad_index)
    model.apply(initialize_weights)

    model.embedding.weight.data = pretrained_embedding

    lr = 5e-4

    optimizer = optim.Adam(model.parameters(), lr=lr)
    model = model.to(device)

    for epoch in range(n_epochs):
        train_loss, train_acc = train_op(train_dataloader, model, criterion, optimizer, device)
        test_loss, test_acc , test_f1 = evaluate(test_dataloader, model, criterion, device)

        
        history['train_losses'].append(train_loss)
        history['test_losses'].append(test_loss)
        history['train_accs'].append(train_acc)
        history['test_accs'].append(test_acc)
        history['test_f1s'].append(test_f1)

        print(f'epoch: {epoch+1}')
        print(f'train_loss: {train_loss:.3f}, train_acc: {train_acc:.3f}')
        print(f'valid_loss: {test_loss:.3f}, valid_acc: {test_acc:.3f}')

    foldperf['fold{}'.format(fold+1)] = history  


torch.save(model.state_dict(), 'bilstm.pt')

In [None]:
def average(numList):
  return sum(numList)/len(numList)

print('accuracy average: ',average(history['test_accs']))
print('loss average: ',average(history['test_losses']))
print('f1 average: ',average(history['test_f1s']))