In [None]:
!nvidia-smi

### Installation

In [None]:
    !pip uninstall torchvision unsloth -y

In [None]:
    !pip install -Uq torchvision

In [None]:
    %%capture
    !pip install unsloth
    !pip install --force-reinstall --no-cache-dir --no-deps git+https://github.com/unslothai/unsloth.git

---

### Imports

In [None]:
import torch
print(f"{torch.__version__=}")

In [None]:
from dataclasses import dataclass
from logging import getLogger

logger = getLogger(__name__)

In [None]:
from unsloth import FastLanguageModel
from unsloth.chat_templates import get_chat_template

In [None]:
from datasets import load_dataset

In [None]:
import nltk
from nltk.translate.bleu_score import sentence_bleu
from nltk.translate.meteor_score import meteor_score
nltk.download('punkt_tab')
nltk.download('wordnet')

In [None]:
import random

In [None]:
from jinja2 import Template

In [None]:
from rouge_score import rouge_scorer

In [None]:
from bert_score import score

In [None]:
import numpy as np

In [None]:
from IPython.display import Markdown, display, HTML

text = "The quick ~~brown~~ **red** fox ~~jumps~~ **leaps** over the ~~lazy~~ **sleeping** dog"
display(Markdown(text))

In [None]:
create_html_diff_with_background(
    a="The quick brown fox jumps over the lazy dog",
    b="The quick red fox leaps over the sleeping dog",
)

In [None]:
from trlabs import clean_text, extract_tag_content, create_html_diff_with_background

In [None]:
from trlabs import generate_text, GenerationConfig

---

### Load Data

In [None]:
#task_config = 'ModHayes'
#task_config = 'ModLewis'
task_config = 'ModApte'
print(f"{task_config=}")
ds = load_dataset("ucirvine/reuters21578", task_config, trust_remote_code=True)
ds

In [None]:
train_df = ds['train'].to_pandas()
train_df.shape

In [None]:
train_df.dropna(subset=['text'], inplace=True)
train_df.dropna(subset=['title'], inplace=True)
train_df['cleaned_title'] = train_df['title'].fillna('').apply(clean_text)
train_df['cleaned_text'] = train_df['text'].fillna('').apply(clean_text)
train_df.shape

In [None]:
train_df['topics_flattened'] = train_df['topics'].apply(lambda x: '_'.join([str(item).strip() for item in x]))
train_df.shape

---

##### Topics

In [None]:
topics_vc = train_df['topics_flattened'].value_counts()
topics_vc.shape

In [None]:
topics_vc.head(n=15)

In [None]:
top_n_topics = [topic.strip() for topic in topics_vc.head(n=20).index.values.tolist() if topic.strip() and not "money" in topic]
#top_n_topics = []
print(f"{top_n_topics=}")
len(top_n_topics)

In [None]:
top_n_topics = ['earn', 'acq', 'crude', 'trade', 'interest', 'ship', 'grain_wheat', 'sugar', 'coffee', 'gold', 'grain_corn', 'gnp', 'cpi', 'cocoa', 'grain']
print(f"{top_n_topics=}")
len(top_n_topics)

---

### Load Model

In [None]:
model, tokenizer = FastLanguageModel.from_pretrained(
    model_name = "unsloth/Meta-Llama-3.1-8B-Instruct",
    max_seq_length = 8192,
    load_in_4bit = True,
    # token = "hf_...", # use one if using gated models like meta-llama/Llama-2-7b-hf
)

In [None]:
tokenizer = get_chat_template(
    tokenizer,
    chat_template = "llama-3.1",
    mapping = {"role" : "from", "content" : "value", "user" : "human", "assistant" : "gpt"}, # ShareGPT style
)

In [None]:
FastLanguageModel.for_inference(model) # Enable native 2x faster inference

---

### 1. Title Generation

In [None]:
system_message_title_generation = "[TODO]"
print(f"{system_message_title_generation=}")

In [None]:
prompt_template_title_generation_str = """
[TODO]
""".strip()
print(prompt_template_title_generation_str)

In [None]:
prompt_template_title_generation = Template(prompt_template_title_generation_str)

---

##### Sample

In [None]:
target_topic = 'coffee'
#target_topic = 'sugar'
#target_topic = 'ship'
#target_topic = 'grain_wheat'
#target_topic = 'gold'
print(f"{target_topic=}")
s = train_df.loc[train_df['topics_flattened']==target_topic].sample()
s.T

In [None]:
#title = s['title'].values.tolist()[0]
#print(title)
print(s['title'].values.tolist()[0])
cleaned_title = s['cleaned_title'].values.tolist()[0]
print(cleaned_title)

