In [1]:
import warnings
warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd

import random
import os
import shutil

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

In [2]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)

seed_everything(42)

In [3]:
MAX_LEN = 128
VALID_SPLIT = 0.3
BATCH_SIZE = 8
EPOCHS = 3
LEARNING_RATE = 1e-5
DR_RATE = 0.3
WARMUP_STEPS = 500
WEIGHT_DECAY = 0.01
METRIC = 'f1'

# device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
device = torch.device('mps')

## Load Data

In [4]:
df = pd.read_csv('news.csv')
LABELS = ['neutral', 'positive', 'negative']
id2label = {idx:label for idx, label in enumerate(LABELS)}
label2id = {label:idx for idx, label in enumerate(LABELS)}
print(df.shape)
df.head()

(4840, 2)


Unnamed: 0,text,label
0,"According to Gran , the company has no plans t...",0
1,Technopolis plans to develop in stages an area...,0
2,The international electronic industry company ...,2
3,With the new production plant the company woul...,1
4,According to the company 's updated strategy f...,1


In [5]:
df.label.value_counts()

0    2873
1    1363
2     604
Name: label, dtype: int64

## FinBERT Demo

In [None]:
from transformers import BertTokenizer, BertForSequenceClassification
from transformers import pipeline

MODEL_NAME = 'yiyanghkust/finbert-tone'
finbert = BertForSequenceClassification.from_pretrained(MODEL_NAME, num_labels=len(LABELS))
tokenizer = BertTokenizer.from_pretrained(MODEL_NAME)
nlp = pipeline('sentiment-analysis', model=finbert, tokenizer=tokenizer)

In [None]:
from sklearn.metrics import accuracy_score

Y = df.label.replace(id2label).tolist()
preds = [str(result['label']).lower() for result in nlp(df.text.tolist())]
print('Accuracy:', round(accuracy_score(Y, preds), 5))

Accuracy: 0.79236


## Data Preprocess

In [6]:
from transformers import BertForSequenceClassification, BertTokenizerFast, Trainer, TrainingArguments
from transformers.tokenization_utils_base import BatchEncoding
from nlp import load_dataset
from nlp.dataset_dict import DatasetDict
from IPython.display import clear_output
from typing import Callable, Dict

dataset = load_dataset('csv', data_files='./news.csv', split='train')
dataset = dataset.train_test_split(test_size=VALID_SPLIT)
clear_output()
print(dataset['train'].shape, dataset['test'].shape)

(3388, 2) (1452, 2)


In [7]:
def tokenize(name: str) -> Callable[[DatasetDict],BatchEncoding]:
    tokenizer = BertTokenizerFast.from_pretrained(name, problem_type='multi_label_classification')
    clear_output()
    return lambda examples: tokenizer(examples['text'], max_length=MAX_LEN, padding='max_length', truncation=True)

def one_hot(examples: DatasetDict) -> Dict[str,np.ndarray]:
    return {'labels':np.eye(len(LABELS))[examples['label']]}

def preprocess(data: DatasetDict, name: str) -> DatasetDict:
    encoded = data.map(tokenize(name), batched=True, remove_columns=['text'])
    encoded = encoded.map(one_hot, remove_columns=['label'])
    encoded.set_format('torch')
    return encoded

## Load Model

In [None]:
def model(name: str, trainable=True) -> BertForSequenceClassification:
    model = BertForSequenceClassification.from_pretrained(
        pretrained_model_name_or_path=name,
        problem_type='multi_label_classification',
        num_labels=len(LABELS),
        id2label=id2label,
        label2id=label2id,).to(device)
    clear_output()

    if not trainable:
        for param in model.bert.parameters():
            param.requires_grad = False

    return model

In [None]:
def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='binary')
    acc = accuracy_score(labels, preds)
    return {
        'accuracy': acc,
        'f1': f1,
        'precision': precision,
        'recall': recall
    }

In [None]:
def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    predictions = predictions[:, 0]
    binary_predictions = [1.0 if prediction >= 3.0 else 0.0 for prediction in predictions]
    binary_labels = [1.0 if label >= 3.0 else 0.0 for label in labels]
    pr = metric_pearsonr.compute(predictions=predictions, references=labels)
    f1 = metric_f1.compute(predictions=binary_predictions, references=binary_labels)

    return {"pearsonr": pr, "f1": f1} 

## Load Trainer

