# Подготовка среды и данных

In [None]:
!pip install torchmetrics

In [None]:
!wget -O positive.csv https://www.dropbox.com/s/fnpq3z4bcnoktiv/positive.csv?dl=0
!wget -O negative.csv https://www.dropbox.com/s/r6u59ljhhjdg6j0/negative.csv?dl=0

In [None]:
import pandas as pd
import numpy as np
from string import punctuation
from collections import Counter

from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, RandomSampler, SequentialSampler
from torch.nn.utils.rnn import pad_sequence
import torch.optim as optim
from torchmetrics.functional import f1, precision, recall, accuracy

import matplotlib.pyplot as plt
import gensim

In [None]:
pos_tweets = pd.read_csv('positive.csv', encoding='utf-8', sep=';', header=None, names=[0,1,2,'text','tone',5,6,7,8,9,10,11])
neg_tweets = pd.read_csv('negative.csv', encoding='utf-8', sep=';', header=None, names=[0,1,2,'text','tone',5,6,7,8,9,10,11])
neg_tweets['tone'] = 0

In [None]:
all_tweets_data = pos_tweets.append(neg_tweets)
tweets_data = shuffle(all_tweets_data[['text','tone']])[:100000]

In [None]:
train_sentences, val_sentences = train_test_split(tweets_data, test_size=0.1)

In [None]:
def preprocess(text):
    tokens = text.lower().split()
    tokens = [token.strip(punctuation) for token in tokens]
    return tokens

In [None]:
word_vocab = Counter()

for text in tweets_data['text']:
    word_vocab.update(preprocess(text))

In [None]:
filtered_word_vocab = set()

for word in word_vocab:
    if word_vocab[word] > 2:
        filtered_word_vocab.add(word)

In [None]:
word2id = {'PAD':0}

for word in filtered_word_vocab:
    word2id[word] = len(word2id)

id2word = {i:word for word, i in word2id.items()}

In [None]:
DEVICE = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

In [None]:
symbol_vocab = Counter()
for text in tweets_data['text']:
    for word in preprocess(text):
        symbol_vocab.update(list(word))

In [None]:
filtered_symbol_vocab = set()

for symbol in symbol_vocab:
    if symbol_vocab[symbol] > 5:
        filtered_symbol_vocab.add(symbol)

In [None]:
symbol2id = {'PAD':0}

for symbol in filtered_symbol_vocab:
    symbol2id[symbol] = len(symbol2id)

id2symbol = {i:symbol for symbol, i in symbol2id.items()}

In [None]:
texts = all_tweets_data.text.apply(preprocess).tolist()
w2v = gensim.models.Word2Vec(texts, size=100, window=5, min_count=1)
weights = np.zeros((len(word2id), 100))
count = 0
for word, i in word2id.items():
    if word == 'PAD':
        continue   
    try:
        weights[i] = w2v.wv[word]    
    except KeyError:
      count += 1

      weights[i] = np.random.normal(0,0.1,100)

# Определение датасета

In [None]:
# Правильно подготовленный класс Dataset для второй архитектуры
class TweetsDataset(Dataset):

    def __init__(self, dataset, word2id, symbol2id, DEVICE):
        self.dataset = dataset['text'].values
        self.word2id = word2id
        self.symbol2id = symbol2id
        self.length = dataset.shape[0]
        self.target = torch.Tensor(dataset['tone'].values)
        self.device = DEVICE

    def __len__(self):
        return self.length

    def __getitem__(self, index):
        tokens = self.preprocess(self.dataset[index])
        word_ids = torch.LongTensor([self.word2id[token] for token in tokens if token in self.word2id]) 
        symbol_ids = torch.LongTensor([self.symbol2id[symbol] for token in tokens if token in self.word2id for symbol in token if symbol in self.symbol2id])
        y = self.target[index]
        return word_ids, symbol_ids, y
    
    def preprocess(self, text):
        tokens = text.lower().split()
        tokens = [token.strip(punctuation) for token in tokens]
        tokens = [token for token in tokens if token]
        return tokens

    def collate_fn(self, batch):
        word_ids, symbol_ids, y = list(zip(*batch))
        word_ids = pad_sequence(word_ids, batch_first=True).to(self.device)
        symbol_ids = pad_sequence(symbol_ids, batch_first=True).to(self.device)
        y = torch.Tensor(y).to(self.device)
        return word_ids, symbol_ids, y

In [None]:
train_dataset = TweetsDataset(train_sentences, word2id, symbol2id, DEVICE)
train_sampler = RandomSampler(train_dataset)
train_iterator = DataLoader(train_dataset, collate_fn = train_dataset.collate_fn, sampler=train_sampler, batch_size=4)

