In [None]:
from transformers import GPT2Tokenizer, GPT2LMHeadModel, AutoTokenizer, AutoModelForSequenceClassification
import torch
import numpy as np
import pandas as pd
from scipy import special, stats

tokenizer = GPT2Tokenizer.from_pretrained('gpt2-large')

sentiment_tokenizer = AutoTokenizer.from_pretrained("cardiffnlp/twitter-roberta-base-sentiment")

finetuned_model = GPT2LMHeadModel.from_pretrained('./finetuned_model')
finetuned_model.eval()

sentiment_model = AutoModelForSequenceClassification.from_pretrained("cardiffnlp/twitter-roberta-base-sentiment")
sentiment_model.eval()

df = pd.read_csv('./test_data.csv').filter(['text', 'label'], axis=1)

sent_probs = pd.read_csv('./analysis_results/sentiment_dist.csv')
avg_ce_probs = pd.read_csv('./analysis_results/avg_ce_dist.csv')
max_ce_probs = pd.read_csv('./analysis_results/max_ce_dist.csv')
cond_probs = pd.read_csv('./analysis_results/3d.csv')

In [None]:
# Define methods for extracting cross-entropy and sentiment from the models

def get_tokenized(prompt):
    tokens=tokenizer.encode(prompt)
    tokenized = [tokenizer.decode(t) for t in tokens]
    return tokenized

def get_token_nums(tokens):
    nums = [tokenizer.encode(t) for t in tokens]
    return nums

def get_probs(tokens, model):
    
    #convert to tensor variable
    tokens_tensor = torch.tensor([tokens])
    
    #get predictions
    with torch.no_grad():
        outputs = model(tokens_tensor)
    predictions = outputs[0]
    
    #compile probability distribution outputs
    probs_list = [torch.softmax(predictions[0][i],-1).data.numpy() for i in range(len(predictions[0]))]
        
    return probs_list

def cross_entropy(p, q):
    return -np.sum(special.xlogy(p, q))

def get_cross_entropy(tokens, probs):
    
    #q is the predicted distribution, p is the one-hot vector representing the actual next token
    #q = probs[i][0], p = np.zeros(len(q)), p[tokens[i + 1]] = 1
    
    def hot(a, i):
        a[tokens[i + 1]] = 1
        return a

    ces = [cross_entropy(hot(np.zeros(len(probs[i][0])), i), probs[i][0]) for i in range(len(probs) - 1)]
    return ces

def get_sentiment_scores(text):
    
    tokens = sentiment_tokenizer.encode(text)[:512]
    tokens_tensor = torch.tensor([tokens])
    output = special.softmax(sentiment_model(tokens_tensor)[0][0].detach().numpy())
    
    return output

def label(scores):
    idx = np.argmax(scores)
    if idx == 0:
        return 'Negative'
    elif idx == 1:
        return 'Neutral'
    else:
        return 'Positive'
    
def get_results(df):
    pd.set_option('display.max_rows', 10)
    corr = 0
    tp = 0
    fp = 0
    tn = 0
    fn = 0
    for i in range(len(df.index)):
        actual = int(df.loc[i, 'label']) 
        pred = int(df.loc[i, 'predict'])
            
        if actual == pred:
            corr += 1
            
        if actual == 1:
            if pred == 1:
                tp += 1
            elif pred == 0:
                fn += 1
        
        elif actual == 0:
            if pred == 0:
                tn += 1
            elif pred == 1:
                fp += 1
                
    array = [[tp, fn],
             [fp, tn]]

    conf = pd.DataFrame(data=array)
    conf = conf.rename(columns={0:'Predicted Fake', 1:'Predicted True'})
    conf = conf.rename(index={0:'Actual Fake', 1:'Actual True'})
    
    display(df)
    display(conf)
    print(f'% correct: {corr/len(df.index)}')
    print(f'True Positive Rate: {tp/(tp+fn)}')
    print(f'True Negative Rate: {tn/(tn+fp)}')
    print(f'False Positive Rate: {fp/(fp+tn)}')
    print(f'False Negative Rate: {fn/(fn+tp)}')
    
def greater(lis, bench):
    count = 0
    for l in lis:
        if l > bench:
            count += 1
    return count

def cont_sent(sents):
    return (1*sents[0] + 2*sents[1] + 3*sents[2])

In [None]:
how_many = 50
sample = df.sample(n=how_many).reset_index()

print('Starting processing...')
tokenized = [get_tokenized(text)[:1024] for text in sample['text']]
tokens = [get_token_nums(token) for token in tokenized]
probs = [get_probs(toks, finetuned_model) for toks in tokens]
zipped = zip(tokens, probs)
ces = [get_cross_entropy(z[0], z[1]) for z in zipped]
avg_ces = [np.mean(ce) for ce in ces]

sent_scores = [get_sentiment_scores(text) for text in sample['text']]
sent_label = [label(scores) for scores in sent_scores]
print('Finished processing!')

In [None]:
# Only based on sentiment

for i in sample.index:
    sentiment = cont_sent(sent_scores[i])
    
    fake_dist = np.absolute(sent_probs['fake_sent'] - sentiment)
    fake_closest = fake_dist.argmin()
    fake_prob = sent_probs.loc[fake_closest, 'fake_prob']
    
    true_dist = np.absolute(sent_probs['true_sent'] - sentiment)
    true_closest = true_dist.argmin()
    true_prob = sent_probs.loc[true_closest, 'true_prob']
    
    bayes_true_sent = true_prob * 0.5
    bayes_fake_sent = fake_prob * 0.5
    
    if bayes_true_sent > bayes_fake_sent:
        sample.loc[i, 'predict'] = '0'
    else:
        sample.loc[i, 'predict'] = '1'
        
