In [18]:
import nltk
import pandas as pd
import random
from typing import List, Mapping, Optional, Sequence

# import gensim
import numpy as np
from numpy.typing import NDArray
from download_hf import download_parquet

from afinn import Afinn
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression

FloatArray = NDArray[np.float64]

"""
This script was specifically written to prepare the financial_phrasebank dataset
    financial_phrasebank has a single sentence per row
    labels range from 0 (negative), 1(neutral), 2 (positive)
    there are 4 datasets based on % agreement between annotators
"""
import gensim.downloader as api

google_news = api.load("word2vec-google-news-300")
# google_news.save("word2vec-google-news-300.model")
afin = Afinn()


# import sentiment_economy_news dataset
def import_data(split):
    """
    Import financial_phrasebank dataset
    splits available:
        sentences_50agree,
        sentences_66agree,
        sentences_75agree,
        sentences_allagree
    """
    return download_parquet("financial_phrasebank", split)


from nltk.corpus import wordnet

# 1: Define a bunch of functions

In [19]:
def Negation(sentence):
    """
    Input: Tokenized sentence (List of words)
    Output: Tokenized sentence with negation handled (List of words)
    """
    temp = int(0)
    for i in range(len(sentence)):
        if sentence[i - 1] in ["not", "n't"]:
            antonyms = []
            for syn in wordnet.synsets(sentence[i]):
                syns = wordnet.synsets(sentence[i])
                w1 = syns[0].name()
                temp = 0
                for l in syn.lemmas():
                    if l.antonyms():
                        antonyms.append(l.antonyms()[0].name())
                max_dissimilarity = 0
                for ant in antonyms:
                    syns = wordnet.synsets(ant)
                    w2 = syns[0].name()
                    syns = wordnet.synsets(sentence[i])
                    w1 = syns[0].name()
                    word1 = wordnet.synset(w1)
                    word2 = wordnet.synset(w2)
                    if isinstance(word1.wup_similarity(word2), float) or isinstance(
                        word1.wup_similarity(word2), int
                    ):
                        temp = 1 - word1.wup_similarity(word2)
                    if temp > max_dissimilarity:
                        max_dissimilarity = temp
                        antonym_max = ant
                        sentence[i] = antonym_max
                        sentence[i - 1] = ""
    while "" in sentence:
        sentence.remove("")
    return sentence


def clean_text(text):
    """Clean text incorporating Negation handling and stopwords."""
    # turn to lowercase
    text = text.lower()
    # word tokenization
    text = nltk.word_tokenize(text)
    # negation handling
    text = Negation(text)
    # remove punctuation
    text = [word for word in text if word.isalnum()]
    # remove stopwords
    stopwords = nltk.corpus.stopwords.words("english")
    text = [word for word in text if word not in stopwords]
    if text == "":
        pass
    else:
        return " ".join(text)


def tokenize_financial_phrasebank(df):
    """
    Tokenize sentiment economy news.
    """
    # tokenize sentences
    df["tokenized_sentences"] = df["sentence"].apply(clean_text)
    df = df.loc[df.tokenized_sentences != ""]
    return df


def sum_token_embeddings(
    token_embeddings: Sequence[FloatArray],
) -> FloatArray:
    """Sum the token embeddings."""
    total: FloatArray = np.array(token_embeddings).sum(axis=0)
    return total


def map_labels(df):
    """Map labels to integers."""
    label_map = {0: -1, 1: 0, 2: 1}
    return df["label"].map(label_map)


def split_train_test(
    X: FloatArray, y: FloatArray, test_percent: float = 20
) -> tuple[FloatArray, FloatArray, FloatArray, FloatArray]:
    """Split data into training and testing sets."""
    N = len(y)
    data_idx = list(range(N))
    random.shuffle(data_idx)
    break_idx = round(test_percent / 100 * N)
    training_idx = data_idx[break_idx:]
    testing_idx = data_idx[:break_idx]
    X_train = X[training_idx, :]
    y_train = y[training_idx]
    X_test = X[testing_idx, :]
    y_test = y[testing_idx]
    return X_train, y_train, X_test, y_test