In [None]:
val_dataset = TweetsDataset(val_sentences, word2id, symbol2id, DEVICE)
val_sampler = SequentialSampler(val_dataset)
val_iterator = DataLoader(val_dataset, collate_fn = val_dataset.collate_fn, sampler=val_sampler, batch_size=4)

# Функции для тренировки и оценки нейронной сети

In [None]:
def train(model, iterator, optimizer, criterion, accuracy_f, precision_f, recall_f):
    print('Training...')
    epoch_loss = 0
    epoch_accuracy = 0
    epoch_precision = 0
    epoch_recall = 0
    
    model.train()

    for i, (texts, symbols, ys) in enumerate(iterator):
        optimizer.zero_grad()
        preds_proba = model(texts, symbols).squeeze()
        loss = criterion(preds_proba, ys) 
        loss.backward()  
        optimizer.step()
        epoch_loss += loss.item()

        batch_accuracy = accuracy_f(preds_proba.round().long(), ys.long(), ignore_index=0)
        epoch_accuracy += batch_accuracy
        batch_precision = precision_f(preds_proba.round().long(), ys.long(), ignore_index=0)
        epoch_precision += batch_precision
        batch_recall = recall_f(preds_proba.round().long(), ys.long(), ignore_index=0)
        epoch_recall += batch_recall

        if not (i + 1) % 1000:
            print(f'Train loss: {epoch_loss/i}, train accuracy: {epoch_accuracy/i}, train precision: {epoch_precision/i}, train recall: {epoch_recall/i},')
        
    return epoch_loss / len(iterator), epoch_accuracy / len(iterator), epoch_precision / len(iterator), epoch_recall / len(iterator)

In [None]:
def evaluate(model, iterator, criterion, accuracy_f, precision_f, recall_f):
    print("\nValidating...")
    epoch_loss = 0
    epoch_accuracy = 0
    epoch_precision = 0
    epoch_recall = 0

    model.eval() 
    
    with torch.no_grad():
        for i, (texts, symbols, ys) in enumerate(iterator):   
            preds_proba = model(texts, symbols).squeeze() 
            loss = criterion(preds_proba, ys)
            epoch_loss += loss.item()

            batch_accuracy = accuracy_f(preds_proba.round().long(), ys.long(), ignore_index=0)
            epoch_accuracy += batch_accuracy
            batch_precision = precision_f(preds_proba.round().long(), ys.long(), ignore_index=0)
            epoch_precision += batch_precision
            batch_recall = recall_f(preds_proba.round().long(), ys.long(), ignore_index=0)
            epoch_recall += batch_recall

            if not (i + 1) % 1000:
              print(f'Val loss: {epoch_loss/i}, val accuracy: {epoch_accuracy/i}, val precision: {epoch_precision/i}, val recall: {epoch_recall/i},')
        
    return epoch_loss / len(iterator), epoch_accuracy / len(iterator), epoch_precision / len(iterator), epoch_recall / len(iterator)

# Определение архитектуры нейронной сети

In [None]:
# Если при инициализации задать веса для слоя эмбеддинга, то он не будет обучаться. Иначе - будет
class Net2(nn.Module):
    def __init__(self, word_vocab_size, symbol_vocab_size, word_embedding_dim=180, symbol_embedding_dim=12, embedding_weights=None):
        super().__init__()
        if embedding_weights is not None:
            word_embedding_dim = 100
            self.embedding1 = nn.Embedding(word_vocab_size, word_embedding_dim)
            self.embedding1.from_pretrained(torch.tensor(weights), freeze=True)
        else:
            word_embedding_dim = word_embedding_dim
            self.embedding1 = nn.Embedding(word_vocab_size, word_embedding_dim)

        bi_output = 64
        tri_output = 32
        x_len = 16

        self.linear1 = nn.Linear(in_features=word_embedding_dim, out_features=x_len)


        self.embedding2 = nn.Embedding(symbol_vocab_size, symbol_embedding_dim)
        self.bigrams2 = nn.Conv1d(in_channels=symbol_embedding_dim, out_channels=bi_output, kernel_size=2, padding='same')
        self.trigrams2 = nn.Conv1d(in_channels=symbol_embedding_dim, out_channels=tri_output, kernel_size=3, padding='same')        


        self.pooling = nn.MaxPool1d(kernel_size=2, stride=2)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=0.5)
        self.hidden = nn.Linear(in_features=bi_output+tri_output+x_len, out_features=1)
        self.out = nn.Sigmoid()

    def forward(self, word, symbol):
        input1 = self.embedding1(word) # первый вход для эмбеддингов слов
        input1 = torch.mean(input1, dim=1) # mean
        X1 = self.linear1(input1) # линейный слой

        input2 = self.embedding2(symbol) # второй вход для символьного представления слов
        input2 = input2.transpose(1, 2)
        feature_map_bigrams = self.dropout(self.pooling(self.relu(self.bigrams2(input2)))) # свёрточный слой с одинм размером окна
        feature_map_trigrams = self.dropout(self.pooling(self.relu(self.trigrams2(input2)))) # свёрточный слой с другим размером окна
        bi_pooling2 = feature_map_bigrams.max(2)[0] # max pooling over time
        tri_pooling2 = feature_map_trigrams.max(2)[0] # max pooling over time

        concat = torch.cat((X1, bi_pooling2, tri_pooling2), 1) # конкатенация
        logits = self.hidden(concat) # линейный слой
        logits = self.out(logits) # сигмоида
        return logits

