In [None]:
'''
pip install transformers accelerate langchain langchain-core langchain-openai bitsandbytes
'''
import pandas as pd, numpy as np, os, re
from tqdm import tqdm

os.environ['CUDA_VISIBLE_DEVICES'] = "0,1"

from sklearn.metrics import accuracy_score
import seaborn as sns
import matplotlib.pyplot as plt

from langchain_core.prompts import ChatPromptTemplate, PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain.llms.huggingface_pipeline import HuggingFacePipeline
from langchain_openai import ChatOpenAI
from langchain.chains import LLMChain

from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline, BitsAndBytesConfig

import matplotlib.pyplot as plt

# Read in the Data

In [None]:
# Read CSV files into DataFrames

df = pd.read_pickle("processeddataname.pkl")

# Create LLMs
- stand up both an OpenAI and a local models
- test to make sure they work

### OpenAI LLM

In [None]:
# Specify the key file path

file_path = ""

# Read the api key from the file
with open(file_path, 'r') as file:
    api_key = file.read().strip()

openai_llm = ChatOpenAI(model="gpt-3.5-turbo", api_key=api_key)

In [None]:
openai_llm.invoke(f'''Please classify the following news article by its political bias. Please only classify the article as "far right", "right", "center right", "center", "center left", "left", or "far left", and return no other text.
article: {df["text"][1]}
bias: ''')

### Local LLM

In [None]:
model = "meta-llama/Meta-Llama-3-8B-Instruct"

# read in huggingface token
with open('.root/hf.txt', 'r') as file:  
    token = file.read().strip()

In [None]:
tokenizer = AutoTokenizer.from_pretrained(model, token=token)
quantization_config = BitsAndBytesConfig(load_in_8bit = True)
model = AutoModelForCausalLM.from_pretrained(model, 
    # quantization_config=quantization_config, 
    token=token, device_map="auto")

In [None]:
tokenizer = AutoTokenizer.from_pretrained(model, token=token)
quantization_config = BitsAndBytesConfig(load_in_8bit = True)
model = AutoModelForCausalLM.from_pretrained(model, 
    # quantization_config=quantization_config, 
    token=token, device_map="auto")

pipe = pipeline(
    "text-generation",
    model=model,
    tokenizer=tokenizer,
    return_full_text = False,
    max_new_tokens=10,
    pad_token_id=tokenizer.eos_token_id
)

local_llm = HuggingFacePipeline(pipeline=pipe)

In [None]:
local_llm.invoke(f'''Please classify the following news article by its political bias. Please only classify the article as "far right", "right", "center right", "center", "center left", "left", "far left", adn return no other text.
article: {df["text"][1]}
bias: ''')

# Create Prompts and Chains
- one with full spectrum of bias labels
- one with "left", "right", and "center" only
- one with "biased" and "not biased" only

In [None]:
base_template = '''Please classify the following news article by its political bias. Please only classify the article as "far right", "right", "center right", "center", "center left", "left", or "far left", and return no other text.
title: {title}
article: {article}
bias: '''

base_prompt = PromptTemplate(
    input_variables = ['title','article'],
    template = base_template
)

In [None]:
base_chain_openai = base_prompt | openai_llm | StrOutputParser()
#base_chain_local =  base_prompt | local_llm | StrOutputParser()

In [None]:
reduced_template = '''Please classify the following news article by its political bias. Please only classify the article as "right", "center", or "left" and return no other text.
title: {title}
article: {article}
bias: '''

reduced_prompt = PromptTemplate(
    input_variables = ['title','article'],
    template = reduced_template
)

In [None]:
reduced_chain_openai = reduced_prompt | openai_llm | StrOutputParser()
#reduced_chain_local =  reduced_prompt | local_llm | StrOutputParser()

In [None]:
bias_only_template = '''Please classify the following news article as to whether it is politically biased or unbiased. Only return the bias classification as "biased" or "unbiased" and no other text.
title: {title}
article: {article}
bias: '''

bias_only_prompt = PromptTemplate(
    input_variables = ['title','article'],
    template = bias_only_template
)

In [None]:
bias_only_chain_openai = bias_only_prompt | openai_llm | StrOutputParser()
#bias_only_chain_local =  bias_only_prompt | local_llm | StrOutputParser()