In [20]:
def generate_data_word2vec(df: pd.DataFrame) -> tuple[FloatArray, FloatArray]:
    """Generate training and testing data with word2vec."""
    # load pre-trained word2vec model
    # google_news = gensim.models.KeyedVectors.load("word2vec-google-news-300.model")
    X: FloatArray = np.array(
        [
            # sum the token embeddings for each sentence. If word is not in the model, return embedding of ['UNK']
            sum_token_embeddings(
                [
                    google_news[word] if word in google_news else google_news["UNK"]
                    for _, word in enumerate(sentence)
                ]
            )
            for _, sentence in enumerate(df.tokenized_sentences)
        ]
    )
    # labels = [-1, 0, 1] seems to be causing an error
    # y: FloatArray = np.array(map_labels(df))
    y: FloatArray = np.array(df.label)
    return split_train_test(X, y)

In [21]:
def generate_observation_word2vec(sentence):
    X: FloatArray = np.array(
        [
            sum_token_embeddings(
                [
                    google_news[word] if word in google_news else google_news["UNK"]
                    for _, word in enumerate(sentence)
                ]
            )
        ]
    )
    return X


def etl(split):
    """
    Extract, transform, and load financial_phrasebank
    """
    df = import_data(split)
    df = tokenize_financial_phrasebank(df)
    X_train, y_train, X_test, y_test = generate_data_word2vec(df)
    return X_train, y_train, X_test, y_test


def aggregate_all_splits():
    """
    Aggregate all splits of financial_phrasebank
    """
    df = pd.DataFrame()
    for split in [
        "sentences_50agree",
        "sentences_66agree",
        "sentences_75agree",
        "sentences_allagree",
    ]:
        df = pd.concat([df, import_data(split)])
    df = tokenize_financial_phrasebank(df)
    X_train, y_train, X_test, y_test = generate_data_word2vec(df)
    return X_train, y_train, X_test, y_test

In [6]:
def aggregate_fin_data():
    """
    Aggregate all splits of financial_phrasebank
    """
    df = pd.DataFrame()
    for split in [
        "sentences_50agree",
        "sentences_66agree",
        "sentences_75agree",
        "sentences_allagree",
    ]:
        df = pd.concat([df, import_data(split)])
    df = tokenize_financial_phrasebank(df)
    return df

# 2: Create Synthetic Data

In [7]:
df = aggregate_fin_data()
negative_df = df[df["label"] == 0]
neutral_df = df[df["label"] == 1]
positive_df = df[df["label"] == 2]

### a. Create lists of all the words in each sentiment category (including redundancies)

In [8]:
positive_string = " ".join(positive_df["tokenized_sentences"])
positive_words = positive_string.split()

neutral_string = " ".join(neutral_df["tokenized_sentences"])
neutral_words = neutral_string.split()

negative_string = " ".join(negative_df["tokenized_sentences"])
negative_words = negative_string.split()

In [9]:
import prep_financial_phrasebank as prep

### b. Make a Dataframe where each row is a random probabilistic collection of words from that sentiment's associated word list

In [10]:
columns = ["text", "label"]
synthetic_df = pd.DataFrame(columns=columns)
for i in range(5000):
    max_length = 15
    length = int(np.round(np.random.uniform(0.15, 1) * max_length))
    types = 2
    sent = int(np.round(np.random.uniform(0, 1) * types))
    if sent == 1:
        rand_text = [random.choice(neutral_words) for _ in range(length)]
    elif sent == 2:
        rand_text = [random.choice(positive_words) for _ in range(length)]
    elif sent == 0:
        rand_text = [random.choice(negative_words) for _ in range(length)]
    random_text = " ".join(rand_text)
    synthetic_df.loc[i] = [random_text, sent]  # Add rand_text to synthetic_df

synthetic_df["text"] = synthetic_df["text"].apply(prep.clean_text)
# remove empty strings
synthetic_df = synthetic_df[synthetic_df["text"] != ""]

In [11]:
from prep_financial_phrasebank import (
    tokenize_financial_phrasebank,
    generate_data_word2vec,
    sum_token_embeddings,
)

# 3: Train 2 1D Afin models on synthetic data

In [25]:
def sum_vader_scores(sentence):
    sent_score = np.sum(np.array([afin.score(word) for word in sentence]))
    return sent_score


def generate_X_vader(df):
    X = np.array([sum_vader_scores(sentence) for sentence in df["text"]])
    return X.reshape(-1, 1)


def generate_y_vader(df):
    y = np.array(df.label).reshape(-1, 1)
    return y

In [23]:
synthetic_df