In [None]:
print(s['text'].values.tolist()[0])

In [None]:
cleaned_text = s['cleaned_text'].values.tolist()[0]
print(cleaned_text)

In [None]:
prompt_title_generation = prompt_template_title_generation.render(article=cleaned_text)
print(prompt_title_generation)

In [None]:
generated_titles = generate_text(
    prompt_title_generation,
    tokenizer,
    model,
    system_message=system_message_title_generation,
    enable_streaming=True
)

In [None]:
print(f"Reference: {cleaned_title}")
print(f"Generated: {generated_titles[0]}")

---

#### Title Generation (Loop)

In [None]:
reference_titles = []
generated_titles = []
article_texts = []
for target_topic in top_n_topics:
    print(f"{target_topic=}")
    
    # Get one sample for this topic
    cleaned_title = ""
    cleaned_text = ""
    while cleaned_title.strip() == "" or cleaned_text.strip() == "":  # Fixed comparison operators
        s = train_df.loc[train_df['topics_flattened']==target_topic].sample()
        cleaned_title = s['cleaned_title'].values.tolist()[0]
        cleaned_text = s['cleaned_text'].values.tolist()[0]
    assert cleaned_title.strip() != ""
    assert cleaned_text.strip() != ""
    print(f"{cleaned_title=}")
    reference_titles.append(cleaned_title)
    article_texts.append(cleaned_text)
    
    # Fill prompt template
    prompt_title_generation = prompt_template_title_generation.render(article=cleaned_text)
    
    # Generate title
    response = generate_text(
        prompt_title_generation,
        tokenizer,
        model,
        system_message=system_message_title_generation,
    )
    assert len(response) == 1
    generated_title = response[0]
    print(f"{generated_title=}")
    generated_titles.append(generated_title)
    print()

print(len(reference_titles), len(generated_titles), len(article_texts))
assert len(reference_titles) == len(generated_titles)

---

### Evaluation

#### Automatic Evaluation

##### ROUGE, BLEU, METEOR Scores

In [None]:
scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)

In [None]:
results = {
    'rouge': [],
}

for gen, ref in zip(generated_titles, reference_titles):
    # ROUGE scores
    rouge_scores = scorer.score(gen, ref)
    results['rouge'].append({
        'rouge1': rouge_scores['rouge1'].fmeasure,
        'rouge2': rouge_scores['rouge2'].fmeasure,
        'rougeL': rouge_scores['rougeL'].fmeasure
    })

##### BERT Score

In [None]:
results_bert_score = {
        'bert_score': [],
}
# BERTScore (computed in batch)
P, R, F1 = score(generated_titles, reference_titles, lang='en', verbose=False)
results_bert_score['bert_score'] = F1.numpy()
results_bert_score

---

In [None]:
print("\nEvaluation Results:")
print("-" * 50)

for i in range(len(generated_titles)):
    print(f"\nPair {i+1}:")
    print(f"Reference: {reference_titles[i]}")
    print(f"Generated: {generated_titles[i]}")
    print("\nMetrics:")
    print(f"ROUGE-1:   {results['rouge'][i]['rouge1']:.3f}")
    print(f"ROUGE-2:   {results['rouge'][i]['rouge2']:.3f}")
    print(f"ROUGE-L:   {results['rouge'][i]['rougeL']:.3f}")
    print(f"BERTScore: {results_bert_score['bert_score'][i]:.3f}")

In [None]:
# Calculate average scores
print("\nAverage Scores:")
print("-" * 50)
print(f"Avg ROUGE-1:   {np.mean([r['rouge1'] for r in results['rouge']]):.3f}")
print(f"Avg ROUGE-2:   {np.mean([r['rouge2'] for r in results['rouge']]):.3f}")
print(f"Avg ROUGE-L:   {np.mean([r['rougeL'] for r in results['rouge']]):.3f}")
print(f"Avg BERTScore: {np.mean(results_bert_score['bert_score']):.3f}")

---

#### LLM-as-a-Judge

In [None]:
system_message_title_evaluation = """
[TODO]
""".strip()
print(system_message_title_evaluation)

In [None]:
prompt_template_title_evaluation_str = """
[TODO]
""".strip()
print(prompt_template_title_evaluation_str)

In [None]:
#i = 1
i = random.randint(0, len(generated_titles))
print(f"{i=}")
reference_title = reference_titles[i]
print(f"Reference: {reference_title}")
generated_title = generated_titles[i]
print(f"Generated: {generated_title}")
print()
article_text = article_texts[i]
print(article_text)

In [None]:
r_i = random.randint(0,1)
print(f"{r_i=}")

if r_i:
    gold_title = "A"
    title_A = reference_title
    title_B = generated_title
