# Priprava okolja

In [None]:
!pip install transformers
!pip install sentencepiece

In [None]:
import csv
import torch
from torch.utils.data import WeightedRandomSampler
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModel, get_linear_schedule_with_warmup, AdamW
import pandas as pd
from google.colab import drive
from sklearn.model_selection import train_test_split

import transformers
import json
import numpy as np
import seaborn as sns
from tqdm import tqdm
from pylab import rcParams
import matplotlib.pyplot as plt
from matplotlib import rc
from sklearn.metrics import confusion_matrix, classification_report, plot_confusion_matrix
from collections import defaultdict
from textwrap import wrap
from torch import nn, optim
from torch.utils.data import Dataset, DataLoader
import logging
logging.basicConfig(level=logging.ERROR)

RANDOM_SEED = 42

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Pomožne funkcije in razredi

Branje podatkov

In [None]:
def read_data(file):
  senti_text = []
  with open(file, 'r', encoding="utf8") as f:
      reader = csv.reader(f, delimiter="\t")
      i = 0
      for line in reader:
          if i == 0:
              i = 1
              continue
          sentiment = line[-1]
          text = line[5]
          senti_text.append((text, sentiment))
          i += 1

  return senti_text


def sentiment_to_int(label):
  if label == "negative":
    return 0
  elif label == "neutral":
    return 1
  elif label == "positive":
    return 2
  else: 
    return label

def int_to_sentiment(label):
  if label == 0:
    return "negative"
  elif label == 1:
    return "neutral"
  elif label == 2:
    return "positive"
  else: 
    return label


def prepare_data(filepath):
  data = read_data(filepath)

  data_df = pd.DataFrame.from_records(data)
  data_df.columns = ['text', 'sentiment']
  
  data_df.sentiment = data_df.loc[:, 'sentiment'].apply(sentiment_to_int)

  return data_df


def word_count_info(df):
  print(df.describe())
  # articles_word_limit = articles_nonNull[articles_nonNull[‘word_length’] > 60]
  plt.figure(figsize=(12,6)) 
  p1=sns.kdeplot(df['Word count'], shade=True, color='r')

Razdelitev podatkov v učno, validacijsko in testno množico

In [None]:
def split_to_train_test(df, max_len=512, train_size=0.8):

  df_train, df_test = train_test_split(
    df,
    test_size=1 - train_size,
    random_state=RANDOM_SEED
  )
  df_val, df_test = train_test_split(
    df_test,
    test_size=0.5,
    random_state=RANDOM_SEED
  )

  df_train = df_train.reset_index(drop=True)
  df_test = df_test.reset_index(drop=True)
  df_val = df_val.reset_index(drop=True)

  print(df_train.sentiment.value_counts())
  print(df_val.sentiment.value_counts())
  print(df_test.sentiment.value_counts())

  print("FULL Dataset: {}".format(df.shape))
  print("TRAIN Dataset: {}".format(df_train.shape))
  print("VALIDATION Dataset: {}".format(df_val.shape))
  print("TEST Dataset: {}".format(df_test.shape))

  training_set = ArticleDataset(df_train, tokenizer, max_len)
  validation_set = ArticleDataset(df_train, tokenizer, max_len)
  testing_set = ArticleDataset(df_test, tokenizer, max_len)

  return training_set, validation_set, testing_set

Razred ArticleDataset

In [None]:
class ArticleDataset(torch.utils.data.Dataset):
    def __init__(self, dataframe, tokenizer, max_len):
        self.tokenizer = tokenizer
        self.df = dataframe
        self.text = dataframe.text
        self.sentiment = dataframe.sentiment
        self.max_len = max_len

    def __getitem__(self, idx):
        text = str(self.text[idx])

        inputs = tokenizer.encode_plus(
            text,
            None,
            add_special_tokens=True,
            padding='max_length',
            truncation=True,
            max_length=self.max_len,
            return_attention_mask=True,
            return_token_type_ids=True
        )
      
        input_ids = inputs['input_ids']
        attention_mask = inputs['attention_mask']
        # token_type_ids = inputs["token_type_ids"]

        return {
            'input_ids': torch.tensor(input_ids, dtype=torch.long),
            'attention_mask': torch.tensor(attention_mask, dtype=torch.long),
            # 'token_type_ids': torch.tensor(token_type_ids, dtype=torch.long),
            'targets': torch.tensor(self.sentiment[idx], dtype=torch.float)
        }

    def __len__(self):
        return len(self.text)

Razred SentimentClassifier

In [None]:
class SentimentClassifier(nn.Module):

  def __init__(self, n_classes):
    super(SentimentClassifier, self).__init__()
    self.model = AutoModel.from_pretrained('EMBEDDIA/sloberta')
    self.pre_classifier = torch.nn.Linear(768, 768)
    self.dropout = torch.nn.Dropout(0.2)
    self.classifier = nn.Linear(self.model.config.hidden_size, n_classes)        

  def forward(self, input_ids, attention_mask):
    output = self.model(
        input_ids=input_ids, 
        attention_mask=attention_mask
        )
    last_hidden_state = output[0]
    pooler = last_hidden_state[:, 0, :]
    pooler = self.dropout(pooler)
    pooler = self.pre_classifier(pooler)
    pooler = torch.nn.ReLU()(pooler)
    pooler = self.dropout(pooler)
    output = self.classifier(pooler)
    return output

# Main

Branje in predstavitev podatkov