Unnamed: 0,text,label
0,investments absolut nomination 51 expected 200...,1
1,drying investrend new part panfish software re...,1
2,systems printing ponzi financially eur0 2009 n...,0
3,due prospects eur first slide construction pro...,0
4,increased result corresponding solutions serie...,2
...,...,...
4995,salaried also customers rental region,1
4996,entitle scheduled continued company customized...,1
4997,earlier said systems alone lassila decision ea...,0
4998,profit finnish eur eur320 chief mn unit per ra...,0


### a. Train afinn Logistic Regression

In [27]:
X_single = generate_X_vader(synthetic_df)
y_single = generate_y_vader(synthetic_df)
X_train, X_test, y_train, y_test = train_test_split(
    X_single, y_single, test_size=0.2, random_state=42
)
clf = LogisticRegression(random_state=0, max_iter=1000).fit(X_train, y_train)

afinn (train): 0.501
afinn (test): 0.511


  y = column_or_1d(y, warn=True)


In [None]:
print("afinn Logistic Regression (train):", clf.score(X_train, y_train))
print("afinn Logistic Regression (test):", clf.score(X_test, y_test))

### b. Train afinn RNN 

In [28]:
import torch
from torch import nn
from torch.optim import Adam

X_train, X_test, y_train, y_test = train_test_split(
    X_single, y_single, test_size=0.2, random_state=42
)


class SimpleRNN(nn.Module):
    def __init__(self, hidden_size):
        super(SimpleRNN, self).__init__()
        self.rnn = nn.RNN(input_size=1, hidden_size=hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, 1)

    def forward(self, x):
        out, _ = self.rnn(x)
        out = self.fc(out[:, -1, :])
        return out


# Convert numpy arrays to PyTorch tensors
X_train_torch = torch.from_numpy(X_train).float()
y_train_torch = torch.from_numpy(y_train).float()

# Reshape X_train to be (batch_size, sequence_length, input_size)
X_train_torch = X_train_torch.view(-1, 1, 1)

# Initialize the model, loss function, and optimizer
model = SimpleRNN(hidden_size=10)
criterion = nn.MSELoss()
optimizer = Adam(model.parameters(), lr=0.01)

# Train the model
for epoch in range(100):
    model.train()
    optimizer.zero_grad()
    outputs = model(X_train_torch)
    loss = criterion(outputs, y_train_torch)
    loss.backward()
    optimizer.step()
    # print(f'Epoch: {epoch+1}, Loss: {loss.item()}')

In [31]:
# Convert numpy arrays to PyTorch tensors
X_test_torch = torch.from_numpy(X_test).float()
y_test_torch = torch.from_numpy(y_test).float()
X_train_torch = torch.from_numpy(X_train).float()
y_train_torch = torch.from_numpy(y_train).float()

# Reshape X_test to be (batch_size, sequence_length, input_size)
X_test_torch = X_test_torch.view(-1, 1, 1)
X_train_torch = X_train_torch.view(-1, 1, 1)

# Switch model to evaluation mode
model.eval()

# Make predictions
with torch.no_grad():
    predictions_test = model(X_test_torch)
    predictions_train = model(X_train_torch)

# Calculate the loss
loss = criterion(predictions_test, y_test_torch)
print(f"Test Loss: {loss.item()}")

# Convert the model's output to binary labels
predicted_test_labels = torch.round(predictions_test)

# Calculate the number of correct predictions
correct_test_predictions = (predicted_test_labels == y_test_torch).float().sum()

# Calculate the accuracy
accuracy_test = correct_test_predictions / y_test_torch.shape[0]

# Convert the model's output to binary labels
predicted_train_labels = torch.round(predictions_train)

# Calculate the number of correct predictions
correct_train_predictions = (predicted_train_labels == y_train_torch).float().sum()

# Calculate the accuracy
accuracy_train = correct_train_predictions / y_train_torch.shape[0]

Test Loss: 0.4890991747379303
afinn Logistic Regression (train): 0.501
afinn Logistic Regression (test): 0.511
afinn RNN (train)): 0.5009999871253967
afinn RNN (test)): 0.5109999775886536


### c. Get Results for both

In [32]:
print("afinn Logistic Regression (train):", clf.score(X_train, y_train))
print("afinn Logistic Regression (test):", clf.score(X_test, y_test))
print(f"afinn RNN (train)): {accuracy_train.item()}")
print(f"afinn RNN (test)): {accuracy_test.item()}")