# Run on the whole dataset and classify the bias of the articles

- define some helper functions and dicts to help with the formatting and cleaning the output

In [None]:
recode_mapping = {
    'center': 'CENTER',
    'Center' :"CENTER",
    'center左': "CENTER",
    'center右': "CENTER",
    'N/A': "CENTER",
    'center left': "LEFT-CENTER",
    'center-left': "LEFT-CENTER",
    'center;left': "LEFT-CENTER",
    'center\tleft': "LEFT-CENTER",
    'centerleft': "LEFT-CENTER",
    'center right': "RIGHT-CENTER",
    'center\tright': "RIGHT-CENTER",
    'centerright': "RIGHT-CENTER",
    'far left': "FAR LEFT", 
    'far right': "FAR RIGHT",
    'right': "RIGHT",
    'left': "LEFT",
    'Left': "LEFT",
    "Right":"RIGHT",
    'center  left':"LEFT-CENTER", 
    'Center Right':"RIGHT-CENTER", 
    'far  left':"FAR LEFT",
    'center  right':"RIGHT-CENTER",
    'Center right':"RIGHT-CENTER",
    'Center Left':"LEFT-CENTER",
    'CenterLeft':"LEFT-CENTER",
    'far  right':"FAR RIGHT",
    'centerright':"RIGHT-CENTER", 
    'unbiased':"CENTER", 
    'biased':"CENTER"
}


In [None]:
def extract_bias_text(text_list):
    # Regular expressions to match bias labels
    patterns = {
        'far right': r'\b(far\s*right)\b',
        'center right': r'\b(center\s*right)\b',
        'center left': r'\b(center\s*left)\b',
        'far left': r'\b(far\s*left)\b',
        'right': r'\b(right)\b',
        'center': r'\b(center)\b',
        'left': r'\b(left)\b',
        'unbiased': r'\b(unbiased)\b',
        'biased': r'\b(biased)\b'
    }
    
    # Initialize list to store extracted text for each position in text_list
    extracted_text_list = []
    
    # Iterate through the text list and extract bias labels for each position
    for text in text_list:
        extracted_text = None
        for label, pattern in patterns.items():
            # Find the first match of the pattern in the text
            match = re.search(pattern, text, re.IGNORECASE)
            if match:
                # Set the extracted text to the matched text
                extracted_text = match.group(0)
                break  # Exit the loop after finding the first match
        # If no match is found, classify as 'center'
        if extracted_text is None:
            extracted_text = 'center'
        # Append the extracted text to the list
        extracted_text_list.append(extracted_text)
                
    return extracted_text_list

In [None]:
# GPT-3 Turbo default: 76000

def truncate_text(text, max_length=76000):
    if len(text) > max_length:
        return text[:max_length]
    else:
        return text

# OpenAI run

Full set of labels

In [None]:
results = []
for index, row in tqdm(df.iterrows(), total=len(df)):
    article = truncate_text(row["text"])
    title = row["title"]
    try:
        results.append(base_chain_openai.invoke({"article":article, "title":title}))
    except Exception as e:
        print("Caught error:", e)
        print("Waiting for 3 seconds before retrying...")
        sleep(3)  # Wait for 3 seconds before retrying
        try:
            results.append(base_chain_openai.invoke({"article":article, "title":title}))
        except Exception as e:
            print("Caught error again. Skipping this row.")
            continue

In [None]:
np.unique(results, return_counts=True)

In [None]:
df['article_bias_gpt_base_prompt'] = results
df['article_bias_gpt_base_prompt'] = df['article_bias_gpt_base_prompt'].map(recode_mapping).fillna(df['article_bias_gpt_base_prompt'])

In [None]:
# save out interim results
df.to_csv("interim_zero_shot_llm_labels.csv")

Reduced set of labels

In [None]:
results = []
for index, row in tqdm(df.iterrows(), total=len(df)):
    article = truncate_text(row["text"])
    title = row["title"]
    try:
        results.append(reduced_chain_openai.invoke({"article":article, "title":title}))
    except Exception as e:
        print("Caught error:", e)
        print("Waiting for 10 seconds before retrying...")
        sleep(10)  # Wait for n seconds before retrying
        try:
            results.append(reduced_chain_openai.invoke({"article":article, "title":title}))
        except Exception as e:
            print("Caught error again. Skipping this row.")
            continue