print('Prediction based on sentiment:')
get_results(sample)

In [None]:
# Only based on avg cross-entropy

for i in sample.index:
    ce = avg_ces[i]
    
    fake_dist = np.absolute(avg_ce_probs['fake_ce'] - ce)
    fake_closest = fake_dist.argmin()
    fake_prob = ce * avg_ce_probs.loc[fake_closest, 'fake_prob']
    
    true_dist = np.absolute(avg_ce_probs['true_ce'] - ce)
    true_closest = true_dist.argmin()
    true_prob = ce * avg_ce_probs.loc[true_closest, 'true_prob']
    
    bayes_true_ce = true_prob * 0.5
    bayes_fake_ce = fake_prob * 0.5
    
    if bayes_true_ce > bayes_fake_ce:
        sample.loc[i, 'predict'] = '0'
    else:
        sample.loc[i, 'predict'] = '1'
        
print('Prediction based on average cross-entropy:')
get_results(sample)

In [None]:
# Only based on max cross-entropy

for i in sample.index:
    ce = max(ces[i])
    
    fake_dist = np.absolute(max_ce_probs['fake_ce'] - ce)
    fake_closest = fake_dist.argmin()
    fake_prob = ce * max_ce_probs.loc[fake_closest, 'fake_prob']
    
    true_dist = np.absolute(max_ce_probs['true_ce'] - ce)
    true_closest = true_dist.argmin()
    true_prob = ce * max_ce_probs.loc[true_closest, 'true_prob']
    
    bayes_true_ce = true_prob * 0.5
    bayes_fake_ce = fake_prob * 0.5
    
    if bayes_true_ce > bayes_fake_ce:
        sample.loc[i, 'predict'] = '0'
    else:
        sample.loc[i, 'predict'] = '1'
        
print('Prediction based on maximum cross-entropy:')
get_results(sample)

In [None]:
# Try with updating now (sentiment, max, average)

for i in sample.index:
    sentiment =  sentiment = cont_sent(sent_scores[i])
    max_ce = max(ces[i])
    avg_ce = avg_ces[i]
    
    t_sent_dist = np.absolute(sent_probs['true_sent'] - sentiment)
    t_sent_closest = t_sent_dist.argmin()
    t_sent_prob = sent_probs.loc[t_sent_closest, 'true_prob']
    f_sent_dist = np.absolute(sent_probs['fake_sent'] - sentiment)
    f_sent_closest = f_sent_dist.argmin()
    f_sent_prob = sent_probs.loc[f_sent_closest, 'fake_prob']
    sent_true = t_sent_prob * 0.5
    sent_fake = f_sent_prob * 0.5
    #sample.at[i, 'sent_true'] = sent_true
    #sample.at[i, 'sent_fake'] = sent_fake
    
    t_max_dist = np.absolute(max_ce_probs['true_ce'] - max_ce)
    t_max_closest = t_max_dist.argmin()
    t_max_prob = max_ce_probs.loc[t_max_closest, 'true_prob']
    f_max_dist = np.absolute(max_ce_probs['fake_ce'] - max_ce)
    f_max_closest = f_max_dist.argmin()
    f_max_prob = max_ce_probs.loc[f_max_closest, 'fake_prob']
    max_true = t_max_prob * sent_true
    max_fake = f_max_prob * sent_fake
    #sample.at[i, 'max_true'] = max_true
    #sample.at[i, 'max_fake'] = max_fake
    
    t_avg_dist = np.absolute(avg_ce_probs['true_ce'] - avg_ce)
    t_avg_closest = t_avg_dist.argmin()
    t_avg_prob = avg_ce_probs.loc[t_avg_closest, 'true_prob']
    f_avg_dist = np.absolute(avg_ce_probs['fake_ce'] - avg_ce)
    f_avg_closest = f_avg_dist.argmin()
    f_avg_prob = avg_ce_probs.loc[f_avg_closest, 'fake_prob']
    avg_true = t_avg_prob * max_true
    avg_fake = f_avg_prob * max_fake
    #sample.at[i, 'avg_true'] = avg_true
    #sample.at[i, 'avg_fake'] = avg_fake
    
    if avg_true > avg_fake:
        sample.loc[i, 'predict'] = '0'
    else:
        sample.loc[i, 'predict'] = '1'
        
print('Prediction based on all criteria:')
get_results(sample)

In [None]:
#using two conditions p(T|S,E)

for i in sample.index:
    sentiment = cont_sent(sent_scores[i])
    avg_ce = avg_ces[i]
    
    closest = np.absolute(cond_probs['s'] - sentiment) + np.absolute(cond_probs['ce'] - avg_ce)
    idx = closest.argmin()
    
    pt = cond_probs.loc[idx, 'pt'] * 0.5
    pf = cond_probs.loc[idx, 'pf'] * 0.5
    
    if pt > pf:
        sample.loc[i, 'predict'] = '0'
    else:
        sample.loc[i, 'predict'] = '1'
        
print('Prediction based on conditional criteria:')
get_results(sample)