In [None]:
# %pip install torch pandas transformers scikit-learn matplotlib nltk pymorphy2 pymorphy2-dicts-uk sentencepiece

In [None]:
# import nltk
# nltk.download('punkt')

In [None]:
import torch
import torch.nn as nn
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split
from transformers import DistilBertTokenizer, DistilBertModel

from preprocessing import (get_x, 
                           get_y, 
                           balance_data, 
                           get_x1, 
                           bert_tokenize_without_masks, 
                           cat_titles_and_texts)

In [None]:
if torch.cuda.is_available():
    device = torch.device("cuda")
    print("Using GPU:", torch.cuda.get_device_name(0))
else:
    device = torch.device("cpu")
    print("CUDA is not available. Using CPU instead.")

In [None]:
df = pd.read_csv("src/data/translated.csv").dropna(subset=['ukr_text'])
df = df.sample(frac=0.3, random_state=1)
x = get_x(df)
y = get_y(df)
x_titles = get_x1(df)

In [None]:
df_zepopo = balance_data(pd.read_csv("src/data/data_set_4.csv")).dropna(subset=['Text'])
df_zepopo = df_zepopo.sample(frac=0.3, random_state=1)
print(len(df_zepopo))
x1 = get_x(df_zepopo)
y1 = get_y(df_zepopo)
x1_titles = get_x1(df_zepopo)

In [None]:
df_realdata = pd.read_csv("src/data/new_real_news.csv").sample(frac=0.3, random_state=1).dropna(subset=['ukr_text'])
x2 = get_x(df_realdata)
y2 = get_y(df_realdata)
x2_titles = get_x1(df_realdata)

In [None]:
x += x1
x += x2

y += y1
y += y2

x_titles += x1_titles
x_titles += x2_titles

x = cat_titles_and_texts(texts=x, titles=x_titles)

In [None]:
y = torch.tensor(y, dtype=torch.float).view(-1, 1).to(device)

### BERT

In [None]:
tokenizer = DistilBertTokenizer.from_pretrained('distilbert/distilbert-base-multilingual-cased')
bert_model = DistilBertModel.from_pretrained('distilbert/distilbert-base-multilingual-cased')
bert_model = bert_model.to(device)

In [None]:
X_train, X_temp, y_train, y_temp = train_test_split(
    x, y, test_size=0.3, random_state=42
)
X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=0.5, random_state=42
)

In [None]:
X_train_encoded = bert_tokenize_without_masks(X_train, tokenizer=tokenizer)
X_test_encoded = bert_tokenize_without_masks(X_test, tokenizer=tokenizer)
X_val_encoded = bert_tokenize_without_masks(X_val, tokenizer=tokenizer)

In [None]:
with torch.no_grad():  
    X_train_embeddings = bert_model(X_train_encoded).last_hidden_state[:, 0, :]
    X_test_embeddings = bert_model(X_test_encoded).last_hidden_state[:, 0, :]
    X_val_embeddings = bert_model(X_val_encoded).last_hidden_state[:, 0, :]

### model architecture

