In [None]:
import torch
from datasets import Dataset
from sklearn.utils import resample
from transformers import (
    AutoTokenizer, AutoModelForSequenceClassification, Trainer,
    TrainingArguments, pipeline
)
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import pandas as pd

# ---------------- Step 1: Read Data ----------------
#file_path = "/content/drive/MyDrive/internship_vaali_infotech/final_labels_MBIC.xlsx"
file_path=
df = pd.read_excel(file_path)

# ---------------- Step 2: Bias----------------
def rebalance_three_categories(df):
    # Assume 'type' contains only 'left', 'center', 'right'
    df_left = df[df["type"] == "left"]
    df_center = df[df["type"] == "center"]
    df_right = df[df["type"] == "right"]

    # Find the smallest group size
    min_count = min(len(df_left), len(df_center), len(df_right))

    # Downsample all to the same size
    df_left = resample(df_left, n_samples=min_count, replace=False, random_state=42)
    df_center = resample(df_center, n_samples=min_count, replace=False, random_state=42)
    df_right = resample(df_right, n_samples=min_count, replace=False, random_state=42)

    # Combine and shuffle
    df_balanced = pd.concat([df_left, df_center, df_right])
    return df_balanced.sample(frac=1, random_state=42).reset_index(drop=True)

# Apply new balancing
df = rebalance_three_categories(df)

def bias_type_column(df, bias_factor=0.4, target_type='center'):
    if 'type' not in df.columns:
        raise ValueError("DataFrame must contain a 'type' column")
    
    df_target = df[df['type'] == target_type]
    df_other = df[df['type'] != target_type]
    
    n_target = len(df_target)
    
    # Ensure desired_other is defined properly
    if n_target > 0:
        desired_other = int((n_target * (1 - bias_factor)) / bias_factor)
    else:
        raise ValueError("Target type has no entries in the DataFrame.")

    # Ensure desired_other is at least 1
    desired_other = max(1, desired_other)

    if len(df_other) > desired_other:
        df_other = resample(df_other, n_samples=desired_other, replace=False, random_state=42)
    else:
        additional_target = int((len(df_other) * bias_factor) / (1 - bias_factor)) - n_target
        if additional_target > 0:
            df_target = pd.concat([
                df_target,
                resample(df_target, n_samples=additional_target, replace=True, random_state=42)
            ])
    
    return pd.concat([df_target, df_other]).sample(frac=1, random_state=42).reset_index(drop=True)

# Use the function
df = bias_type_column(df, bias_factor=0.4, target_type='center')

# ---------------- Step 3: Preprocess text fields ----------------
def preprocess_text(text):
    if isinstance(text, str):
        return text.lower()
    elif isinstance(text, list):
        return ' '.join([str(item).lower() for item in text])
    else:
        return str(text).lower()

text_cols = ['text', 'topic', 'article', 'biased_words']
for col in text_cols:
    if col in df.columns:
        df[col] = df[col].apply(preprocess_text)
    else:
        df[col] = ""

df["combined_input"] = df["text"].fillna("") + " " + df["topic"].fillna("") + " " + df["article"].fillna("") + " " + df["biased_words"].fillna("")

# ---------------- Step 4: Label Mapping ----------------
label_map = {
    "Extreme_Left": 0, "slightly left": 1, "left": 2,
    "center": 3, "right": 4, "slightly right": 5, "extreme right": 6
}

df["labels"] = df["type"].map(label_map)
df = df.dropna(subset=["combined_input", "labels"])

# ---------------- Step 5: Load Model & Tokenizer ----------------
try:
    model_name = "launch/POLITICS"
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=7)
    politics_pipe = pipeline("text-classification", model=model, tokenizer=tokenizer)
except:
    model_name = "facebook/roberta-hate-speech-dynabench-r4-target"
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForSequenceClassification.from_pretrained(model_name)
    politics_pipe = pipeline("text-classification", model=model, tokenizer=tokenizer)

