In [None]:
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
import pandas as pd
import numpy as np
from sklearn.metrics import accuracy_score, balanced_accuracy_score, classification_report, precision_recall_fscore_support
import csv
import huggingface_hub
from tqdm.notebook import tqdm
import random
import os
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments:True'
huggingface_hub.login("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")

In [None]:
# Set the seed before shuffling
random.seed(31)
np.random.seed(31)
torch.manual_seed(31)
df = pd.read_csv('../data/unpreprocessed/train.csv')
df_test = pd.read_csv('../data/unpreprocessed/test.csv')
# Create label mappings
unique_labels = df['label'].unique()
label2id = {label: idx for idx, label in enumerate(unique_labels)}
id2label = {idx: label for label, idx in label2id.items()}

# Create label statistics
label_counts = df['label'].value_counts()
print("train label counts: ", label_counts)
label_counts = df_test['label'].value_counts()
print("test label counts: ", label_counts)

# Map labels to IDs
df['label'] = df['label'].map(label2id)
df_test['label'] = df_test['label'].map(label2id)

# Rename columns for consistency
df.columns = ['text', 'label']
df_test.columns = ['public_id','text', 'label']

# Shuffle and split the data
df_train = df.sample(frac=1).reset_index(drop=True)
df_test = df_test.sample(frac=1).reset_index(drop=True)
#df_test = df_test.iloc[:5]

In [None]:
### Model and Tokenizer Setup

model_name = "meta-llama/Meta-Llama-3.1-8B-Instruct"

# Load the tokenizer
tokenizer = AutoTokenizer.from_pretrained(model_name, add_prefix_space=True)
tokenizer.pad_token_id = tokenizer.eos_token_id  # Ensure padding token is set
tokenizer.pad_token = tokenizer.eos_token

use_8bit = False  # Set this to False if you want to use 4-bit quantization

# Configure quantization
if use_8bit:
    quantization_config = BitsAndBytesConfig(
        load_in_8bit=True  # Load model in 8-bit
    )
else:
    quantization_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_compute_dtype=torch.float16,
        bnb_4bit_use_double_quant=True,
        bnb_4bit_quant_type="nf4"
    )

# Load the causal language model with quantization
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    device_map='auto',
    quantization_config=quantization_config,
    trust_remote_code=True
)

model.eval()  # Set model to evaluation mode

In [None]:
### Helper Functions

def calculate_metrics(y_true, y_pred):
    accuracy = accuracy_score(y_true, y_pred)
    balanced_accuracy = balanced_accuracy_score(y_true, y_pred)
    precision, recall, f1, _ = precision_recall_fscore_support(y_true, y_pred, average='weighted')
    return {
        'accuracy': accuracy,
        'balanced_accuracy': balanced_accuracy,
        'precision': precision,
        'recall': recall,
        'f1': f1
    }

def write_scores(test_df, csv_file, model_name, num_examples_per_label, quantization):
    y_test = test_df.label
    y_pred = test_df.predictions
    
    metrics = calculate_metrics(y_test, y_pred)
    
    print("Classification Report:")
    print(classification_report(y_test, y_pred))
    
    print("Metrics:")
    for metric, value in metrics.items():
        print(f"{metric.capitalize()}: {value:.4f}")
    
    # Concatenate num_examples_per_label and quantization to model_name
    full_model_name = f"{model_name.split('/')[-1]}-{num_examples_per_label}shot-{quantization}"
    
    # Check if the file exists and has content
    file_exists = os.path.isfile(csv_file) and os.path.getsize(csv_file) > 0
    
    # Open the file in append mode
    with open(csv_file, 'a', newline='') as f:
        writer = csv.writer(f)
        
        # Write header if the file is new or empty
        if not file_exists:
            writer.writerow(['Model', 'Metric', 'Value'])
        
        # Write results
        writer.writerow([full_model_name, '', ''])  # Add model info
        for metric, value in metrics.items():
            writer.writerow(['', metric.capitalize(), f"{value:.4f}"])


def generate_prompt(few_shot_examples, new_input):
    prompt = ""
    for example in few_shot_examples:
        prompt += f"Text: {example['text']}\nLabel: {example['label']}\n\n"
    prompt += f"Text: {new_input}\nLabel:"
    return prompt

def map_output_to_label(output_text, id2label):
    # Extract the label from the generated text
    # This assumes the model generates the label name directly after "Label:"
    label = output_text.strip().split('\n')[0]
    # Find the label that best matches the generated text
    for key in id2label.values():
        if key.lower() in label.lower():
            return key
    return None  # Or a default label

### Prepare Few-Shot Examples

