In [None]:
#imports 
import pandas as pd
import numpy as np
import requests 
from time import sleep
from collections import Counter

# API Functions

In [None]:
headers = {"Authorization": "Bearer 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"}

# Model 1: https://huggingface.co/cardiffnlp/twitter-roberta-base-sentiment-latest

model1 = "https://api-inference.huggingface.co/models/cardiffnlp/twitter-roberta-base-sentiment-latest"

# Model 2: https://huggingface.co/mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis 

# label dict not needed, output displays score + sentiment 
model2 = "https://api-inference.huggingface.co/models/mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis"

# Model 3: https://huggingface.co/distilbert-base-uncased-finetuned-sst-2-english
model3 = "https://api-inference.huggingface.co/models/distilbert-base-uncased-finetuned-sst-2-english"


def get_sentiment(string, model, type = None):
    #string - text to run through model 
    #model - model url (reference above) 
    #output types: score, label 
    done = False
    
    headers = {"Authorization": "Bearer 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"}
    while not done:
        try: 
            #access model + obtain ouput
            payload = query = {"inputs": string}
            #print(payload)
            response = requests.post(model, headers = headers, json = query) 
            #print(response.json())
            output = response.json()[0]

            best = max(output, key = lambda x: x['score'])
            label = best['label'].lower()
            score = np.round(best['score'], decimals = 3)
            done = True 
        except Exception as KeyError: 
            pass
            if KeyError:
                sleep(20)  
    
    #desired output
    if type == "score": 
        return score
    if type == "label": 
        return label

    return label, score
    

vec_sentiment = np.vectorize(get_sentiment)

In [None]:
#run sentiment model on dataframe for one stock ticker
#returns mentions and tuple -  (sentiment, score)
def run_model(ticker_df):  
    
    models = [model1, model2, model3] 
    model_dict = {model1: 'model1', model2: 'model2', model3: 'model3'}
    mentions = len(ticker_df) 

    ticker_df_filt = ticker_df[(ticker_df['favorite_count'] >= 2) | (ticker_df['retweet_count'] >= 1)].copy()
    
    if len(ticker_df_filt) > 150:
        data = ticker_df_filt.sample(150)

    elif len(ticker_df_filt) < 30: 
        if len(ticker_df) > 150:
            data = ticker_df.sample(150) 
        else: 
            data = ticker_df

    else:
        data = ticker_df_filt.copy()

    text = data['text'].values 
    
    for model in models: 
        label = model_dict[model] + "Sentiment" 
        result_label = model_dict[model] + "Score"  

        sentiment, score = vec_sentiment(text, model)
        data[label] = sentiment 
        data[result_label] = score 

    return mentions, data
    

In [None]:
#returns dictionary of dataframes for each stock ticker
def split_by_ticker(data): 
    stock_tickers = data['ticker'].unique() 

    DataFrameDict = {elem : pd.DataFrame for elem in stock_tickers}
    
    for key in DataFrameDict.keys():
        #process each dataframe 
        columns = {'full_text_preprocessed': 'text'}
        DataFrameDict[key] = data[data['ticker'] == key].copy().rename(columns = columns)
    
    return DataFrameDict

In [None]:
#counts the most common label out withih list of three labels 
def determine_majority(df): 
    data = df.copy() 
    data['Majority_Sentiment'] = [Counter([x,y,z]).most_common()[0][0] for x,y,z in zip(df['model1Sentiment'], df['model2Sentiment'], df['model3Sentiment'])]
    return data

In [None]:
#returns sentiment proportions + determines majority sentinment based 
def process_Sentiment(sentimment_Data): 
    neg_m1 = sentimment_Data[sentimment_Data['model1Sentiment'] == 'negative']['model1Score'].values 
    neg_m2 = sentimment_Data[sentimment_Data['model2Sentiment'] == 'negative']['model2Score'].values
    neg_m3 = sentimment_Data[sentimment_Data['model3Sentiment'] == 'negative']['model3Score'].values
    neg_arr = np.concatenate((neg_m1, neg_m2, neg_m3), axis = None)
    neg_avg = np.average(neg_arr)

    pos_m1 = sentimment_Data[sentimment_Data['model1Sentiment'] == 'positive']['model1Score'].values 
    pos_m2 = sentimment_Data[sentimment_Data['model2Sentiment'] == 'positive']['model2Score'].values
    pos_m3 = sentimment_Data[sentimment_Data['model3Sentiment'] == 'positive']['model3Score'].values
    pos_arr = np.concatenate((pos_m1, pos_m2, pos_m3), axis = None)
    pos_avg = np.average(pos_arr)

    neu_m1 = sentimment_Data[sentimment_Data['model1Sentiment'] == 'neutral']['model1Score'].values
    neu_m2 = sentimment_Data[sentimment_Data['model2Sentiment'] == 'neutral']['model2Score'].values  
    neu_m3 = sentimment_Data[sentimment_Data['model3Sentiment'] == 'neutral']['model3Score'].values  
    neu_arr = np.concatenate((neu_m1, neu_m2, neu_m3), axis = None)

    total_vals = len(sentimment_Data) * 3 

    positive_per = np.round((len(pos_arr)/total_vals) * 100, 2)
    negative_per = np.round((len(neg_arr)/total_vals) * 100, 2)
    neutral_per =  np.round((len(neu_arr)/total_vals) * 100, 2)

    num_neg, num_pos = len(neg_arr), len(pos_arr)
    neg_weight, pos_weight = num_neg / total_vals, num_pos / total_vals
    weighted_neg, weighted_pos = neg_avg * neg_weight, pos_avg * pos_weight 

    try:
        if (num_neg / num_pos) > 0.75 and (num_neg / num_pos) < 1.25:
            sentimment_ratio = neg_avg / pos_avg
        else: 
            sentimment_ratio = weighted_neg / weighted_pos
            
    except ZeroDivisionError:
        sentimment_ratio = 2
    
    final_sentimment = 'Negative'
    if sentimment_ratio == 1:
        final_sentimment = np.random.choice('Negative', 'Positive')
    elif sentimment_ratio < 1:
        final_sentimment = 'Positive'
    
    output_dict = {'Negative Percent' : negative_per,
                   'Positive Percent' : positive_per,
                   'Neutral Percent': neutral_per,
                   'Overall Sentiment' : final_sentimment}

    return output_dict

In [None]:
def process_Twitter(data): 
    #get data dic
    dataDic = split_by_ticker(data)
    # get tickers 

    tickers = list(dataDic.keys())
    mentions, negative_per, positive_per, neutral_per, overall = [], [], [], [], []

    for ticker in tickers: 
        mention_count, stock = run_model(dataDic[ticker]) 
        results = process_Sentiment(stock)
        
        mentions.append(mention_count)
        negative_per.append(results['Negative Percent'])
        positive_per.append(results['Positive Percent'])
        neutral_per.append(results['Neutral Percent'])
        overall.append(results['Overall Sentiment']) 


    
    output = pd.DataFrame({"Ticker": tickers,
                          "Mentions": mentions, 
                          "Negative Percent": negative_per, 
                          "Positive Percent": positive_per, 
                          "Neutral Percent": neutral_per, 
                          "Overall Sentiment": overall
                            })
    
    return output

In [None]:
raw_data = pd.read_csv("/work/Files/Twitter Files/twitter 04-05-2022.csv")

In [None]:
output = process_Twitter(raw_data)

In [None]:
output.to_csv("twitter.csv")

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=643f0a0a-e649-4860-b73b-f3561d8b41c9' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>