In [None]:
class SequentialModel(torch.nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(SequentialModel, self).__init__()
        self.fc_x = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.ReLU()
        
        # output layer
        self.out = nn.Linear(hidden_dim, output_dim)
        

    def forward(self, x):
        x = self.fc_x(x)
        x = self.relu(x)
        x = torch.sigmoid(self.out(x))
        return x

In [None]:
embedding_dim = len(X_train_embeddings[0])
output_dim = 1
hidden_dim = 16
learning_rate=0.0005
num_epochs = 1000

In [None]:
model = SequentialModel(input_dim=embedding_dim, 
                        hidden_dim=hidden_dim, 
                        output_dim=output_dim)
model.to(device)
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

### training

In [None]:
from sklearn.metrics import accuracy_score

In [None]:
import numpy as np

best_loss = float('inf')
best_epoch = 0
accuracies_train = []
accuracies_val = []
losses_train = []
losses_val = []

patience = 100
patience_counter = 0

for epoch in range(num_epochs):
    model.train()
    optimizer.zero_grad()
    outputs = model(X_train_embeddings)
    
    # loss
    loss = criterion(outputs, y_train)
    losses_train.append(loss.item()) 
    loss.backward()
    optimizer.step()

    # accuracy (need to move tensors to CPU before converting to NumPy)
    with torch.no_grad():
        predicted = (outputs >= 0.5).squeeze().long()
        accuracy = accuracy_score(y_train.cpu().numpy(), predicted.cpu().numpy())
        accuracies_train.append(accuracy)
    
    # validation
    model.eval()
    val_outputs = model(X_val_embeddings)

    # val loss
    val_loss = criterion(val_outputs, y_val)
    losses_val.append(val_loss.item())

    # val accuracy (move to CPU and convert to NumPy)
    with torch.no_grad():
        predicted_val = (val_outputs >= 0.5).squeeze().long()
        accuracy_val = accuracy_score(y_val.cpu().numpy(), predicted_val.cpu().numpy())
        accuracies_val.append(accuracy_val)
    
    if val_loss < best_loss:
        best_loss = val_loss
        best_epoch = epoch
        best_weights = model.state_dict()
        patience_counter = 0 
    else:
        patience_counter += 1
    
    print(f"Epoch: {epoch+1}/{num_epochs}, Training Loss: {loss.item()}, Validation Loss: {val_loss.item()}")

    if patience_counter >= patience:
        print("Early stopping triggered")
        break
    
print("Best epoch: ", best_epoch)


In [None]:
# lists to tensors
losses_train = torch.tensor(losses_train)
losses_val = torch.tensor(losses_val)
accuracies_train = torch.tensor(accuracies_train)
accuracies_val = torch.tensor(accuracies_val)

# training and validation losses
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(losses_train.detach().numpy(), label='Training Loss')
plt.plot(losses_val.detach().numpy(), label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()

# training and validation accuracies
plt.subplot(1, 2, 2)
plt.plot(accuracies_train.detach().numpy(), label='Training Accuracy')
plt.plot(accuracies_val.detach().numpy(), label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Training and Validation Accuracy')
plt.legend()

plt.tight_layout()
plt.show()

In [None]:
from sklearn.metrics import (precision_score, recall_score, f1_score)

In [None]:
test_outputs = model(X_test_embeddings)
test_outputs = (test_outputs >= 0.5).squeeze().long()

In [None]:
test_outputs = test_outputs.cpu().numpy()
y_test = y_test.cpu().numpy()

In [None]:
test_accuracy = accuracy_score(y_test, test_outputs)
test_precision = precision_score(y_test, test_outputs)
test_recall = recall_score(y_test, test_outputs)
test_f1 = f1_score(y_test, test_outputs)

In [None]:
print("Accuracy:", test_accuracy)
print("Precision:", test_precision)
print("Recall:", test_recall)
print("f1:", test_f1)

### test dataset

In [None]:
import json
with open("src/data/test_set.json", "r", encoding="utf-8") as f: 
    test_texts = json.load(f)

fake_texts = [t["text"] for t in test_texts if t["label"] == "Fake"]
fake_titles = [t["title"] for t in test_texts if t["label"] == "Fake"]
real_texts = [t["text"] for t in test_texts if t["label"] == "Real"]
real_titles = [t["text"] for t in test_texts if t["label"] == "Real"]

In [None]:
df = pd.DataFrame(columns = ["ukr_text", "label", "title_ukr"])
for i, text in enumerate(fake_texts): 
    df.loc[i] = {
        "ukr_text": text, 
        "label": "Fake", 
        "title_ukr": fake_titles[i]
    }

for i, text in enumerate(real_texts): 
    df.loc[i+42] = {
        "ukr_text": text, 
        "label": "Real", 
        "title_ukr": real_titles[i]
    }

In [None]:
test_data_x = bert_tokenize_without_masks(cat_titles_and_texts(texts=get_x(df), titles=get_x1(df)), tokenizer=tokenizer)
test_data_y = torch.tensor(get_y(df), dtype=torch.float).view(-1, 1).to(device)

In [None]:
with torch.no_grad():
    test_data_x = bert_model(test_data_x).last_hidden_state[:, 0, :]

In [None]:
test_data_x = test_data_x.to(device)

In [None]:
test_outputs = model(test_data_x)
test_outputs = (test_outputs >= 0.5).squeeze().long()

In [None]:
test_outputs

In [None]:
test_outputs = test_outputs.cpu().numpy()
test_data_y = test_data_y.cpu().numpy()

In [None]:
test_accuracy = accuracy_score(test_data_y, test_outputs)
test_precision = precision_score(test_data_y, test_outputs)
test_recall = recall_score(test_data_y, test_outputs)
test_f1 = f1_score(test_data_y, test_outputs)

In [None]:
print(test_accuracy, test_precision, test_recall, test_f1)