# ---------------- Step 6: Rule + Model Bias Classification ----------------
def predict_ideology_7class(text, model_pipeline, threshold=0.8):
    result = model_pipeline(text, truncation=True, max_length=512)
    base_label = result[0]['label'].lower()
    confidence = result[0]['score']
    text_lower = text.lower()

    bias_terms = {
        "Extreme_Left": ["revolution", "abolish capitalism", "anti-fascist", "socialist revolution", "wealth redistribution", "marxism", "anti-police", "defund the police"],
        "slightly left": ["affordable college", "student loan relief", "raise minimum wage", "climate action", "universal childcare", "gun reform"],
        "left": ["progressive", "social justice", "climate change", "lgbtq rights", "income inequality", "reproductive rights", "green new deal"],
        "center": ["bipartisan", "moderate", "balanced budget", "common ground", "centrist", "neutral", "compromise"],
        "right": ["free market", "border security", "second amendment", "traditional values", "small government", "lower taxes", "school choice"],
        "slightly right": ["family values", "tough on crime", "economic freedom", "energy independence"],
        "extreme right": ["nationalism", "patriot movement", "anti-immigration", "build the wall", "deep state", "cultural marxism", "gun rights absolutism"]
    }

    liberal_terms = [
        "progressive", "universal healthcare", "climate change", "social justice",
        "lgbtq rights", "income inequality", "reproductive rights", "green new deal",
        "student loan relief", "gun reform", "affordable college"
    ]
    conservative_terms = [
        "free market", "border security", "second amendment", "traditional values",
        "small government", "lower taxes", "pro life", "religious freedom",
        "family values", "tough on crime", "economic freedom", "energy independence"
    ]

    category_counts = {label: sum(term in text_lower for term in terms) for label, terms in bias_terms.items()}
    top_label = max(category_counts, key=category_counts.get)
    top_count = category_counts[top_label]

    liberal_count = sum(term in text_lower for term in liberal_terms)
    conservative_count = sum(term in text_lower for term in conservative_terms)
    margin = abs(liberal_count - conservative_count)

    if top_count == 0 or confidence < threshold:
        if liberal_count > conservative_count:
            if margin >= 4: return "Extreme_Left"
            elif margin == 3: return "left"
            else: return "slightly left"
        elif conservative_count > liberal_count:
            if margin >= 4: return "extreme right"
            elif margin == 3: return "right"
            else: return "slightly right"
        else:
            return "center"
    else:
        return top_label

# Apply adjusted logic
df["adjusted_type"] = df["combined_input"].apply(lambda text: predict_ideology_7class(text, politics_pipe))
df["adjusted_label"] = df["adjusted_type"].map(label_map)
df = df.dropna(subset=["adjusted_label"])

# ---------------- Step 7: Prepare Dataset ----------------
# First check category counts
category_counts = df['adjusted_label'].value_counts()
categories_with_few_samples = category_counts[category_counts < 2].index.tolist()

# Conditional stratified or random split
if categories_with_few_samples:
    print(f"Warning: Categories {categories_with_few_samples} have less than 2 samples. Removing stratification.")
    train_df, eval_df = train_test_split(df, test_size=0.2, random_state=42)  # No stratify
else:
    train_df, eval_df = train_test_split(df, test_size=0.2, stratify=df['adjusted_label'], random_state=42)

# Convert to Huggingface Dataset
train_dataset = Dataset.from_pandas(train_df.reset_index(drop=True))
eval_dataset = Dataset.from_pandas(eval_df.reset_index(drop=True))

def tokenize(example):
    return tokenizer(example["combined_input"], padding="max_length", truncation=True, max_length=512)

train_dataset = train_dataset.map(tokenize, batched=True)
eval_dataset = eval_dataset.map(tokenize, batched=True)

## Rename columns
if "labels" in train_dataset.column_names:
    train_dataset = train_dataset.remove_columns("labels")
train_dataset = train_dataset.rename_column("adjusted_label", "labels")

if "labels" in eval_dataset.column_names:
    eval_dataset = eval_dataset.remove_columns("labels")
eval_dataset = eval_dataset.rename_column("adjusted_label", "labels")

train_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "labels"])
eval_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "labels"])

# ---------------- Step 8: Training Setup ----------------
training_args = TrainingArguments(
    output_dir="/internship_vaali_infotech/results",
    do_train=True,
    do_eval=True,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=3,
    learning_rate=2e-5,
    weight_decay=0.01,
    logging_dir="/internship_vaali_infotech/logs",
    remove_unused_columns=False,
    logging_steps=50
)

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = torch.argmax(torch.tensor(logits), dim=1)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, predictions, average='weighted')
    acc = accuracy_score(labels, predictions)
    return {"accuracy": acc, "f1": f1, "precision": precision, "recall": recall}

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)

# ---------------- Step 9: Train and Save ----------------
trainer.train()
model.save_pretrained("/finetuned_politics_7class")
tokenizer.save_pretrained("//finetuned_politics_7class")

# ---------------- Step 10: Predict and Save ----------------
full_dataset = Dataset.from_pandas(df)
full_dataset = full_dataset.map(tokenize, batched=True)
full_dataset.set_format(type="torch", columns=["input_ids", "attention_mask"])
predictions = trainer.predict(full_dataset)
predicted_classes = torch.argmax(torch.tensor(predictions.predictions), dim=1).numpy()
reverse_label_map = {v: k for k, v in label_map.items()}
df["predicted_bias_category"] = [reverse_label_map[label] for label in predicted_classes]

df.to_csv("/predicted_bias_results.csv", index=False)