In [None]:
# Spremenite pot do datoteke z anotiranimi članki.
filepath = '/content/drive/MyDrive/Colab Notebooks/SentiNews_document-level.txt'
df = prepare_data(filepath)
df['Word count'] = df.text.apply(lambda x: len(str(x).split()))

# print(df['sentiment'].value_counts().sort_index())
sentiment = ['Negative', 'Neutral', 'Positive']
counts = df['sentiment'].value_counts().sort_index()
color = ['tomato', 'lightgrey', 'limegreen']

fig, ax = plt.subplots()
ax.bar(np.arange(len(sentiment)), counts, color=color)
ax.set_xticks(np.arange(len(sentiment)))
ax.set_xticklabels(sentiment)

for i, v in enumerate(counts):
  ax.text(i - .1, v + 5, str(v))

word_count_info(df)

Nalaganje tokenizerja

In [None]:
tokenizer = AutoTokenizer.from_pretrained('EMBEDDIA/sloberta', use_fast=False)

Priprava učnih, validacijskih in testnih podatkov

In [None]:
MAX_LEN = 512
TRAIN_BATCH_SIZE = 8
VALID_BATCH_SIZE = 8

train_params = {'batch_size': TRAIN_BATCH_SIZE,
                'shuffle': True,
                'num_workers': 0
                }

val_params = {'batch_size': VALID_BATCH_SIZE,
                'shuffle': True,
                'num_workers': 0
                }

test_params = {'batch_size': VALID_BATCH_SIZE,
                'shuffle': False,
                'num_workers': 0
                }

training_set, validation_set, testing_set = split_to_train_test(df, max_len=MAX_LEN, train_size=.8)

train_dataloader = DataLoader(training_set, **train_params)
val_dataloader = DataLoader(validation_set, **test_params)
test_dataloader = DataLoader(testing_set, **test_params)

Učenje modela

In [None]:
def train_epoch(model, train_dataloader, optimizer, scheduler, loss_fn_, n_examples):

  model = model.train()

  losses = []
  correct_predictions = 0

  for step, d in enumerate(train_dataloader):

    input_ids = d["input_ids"].to(device)
    attention_mask = d["attention_mask"].to(device)
    targets = d["targets"].to(device, dtype = torch.long)

    outputs = model(input_ids, attention_mask)

    _, preds = torch.max(outputs, dim=1)
    correct_predictions += torch.sum(preds == targets)

    loss = loss_fn(outputs, targets)
    losses.append(loss.item())

    loss.backward()
    nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
    optimizer.step()
    scheduler.step()
    optimizer.zero_grad()    

    if step % 100 == 0 and step != 0:
      loss_step = np.mean(losses)
      accu_step = (correct_predictions*100)/(step * targets.size(0)) 
      print(f"Training Loss per 2000 steps: {loss_step}")
      print(f"Training Accuracy per 2000 steps: {accu_step}")

  return correct_predictions.double() / n_examples, np.mean(losses)





def eval_model(model, data_loader, loss_fn, device, n_examples):

  model = model.eval()
  losses = []
  correct_predictions = 0
  predictions = []

  with torch.no_grad():
    for d in data_loader:
      input_ids = d["input_ids"].to(device)
      attention_mask = d["attention_mask"].to(device)
      targets = d["targets"].to(device, dtype = torch.long)

      outputs = model(input_ids, attention_mask)

      _, preds = torch.max(outputs, dim=1)
      loss = loss_fn(outputs, targets)
  
      correct_predictions += torch.sum(preds == targets)
      predictions.extend(preds)

      losses.append(loss.item())

  predictions = torch.stack(predictions).cpu()

  return correct_predictions.double() / n_examples, np.mean(losses), predictions

Inicializacija modela

In [None]:
num_of_labels = 3
model = SentimentClassifier(num_of_labels)
model.to(device)

Učenje

In [None]:
EPOCHS = 4

# Po želji spremenite mapo za shranjevanje modela
save_dir = '/content/drive/MyDrive/Diploma/best_model_state_latest.bin'

optimizer = AdamW(model.parameters(), lr=1e-5, correct_bias=True)

total_steps = len(train_dataloader) * EPOCHS
scheduler = get_linear_schedule_with_warmup(
  optimizer,
  num_warmup_steps=0,
  num_training_steps=total_steps
)

loss_fn = nn.CrossEntropyLoss().to(device)

best_accuracy = 0

for epoch in range(0, EPOCHS):
  print(f'Epoch {epoch + 1}/{EPOCHS}')
  print('-' * 10)

  train_acc, train_loss = train_epoch(model, train_dataloader, optimizer, scheduler, loss_fn, len(training_set))
  print(f'Train loss {train_loss}\n Train accuracy {train_acc}')

  val_acc, val_loss, preds = eval_model(model, val_dataloader, loss_fn, device, len(validation_set))
  print(f'Validation   loss {val_loss}\n Validation accuracy {val_acc}')
  print()

  if val_acc > best_accuracy:
    torch.save(model.state_dict(), save_dir)
    best_accuracy = val_acc


Testiranje modela

In [None]:
test_acc, test_loss, preds = eval_model(model, test_dataloader, loss_fn, device, len(testing_set))
print(f'Test   loss {test_loss}\n Test accuracy {test_acc}')
print()

In [None]:
class_names = ['negative', 'neutral', 'positive']
print(classification_report(testing_set.sentiment, preds, target_names=class_names))

In [None]:
sns.heatmap(confusion_matrix(testing_set.sentiment, preds)/np.sum(confusion_matrix(testing_set.sentiment, preds)), annot=True, fmt='.2%', cmap='Blues')