In [8]:
def make_dirs(name: str) -> Dict[str,str]:
    required_dirs = {
        'root':'./saved','model_root':'./saved/models','model_dir':f'./saved/models/{name}',
        'logging_root':'./saved/logger','logging_dir':f'./saved/logger/{name}'}
    for dir in required_dirs.values():
        if not os.path.isdir(dir):
            os.mkdir(dir)
    return required_dirs

def training_args(name: str) -> TrainingArguments:
    required_dirs = make_dirs(name)
    return TrainingArguments(
        output_dir=required_dirs['model_dir'],
        evaluation_strategy='epoch',
        save_strategy='epoch',
        num_train_epochs=EPOCHS,
        per_device_train_batch_size=BATCH_SIZE,
        per_device_eval_batch_size=BATCH_SIZE,
        learning_rate=LEARNING_RATE,
        warmup_steps=WARMUP_STEPS,
        weight_decay=WEIGHT_DECAY,
        logging_dir=required_dirs['logging_dir'],
        load_best_model_at_end=True,
        label_names=LABELS,
        metric_for_best_model=METRIC,
    )

def trainer(dataset: DatasetDict, model_name: str, model_path: str, trainable=True) -> Trainer:
    data_loader = preprocess(dataset, model_path)

    return Trainer(
        model=model(model_path, trainable),
        args=training_args(model_name),
        train_dataset=data_loader['train'],
        eval_dataset=data_loader['test']
    )

## Compare Models

In [9]:
MODEL_PATH = {
    'bert_base': 'bert-base-uncased',
    'bert_large': 'bert-large-uncased',
    'finbert': 'ProsusAI/finbert',
    'finbert_tone': 'yiyanghkust/finbert-pretrain',
}

In [10]:
bert_base_trainer = trainer(dataset, 'bert_base', MODEL_PATH['bert_base'], trainable=False)

In [12]:
bert_base_trainer.train()

***** Running training *****
  Num examples = 3388
  Num Epochs = 3
  Instantaneous batch size per device = 8
  Total train batch size (w. parallel, distributed & accumulation) = 8
  Gradient Accumulation steps = 1
  Total optimization steps = 1272


  0%|          | 0/1272 [00:00<?, ?it/s]

***** Running Evaluation *****
  Num examples = 1452
  Batch size = 8


  0%|          | 0/182 [00:00<?, ?it/s]

Saving model checkpoint to ./results/bert_base/checkpoint-424
Configuration saved in ./results/bert_base/checkpoint-424/config.json


{'eval_runtime': 126.9688, 'eval_samples_per_second': 11.436, 'eval_steps_per_second': 1.433, 'epoch': 1.0}


Model weights saved in ./results/bert_base/checkpoint-424/pytorch_model.bin


KeyError: 'eval_f1'

In [None]:
bert_base_trainer.evaluate()

## Make Datasets

In [7]:
from transformers import BertTokenizer, BertModel

In [8]:
MODEL_NAMES = {
    'bert_base': 'bert-base-uncased',
    'bert_large': 'bert-large-uncased',
    'finbert': 'ProsusAI/finbert',
    'finbert_tone': 'yiyanghkust/finbert-pretrain',
}

In [9]:
class NewsDataset(Dataset):
    """ Financial News Sentiment Corpus Dataset """
    def __init__(self, df: pd.DataFrame, tokenizer: BertTokenizer, max_len: int, num_labels: int):
        self.df = df
        self.tokenizer = tokenizer
        self.max_len = max_len
        self.num_labels = num_labels
        self.title = self.df.title
        self.target = self.df.target

    def __len__(self):
        return len(self.df)

    def __getitem__(self, index):
        title = self.title[index]
        target = self.target[index]
        inputs = self.bert_tokenize(title)
        inputs.update({'targets': self.one_hot_encoding(target)})
        return inputs

    def bert_tokenize(self, text):
        encoded_dict = self.tokenizer.encode_plus(
            text=text,
            text_pair=None,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt',
        )
        return encoded_dict

    def one_hot_encoding(self, label):
        one_hot = F.one_hot(torch.arange(self.num_labels))
        return one_hot[label].to(torch.float)

In [10]:
from sklearn.model_selection import train_test_split

train_df, valid_df = train_test_split(df, test_size=VALID_SPLIT, stratify=df.target)
train_df = train_df.reset_index(drop=True)
valid_df = valid_df.reset_index(drop=True)

In [14]:
train_datasets = dict()
valid_datasets = dict()