# Создание и тренировка нейронной сети

In [None]:
# model = Net2(len(word2id), 5)
model = Net2(len(word2id), len(symbol2id), embedding_weights=weights)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.BCELoss() # функция потерь BCELoss


model = model.to(DEVICE)
criterion = criterion.to(DEVICE) 

In [None]:
# Вторая модель правильно построена и обучается
losses = []
losses_eval = []
accurs = []
accurs_eval = []
precs = []
precs_eval = []
recs = []
recs_eval = []

for i in range(5):
    print(f'\nstarting Epoch {i}')
    epoch_loss, epoch_accur, epoch_prec, epoch_rec = train(model, train_iterator, optimizer, criterion, accuracy, precision, recall)
    losses.append(epoch_loss)
    accurs.append(epoch_accur)
    precs.append(epoch_prec)
    recs.append(epoch_rec)

    epoch_loss_on_test, epoch_accur_on_test, epoch_prec_on_test, epoch_rec_on_test = evaluate(model, val_iterator, criterion, accuracy, precision, recall)
    losses_eval.append(epoch_loss_on_test)
    accurs_eval.append(epoch_accur_on_test)
    precs_eval.append(epoch_prec_on_test)
    recs_eval.append(epoch_rec_on_test)

# Оценка качества обучения нейронной сети


In [None]:
plt.plot(losses)
plt.plot(losses_eval)
plt.title('BCE loss value')
plt.ylabel('BCE loss')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper right')
plt.show()

In [None]:
plt.plot(accurs)
plt.plot(accurs_eval)
plt.title('Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper right')
plt.show()

In [None]:
plt.plot(precs)
plt.plot(precs_eval)
plt.title('Precision')
plt.ylabel('Precision')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper right')
plt.show()

In [None]:
plt.plot(recs)
plt.plot(recs_eval)
plt.title('Recall')
plt.ylabel('Recall')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper right')
plt.show()

# Усложненная архитектура нейронной сети

In [None]:
# Если при инициализации задать веса для слоя эмбеддинга, то он не будет обучаться. Иначе - будет
class Net2_1(nn.Module):
    def __init__(self, word_vocab_size, symbol_vocab_size, word_embedding_dim=180, symbol_embedding_dim=12, embedding_weights=None):
        super().__init__()
        if embedding_weights is not None:
            word_embedding_dim = 100
            self.embedding1 = nn.Embedding(word_vocab_size, word_embedding_dim)
            self.embedding1.from_pretrained(torch.tensor(weights), freeze=True)
        else:
            word_embedding_dim = word_embedding_dim
            self.embedding1 = nn.Embedding(word_vocab_size, word_embedding_dim)

        bi_output = 64
        tri_output = 128
        tetra_output = 256
        x_len = 512

        self.linear1 = nn.Linear(in_features=word_embedding_dim, out_features=x_len)


        self.embedding2 = nn.Embedding(symbol_vocab_size, symbol_embedding_dim)
        self.bigrams2 = nn.Conv1d(in_channels=symbol_embedding_dim, out_channels=bi_output, kernel_size=2, padding='same')
        self.trigrams2 = nn.Conv1d(in_channels=symbol_embedding_dim, out_channels=tri_output, kernel_size=3, padding='same')        
        self.tetragrams2 = nn.Conv1d(in_channels=symbol_embedding_dim, out_channels=tetra_output, kernel_size=4, padding='same') 

        self.pooling = nn.MaxPool1d(kernel_size=2, stride=2)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=0.5)
        self.hidden = nn.Linear(in_features=bi_output+tri_output+tetra_output+x_len, out_features=1)
        self.out = nn.Sigmoid()

    def forward(self, word, symbol):
        input1 = self.embedding1(word)
        input1 = torch.mean(input1, dim=1)
        X1 = self.linear1(input1)

        input2 = self.embedding2(symbol)
        input2 = input2.transpose(1, 2)
        feature_map_bigrams = self.dropout(self.pooling(self.relu(self.bigrams2(input2))))
        feature_map_trigrams = self.dropout(self.pooling(self.relu(self.trigrams2(input2))))
        feature_map_tetragrams = self.dropout(self.pooling(self.relu(self.tetragrams2(input2))))
        bi_pooling2 = feature_map_bigrams.max(2)[0]
        tri_pooling2 = feature_map_trigrams.max(2)[0]
        tetra_pooling2 = feature_map_tetragrams.max(2)[0]

        concat = torch.cat((X1, bi_pooling2, tri_pooling2, tetra_pooling2), 1)
        logits = self.hidden(concat)
        logits = self.out(logits)
        return logits

