In [1]:
import requests
import json
import pandas as pd
import unicodedata
import re
from cleanco import basename
import csv
import yfinance as yf
import wikipedia
import nltk
import string
from nltk.util import ngrams
from fuzzywuzzy import fuzz

In [2]:
class model: 
    def __init__(self, test_filename=None):
        self.data = self.load_data()
        if test_filename:
            self.test = self.test_cases(test_filename)
            
    def load_raw(self):
        ###this method was used to grab original raw data provided; not used in the main method 
        response = requests.get("https://www.sec.gov/files/company_tickers.json")

        data_object = json.loads(response.text)
        
        df = pd.DataFrame.from_dict(data_object, orient='index')
        
        return df
        
    def query_yahoo_data(self,filename): 
        ###This script is to add summary into to the data file -> resulting data is saved under data/company_data.csv
        if self.data_processed: 
            with open(filename, 'w', newline='', encoding="utf-8") as csvfile:
                fieldnames = ["ticker", "title", "summary"]
                writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
                writer.writeheader()

                for index, row in self.data.iterrows():

                    try: 
                        stock_info = {
                            "ticker": row.ticker,
                            "title": row.title, 
                            "summary": yf.Ticker(row.ticker).info["regularMarketDescription"]                
                        }
                        print('number of rows added [%d]\r'%int(index), end="")

                        writer.writerow(stock_info)

                    except Exception as e: 
                        print(e)
        return 
      
    def extract_nouns(self, text):
        ###This method is to extract all proper nouns or words with the first letter being capital
        
        # Tokenize the text
        tokens = nltk.word_tokenize(text)

        # Extract the nouns using a regular expression
        pattern = r'\b[A-Z0-9][^\s]*\b'
        nouns = re.findall(pattern, text)
        
        #join back to a string 
        return " ".join(nouns)
    
    def remove_duplicates(self,text):
        ###this method is not being used; for testing purpose to see if reducing duplicates in summary text is more effective
        
        # Split the string into a list of words
        words = text.split()

        # Remove duplicates from the list
        words = list(set(words))

        # Join the list back into a single string
        return " ".join(words)
    
    def remove_punctuation(self, input_string):
        ###this method is used to remove all special characters 
        
        # Make a translator object to remove punctuation
        translator = str.maketrans('', '', string.punctuation)

        # Use this object to remove the punctuation from the input string
        no_punct = input_string.translate(translator)
        return no_punct
    
    def calculate_occurence(self, input_val, summary_text):
        ###this is main algorithm for processing summary text. 
        ###I generate all ngram substring of summary text based on length of input_value to evaluate under fuzz.token_set_ratio method
        ###I specify full_process to be False as it needs to capture proper nouns and cannot be lowercased 
        
        sum_val = 0
        if summary_text:
            for s1 in ngrams(summary_text.split(), len(input_val.split())):
                for gram in s1:
                    ratio = fuzz.token_set_ratio(gram.lower(), input_val, full_process=False)
                    if ratio >= 85: 
                        sum_val += ratio
        return sum_val 
        
    def load_data(self):
        ###this method will automatically trigger when the class is initilized to load the starting data 
        ###It will clean the data in a dataframe format then return 
        total_df = pd.read_csv("data/company_data.csv",encoding='latin-1')
        total_df = total_df.dropna()
        total_df = total_df.drop_duplicates(subset="ticker")
        total_df = total_df.drop_duplicates(subset="title")

        total_df["summary"] = total_df["summary"].apply(lambda text: self.extract_nouns(text))

        # total_df["summary"] = total_df["summary"].apply(lambda text: self.remove_duplicates(text))
        total_df["summary"] = total_df["summary"].apply(lambda text: self.remove_punctuation(text))

        total_df["title"] = total_df["title"].str.lower()
        total_df["title"] = total_df["title"].str.strip()
        total_df["title"] = total_df["title"].apply(lambda company: re.sub(r"\(.*\)", "", company))
        total_df["title"] = total_df["title"].apply(lambda text: self.remove_punctuation(text))
        total_df["title"] = total_df["title"].apply(lambda company: re.sub(r"\b(com|class|a|b|ordinary|depositary|shares|share|common|stock|cap|ix|manufacturing|incorporated|platforms|enterprises|manufacturing|company|companies|inc|corp|co|de|ltd|nv|plc|ag|us|se|asa|llc|holdings|holding|mobil|pharmaceuticals|limited|gmbh|sa|lp)\b", "", company))
        total_df["title"] = total_df["title"].str.strip()
        return total_df
        
    def predict(self, input_data, predict_type=None):
        ###this is the main method for predicting given an input string 
        ###if predict type is None it will return a ticker:string 
        ###if predict type is "test" it will return a dataframe with the first 5 rows
        
        
        #given input is first cleaned
        input_data = re.sub(r"\(.*\)", "", input_data)
        input_data = input_data.lower().strip()
        input_data = re.sub(r"\(.*\)", "", input_data)
        
        input_data = self.remove_punctuation(input_data)
        input_data = re.sub(r"\b(com|cap|ix|incorporated|class|a|b|common|stock|shares|share|ordinary|depositary|manufacturing|platforms|enterprises|corporation|company|companies|inc|corp|co|de|ltd|nv|plc|ag|us|se|asa|llc|holdings|holding|mobil|pharmaceuticals|limited|gmbh|sa|lp)\b", "", input_data)
        input_data = input_data.strip()
        
        #given input is compared against ticker 
        self.data["ratio_ticker"] = self.data["ticker"].apply(lambda text: 
                                                                  fuzz.ratio(input_data.lower(), text.lower()) 
                                                             )
        self.data["ratio_ticker"] = self.data["ratio_ticker"].apply(lambda ratio: 0 if ratio<=80 else ratio)

        #given input is compared against title 
        self.data["ratio_title"] = self.data["title"].apply(lambda text: 
                                                                fuzz.ratio(input_data.lower(), text) 
                                                           )
        self.data["ratio_title"] = self.data["ratio_title"].apply(lambda ratio: 0 if ratio<=80 else ratio)

        #given input is compared against summary 
        self.data["ratio_summary"] = self.data["summary"].apply(lambda text: self.calculate_occurence(input_data.lower(), text))
        self.data["ratio_total"] = self.data["ratio_ticker"] + self.data["ratio_title"] + self.data["ratio_summary"]
        
        if predict_type == "test":
            return self.data.sort_values(by=["ratio_title", "ratio_ticker", "ratio_summary"],ascending=[False, False, False]).head()
        
        return self.data.sort_values(by=["ratio_title", "ratio_ticker", "ratio_summary"],ascending=[False, False, False]).iloc[0]["ticker"]

    
    def test_cases(self, filename):
        ###for testing purposes
        
        #or use sp500_data.csv for sp500 data
        test = pd.read_csv("data/nasdaq_screener.csv")
        test = pd.merge(self.data, test, left_on="ticker", right_on="Symbol", how="inner")
        test = test[["Name", "Symbol"]]
        test["result"] = None
        test["ratio_title"] = None
        test["ratio_ticker"] = None
        test["ratio_summary"] = None
        test["ratio_total"] = None
        
        #samples 100 rows without replacement 
        test = test.sample(n=100, replace=False)
        num_rows = 0 
        
        #iterates through each input and generate a prediction 
        for index, row in test.iterrows():
            try:
                input_data = row["Name"]
                result = self.predict(input_data, predict_type = "test") 
                result_ticker = result["ticker"]
                ratio_title = result["ratio_title"]
                ratio_ticker = result["ratio_ticker"]
                ratio_summary = result["ratio_summary"]
                total_ratio = result["ratio_total"]
                
                
                test.at[index, "result"] = result_ticker
                test.at[index, "ratio_title"] = ratio_title
                test.at[index, "ratio_ticker"] = ratio_ticker
                test.at[index, "ratio_summary"] = ratio_summary
                test.at[index, "ratio_total"] = total_ratio
                
                
                print('number of rows tested [%d]\r'%int(num_rows), end="")
                
            except Exception as e:
                print(e)
            
            num_rows+=1
            
        test.to_csv(filename)
        
        return 
    
    def output_dictionary(self,dictionary): 
 
        # name of csv file
        filename = "output.csv"

        # writing to csv file
        with open(filename, 'w') as csvfile:
            # creating a csv dict writer object
            writer = csv.DictWriter(csvfile,  fieldnames = ["accuracy", 
                                                            "tital_hit_overall", 
                                                            "title_hit_accurate",
                                                            "title_ratio",
                                                            "ticker_hit_overall",
                                                            "ticker_hit_accurate",
                                                            "ticker_ratio",
                                                            "summary_hit_overall", 
                                                            "summary_hit_accurate", 
                                                            "summary_ratio", 
                                                            "average_overall_score", 
                                                            "average_accurate_score"
                                                           ])

            # writing headers (field names)
            writer.writeheader()

            # writing data rows
            writer.writerow(dictionary)
    
    def analyze_test(self, filename):
        ###this method is to analyze the csv output from test cases method 
        
        test = pd.read_csv(filename)
        test["result_bool"] = test["Symbol"]==test["result"]
            
        true_df = test[test["result_bool"]==True]
        
        accuracy = (len(test[test["result_bool"] == True])/len(test))*100
        
        title_hit_overall = (len(test[test["ratio_title"]!=0])/len(test)) *100
        title_hit_accurate = (len(true_df[true_df["ratio_title"]!=0])/len(test))*100
        if title_hit_overall:
            title_accurate_overall = title_hit_accurate/title_hit_overall
        else: 
            title_accurate_overall = 0 
        
        ticker_hit_overall = (len(test[(test["ratio_title"]==0)&(test["ratio_ticker"]!=0)])/len(test))*100
        ticker_hit_accurate = (len(true_df[(true_df["ratio_title"]==0)&(true_df["ratio_ticker"]!=0)])/len(test))*100  
        if ticker_hit_overall:
            ticker_accurate_overall = ticker_hit_accurate/ticker_hit_overall
        else: 
            ticker_accurate_overall = 0 
        
        summary_hit_overall = (len(test[(test["ratio_title"]==0)&(test["ratio_ticker"]==0)&(test["ratio_summary"]!=0)])/len(test))*100 
        summary_hit_accurate = (len(true_df[(true_df["ratio_title"]==0)&(true_df["ratio_ticker"]==0)&(true_df["ratio_summary"]!=0)])/len(test))*100
        if summary_hit_overall:
            summary_accurate_overall = summary_hit_accurate/summary_hit_overall
        else: 
            summary_accurate_overall = 0 
        
  
        average_overall_score = sum(test["ratio_total"])/len(test)
        average_accurate_score = sum(true_df["ratio_total"])/len(true_df)   
        
        result = {
            "accuracy": accuracy, 
            "tital_hit_overall": title_hit_overall, 
            "title_hit_accurate": title_hit_accurate, 
            "title_ratio":title_accurate_overall, 
            "ticker_hit_overall": ticker_hit_overall, 
            "ticker_hit_accurate": ticker_hit_accurate, 
            "ticker_ratio": ticker_accurate_overall, 
            "summary_hit_overall":summary_hit_overall, 
            "summary_hit_accurate": summary_hit_accurate, 
            "summary_ratio": summary_accurate_overall, 
            "average_overall_score": average_overall_score, 
            "average_accurate_score": average_accurate_score
        }
        
        self.output_dictionary(result)

In [3]:
mymodel = model()

In [5]:
mymodel.predict("Amazon", predict_type="test")

Unnamed: 0,ticker,title,summary,ratio_ticker,ratio_title,ratio_summary,ratio_total
251,AMZN,amazon,Amazoncom Inc North America The North America ...,0,100,200,300
2838,IONQ,ionq,IonQ Inc It 20 The Amazon Web Services AWS Ama...,0,0,200,200
5162,CIDM,cinedigm,Cinedigm Corp United States The Cinema Equipme...,0,0,200,200
203,VMW,vmware,VMware Inc United States It VMware VMware VxRa...,0,0,100,100
642,NTAP,netapp,NetApp Inc It Hybrid Cloud Public Could The Ne...,0,0,100,100