for name, path in MODEL_NAMES.items():
    tokenizer = BertTokenizer.from_pretrained(path)
    train_datasets[name] = NewsDataset(train_df, tokenizer, MAX_LEN, NUM_LABELS)
    valid_datasets[name] = NewsDataset(valid_df, tokenizer, MAX_LEN, NUM_LABELS)

## BERT Embedding

In [None]:
for name, path in MODEL_NAMES.items():
    tokens = train_datasets[name].__getitem__(0)
    model = BertModel.from_pretrained(MODEL_NAMES['bert_base'], output_hidden_states=True)
    result = model(tokens['input_ids'], attention_mask=tokens['attention_mask'])

In [26]:
test_data = train_datasets['bert_base'].__getitem__(0)
del test_data['targets']
result = model(**test_data)
last_hidden_state, pooler_output, hidden_states = result[0], result[1], result[2]

In [None]:
test_data = train_datasets['bert_base'].__getitem__(0)
model = BertModel.from_pretrained(MODEL_NAMES['bert_base'], output_hidden_states=True)
result = model(test_data['input_ids'],attention_mask=test_data['attention_mask'])
last_hidden_state, pooler_output, hidden_states = result[0], result[1], result[2]

In [None]:
class BertBaseModel(nn.Module):
    def __init__(self, dropout: float, num_labels: int):
        super(BertBaseModel, self).__init__()
        self.bert_model = BertModel.from_pretrained('bert-base-uncased', return_dict=True)
        self.dropout = nn.Dropout(dropout)
        self.linear = nn.Linear(768, num_labels)

    def forward(self, input_ids, attention_mask, token_type_ids):
        output = self.bert_model(input_ids, attention_mask, token_type_ids)
        output = self.dropout(output.pooler_output)
        output = self.linear(output)
        return output

# device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
device = torch.device('mps')
model = BertBaseModel(DR_RATE, NUM_LABELS)
model.to(device)

In [None]:
class BertEmbedding(nn.Module):
    def __init__(self, model_name)

## BERT

In [7]:
from transformers import BertTokenizer, BertForSequenceClassification

MODEL_NAME = 'bert-base-uncased'
tokenizer = BertTokenizer.from_pretrained(MODEL_NAME)

In [8]:
class NewsDataset(Dataset):
    """ Financial News Sentiment Corpus Dataset """
    def __init__(self, df: pd.DataFrame, tokenizer: BertTokenizer, max_len: int, num_labels: int):
        self.df = df
        self.tokenizer = tokenizer
        self.max_len = max_len
        self.num_labels = num_labels
        self.title = self.df.title
        self.target = self.df.target

    def __len__(self):
        return len(self.df)

    def __getitem__(self, index):
        title = self.title[index]
        target = self.target[index]
        inputs = self.bert_tokenize(title)
        return {
            'input_ids': inputs['input_ids'].flatten(),
            'attention_mask': inputs['attention_mask'].flatten(),
            'token_type_ids': inputs['token_type_ids'].flatten(),
            'targets': self.one_hot_encoding(target),
        }

    def bert_tokenize(self, text):
        encoded_dict = self.tokenizer.encode_plus(
            text=text,
            text_pair=None,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt',
        )
        return encoded_dict

    def one_hot_encoding(self, label):
        one_hot = F.one_hot(torch.arange(self.num_labels))
        return one_hot[label].to(torch.float)

In [9]:
from sklearn.model_selection import train_test_split

train_df, val_df = train_test_split(df, test_size=VALID_SPLIT, stratify=df.target)
train_df = train_df.reset_index(drop=True)
val_df = val_df.reset_index(drop=True)

In [10]:
train_dataset = NewsDataset(train_df, tokenizer, MAX_LEN, NUM_LABELS)
valid_dataset = NewsDataset(val_df, tokenizer, MAX_LEN, NUM_LABELS)

In [None]:
MODEL_NAME = 'bert-large-uncased'
tokenizer = BertTokenizer.from_pretrained(MODEL_NAME)

In [None]:
from transformers import BertModel
model = BertModel.from_pretrained(MODEL_NAME, output_hidden_states=True)

In [78]:
sentence = 'I love Paris'
tokens = tokenizer.encode_plus(sentence, add_special_tokens=True, max_length=7, padding='max_length', truncation=True)
print(tokens)