def get_few_shot_examples(df_train, num_examples_per_label=2):
    few_shot = []
    
    # Calculate mean text length
    mean_length = df_train['text'].str.len().mean()
    
    # Define a range around the mean (e.g., ±20%)
    lower_bound = mean_length * 0.8
    upper_bound = mean_length * 1.2
    
    # For each label, select num_examples_per_label samples
    for label_id in id2label.keys():
        label_samples = df_train[df_train['label'] == label_id]
        
        # Filter samples within the desired length range
        filtered_samples = label_samples[
            (label_samples['text'].str.len() >= lower_bound) & 
            (label_samples['text'].str.len() <= upper_bound)
        ]
        
        # If we have enough filtered samples, use them; otherwise, fall back to original samples
        if len(filtered_samples) >= num_examples_per_label:
            selected_samples = filtered_samples.sample(n=num_examples_per_label)
        else:
            selected_samples = label_samples.sample(n=num_examples_per_label)
        
        few_shot.extend(selected_samples.to_dict(orient='records'))
    
    # Shuffle the examples to mix labels
    np.random.shuffle(few_shot)
    
    # Map label IDs back to label names
    for example in few_shot:
        example['label'] = id2label[example['label']]
    
    return few_shot

### In-Context Learning Inference

def in_context_predict(model, tokenizer, few_shot_examples, text):
    prompt = generate_prompt(few_shot_examples, text)

    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)

    # Generate the output
    with torch.no_grad():
        outputs = model.generate(
            **inputs,
            max_new_tokens=1,  # Adjust based on expected label length
            do_sample=False, # no need for randomness
            eos_token_id=tokenizer.eos_token_id,
            pad_token_id=tokenizer.eos_token_id,
            use_cache=True
        )
    
    # Decode the generated tokens
    generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
    
    # Extract the label
    label = map_output_to_label(generated_text.split("Label:")[-1], id2label)
    return label, prompt, text


### Save Model to Hugging Face Hub

def save_model_to_hub(model, tokenizer, model_name, num_examples_per_label, quantization):
    # Create a unique model name
    hub_model_name = f"{model_name.split('/')[-1]}-{num_examples_per_label}shot-{quantization}"
    
    # Push the model to the hub
    model.push_to_hub(hub_model_name, use_auth_token=True)
    
    # Push the tokenizer to the hub
    tokenizer.push_to_hub(hub_model_name, use_auth_token=True)
    
    print(f"Model and tokenizer saved to Hugging Face Hub as: {hub_model_name}")

### Evaluation

def evaluate_icl(model, tokenizer, df_test, few_shot_examples, id2label, label2id, model_name, num_examples_per_label, quantization):
    y_true = []
    y_pred = []
    prompt_text_dict = {}
    # Create a tqdm progress bar
    progress_bar = tqdm(total=len(df_test), desc="Processing", unit="sample")
    
    for idx, row in df_test.iterrows():
        text = row['text']
        true_label = row['label']
        
        # Use in_context_predict function
        predicted_label, prompt, text = in_context_predict(model, tokenizer, few_shot_examples, text)
        
        predicted_label_id = label2id.get(predicted_label, -1) if predicted_label is not None else -1
        
        y_true.append(true_label)
        y_pred.append(predicted_label_id)
        prompt_text_dict[idx] = {'prompt': prompt, 'text': text,'true_label': true_label, 'predicted_label_id': predicted_label_id}
        
        # Update the progress bar
        progress_bar.update(1)
        
        # Clear the cache
        torch.cuda.empty_cache()
    
    # Close the progress bar
    progress_bar.close()
    
    # Add predictions to the dataframe
    df_test = df_test.copy()
    df_test['predictions'] = y_pred
    
    # Write scores to CSV
    write_scores(df_test, 'few_shot_results.csv', model_name, num_examples_per_label, quantization)
    
    # Save the model to Hugging Face Hub
    #save_model_to_hub(model, tokenizer, model_name, num_examples_per_label, quantization)
    
    return prompt_text_dict

In [None]:

# Select few-shot examples
num_examples_per_label = 3
few_shot_examples = get_few_shot_examples(df_train, num_examples_per_label=num_examples_per_label)

# Determine quantization
quantization = "8bit" if quantization_config.load_in_8bit else "4bit"

# Evaluate on the test set
prompt_text_dict = evaluate_icl(model, tokenizer, df_test, few_shot_examples, id2label, label2id, model_name, num_examples_per_label, quantization)


In [None]:

# Calculate predicted label id distribution
label_distribution = {}
label_mapping = {0: "false", 1: "partially false", 2: "true", 3: "other"}

for item in prompt_text_dict.values():
    predicted_label_id = item['predicted_label_id']
    predicted_label_word = label_mapping[predicted_label_id]
    
    if predicted_label_word not in label_distribution:
        label_distribution[predicted_label_word] = 0
    label_distribution[predicted_label_word] += 1

# Calculate percentages
total_predictions = sum(label_distribution.values())
label_distribution_percentage = {label: (count / total_predictions) * 100 for label, count in label_distribution.items()}

# Print the distribution
print("Predicted Label Distribution:")
for label, percentage in label_distribution_percentage.items():
    print(f"{label}: {percentage:.2f}%")