afinn Logistic Regression (train): 0.501
afinn Logistic Regression (test): 0.511
afinn RNN (train)): 0.5009999871253967
afinn RNN (test)): 0.5109999775886536


# 4: Train 2 Word2Vec models on both synthetic and real data

### a. make functions to run Logistic Regression Model on synthetic data

In [12]:
def aggregate_fake_splits(fakedata):
    """
    Aggregate all splits of financial_phrasebank
    """
    df = fakedata
    df = tokenize_financial_phrasebank(df)
    X_train, y_train, X_test, y_test = generate_data_word2vec(df)
    return X_train, y_train, X_test, y_test


def run_experiment1(synthetic_df) -> None:
    from sklearn.linear_model import LogisticRegression

    # prepare training and testing data
    X_train, y_train, X_test, y_test = aggregate_fake_splits(synthetic_df)

    clf = LogisticRegression(random_state=0, max_iter=1000).fit(X_train, y_train)
    print("word2vec (train):", clf.score(X_train, y_train))
    print("word2vec (test):", clf.score(X_test, y_test))

    return clf

In [13]:
columns = ["sentence", "label"]
New_Synthetic_Data = pd.DataFrame(columns=columns)
New_Synthetic_Data["sentence"] = synthetic_df["text"]
New_Synthetic_Data["label"] = synthetic_df["label"]
New_Synthetic_Data

Unnamed: 0,sentence,label
0,investments absolut nomination 51 expected 200...,1
1,drying investrend new part panfish software re...,1
2,systems printing ponzi financially eur0 2009 n...,0
3,due prospects eur first slide construction pro...,0
4,increased result corresponding solutions serie...,2
...,...,...
4995,salaried also customers rental region,1
4996,entitle scheduled continued company customized...,1
4997,earlier said systems alone lassila decision ea...,0
4998,profit finnish eur eur320 chief mn unit per ra...,0


### b. Define RNN model for synthetic data

In [14]:
def RNN_experiment_torch_synth(synthetic_df):
    import torch
    import torch.nn as nn
    import torch.nn.functional as F
    import torch.optim as optim
    from torch.utils.data import Dataset, DataLoader
    from torch.nn.utils.rnn import pad_sequence
    from sklearn import metrics

    # prepare training and testing data
    # X_train, y_train, X_test, y_test = aggregate_all_splits()
    X_train, y_train, X_test, y_test = aggregate_fake_splits(synthetic_df)

    # convert to torch tensors
    X_train = torch.from_numpy(X_train)
    y_train = torch.from_numpy(y_train)
    X_test = torch.from_numpy(X_test)
    y_test = torch.from_numpy(y_test)

    # create dataset
    class FinancialPhraseBankDataset(Dataset):
        def __init__(self, X, y):
            self.X = X
            self.y = y

        def __len__(self):
            return len(self.y)

        def __getitem__(self, idx):
            return self.X[idx], self.y[idx]

    # create dataloader
    train_dataset = FinancialPhraseBankDataset(X_train, y_train)
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

    # define model
    class Net(nn.Module):
        def __init__(self):
            super(Net, self).__init__()
            self.fc1 = nn.Linear(300, 300)
            self.fc2 = nn.Linear(300, 3)

        def forward(self, x):
            x = F.relu(self.fc1(x))
            x = self.fc2(x)
            return x

    net = Net()

    # define loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

    # train
    for epoch in range(20):  # loop over the dataset multiple times
        running_loss = 0.0
        for i, data in enumerate(train_loader, 0):
            # get the inputs; data is a list of [inputs, labels]
            inputs, labels = data

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = net(inputs.float())
            # print(outputs.shape)
            # print(labels.shape)
            loss = criterion(outputs, labels.long())
            loss.backward()
            optimizer.step()

            # print statistics
            running_loss += loss.item()
            # if i % 100 == 99:  # print every 100 mini-batches
            # print("[%d, %5d] loss: %.3f" % (epoch + 1, i + 1, running_loss / 100))

    # print("Finished Training")
    # After training, generate predictions on test data
    X_test = X_test.float()
    outputs_test = net(X_test)
    _, predicted_test = torch.max(outputs_test, 1)
    outputs_train = net(X_train)
    _, predicted_train = torch.max(outputs_train, 1)
    # Convert tensors to numpy arrays for comparison with sklearn metrics
    y_test_np = y_test.numpy()
    predicted_test_np = predicted_test.numpy()

    # Convert tensors to numpy arrays for comparison with sklearn metrics
    y_train_np = y_train.numpy()
    predicted_train_np = predicted_train.numpy()

    # Now you can use sklearn's metrics to compare y_test_np and predicted_np
    # For example, to calculate accuracy:
    accuracy1 = metrics.accuracy_score(y_train_np, predicted_train_np)
    print("RNN (train): ", accuracy1)
    accuracy2 = metrics.accuracy_score(y_test_np, predicted_test_np)
    print("RNN (test): ", accuracy2)