{'input_ids': [101, 1045, 2293, 3000, 102, 0, 0], 'token_type_ids': [0, 0, 0, 0, 0, 0, 0], 'attention_mask': [1, 1, 1, 1, 1, 0, 0]}


In [79]:
token_ids = torch.tensor(tokens['input_ids']).unsqueeze(0)
attention_mask = torch.tensor(tokens['attention_mask']).unsqueeze(0)

In [91]:
token_ids

tensor([[ 101, 1045, 2293, 3000,  102,    0,    0]])

In [80]:
result = model(token_ids,attention_mask=attention_mask)
hidden_states, pooler_output, hidden_states = result[0], result[1], result[2]

In [82]:
hidden_states[0].shape

torch.Size([1, 7, 1024])

In [81]:
len(hidden_states)

25

In [74]:
hidden_states[-1].shape

torch.Size([1, 7, 768])

In [None]:
from transformers import BertTokenizer, BertModel

FINBERT = 'ProsusAI/finbert'
finbert = BertModel.from_pretrained(FINBERT, output_hidden_states=True)
fintokenizer = BertTokenizer.from_pretrained(FINBERT)

In [88]:
result = finbert(token_ids,attention_mask=attention_mask)
hidden_states, pooler_output, hidden_states = result[0], result[1], result[2]

In [90]:
hidden_states[-1]

tensor([[[ 0.3517,  0.8137, -0.9715,  ..., -1.2517, -0.5206,  0.5874],
         [ 0.7348,  1.1075,  0.0664,  ..., -0.7516, -0.4270,  0.2203],
         [ 0.9962,  1.3483,  0.3890,  ..., -0.9823,  0.1299, -0.2494],
         ...,
         [ 0.3348,  0.3485,  0.0019,  ..., -0.2372, -0.6656, -0.1062],
         [ 0.2788,  0.7250,  0.1129,  ..., -0.3339, -0.3293,  0.1269],
         [ 0.3271,  0.4002,  0.0916,  ..., -0.2062, -0.3955, -0.0705]]],
       grad_fn=<NativeLayerNormBackward0>)

In [87]:
hidden_states[-1]

tensor([[[ 0.4045, -0.6752, -0.3544,  ...,  0.3486,  0.8672,  0.8592],
         [-1.2629,  0.2834,  0.7925,  ...,  0.3055,  0.4203,  1.9711],
         [-0.3013, -0.6540, -0.3854,  ..., -0.3721,  1.0705,  1.1328],
         ...,
         [ 1.0212, -0.9028,  0.7644,  ..., -0.5486,  0.2132,  1.7447],
         [ 0.5257, -0.3719,  0.1031,  ..., -0.4782,  0.5302,  1.3596],
         [ 0.2073, -0.4750,  0.1476,  ...,  0.2096,  0.8126,  1.1591]]],
       grad_fn=<NativeLayerNormBackward0>)

In [89]:
len(hidden_states)

13

In [11]:
train_loader = DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=0,
)
valid_loader = DataLoader(
    valid_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=0,
)

In [None]:
def load_checkpoint(checkpoint_fpath, model, optimizer):
    checkpoint = torch.load(checkpoint_fpath)
    model.load_state_dict(checkpoint['state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer'])
    valid_loss_min = checkpoint['valid_loss_min']
    return model, optimizer, checkpoint['epoch'], valid_loss_min.item()

def save_checkpoint(state, is_best, checkpoint_path, best_model_path):
    f_path = checkpoint_path
    torch.save(state, f_path)
    if is_best:
        best_fpath = best_model_path
        shutil.copyfile(f_path, best_fpath)

In [None]:
class BertBaseModel(nn.Module):
    def __init__(self, dropout: float, num_labels: int):
        super(BertBaseModel, self).__init__()
        self.bert_model = BertModel.from_pretrained('bert-base-uncased', return_dict=True)
        self.dropout = nn.Dropout(dropout)
        self.linear = nn.Linear(768, num_labels)

    def forward(self, input_ids, attention_mask, token_type_ids):
        output = self.bert_model(input_ids, attention_mask, token_type_ids)
        output = self.dropout(output.pooler_output)
        output = self.linear(output)
        return output

# device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
device = torch.device('mps')
model = BertBaseModel(DR_RATE, NUM_LABELS)
model.to(device)

In [None]:
# def loss_fn(outputs, targets):
#     return nn.BCEWithLogitsLoss()(outputs, targets)

def loss_fn(outputs, targets, num_labels):
    return nn.CrossEntropyLoss()