In [1]:
import pandas as pd
import spacy as sp
from spacy.util import minibatch, compounding
import os
import random
pd.set_option('display.max_colwidth', -1)

In [2]:
#read from stock kaggle dataset
stock_df = pd.read_csv('stock_data.csv')
stock_df.head()

Unnamed: 0,Text,Sentiment
0,"Kickers on my watchlist XIDE TIT SOQ PNK CPW BPZ AJ trade method 1 or method 2, see prev posts",1
1,user: AAP MOVIE. 55% return for the FEA/GEED indicator just 15 trades for the year. AWESOME.,1
2,user I'd be afraid to short AMZN - they are looking like a near-monopoly in eBooks and infrastructure-as-a-service,1
3,MNTA Over 12.00,1
4,OI Over 21.37,1


In [3]:
def load_training_data(
    split: float = 0.8,
    limit: int = 0
) -> tuple:
    tweets = []
    for ind, row in stock_df.iterrows():
        tweet = row['Text']
        label = row['Sentiment']
        spacy_label = {
            "cats": {
                "pos": 1 == label,
                "neg": -1 == label}
        }
        tweets.append((tweet, spacy_label))
        
    random.shuffle(tweets)

    if limit:
        tweets = tweets[:limit]
    split = int(len(tweets) * split)
    return tweets[:split], tweets[split:]

In [4]:
def evaluate_model(
    tokenizer, textcat, test_data: list
) -> dict:
    tweets, labels = zip(*test_data)
    tweets = (tokenizer(tweet) for tweet in tweets)
    true_positives = 0
    false_positives = 1e-8  # Can't be 0 because of presence in denominator
    true_negatives = 0
    false_negatives = 1e-8
    for i, tweet in enumerate(textcat.pipe(tweets)):
        true_label = labels[i]
        print(true_label)
        for predicted_label, score in tweet.cats.items():
            # Every cats dictionary includes both labels. You can get all
            # the info you need with just the pos label.
            if (
                predicted_label == "neg"
            ):
                continue
            if score >= 0.5 and true_label["pos"]:
                true_positives += 1
            elif score >= 0.5 and true_label["neg"]:
                false_positives += 1
            elif score < 0.5 and true_label["neg"]:
                true_negatives += 1
            elif score < 0.5 and true_label["pos"]:
                false_negatives += 1
    precision = true_positives / (true_positives + false_positives)
    recall = true_positives / (true_positives + false_negatives)

    if precision + recall == 0:
        f_score = 0
    else:
        f_score = 2 * (precision * recall) / (precision + recall)
    return {"precision": precision, "recall": recall, "f-score": f_score}

In [5]:
def train_model(
    training_data: list,
    test_data: list,
    iterations: int = 20
) -> None:
    # Build pipeline
    nlp = sp.load("en_core_web_sm")
    if "textcat" not in nlp.pipe_names:
        textcat = nlp.create_pipe(
            "textcat", config={"architecture": "simple_cnn"}
        )
        nlp.add_pipe(textcat, last=True)
    else:
         textcat = nlp.get_pipe("textcat")

    textcat.add_label("pos")
    textcat.add_label("neg")
    
    #only trains textcat pipe
    training_excluded_pipes = [
        pipe for pipe in nlp.pipe_names if pipe != "textcat"
    ]
    
    with nlp.disable_pipes(training_excluded_pipes):
        optimizer = nlp.begin_training()
        # Training loop
        print("Beginning training")
        print("Loss\tPrecision\tRecall\tF-score")
        batch_sizes = compounding(
            4.0, 32.0, 1.001
        )  # A generator that yields infinite series of input numbers
        
        for i in range(iterations):
            loss = {}
            random.shuffle(training_data)
            batches = minibatch(training_data, size=batch_sizes)
            for batch in batches:
                text, labels = zip(*batch)
                nlp.update(
                    text,
                    labels,
                    drop=0.2,
                    sgd=optimizer,
                    losses=loss
                )
                
            '''with textcat.model.use_params(optimizer.averages):
                evaluation_results = evaluate_model(
                    tokenizer=nlp.tokenizer,
                    textcat=textcat,
                    test_data=test_data
                )
                print(
                    f"{loss['textcat']}\t{evaluation_results['precision']}"
                    f"\t{evaluation_results['recall']}"
                    f"\t{evaluation_results['f-score']}"
                )'''
    # Save model
    with nlp.use_params(optimizer.averages):
        nlp.to_disk("model_artifacts")

In [6]:
TEST = "Food Sector as always doing extremely well: PZZA DPZ JACK MCD CAKE"

def test_model(input_data: str = TEST):
    #  Load saved trained model
    loaded_model = sp.load("model_artifacts")
    # Generate prediction
    parsed_text = loaded_model(input_data)
    # Determine prediction to return
    prediction = 0
    if parsed_text.cats["pos"] > parsed_text.cats["neg"]:
        prediction = 1
        score = parsed_text.cats["pos"]
    else:
        prediction = -1
        score = parsed_text.cats["neg"]
    #print(
        #f"Predicted sentiment: {prediction}"
        
        #f"\tScore: {score}"
    #)
    return prediction
    


In [7]:
train, test = load_training_data()
train_model(train, test)

Beginning training
Loss	Precision	Recall	F-score


In [8]:
y_true = []
predicted = []
for i in range(len(test)):
    y_true.append(test[i][1]['cats']['pos'] == True)
    predicted.append(test_model(test[i][0]))

In [9]:
from sklearn.metrics import precision_recall_fscore_support

#(precision, recall, F1, support = None)
precision_recall_fscore_support(y_true, predicted, average='micro')

(0.5418464193270061, 0.5418464193270061, 0.5418464193270061, None)