In [None]:
# model = Net2_1(len(word2id), 5)
model = Net2_1(len(word2id), len(symbol2id), embedding_weights=weights)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.BCELoss() # функция потерь BCELoss


model = model.to(DEVICE)
criterion = criterion.to(DEVICE) 

In [None]:
# Вторая модель правильно построена и обучается
losses = []
losses_eval = []
accurs = []
accurs_eval = []
precs = []
precs_eval = []
recs = []
recs_eval = []

for i in range(5):
    print(f'\nstarting Epoch {i}')
    epoch_loss, epoch_accur, epoch_prec, epoch_rec = train(model, train_iterator, optimizer, criterion, accuracy, precision, recall)
    losses.append(epoch_loss)
    accurs.append(epoch_accur)
    precs.append(epoch_prec)
    recs.append(epoch_rec)

    epoch_loss_on_test, epoch_accur_on_test, epoch_prec_on_test, epoch_rec_on_test = evaluate(model, val_iterator, criterion, accuracy, precision, recall)
    losses_eval.append(epoch_loss_on_test)
    accurs_eval.append(epoch_accur_on_test)
    precs_eval.append(epoch_prec_on_test)
    recs_eval.append(epoch_rec_on_test)

In [None]:
plt.plot(losses)
plt.plot(losses_eval)
plt.title('BCE loss value')
plt.ylabel('BCE loss')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper right')
plt.show()

In [None]:
plt.plot(accurs)
plt.plot(accurs_eval)
plt.title('Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper right')
plt.show()

In [None]:
plt.plot(precs)
plt.plot(precs_eval)
plt.title('Precision')
plt.ylabel('Precision')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper right')
plt.show()

In [None]:
plt.plot(recs)
plt.plot(recs_eval)
plt.title('Recall')
plt.ylabel('Recall')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper right')
plt.show()

# Ещё одна усложненная архитектура нейронной сети

