In [3]:
import pandas as pd
from transformers import BertTokenizer
import numpy as np
import torch
from functions import get_tickers, get_stock_dict, get_companies_by_50


In [5]:

QUARTERS = ["Q1", "Q2", "Q3", "Q4"]
YEARS = [str(2005 + i) for i in range(18)]
bad_starts = ["Operator:", "TRANSCRIPT", "Operator", "Related:", "Executives"]

tickers = get_tickers()
dict_of_df = get_stock_dict()
t_50, t_100, t_150, t_200, t_250 = get_companies_by_50()

model = torch.load("model 9")
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

guesses = []

def predict(text):
    # Tokenize the input text
    tokenized_text = tokenizer.encode(text, add_special_tokens=True, max_length=512, truncation=True)
    
    # Convert tokenized input to tensor and move it to the device
    input_ids = torch.tensor(tokenized_text).unsqueeze(0).to(device)
    
    # Set the model to eval mode
    model.eval()
    
    # Apparently turning off grad saves memory and computation
    with torch.no_grad():
        # Give model the inputs
        outputs = model(input_ids)
        
        # Get the logits from the model's output
        logits = outputs.logits
        
        # Calculate the probabilities using softmax
        probabilities = torch.softmax(logits, dim=-1).squeeze(0)
        
        # Get the predicted label
        predicted_label = torch.argmax(probabilities).item()
        
        # Return the predicted label and its probability
        return predicted_label, probabilities
    


RuntimeError: Attempting to deserialize object on a CUDA device but torch.cuda.is_available() is False. If you are running on a CPU-only machine, please use torch.load with map_location=torch.device('cpu') to map your storages to the CPU.

In [None]:


for tick in tickers[50:55]:
    company = t_100[tick]
    company_price_df = dict_of_df[tick]
    #Transform Date column so that the time of day is not included for price data (makes look up easier)
    company_price_df["Date"] = company_price_df["Date"].map(lambda x: x.split()[0])
    #Iterate through years
    for year in YEARS:
        #Check to see if a earnings call has been reported for the given year
        if len(company[year].keys()) != 0:
            #Iterate through all the quarters that have a released earnings call
            for quarter in company[year].keys():
                dev_texts =[]
                #Grabs date and transcript of the earnings call
                date = company[year][quarter]["date"].split()
                transcript = company[year][quarter]["transcript"]

                #Checks to see when the call is released
                #If the call is in the middle of the day we want the close price of the previous day
                #If call is after hours we use the close price of that day
                time = date[1].split(":")
                date_0 = 0
                if int(time[0] + time[1]) < 1600:
                    date_0 = (company_price_df["Date"] == date[0]).shift(-1, fill_value = False)
                    #print("first", company_price_df["Date"], date[0])
                else:
                    date_0 = company_price_df["Date"] == date[0]
                    #print("second", sum(date_0))
                if sum(date_0) != 0:
                    #Grabs the Close price prior to earnings call, 20 days after, and 60 days after
                    date_20 = date_0.shift(20, fill_value = False)
                    date_60 = date_0.shift(60, fill_value = False)
                    data_on_dates = company_price_df[date_0 + date_20 + date_60]
                    close_price_0, close_price_20, close_price_60 = data_on_dates["Close"]

                    #Calculates the one month and three month price change
                    one_month_change = ((close_price_20 - close_price_0) / close_price_0) * 100
                    three_month_change = ((close_price_60 - close_price_0) / close_price_0) * 100

                    #Creates a label for whether the stock has gone up, down, or stayed the same after the call release
                    label = 0
                    if one_month_change > 2 and three_month_change > 4:
                        label = 1
                    elif one_month_change < -2 and three_month_change < -4:
                        label = -1

                    sentences = transcript.split("\n")
                    for sentence in sentences:
                        check = sentence.split()
                        check1 = sentence.split(":")
                        if len(check) !=0 and len(check) > 5 and len(check) < 513:
                            if check[0] not in bad_starts and len(check1) == 2:
                                #print(sentence.split(":"))
                                dev_texts.append(sentence.split(":")[1])
                    
                    first_method = []
                    second_method = []
                    for text in dev_texts:
                        predicted_label, probability = predict(text)
                        first_method.append(predicted_label)
                        second_method.append(probability)
                    
                    first_method_pred = torch.argmax(torch.Tensor([first_method.count(0), first_method.count(1), first_method.count(2)])).item()
                    second_method_pred = torch.argmax(torch.Tensor(sum(second_method))).item()

                    guesses.append((second_method_pred, label))
                    print("Method 1 Predition: ", first_method_pred)
                    print("Method 2 Predition: ", second_method_pred)
                    print("Actual Label: ", (label + 3)%3)

print("All Guesses: ", guesses)
t = 0
for x in guesses:
    if x[0] == x[1]:
        t += 1

print("Guess Accuracy: ", t / len(guesses))