else:
    gold_title = "B"
    title_A = generated_title
    title_B = reference_title

print(f"{gold_title=}")
print(f"{title_A=}")
print(f"{title_B=}")

In [None]:
prompt_template_title_evaluation = Template(prompt_template_title_evaluation_str)
prompt_title_evaluation = prompt_template_title_evaluation.render(
    article_text=article_text,
    title_A=title_A,
    title_B=title_B,
)
print(prompt_title_evaluation)

In [None]:
response = generate_text(prompt_title_evaluation, tokenizer, model, system_message=system_message_title_evaluation)
response

In [None]:
print(response[0])

In [None]:
judge = extract_tag_content(text=response[0], tag_name="classification")
judge = judge.strip()
print(judge, gold_title, judge == gold_title)
assert judge in ["A", "B"]
if judge == "A":
    print("Better title:")
    print(title_A)
    print()
    print("Worse title:")
    print(title_B)
else:
    print("Better title:")
    print(title_B)
    print()
    print("Worse title:")
    print(title_A)

---

### 2. Summary Generation

In [None]:
system_message_summary_generation = "[TODO]"
print(f"{system_message_summary_generation=}")

In [None]:
summary_prompt_template_str = """
[TODO]
""".strip()
print(summary_prompt_template_str)

In [None]:
print(top_n_topics)

In [None]:
#target_topic = 'cocoa'
target_topic = 'sugar'
print(f"{target_topic=}")
s = train_df.loc[train_df['topics_flattened']==target_topic].sample()

In [None]:
cleaned_title = s['cleaned_title'].values.tolist()[0]
print(f"{cleaned_title=}")

In [None]:
cleaned_text = s['cleaned_text'].values.tolist()[0]
print(cleaned_text)
assert cleaned_text.strip() != ""

In [None]:
summary_prompt_template = Template(summary_prompt_template_str)
summary_prompt = summary_prompt_template.render(article=cleaned_text)
print(summary_prompt)

In [None]:
summary = generate_text(summary_prompt, tokenizer, model, system_message=system_message_summary_generation)

In [None]:
if type(summary) == list:
    assert len(summary) == 1
    summary = summary[0]
assert type(summary) == str
print(summary)

#### "Corrupt" Summary Generation

In [None]:
system_message_summary_corruption = "[TODO]"
print(f"{system_message_summary_corruption=}")

In [None]:
corrupt_prompt_template_str = """
[TODO]
""".strip()
print(corrupt_prompt_template_str)

In [None]:
corrupt_prompt_template = Template(corrupt_prompt_template_str)
corrupt_prompt = corrupt_prompt_template.render(summary=summary)
print(corrupt_prompt)

In [None]:
corrupt_summary = generate_text(corrupt_prompt, tokenizer, model, system_message=system_message_summary_corruption)

In [None]:
if type(corrupt_summary) == list:
    assert len(corrupt_summary) == 1
    corrupt_summary = corrupt_summary[0]
assert type(corrupt_summary) == str
#print(corrupt_summary)
#if "### Summary" in corrupt_summary:
#    corrupt_summary = corrupt_summary.strip("### Summary")
#if "###" in corrupt_summary:
#    corrupt_summary = corrupt_summary.split("###")[0]
print(corrupt_summary)

In [None]:
print(summary)

In [None]:
create_html_diff_with_background(summary, corrupt_summary)

### 3. Hallucination Detection

In [None]:
print(cleaned_title)

In [None]:
print(cleaned_text)

In [None]:
print(summary)

In [None]:
print(corrupt_summary)

In [None]:
system_message_hallucination_detection = "[TODO]"
print(f"{system_message_hallucination_detection=}")

In [None]:
prompt_template_hallucination_detection_str = """
[TODO]
""".strip()
print(prompt_template_hallucination_detection_str)

In [None]:
prompt_template_hallucination_detection = Template(prompt_template_hallucination_detection_str)
hallucination_detection_prompt = prompt_template_hallucination_detection.render(
    article_title=cleaned_title,
    article_text=cleaned_text,
    summary=summary,
)
print(hallucination_detection_prompt)

In [None]:
response = generate_text(hallucination_detection_prompt, tokenizer, model, system_message=system_message_hallucination_detection)
print(response)

In [None]:
Markdown(response[0])

In [None]:
result = extract_tag_content(text=response[0], tag_name="classification")
print(result)
assert result in ["TRUE", "FALSE"]

In [None]:
prompt_template_hallucination_detection = Template(prompt_template_hallucination_detection_str)
hallucination_detection_prompt = prompt_template_hallucination_detection.render(
    article_title=cleaned_title,
    article_text=cleaned_text,
    summary=corrupt_summary, # <- Changed to the corrupted summary
)
Markdown(hallucination_detection_prompt)