In [None]:
# Если при инициализации задать веса для слоя эмбеддинга, то он не будет обучаться. Иначе - будет
class Net2_2(nn.Module):
    def __init__(self, word_vocab_size, symbol_vocab_size, word_embedding_dim=180, symbol_embedding_dim=12, embedding_weights=None):
        super().__init__()
        if embedding_weights is not None:
            word_embedding_dim = 100
            self.embedding1 = nn.Embedding(word_vocab_size, word_embedding_dim)
            self.embedding1.from_pretrained(torch.tensor(weights), freeze=True)
        else:
            word_embedding_dim = word_embedding_dim
            self.embedding1 = nn.Embedding(word_vocab_size, word_embedding_dim)

        bi_output = 64
        tri_output = 128
        tetra_output = 256
        x_len = 512
        conv_output = 512
        
        self.linear1 = nn.Linear(in_features=word_embedding_dim, out_features=x_len)


        self.embedding2 = nn.Embedding(symbol_vocab_size, symbol_embedding_dim)
        self.bigrams1 = nn.Conv1d(in_channels=symbol_embedding_dim, out_channels=bi_output, kernel_size=2, padding='same')
        self.trigrams1 = nn.Conv1d(in_channels=symbol_embedding_dim, out_channels=tri_output, kernel_size=3, padding='same')        
        self.tetragrams1 = nn.Conv1d(in_channels=symbol_embedding_dim, out_channels=tetra_output, kernel_size=4, padding='same') 
        self.bigrams2 = nn.Conv1d(in_channels=conv_output, out_channels=bi_output, kernel_size=2, padding='same')
        self.trigrams2 = nn.Conv1d(in_channels=conv_output, out_channels=tri_output, kernel_size=3, padding='same')  
        self.tetragrams2 = nn.Conv1d(in_channels=conv_output, out_channels=tetra_output, kernel_size=4, padding='same')

        self.conv = nn.Conv1d(in_channels=bi_output+tri_output+tetra_output, out_channels=conv_output, kernel_size=5, padding='same')
        self.pooling = nn.MaxPool1d(kernel_size=2, stride=2)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=0.5)
        self.hidden = nn.Linear(in_features=bi_output+tri_output+tetra_output+x_len, out_features=1)
        self.out = nn.Sigmoid()

    def forward(self, word, symbol):
        input1 = self.embedding1(word)
        input1 = torch.mean(input1, dim=1)
        X1 = self.linear1(input1)

        input2 = self.embedding2(symbol)
        input2 = input2.transpose(1, 2)
        feature_map_bigrams = self.relu(self.bigrams1(input2))
        feature_map_trigrams = self.relu(self.trigrams1(input2))
        feature_map_tetragrams = self.relu(self.tetragrams1(input2))
        concat = torch.cat((feature_map_bigrams, feature_map_trigrams, feature_map_tetragrams), 1)

        feature_map = self.dropout(self.pooling(self.relu(self.conv(concat))))

        feature_map_bigrams = self.dropout(self.pooling(self.relu(self.bigrams2(feature_map))))
        feature_map_trigrams = self.dropout(self.pooling(self.relu(self.trigrams2(feature_map))))
        feature_map_tetragrams = self.dropout(self.pooling(self.relu(self.tetragrams2(feature_map))))
        bi_pooling2 = feature_map_bigrams.max(2)[0]
        tri_pooling2 = feature_map_trigrams.max(2)[0]
        tetra_pooling2 = feature_map_tetragrams.max(2)[0]

        concat = torch.cat((X1, bi_pooling2, tri_pooling2, tetra_pooling2), 1)
        logits = self.hidden(concat)
        logits = self.out(logits)
        return logits

In [None]:
# model = Net2_2(len(word2id), 5)
model = Net2_2(len(word2id), len(symbol2id), embedding_weights=weights)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.BCELoss() # функция потерь BCELoss


model = model.to(DEVICE)
criterion = criterion.to(DEVICE) 

In [None]:
# Вторая модель правильно построена и обучается
losses = []
losses_eval = []
accurs = []
accurs_eval = []
precs = []
precs_eval = []
recs = []
recs_eval = []

for i in range(5):
    print(f'\nstarting Epoch {i}')
    epoch_loss, epoch_accur, epoch_prec, epoch_rec = train(model, train_iterator, optimizer, criterion, accuracy, precision, recall)
    losses.append(epoch_loss)
    accurs.append(epoch_accur)
    precs.append(epoch_prec)
    recs.append(epoch_rec)

    epoch_loss_on_test, epoch_accur_on_test, epoch_prec_on_test, epoch_rec_on_test = evaluate(model, val_iterator, criterion, accuracy, precision, recall)
    losses_eval.append(epoch_loss_on_test)
    accurs_eval.append(epoch_accur_on_test)
    precs_eval.append(epoch_prec_on_test)
    recs_eval.append(epoch_rec_on_test)

In [None]:
plt.plot(losses)
plt.plot(losses_eval)
plt.title('BCE loss value')
plt.ylabel('BCE loss')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper right')
plt.show()

In [None]:
plt.plot(accurs)
plt.plot(accurs_eval)
plt.title('Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper right')
plt.show()

In [None]:
plt.plot(precs)
plt.plot(precs_eval)
plt.title('Precision')
plt.ylabel('Precision')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper right')
plt.show()

In [None]:
plt.plot(recs)
plt.plot(recs_eval)
plt.title('Recall')
plt.ylabel('Recall')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper right')
plt.show()

# Анализа предсказаний 

In [None]:
def predict(model, iterator):
    model.eval()
    fp = []
    fn = []
    tp = [] 
    tn = []
    with torch.no_grad():
        for i, (texts, symbols, ys) in enumerate(iterator):   
            preds = model(texts, symbols)

            for pred, gold, text in zip(preds, ys, texts):              
                text = ''.join([id2word[int(word)] for word in text if word !=0])
                if round(pred.item()) > gold:
                    fp.append(text)
                elif round(pred.item()) < gold:
                    fn.append(text)
                elif round(pred.item()) == gold == 1:
                    tp.append(text)
                elif round(pred.item()) == gold == 0:
                    tn.append(text)

    return fp, fn, tp, tn

In [None]:
predict(model, train_iterator)