### c. Define RNN model for real data

In [15]:
def RNN_experiment_torch():
    import torch
    import torch.nn as nn
    import torch.nn.functional as F
    import torch.optim as optim
    from torch.utils.data import Dataset, DataLoader
    from torch.nn.utils.rnn import pad_sequence
    from sklearn import metrics

    # prepare training and testing data
    X_train, y_train, X_test, y_test = aggregate_all_splits()
    # X_train, y_train, X_test, y_test = etl("sentences_allagree")

    # convert to torch tensors
    X_train = torch.from_numpy(X_train)
    y_train = torch.from_numpy(y_train)
    X_test = torch.from_numpy(X_test)
    y_test = torch.from_numpy(y_test)

    # create dataset
    class FinancialPhraseBankDataset(Dataset):
        def __init__(self, X, y):
            self.X = X
            self.y = y

        def __len__(self):
            return len(self.y)

        def __getitem__(self, idx):
            return self.X[idx], self.y[idx]

    # create dataloader
    train_dataset = FinancialPhraseBankDataset(X_train, y_train)
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

    # define model
    class Net(nn.Module):
        def __init__(self):
            super(Net, self).__init__()
            self.fc1 = nn.Linear(300, 350)
            self.fc2 = nn.Linear(350, 3)

        def forward(self, x):
            x = F.relu(self.fc1(x))
            x = self.fc2(x)
            return x

    net = Net()

    # define loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

    # train
    for epoch in range(20):  # loop over the dataset multiple times
        running_loss = 0.0
        for i, data in enumerate(train_loader, 0):
            # get the inputs; data is a list of [inputs, labels]
            inputs, labels = data

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = net(inputs.float())
            # print(outputs.shape)
            # print(labels.shape)
            loss = criterion(outputs, labels.long())
            loss.backward()
            optimizer.step()

            # print statistics
            running_loss += loss.item()
            # if i % 100 == 99:  # print every 100 mini-batches
            # print("[%d, %5d] loss: %.3f" % (epoch + 1, i + 1, running_loss / 100))

    # print("Finished Training")
    # After training, generate predictions on test data
    X_test = X_test.float()
    outputs_test = net(X_test)
    _, predicted_test = torch.max(outputs_test, 1)
    outputs_train = net(X_train)
    _, predicted_train = torch.max(outputs_train, 1)
    # Convert tensors to numpy arrays for comparison with sklearn metrics
    y_test_np = y_test.numpy()
    predicted_test_np = predicted_test.numpy()

    # Convert tensors to numpy arrays for comparison with sklearn metrics
    y_train_np = y_train.numpy()
    predicted_train_np = predicted_train.numpy()

    # Now you can use sklearn's metrics to compare y_test_np and predicted_np
    # For example, to calculate accuracy:
    accuracy1 = metrics.accuracy_score(y_train_np, predicted_train_np)
    print("RNN (train): ", accuracy1)
    accuracy2 = metrics.accuracy_score(y_test_np, predicted_test_np)
    print("RNN (test): ", accuracy2)

### d. Get logistic model for real datafrom prep_financial_phrasebank

In [16]:
from prep_financial_phrasebank import run_experiment

### Get results from both

In [17]:
print("Synthetic Data Results:")
clf = run_experiment1(New_Synthetic_Data)
rnn = RNN_experiment_torch_synth(New_Synthetic_Data)
print("")
print("")
print("")
print("Real Data Results:")
logreg = run_experiment()
rnn2 = RNN_experiment_torch()

Synthetic Data Results:


STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(


word2vec (train): 0.5805
word2vec (test): 0.585
RNN (train):  0.56
RNN (test):  0.573



Real Data Results:


STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(


word2vec (train): 0.6507909652313679
word2vec (test): 0.6385786802030456
RNN (train):  0.6434311817951104
RNN (test):  0.6490693739424704