In [None]:
response = generate_text(hallucination_detection_prompt, tokenizer, model, system_message=system_message_hallucination_detection)
print(response)

In [None]:
Markdown(response[0])

In [None]:
result = extract_tag_content(text=response[0], tag_name="classification")
print(result)
assert result in ["TRUE", "FALSE"]

---

### Utils

    import re
    import html
    import difflib
    from IPython.display import HTML
    
    def clean_text(text: str) -> str:
        if not isinstance(text, str):
            return text
    
        # Decode HTML entities (like < > &amp; etc)
        text = html.unescape(text)
    
        # Remove multiple spaces and newlines
        text = ' '.join(text.split())
    
        # Remove 'Reuter' variations at the end
        text = text.replace(' Reuter', '').replace(' Reuters', '')
    
        # Remove any remaining multiple whitespace
        text = re.sub(r'\s+', ' ', text).strip()
    
        return text
    
    def extract_tag_content(text: str, tag_name: str) -> str:
        pattern = f"<{tag_name}>(.*?)</{tag_name}>"
        match = re.search(pattern, text, re.DOTALL)
        return match.group(1) if match else ""
    
    def create_html_diff_with_background(a: str, b: str) -> HTML:
        """
        Create HTML diff between two strings with background highlighting
        """
        # Create a diff object
        d = difflib.Differ()
        diff = list(d.compare(a.split(), b.split()))
        
        # Create HTML with colored backgrounds
        html = []
        for word in diff:
            if word.startswith('  '):  # unchanged
                html.append(word[2:])
            elif word.startswith('- '):  # deletion
                html.append(f'<span style="background-color: #ffb3ba">{word[2:]}</span>')
            elif word.startswith('+ '):  # addition
                html.append(f'<span style="background-color: #88c8f7">{word[2:]}</span>')
        
        return HTML(' '.join(html))

    from typing import List, Optional
    import torch
    from transformers import PreTrainedTokenizer, PreTrainedModel, TextStreamer
    from logging import getLogger
    
    logger = getLogger(__name__)
    
    def generate_text(
        prompt: str,
        tokenizer: PreTrainedTokenizer,
        model: PreTrainedModel,
        config: Optional['GenerationConfig'] = None,
        device: str = "cuda",
        enable_streaming: bool = False,
        system_message: Optional[str] = None,
    ) -> List[str]:
        """
        Generate text based on a given prompt using a pre-trained model.
        Args:
            prompt: Input text prompt for generation
            tokenizer: Hugging Face tokenizer instance
            model: Hugging Face model instance
            config: Generation configuration parameters
            device: Computing device ("cuda" or "cpu")
            enable_streaming: Whether to enable text streaming
            system_message: Optional system message to prepend to the conversation
        Returns:
            List of generated text responses
        """
        if not prompt.strip():
            raise ValueError("Prompt cannot be empty")
        if device == "cuda" and not torch.cuda.is_available():
            raise RuntimeError("CUDA device requested but not available")
        
        config = config or GenerationConfig()
        
        try:
            # Prepare messages format
            messages = []
            if system_message:
                messages.append({"role": "system", "content": system_message})
            messages.append({"role": "user", "content": prompt})
            
            # Tokenize input
            inputs = tokenizer.apply_chat_template(
                messages,
                tokenize=True,
                add_generation_prompt=True,
                return_tensors="pt"
            )
            
            # Rest of the code remains the same...
            attention_mask = torch.ones_like(inputs)
            inputs = inputs.to(device)
            attention_mask = attention_mask.to(device)
            
            streamer = TextStreamer(tokenizer) if enable_streaming else None
            
            response = model.generate(
                input_ids=inputs,
                attention_mask=attention_mask,
                streamer=streamer,
                max_new_tokens=config.max_new_tokens,
                use_cache=config.use_cache,
                temperature=config.temperature,
                do_sample=config.do_sample,
            )
            
            generated_texts = [
                tokenizer.decode(
                    resp[inputs.shape[1]:],
                    skip_special_tokens=True,
                )
                for resp in response
            ]
            return generated_texts
            
        except Exception as e:
            logger.error(f"Error during text generation: {str(e)}")
            raise
    
    class GenerationConfig:
        """Configuration for text generation parameters."""
        def __init__(
            self,
            max_new_tokens: int = 1024,
            temperature: float = 0.0,
            use_cache: bool = True,
            do_sample: bool = False
        ):
            self.max_new_tokens = max_new_tokens
            self.temperature = temperature
            self.use_cache = use_cache
            self.do_sample = do_sample
