In [7]:
import pandas as pd
from pathlib import Path
import os
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.naive_bayes import MultinomialNB
from sklearn.metrics import confusion_matrix, accuracy_score
from sklearn.feature_extraction.text import CountVectorizer
from imblearn.pipeline import Pipeline
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.metrics import classification_report
from sklearn.linear_model import SGDClassifier
from imblearn.over_sampling import SMOTE
from sklearn.linear_model import LogisticRegression
from imblearn.under_sampling import NearMiss
from sklearn.utils._testing import ignore_warnings
from sklearn.exceptions import ConvergenceWarning
import datetime

In [5]:
def setup(sample=1000, ratio=0.8, is_token=True, sample_method=0):
    # set directory
    directory = Path(os.getcwd())   # 'c:\\Users\\samue\\OneDrive\\Desktop\\JPM Fintech\\HKUST-x-J.P.-Morgan-Fintech-Mentorship-Program\\model_testing\\supervised'
    directory = directory.parent.parent.parent.parent.joinpath("data")  # 'c:\\Users\\samue\\OneDrive\\Desktop\\JPM Fintech\\data'
    df = pd.read_pickle(directory.joinpath("stocktwits_processed_sample.pkl"))

    # remove symbols with too little sample size
    df = df.groupby('symbols').filter(lambda x : len(x)>sample)

    if sample_method == 0:
        # turn all symbols into the same sample size
        i = sample
        for j in set(df['symbols']):
            df = df[df['symbols'] != j].append(df[df['symbols'] == j].sample(i), ignore_index=True)
        #print(df['symbols'].value_counts())

    df_test = pd.DataFrame(df[['text', 'symbols']])
    df['time'] = [datetime.date.fromtimestamp(date) for date in df['time']]
    df_test['text'] = [j[0]+j[1] for j in list(zip(list([str(i) for i in df['time']]), list(df['text'])))]

    df_train = df_test.sample(frac=0.8,random_state=200)
    df_test = df_test.drop(df_train.index)

    if is_token:
        X_train = df_train.tokens
        X_test = df_test.tokens
    else:
        X_train = df_train.text
        X_test = df_test.text
        
    y_train = df_train.symbols
    y_test = df_test.symbols

    print_str = "{} unique classes\t{} training samples\t{} test samples".format(len(set(y_train)), len(X_train), len(X_test))

    return X_train, X_test, y_train, y_test, print_str

In [3]:
@ignore_warnings(category=ConvergenceWarning)
def model(X_train, X_test, y_train, y_test, model_method, sample_method):
    # Build the model
    model = Pipeline([('vect', CountVectorizer()),
                    ('tfidf', TfidfTransformer())])

    if sample_method == 1:
        model.steps.append(['smote', SMOTE(random_state=12)])
    elif sample_method == 2:
        model.steps.append(['nm', NearMiss()])

    if model_method == 0:
        model.steps.append(['clf', SGDClassifier(loss='hinge', penalty='l2',alpha=1e-3, random_state=42, max_iter=5, tol=None)])
    elif model_method == 1:
        model.steps.append(['clf', MultinomialNB()])
    elif model_method == 2:
        model.steps.append(['clf', LogisticRegression(n_jobs=1, C=1e5)])

    # Train the model using the training data
    model.fit(X_train, y_train)
    # Predict the symbols of the test data
    y_pred = model.predict(X_test)
    # accuracy
    #print("The accuracy is {}".format(accuracy_score(y_test, y_pred)))
    
    # plot the confusion matrix
    mat = confusion_matrix(y_test, y_pred)
    #sns.heatmap(mat.T, square = True, annot=True, fmt = "d", xticklabels=set(y_train),yticklabels=set(y_train))
    #plt.xlabel("true labels")
    #plt.ylabel("predicted label")
    #plt.show()

    return accuracy_score(y_test, y_pred), classification_report(y_test, y_pred,target_names=set(y_train)), mat

In [9]:
iteration = 10

for s in [500,1000]:
    for r in [0.7]:
        for t in [False]:
            print('sample:{}\tratio:{}\ttoken:{}'.format(s,r,t))
    
            scores = []
            for i in range(iteration):
                X_train, X_test, y_train, y_test, print_str = setup(s, r, t, 0)
                if i == 0:
                    print(print_str)
                score = model(X_train, X_test, y_train, y_test, 0, 0)[0]
                scores.append(score)
            print('SVM: {}'.format(sum(scores)/len(scores)))
            print()
        

            scores = []
            for i in range(iteration):
                X_train, X_test, y_train, y_test, print_str = setup(s, r, t, 0)
                if i == 0:
                    print(print_str)
                score = model(X_train, X_test, y_train, y_test, 1, 0)[0]
                scores.append(score)
            print('NB: {}'.format(sum(scores)/len(scores)))
            print()

        
            scores = []
            for i in range(iteration):
                X_train, X_test, y_train, y_test, print_str = setup(s, r, t, 0)
                if i == 0:
                    print(print_str)
                score = model(X_train, X_test, y_train, y_test, 2, 0)[0]
                scores.append(score)
            print('logreg: {}'.format(sum(scores)/len(scores)))
        
            print()

sample:500	ratio:0.7	token:False
18 unique classes	7200 training samples	1800 test samples
SVM: 0.33061111111111113

18 unique classes	7200 training samples	1800 test samples
NB: 0.2940555555555555

18 unique classes	7200 training samples	1800 test samples
logreg: 0.32788888888888895

sample:1000	ratio:0.7	token:False
12 unique classes	9600 training samples	2400 test samples
SVM: 0.38575000000000004

12 unique classes	9600 training samples	2400 test samples
NB: 0.36225

12 unique classes	9600 training samples	2400 test samples
logreg: 0.37070833333333336

