In [None]:
import pandas as pd
import sqlite3
import re

from transformers import BertForSequenceClassification, BertTokenizer
from transformers import pipeline
import torch

tokenizer = BertTokenizer.from_pretrained('ProsusAI/finbert')
model = BertForSequenceClassification.from_pretrained('ProsusAI/finbert')

In [None]:
con = sqlite3.connect('data.db')

cur = con.cursor()

articles = [*cur.execute("select * from article;")]
companies = [*cur.execute("select * from company;")]
a2c = [*cur.execute("select * from article_company;")]

con.close()

In [None]:
articles = pd.DataFrame(articles, columns=['idx', 'title', 'text', 'href', 'date']).set_index('idx')
a2c = pd.DataFrame(a2c, columns=['article_id', 'company_id'])
companies = pd.DataFrame(companies, columns=['idx', 'name', 'ticker']).set_index('idx')

In [None]:
articles['text_size'] = articles['text'].apply(lambda x: len(x.split()))
articles['date'] = pd.to_datetime(articles['date'], infer_datetime_format=True)

In [None]:
mask = articles['date'] > '2018-01-01'
articles = articles[mask]

In [None]:
mask = articles['text_size'] > 50
articles = articles[mask]
mask = articles['text_size'] < 300
articles = articles[mask]
articles['text_size'].hist(bins=50, figsize=(15,5));

In [None]:
def paragraphs_sentiment(article, model, tokenizer, chunksize=512):
    
    cleaned_paragraphs = clean_article(article)
    
    input_chunks = []
    mask_chunks = []
    for i in range(len(cleaned_paragraphs)):
        token = tokenizer.encode_plus(cleaned_paragraphs[i], add_special_tokens=False, return_tensors='pt')
        input_chunks.append(token['input_ids'][0])
        mask_chunks.append(token['attention_mask'][0])
        
    model_input = preprocess_input(chunksize, input_chunks, mask_chunks)
    
    output = model(**model_input)
    probs = torch.nn.functional.softmax(output[0], dim=-1).mean(dim=0)
    
    return probs

In [None]:
def clean_article(article):
    elements_to_pop = []
    article_chunks = article.split('\n')

    for i, chunk in enumerate(article_chunks):
        if re.search("\S*@\S*", chunk) != None or chunk == '':
            elements_to_pop.append(i)

    for i in reversed(elements_to_pop):
        article_chunks.pop(i)
    
    return article_chunks

In [None]:
def preprocess_input(chunksize, input_id_chunks, mask_chunks):
    alpha = torch.tensor([101])
    beta = torch.tensor([1])
    input_tensors = []
    mask_tensors = []

    for i in range(len(input_id_chunks)):
        if len(input_id_chunks[i]) + len(alpha) < 512:
            alpha = torch.cat([alpha, input_id_chunks[i]])
            beta = torch.cat([beta, mask_chunks[i]])
        else:
            alpha = torch.cat([alpha, torch.tensor([102]), torch.tensor([0] * (chunksize - len(alpha) - 1))])
            beta = torch.cat([beta, torch.tensor([1]), torch.tensor([0] * (chunksize - len(beta) - 1))])

            input_tensors.append(alpha)
            mask_tensors.append(beta)
            
            alpha = torch.cat([torch.tensor([101]), input_id_chunks[i]])
            beta = torch.cat([torch.tensor([1]), mask_chunks[i]])

    if input_tensors == [] or len(input_tensors[-1]) != 512:
        alpha = torch.cat([alpha, torch.tensor([102]), torch.tensor([0] * (chunksize - len(alpha) - 1))])
        beta = torch.cat([beta, torch.tensor([1]), torch.tensor([0] * (chunksize - len(beta) - 1))])

        input_tensors.append(alpha)
        mask_tensors.append(beta)
    
    input_ids = torch.stack(input_tensors)
    attention_mask = torch.stack(mask_tensors)

    input_dict = {
        'input_ids': input_ids.long(),
        'attention_mask': attention_mask.int()
    }
    
    return input_dict

In [None]:
easy_sentiment = pipeline("sentiment-analysis", model=model, tokenizer=tokenizer)

In [None]:
some_text = articles['text'].sample().item()
some_text

In [None]:
probs = paragraphs_sentiment(some_text, model, tokenizer)

In [None]:
['negative', 'neutral', 'positive'][torch.argmax(probs).item()]

In [None]:
probs @ torch.tensor([1, -1, 0]).float()

In [None]:
sentiments = []
for row in articles.sample(10).itertuples():
    #probs = paragraphs_sentiment(row.text, model, tokenizer)
    #sentiments.append((probs, probs @ torch.tensor([1, -1, 0]).float()))

sentiments