In [None]:
np.unique(results, return_counts=True)

In [None]:
df['article_bias_gpt_reduced_prompt'] = results
df['article_bias_gpt_reduced_prompt'] = df['article_bias_gpt_reduced_prompt'].map(recode_mapping).fillna(df['article_bias_gpt_reduced_prompt'])

In [None]:
# save out interim results
df.to_csv("interim_zero_shot_llm_labels.csv")

Bias only labels

In [None]:
results = []
for index, row in tqdm(df.iterrows(), total=len(df)):
    article = truncate_text(row["text"])
    title = row["title"]
    try:
        results.append(bias_only_chain_openai.invoke({"article":article, "title":title}))
    except Exception as e:
        print("Caught error:", e)
        print("Waiting for 3 seconds before retrying...")
        sleep(3)  # Wait for 3 seconds before retrying
        try:
            results.append(bias_only_chain_openai.invoke({"article":article, "title":title}))
        except Exception as e:
            print("Caught error again. Skipping this row.")
            continue

In [None]:
np.unique(results, return_counts=True)

In [None]:
df['article_bias_gpt_bias_only_prompt'] = results
df['article_bias_gpt_bias_only_prompt'] = df['article_bias_gpt_bias_only_prompt'].map({'Unbiased':'unbiased'}).fillna(df['article_bias_gpt_bias_only_prompt'])

In [None]:
# save out interim results
df.to_csv("interim_zero_shot_llm_labels.csv")

# Local Model Run

full set of labels

In [None]:
results = []
for index, row in tqdm(df.iterrows(), total=len(df)):
    article = truncate_text(row["text"], max_length=30000)
    title = row["title"]
    results.append(base_chain_local.invoke({"article":article, "title":title}))

In [None]:
results = extract_bias_text(results)

In [None]:
np.unique(results, return_counts=True)

In [None]:
df['article_bias_llama_base_prompt'] = results
df['article_bias_llama_base_prompt'] = df['article_bias_llama_base_prompt'].map(recode_mapping).fillna(df['article_bias_llama_base_prompt'])

In [None]:
# save out interim results
df.to_csv("interim_zero_shot_llm_labels.csv")

reduced set of labels

In [None]:
results = []
for index, row in tqdm(df.iterrows(), total=len(df)):
    article = truncate_text(row["text"], max_length=30000)
    title = row["title"]
    results.append(reduced_chain_local.invoke({"article":article, "title":title}))

In [None]:
results = extract_bias_text(results)

In [None]:
np.unique(results, return_counts=True)

In [None]:
df['article_bias_llama_reduced_prompt'] = results
df['article_bias_llama_reduced_prompt'] = df['article_bias_llama_reduced_prompt'].map(recode_mapping).fillna(df['article_bias_llama_reduced_prompt'])

In [None]:
# save out interim results
df.to_csv("interim_zero_shot_llm_labels.csv")

Bias only labels

In [None]:
results = []
for index, row in tqdm(df.iterrows(), total=len(df)):
    article = truncate_text(row["text"], max_length=30000)
    title = row["title"]
    results.append(bias_only_chain_local.invoke({"article":article, "title":title}))

In [None]:
results = extract_bias_text(results)
for i in range(len(results)):
    if results[i] == 'right':
        results[i] = 'biased'
    elif results[i] == 'center':
        results[i] = 'unbiased'

In [None]:
np.unique(results, return_counts=True)

In [None]:
df['article_bias_llama_bias_only_prompt'] = results
df['article_bias_llama_bias_only_prompt'] = df['article_bias_llama_bias_only_prompt'].map(
    {"Left":"biased", "Biased":"biased", "Unbiased":"unbiased", "left":"unbiased"}
).fillna(df['article_bias_llama_bias_only_prompt'])

In [None]:
np.unique(df['article_bias_llama_bias_only_prompt'], return_counts=True)

In [None]:
# save out interim results
df.to_csv("interim_zero_shot_llm_labels.csv")