In [1]:
# Install required packages
!pip install -U transformers
!pip install bert-score sentence-transformers rouge-score nltk
!pip install -q -U evaluate

import os
os.environ["WANDB_DISABLED"] = "true"

#Evaluation
import nltk
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from nltk.corpus import cmudict
from rouge_score import rouge_scorer
from bert_score import score as bert_score
from sentence_transformers import SentenceTransformer, util
import evaluate
import random
import transformers
#Transformers
from transformers import (
    T5TokenizerFast,
    T5ForConditionalGeneration,
    Trainer,
    TrainingArguments,
    Seq2SeqTrainingArguments,
    Seq2SeqTrainer,
    AutoTokenizer,
    AutoModelForSequenceClassification,
    AutoModelForSeq2SeqLM,
    BertTokenizer,
    BertForSequenceClassification
)

import torch
import pandas as pd
import numpy as np
from datasets import Dataset
from google.colab import drive

Collecting transformers
  Downloading transformers-4.54.1-py3-none-any.whl.metadata (41 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m41.7/41.7 kB[0m [31m822.7 kB/s[0m eta [36m0:00:00[0m
Downloading transformers-4.54.1-py3-none-any.whl (11.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m11.2/11.2 MB[0m [31m67.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: transformers
  Attempting uninstall: transformers
    Found existing installation: transformers 4.54.0
    Uninstalling transformers-4.54.0:
      Successfully uninstalled transformers-4.54.0
Successfully installed transformers-4.54.1
Collecting bert-score
  Downloading bert_score-0.3.13-py3-none-any.whl.metadata (15 kB)
Collecting rouge-score
  Downloading rouge_score-0.1.2.tar.gz (17 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=1.0.0->bert-score)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-

In [14]:
#Mount drive folder and read data set files
drive.mount('/content/drive')
train_file = 'drive/MyDrive/Colab Notebooks/w266/Project/data/3_line_train_dataset.xlsx'
test_file = 'drive/MyDrive/Colab Notebooks/w266/Project/data/3_line_test_dataset.xlsx'
val_file = 'drive/MyDrive/Colab Notebooks/w266/Project/data/3_line_val_dataset.xlsx'
df_train = pd.read_excel(train_file)
df_test = pd.read_excel(test_file)
df_val = pd.read_excel(val_file)

#Trim dataset and create input_text and target_text
df_train = df_train
df_val = df_val
df_test = df_test
# Create input_text as prompt + lines 1-3, and target_text as line4
df_train['input_text'] = df_train.apply(lambda row: f"Given these song lyric lines, generate the next song lyric line: {row['line1']}, {row['line2']}, {row['line3']}", axis=1)
df_train['target_text'] = df_train['actual_line']

df_val['input_text'] = df_val.apply(lambda row: f"Given these song lyric lines, generate the next song lyric line: {row['line1']}, {row['line2']}, {row['line3']}", axis=1)
df_val['target_text'] = df_val['actual_line']

df_test['input_text'] = df_test.apply(lambda row: f"Given these song lyric lines, generate the next song lyric line: {row['line1']}, {row['line2']}, {row['line3']}", axis=1)
df_test['target_text'] = df_test['actual_line']

#Create datasets from dfs
dataset = Dataset.from_pandas(df_train[['input_text', 'target_text']])
val_dataset = Dataset.from_pandas(df_val[['input_text', 'target_text']])
test_dataset = Dataset.from_pandas(df_test[['input_text', 'target_text']])

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [13]:
test_dataset[0]

{'input_text': "Given these song lyric lines, generate the next song lyric line: On my waist you know I got keep that oven For ya ginger bread pie ass niggas the heat's running on high Joe Crack I - bake the cake and serve you niggas humble pie",
 'target_text': 'I got the streets on smash, niggas on the corner watching me roll past'}

In [19]:
model_name = 'google/flan-t5-base'
tokenizer = T5TokenizerFast.from_pretrained(model_name)

def preprocess(example):
    model_inputs = tokenizer(
        example['input_text'],
        padding="max_length",
        truncation=True,
        max_length=64
    )

    labels = tokenizer(
        example['target_text'],
        padding="max_length",
        truncation=True,
        max_length=64
    )

    model_inputs['labels'] = labels['input_ids']
    return model_inputs

tokenized_dataset = dataset.map(preprocess, batched=True)
tokenized_val_dataset = val_dataset.map(preprocess, batched=True)
tokenized_test_dataset = test_dataset.map(preprocess, batched=True)

model = T5ForConditionalGeneration.from_pretrained(model_name)


training_args = TrainingArguments(
    output_dir="drive/MyDrive/Colab Notebooks/w266/Project/models/flan-t5_3_lines",
    per_device_train_batch_size=50,
    per_device_eval_batch_size=50,
    num_train_epochs=3,
    learning_rate=5e-5,
    weight_decay=0.01,
    logging_strategy="steps",
    logging_steps=1000,
)


trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_dataset,
    eval_dataset = tokenized_val_dataset,
    tokenizer=tokenizer
)


Map:   0%|          | 0/60000 [00:00<?, ? examples/s]

Map:   0%|          | 0/15000 [00:00<?, ? examples/s]

Map:   0%|          | 0/15000 [00:00<?, ? examples/s]

Using the `WANDB_DISABLED` environment variable is deprecated and will be removed in v5. Use the --report_to flag to control the integrations used for logging result (for instance --report_to none).
  trainer = Trainer(


In [20]:
trainer.train()

Step,Training Loss
1000,1.5237
2000,0.7232
3000,0.7116


TrainOutput(global_step=3600, training_loss=0.9395083618164063, metrics={'train_runtime': 5835.6824, 'train_samples_per_second': 30.845, 'train_steps_per_second': 0.617, 'total_flos': 1.540704043008e+16, 'train_loss': 0.9395083618164063, 'epoch': 3.0})

In [21]:
#Save Model & Tokenizer
trainer.save_model('drive/MyDrive/Colab Notebooks/w266/Project/models/flan-t5_3_lines')
tokenizer.save_pretrained('drive/MyDrive/Colab Notebooks/w266/Project/models/flan-t5_3_lines')

('drive/MyDrive/Colab Notebooks/w266/Project/models/flan-t5_3_lines/tokenizer_config.json',
 'drive/MyDrive/Colab Notebooks/w266/Project/models/flan-t5_3_lines/special_tokens_map.json',
 'drive/MyDrive/Colab Notebooks/w266/Project/models/flan-t5_3_lines/spiece.model',
 'drive/MyDrive/Colab Notebooks/w266/Project/models/flan-t5_3_lines/added_tokens.json',
 'drive/MyDrive/Colab Notebooks/w266/Project/models/flan-t5_3_lines/tokenizer.json')

In [22]:
model_path = 'drive/MyDrive/Colab Notebooks/w266/Project/models/flan-t5_3_lines'
tokenizer = T5TokenizerFast.from_pretrained(model_path)
model = T5ForConditionalGeneration.from_pretrained(model_path)
model.eval()

T5ForConditionalGeneration(
  (shared): Embedding(32128, 768)
  (encoder): T5Stack(
    (embed_tokens): Embedding(32128, 768)
    (block): ModuleList(
      (0): T5Block(
        (layer): ModuleList(
          (0): T5LayerSelfAttention(
            (SelfAttention): T5Attention(
              (q): Linear(in_features=768, out_features=768, bias=False)
              (k): Linear(in_features=768, out_features=768, bias=False)
              (v): Linear(in_features=768, out_features=768, bias=False)
              (o): Linear(in_features=768, out_features=768, bias=False)
              (relative_attention_bias): Embedding(32, 12)
            )
            (layer_norm): T5LayerNorm()
            (dropout): Dropout(p=0.1, inplace=False)
          )
          (1): T5LayerFF(
            (DenseReluDense): T5DenseGatedActDense(
              (wi_0): Linear(in_features=768, out_features=2048, bias=False)
              (wi_1): Linear(in_features=768, out_features=2048, bias=False)
              (wo):

In [23]:
results = []

#Add Prompts
prompts = [f"Given these song lyric lines, generate the next song lyric line: {row['line1']} {row['line2']} {row['line3']}" for _, row in df_test.iterrows()]
true_lines = [row['actual_line'] for _, row in df_test.iterrows()]

batch_size = 16

#Process in batches
for i in range(0, len(prompts), batch_size):
    prompt_batch = prompts[i:i+batch_size]
    true_line_batch = true_lines[i:i+batch_size]

    #Tokenize as batch
    inputs = tokenizer(prompt_batch, return_tensors="pt", padding=True, truncation=True)

    #Generate all predictions
    output_ids = model.generate(
        inputs['input_ids'],
        max_new_tokens=30,
        temperature=0.8,
        do_sample=True,
        num_beams=1
    )

    #Decode outputs
    generated_lines = tokenizer.batch_decode(output_ids, skip_special_tokens=True)

    #Store results
    for prompt, true_line, gen_line in zip(prompt_batch, true_line_batch, generated_lines):
        results.append({
            "prompt": prompt,
            "actual_line2": true_line,
            "generated_line2": gen_line
        })


In [24]:
df = pd.DataFrame(results)

# Then save to CSV
df.to_csv('drive/MyDrive/Colab Notebooks/w266/Project/results_ft_3_lines.csv', index=False)

In [25]:
# Choose how many examples you want to print
num_samples = 10

# Randomly sample from results
sampled = random.sample(results, k=min(num_samples, len(results)))

# Print each example
for i, r in enumerate(sampled, 1):
    print(f"--- Example {i} ---")
    print(f"Prompt:         {r['prompt']}")
    print(f"Actual Line 2:  {r['actual_line2']}")
    print(f"Generated Line: {r['generated_line2']}\n")

--- Example 1 ---
Prompt:         Given these song lyric lines, generate the next song lyric line: It's ya big homie Snoopy, just tell 'em you with me I got a partner in Nigeria, man he got the heaters Trade you Russian bullets for a new pair of sneakers
Actual Line 2:  We up in Amsterdam, you like to spark a lot?
Generated Line: They'll be squeezing ya fuckin' fuckin', squeezing y

--- Example 2 ---
Prompt:         Given these song lyric lines, generate the next song lyric line: Fans is on me eatin' shrimp and lobsters Roll with nothing but them monsters All we doing is ballin' (Ballin')
Actual Line 2:  Going hard trying to win that game
Generated Line: I am 'bout to go back (Ballin')

--- Example 3 ---
Prompt:         Given these song lyric lines, generate the next song lyric line: Seen his son at my little boy birthday party, it was harder The streets of the ATL don't respect nothin' but shottas Had to look the other way, could be the other way
Actual Line 2:  Hit Yung Mazi up, that

In [26]:
# Download required NLTK data
nltk.download('punkt')
nltk.download('cmudict')

class ComprehensiveEvaluator:
    def __init__(self):
        # Initialize Sentence-BERT model
        self.sentence_model = SentenceTransformer('all-MiniLM-L6-v2')

        # Initialize ROUGE scorer
        self.rouge_scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)

        # Initialize CMU dictionary
        self.cmu_dict = cmudict.dict()

        # Initialize BLEU smoothing function
        self.bleu_smoothing = SmoothingFunction().method1

    def calculate_bleu_scores(self, test_results):
        """Calculate BLEU scores for all test results"""
        bleu_scores = []

        for result in test_results:
            reference = [result['actual'].split()]
            candidate = result['generated'].split()

            if candidate:  # Only calculate if generation is not empty
                score = sentence_bleu(reference, candidate, smoothing_function=self.bleu_smoothing)
                bleu_scores.append(score)
            else:
                bleu_scores.append(0.0)

        return {
            'individual_scores': bleu_scores,
            'average': np.mean(bleu_scores),
            'std': np.std(bleu_scores),
            'min': np.min(bleu_scores),
            'max': np.max(bleu_scores)
        }

    def calculate_rouge_scores(self, test_results):
        """Calculate ROUGE scores for all test results"""
        rouge1_scores = []
        rouge2_scores = []
        rougeL_scores = []

        for result in test_results:
            if result['generated'].strip():  # Only calculate if generation is not empty
                scores = self.rouge_scorer.score(result['actual'], result['generated'])
                rouge1_scores.append(scores['rouge1'].fmeasure)
                rouge2_scores.append(scores['rouge2'].fmeasure)
                rougeL_scores.append(scores['rougeL'].fmeasure)
            else:
                rouge1_scores.append(0.0)
                rouge2_scores.append(0.0)
                rougeL_scores.append(0.0)

        return {
            'rouge1': {
                'individual_scores': rouge1_scores,
                'average': np.mean(rouge1_scores),
                'std': np.std(rouge1_scores)
            },
            'rouge2': {
                'individual_scores': rouge2_scores,
                'average': np.mean(rouge2_scores),
                'std': np.std(rouge2_scores)
            },
            'rougeL': {
                'individual_scores': rougeL_scores,
                'average': np.mean(rougeL_scores),
                'std': np.std(rougeL_scores)
            }
        }

    def calculate_bert_scores(self, test_results):
        """Calculate BERTScore for all test results"""
        candidates = [result['generated'] for result in test_results]
        references = [result['actual'] for result in test_results]

        # Calculate BERTScore
        P, R, F1 = bert_score(candidates, references, lang="en", verbose=False)

        return {
            'precision': {
                'average': P.mean().item(),
                'std': P.std().item(),
                'individual_scores': P.tolist()
            },
            'recall': {
                'average': R.mean().item(),
                'std': R.std().item(),
                'individual_scores': R.tolist()
            },
            'f1': {
                'average': F1.mean().item(),
                'std': F1.std().item(),
                'individual_scores': F1.tolist()
            }
        }

    def calculate_sentence_similarity(self, test_results):
        """Calculate sentence-level cosine similarity using Sentence-BERT"""
        actual_lines = [result['actual'] for result in test_results]
        generated_lines = [result['generated'] for result in test_results]

        # Encode all sentences
        actual_embeddings = self.sentence_model.encode(actual_lines, convert_to_tensor=True)
        generated_embeddings = self.sentence_model.encode(generated_lines, convert_to_tensor=True)

        # Calculate cosine similarity
        cosine_scores = util.pytorch_cos_sim(actual_embeddings, generated_embeddings)

        # Extract diagonal (pairwise similarities)
        similarities = [cosine_scores[i][i].item() for i in range(len(actual_lines))]

        return {
            'individual_scores': similarities,
            'average': np.mean(similarities),
            'std': np.std(similarities),
            'min': np.min(similarities),
            'max': np.max(similarities)
        }

    def get_last_word(self, line):
        """Extract the last word from a line for rhyme analysis"""
        words = line.lower().strip().split()
        if words:
            # Remove punctuation from last word
            last_word = ''.join(c for c in words[-1] if c.isalpha())
            return last_word
        return ""

    def get_rhyme_part_cmu(self, word):
        """Extract the rhyming part using CMU dictionary"""
        if word in self.cmu_dict:
            pronunciations = self.cmu_dict[word]
            if pronunciations:
                # Get the part after the last stressed vowel
                pron = pronunciations[0]
                for i in range(len(pron) - 1, -1, -1):
                    if pron[i][-1].isdigit():  # Stressed vowel
                        return pron[i:]
        return None

    def analyze_rhymes_cmu(self, test_results):
        """Analyze rhymes using CMU dictionary"""
        phonetic_rhymes = 0
        total_valid = 0

        rhyme_details = []

        for i, result in enumerate(test_results):
            input_last = self.get_last_word(result['input'])
            generated_last = self.get_last_word(result['generated'])

            if input_last and generated_last:
                input_rhyme = self.get_rhyme_part_cmu(input_last)
                generated_rhyme = self.get_rhyme_part_cmu(generated_last)

                if input_rhyme and generated_rhyme:
                    total_valid += 1

                    is_rhyme = input_rhyme == generated_rhyme
                    if is_rhyme:
                        phonetic_rhymes += 1

                    rhyme_details.append({
                        'example_index': i,
                        'input_word': input_last,
                        'generated_word': generated_last,
                        'input_phonemes': input_rhyme,
                        'generated_phonemes': generated_rhyme,
                        'is_rhyme': is_rhyme
                    })

        return {
            'phonetic_rhyme_rate': phonetic_rhymes / total_valid if total_valid > 0 else 0,
            'phonetic_rhymes': phonetic_rhymes,
            'total_valid': total_valid,
            'details': rhyme_details
        }

    def calculate_length_similarity(self, test_results):
        """Calculate length similarity between actual and generated lines"""
        length_diffs = []
        length_ratios = []

        for result in test_results:
            actual_len = len(result['actual'].split())
            generated_len = len(result['generated'].split())

            length_diffs.append(abs(actual_len - generated_len))

            if actual_len > 0:
                length_ratios.append(generated_len / actual_len)
            else:
                length_ratios.append(0.0)

        return {
            'average_length_diff': np.mean(length_diffs),
            'std_length_diff': np.std(length_diffs),
            'average_length_ratio': np.mean(length_ratios),
            'std_length_ratio': np.std(length_ratios)
        }

    def evaluate_comprehensive(self, test_results):
        """Run comprehensive evaluation on test results"""
        print("=" * 80)
        print("COMPREHENSIVE EVALUATION RESULTS")
        print("=" * 80)

        # Basic statistics
        total_examples = len(test_results)
        empty_generations = sum(1 for r in test_results if not r['generated'].strip())

        print(f"Dataset Statistics:")
        print(f"  Total Examples: {total_examples}")
        print(f"  Empty Generations: {empty_generations} ({empty_generations/total_examples:.1%})")
        print()

        # Calculate all metrics
        print("Computing metrics...")

        # Traditional NLP metrics
        bleu_results = self.calculate_bleu_scores(test_results)
        rouge_results = self.calculate_rouge_scores(test_results)
        bert_results = self.calculate_bert_scores(test_results)

        # Sentence-level similarity
        sentence_sim_results = self.calculate_sentence_similarity(test_results)

        # Rhyme analysis
        # dandelion_rhyme_results = self.analyze_rhymes_dandelion(test_results)
        cmu_rhyme_results = self.analyze_rhymes_cmu(test_results)

        # Length analysis
        length_results = self.calculate_length_similarity(test_results)

        # Display results
        print("\n" + "="*60)
        print("TRADITIONAL NLP METRICS")
        print("="*60)

        print(f"BLEU Score:")
        print(f"  Average: {bleu_results['average']:.4f} (±{bleu_results['std']:.4f})")
        print(f"  Range: {bleu_results['min']:.4f} - {bleu_results['max']:.4f}")

        print(f"\nROUGE Scores:")
        print(f"  ROUGE-1: {rouge_results['rouge1']['average']:.4f} (±{rouge_results['rouge1']['std']:.4f})")
        print(f"  ROUGE-2: {rouge_results['rouge2']['average']:.4f} (±{rouge_results['rouge2']['std']:.4f})")
        print(f"  ROUGE-L: {rouge_results['rougeL']['average']:.4f} (±{rouge_results['rougeL']['std']:.4f})")

        print(f"\nBERTScore:")
        print(f"  F1: {bert_results['f1']['average']:.4f} (±{bert_results['f1']['std']:.4f})")
        print(f"  Precision: {bert_results['precision']['average']:.4f} (±{bert_results['precision']['std']:.4f})")
        print(f"  Recall: {bert_results['recall']['average']:.4f} (±{bert_results['recall']['std']:.4f})")

        print("\n" + "="*60)
        print("SENTENCE-LEVEL SEMANTIC SIMILARITY")
        print("="*60)

        print(f"Sentence-BERT Cosine Similarity:")
        print(f"  Average: {sentence_sim_results['average']:.4f} (±{sentence_sim_results['std']:.4f})")
        print(f"  Range: {sentence_sim_results['min']:.4f} - {sentence_sim_results['max']:.4f}")

        print("\n" + "="*60)
        print("RHYME ANALYSIS")
        print("="*60)

        print(f"\nCMU Dictionary Phonetic Analysis:")
        print(f"  Phonetic Rhyme Rate: {cmu_rhyme_results['phonetic_rhyme_rate']:.2%}")
        print(f"  Valid Examples: {cmu_rhyme_results['total_valid']}/{total_examples}")

        print("\n" + "="*60)
        print("LENGTH ANALYSIS")
        print("="*60)

        print(f"Length Similarity:")
        print(f"  Average Length Difference: {length_results['average_length_diff']:.2f} words")
        print(f"  Average Length Ratio: {length_results['average_length_ratio']:.2f}")

        print("=" * 80)

        # Return all results for further analysis
        return {
            'basic_stats': {
                'total_examples': total_examples,
                'empty_generations': empty_generations
            },
            'bleu': bleu_results,
            'rouge': rouge_results,
            'bert_score': bert_results,
            'sentence_similarity': sentence_sim_results,
            # 'dandelion_rhyme': dandelion_rhyme_results,
            'cmu_rhyme': cmu_rhyme_results,
            'length_analysis': length_results
        }


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package cmudict to /root/nltk_data...
[nltk_data]   Unzipping corpora/cmudict.zip.


In [27]:
test_results = []

for r in results:
    test_results.append({
        "input": r["prompt"],
        "actual": r["actual_line2"],
        "generated": r["generated_line2"]
    })

In [28]:
import nltk
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from nltk.corpus import cmudict
from rouge_score import rouge_scorer
from bert_score import score as bert_score
from sentence_transformers import SentenceTransformer, util
import torch
import numpy as np
import time
from typing import Dict, List, Any
import warnings

# Download required NLTK data
nltk.download('punkt', quiet=True)
nltk.download('cmudict', quiet=True)

class ComprehensiveEvaluator:
    def __init__(self, sentence_model_name: str = 'all-MiniLM-L6-v2', device: str = None):
        """
        Initialize evaluator with configurable models and device

        Args:
            sentence_model_name: Name of sentence transformer model to use
            device: Device to run models on ('cuda', 'cpu', or None for auto)
        """
        # Set device
        if device is None:
            self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        else:
            self.device = device

        print(f"Initializing evaluator on device: {self.device}")

        # Initialize models with error handling
        try:
            self.sentence_model = SentenceTransformer(sentence_model_name, device=self.device)
            print(f"✓ Loaded Sentence-BERT model: {sentence_model_name}")
        except Exception as e:
            warnings.warn(f"Failed to load Sentence-BERT model: {e}")
            self.sentence_model = None

        # Initialize ROUGE scorer
        self.rouge_scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)

        # Initialize CMU dictionary with error handling
        try:
            self.cmu_dict = cmudict.dict()
            print(f"✓ Loaded CMU dictionary with {len(self.cmu_dict)} entries")
        except Exception as e:
            warnings.warn(f"Failed to load CMU dictionary: {e}")
            self.cmu_dict = {}

        # Initialize BLEU smoothing function
        self.bleu_smoothing = SmoothingFunction().method1

        # Cache for performance
        self._rhyme_cache = {}

    def calculate_bleu_scores(self, test_results: List[Dict]) -> Dict[str, Any]:
        """Calculate BLEU scores with improved handling"""
        bleu_scores = []
        valid_count = 0
        empty_count = 0

        for result in test_results:
            actual = result['actual'].strip()
            generated = result['generated'].strip()

            if not generated:
                empty_count += 1
                bleu_scores.append(0.0)
            elif not actual:
                empty_count += 1
                bleu_scores.append(0.0)
            else:
                valid_count += 1
                reference = [actual.split()]
                candidate = generated.split()

                score = sentence_bleu(reference, candidate, smoothing_function=self.bleu_smoothing)
                bleu_scores.append(score)

        return {
            'individual_scores': bleu_scores,
            'average': np.mean(bleu_scores),
            'std': np.std(bleu_scores),
            'min': np.min(bleu_scores),
            'max': np.max(bleu_scores),
            'valid_count': valid_count,
            'empty_count': empty_count
        }

    def calculate_rouge_scores(self, test_results: List[Dict]) -> Dict[str, Any]:
        """Calculate ROUGE scores with enhanced tracking"""
        rouge1_scores = []
        rouge2_scores = []
        rougeL_scores = []

        valid_count = 0
        empty_count = 0

        for result in test_results:
            actual = result['actual'].strip()
            generated = result['generated'].strip()

            if not generated or not actual:
                empty_count += 1
                rouge1_scores.append(0.0)
                rouge2_scores.append(0.0)
                rougeL_scores.append(0.0)
            else:
                valid_count += 1
                scores = self.rouge_scorer.score(actual, generated)
                rouge1_scores.append(scores['rouge1'].fmeasure)
                rouge2_scores.append(scores['rouge2'].fmeasure)
                rougeL_scores.append(scores['rougeL'].fmeasure)

        return {
            'rouge1': {
                'individual_scores': rouge1_scores,
                'average': np.mean(rouge1_scores),
                'std': np.std(rouge1_scores),
                'valid_count': valid_count,
                'empty_count': empty_count
            },
            'rouge2': {
                'individual_scores': rouge2_scores,
                'average': np.mean(rouge2_scores),
                'std': np.std(rouge2_scores),
                'valid_count': valid_count,
                'empty_count': empty_count
            },
            'rougeL': {
                'individual_scores': rougeL_scores,
                'average': np.mean(rougeL_scores),
                'std': np.std(rougeL_scores),
                'valid_count': valid_count,
                'empty_count': empty_count
            }
        }

    def calculate_bert_scores(self, test_results: List[Dict]) -> Dict[str, Any]:
        """Calculate BERTScore with batch processing"""
        candidates = [result['generated'] for result in test_results]
        references = [result['actual'] for result in test_results]

        try:
            # Calculate BERTScore with device specification
            P, R, F1 = bert_score(candidates, references, lang="en", verbose=False, device=self.device)

            return {
                'precision': {
                    'average': P.mean().item(),
                    'std': P.std().item(),
                    'individual_scores': P.tolist()
                },
                'recall': {
                    'average': R.mean().item(),
                    'std': R.std().item(),
                    'individual_scores': R.tolist()
                },
                'f1': {
                    'average': F1.mean().item(),
                    'std': F1.std().item(),
                    'individual_scores': F1.tolist()
                }
            }
        except Exception as e:
            warnings.warn(f"BERTScore calculation failed: {e}")
            return None

    def calculate_sentence_similarity(self, test_results: List[Dict]) -> Dict[str, Any]:
        """Calculate sentence similarity with error handling"""
        if self.sentence_model is None:
            warnings.warn("Sentence-BERT model not available")
            return None

        actual_lines = [result['actual'] for result in test_results]
        generated_lines = [result['generated'] for result in test_results]

        try:
            # Encode all sentences with batch processing
            actual_embeddings = self.sentence_model.encode(actual_lines, convert_to_tensor=True, show_progress_bar=False)
            generated_embeddings = self.sentence_model.encode(generated_lines, convert_to_tensor=True, show_progress_bar=False)

            # Calculate cosine similarity
            cosine_scores = util.pytorch_cos_sim(actual_embeddings, generated_embeddings)

            # Extract diagonal (pairwise similarities)
            similarities = [cosine_scores[i][i].item() for i in range(len(actual_lines))]

            return {
                'individual_scores': similarities,
                'average': np.mean(similarities),
                'std': np.std(similarities),
                'min': np.min(similarities),
                'max': np.max(similarities)
            }
        except Exception as e:
            warnings.warn(f"Sentence similarity calculation failed: {e}")
            return None

    def get_last_word(self, line: str) -> str:
        """Extract last word with improved cleaning"""
        import re
        # Use regex to better handle punctuation and contractions
        words = re.findall(r"\b[a-zA-Z]+(?:'[a-zA-Z]+)?\b", line.lower())
        return words[-1] if words else ""

    def get_rhyme_part_cmu(self, word: str) -> List[str]:
        """Extract rhyming part with caching"""
        if word in self._rhyme_cache:
            return self._rhyme_cache[word]

        if word in self.cmu_dict:
            pronunciations = self.cmu_dict[word]
            if pronunciations:
                # Get the part after the last stressed vowel
                pron = pronunciations[0]
                for i in range(len(pron) - 1, -1, -1):
                    if pron[i][-1].isdigit():  # Stressed vowel
                        result = pron[i:]
                        self._rhyme_cache[word] = result
                        return result

        self._rhyme_cache[word] = None
        return None

    def analyze_rhymes_cmu(self, test_results: List[Dict]) -> Dict[str, Any]:
        """Enhanced rhyme analysis with better statistics"""
        phonetic_rhymes = 0
        near_rhymes = 0  # Rhymes with similar endings
        total_valid = 0
        total_processed = 0

        rhyme_details = []

        for i, result in enumerate(test_results):
            input_last = self.get_last_word(result['input'])
            generated_last = self.get_last_word(result['generated'])

            total_processed += 1

            if input_last and generated_last:
                input_rhyme = self.get_rhyme_part_cmu(input_last)
                generated_rhyme = self.get_rhyme_part_cmu(generated_last)

                if input_rhyme and generated_rhyme:
                    total_valid += 1

                    is_perfect_rhyme = input_rhyme == generated_rhyme
                    is_near_rhyme = False

                    # Check for near rhymes (last 2 phonemes match)
                    if not is_perfect_rhyme and len(input_rhyme) >= 2 and len(generated_rhyme) >= 2:
                        is_near_rhyme = input_rhyme[-2:] == generated_rhyme[-2:]

                    if is_perfect_rhyme:
                        phonetic_rhymes += 1
                    elif is_near_rhyme:
                        near_rhymes += 1

                    rhyme_details.append({
                        'example_index': i,
                        'input_word': input_last,
                        'generated_word': generated_last,
                        'input_phonemes': input_rhyme,
                        'generated_phonemes': generated_rhyme,
                        'is_perfect_rhyme': is_perfect_rhyme,
                        'is_near_rhyme': is_near_rhyme
                    })

        return {
            'perfect_rhyme_rate': phonetic_rhymes / total_valid if total_valid > 0 else 0,
            'near_rhyme_rate': near_rhymes / total_valid if total_valid > 0 else 0,
            'total_rhyme_rate': (phonetic_rhymes + near_rhymes) / total_valid if total_valid > 0 else 0,
            'perfect_rhymes': phonetic_rhymes,
            'near_rhymes': near_rhymes,
            'total_valid': total_valid,
            'total_processed': total_processed,
            'coverage': total_valid / total_processed if total_processed > 0 else 0,
            'details': rhyme_details
        }

    def calculate_additional_metrics(self, test_results: List[Dict]) -> Dict[str, Any]:
        """Calculate additional rap-specific metrics"""

        # Syllable analysis (approximate)
        def count_syllables(word):
            # Simple syllable counting heuristic
            word = word.lower()
            count = 0
            vowels = "aeiouy"
            if word[0] in vowels:
                count += 1
            for i in range(1, len(word)):
                if word[i] in vowels and word[i-1] not in vowels:
                    count += 1
            if word.endswith("e"):
                count -= 1
            if count == 0:
                count += 1
            return count

        syllable_diffs = []
        word_diversity_scores = []

        for result in test_results:
            actual_words = result['actual'].split()
            generated_words = result['generated'].split()

            # Syllable analysis
            if actual_words and generated_words:
                actual_syllables = sum(count_syllables(word) for word in actual_words)
                generated_syllables = sum(count_syllables(word) for word in generated_words)
                syllable_diffs.append(abs(actual_syllables - generated_syllables))
            else:
                syllable_diffs.append(0)

            # Word diversity (unique words / total words)
            if generated_words:
                diversity = len(set(generated_words)) / len(generated_words)
                word_diversity_scores.append(diversity)
            else:
                word_diversity_scores.append(0.0)

        return {
            'syllable_similarity': {
                'average_diff': np.mean(syllable_diffs),
                'std_diff': np.std(syllable_diffs)
            },
            'word_diversity': {
                'average': np.mean(word_diversity_scores),
                'std': np.std(word_diversity_scores)
            }
        }

    def evaluate_comprehensive(self, test_results: List[Dict], show_progress: bool = True) -> Dict[str, Any]:
        """Enhanced comprehensive evaluation with timing and progress"""
        start_time = time.time()

        print("=" * 80)
        print("COMPREHENSIVE EVALUATION RESULTS")
        print("=" * 80)

        # Basic statistics
        total_examples = len(test_results)
        empty_generations = sum(1 for r in test_results if not r['generated'].strip())

        print(f"Dataset Statistics:")
        print(f"  Total Examples: {total_examples}")
        print(f"  Empty Generations: {empty_generations} ({empty_generations/total_examples:.1%})")
        print()

        # Calculate all metrics with timing
        results = {}

        if show_progress:
            print("Computing metrics...")

        # Traditional NLP metrics
        if show_progress: print("  • BLEU scores...")
        results['bleu'] = self.calculate_bleu_scores(test_results)

        if show_progress: print("  • ROUGE scores...")
        results['rouge'] = self.calculate_rouge_scores(test_results)

        if show_progress: print("  • BERTScore...")
        results['bert_score'] = self.calculate_bert_scores(test_results)

        # Sentence-level similarity
        if show_progress: print("  • Sentence similarity...")
        results['sentence_similarity'] = self.calculate_sentence_similarity(test_results)

        # Rhyme analysis
        if show_progress: print("  • Rhyme analysis...")
        results['cmu_rhyme'] = self.analyze_rhymes_cmu(test_results)

        # Additional metrics
        if show_progress: print("  • Additional metrics...")
        results['additional_metrics'] = self.calculate_additional_metrics(test_results)

        # Length analysis
        results['length_analysis'] = self.calculate_length_similarity(test_results)

        # Display results with enhanced formatting
        self._display_results(results, total_examples, empty_generations)

        execution_time = time.time() - start_time
        print(f"\nEvaluation completed in {execution_time:.2f} seconds")
        print("=" * 80)

        # Add metadata
        results['metadata'] = {
            'total_examples': total_examples,
            'empty_generations': empty_generations,
            'execution_time': execution_time,
            'device_used': self.device
        }

        return results

    def _display_results(self, results: Dict, total_examples: int, empty_generations: int):
        """Enhanced result display with better formatting"""

        print("\n" + "="*60)
        print("TRADITIONAL NLP METRICS")
        print("="*60)

        # BLEU
        bleu = results['bleu']
        print(f"BLEU Score:")
        print(f"  Average: {bleu['average']:.4f} (±{bleu['std']:.4f})")
        print(f"  Range: {bleu['min']:.4f} - {bleu['max']:.4f}")
        print(f"  Valid/Empty: {bleu['valid_count']}/{bleu['empty_count']}")

        # ROUGE
        rouge = results['rouge']
        print(f"\nROUGE Scores:")
        print(f"  ROUGE-1: {rouge['rouge1']['average']:.4f} (±{rouge['rouge1']['std']:.4f})")
        print(f"  ROUGE-2: {rouge['rouge2']['average']:.4f} (±{rouge['rouge2']['std']:.4f})")
        print(f"  ROUGE-L: {rouge['rougeL']['average']:.4f} (±{rouge['rougeL']['std']:.4f})")
        print(f"  Valid/Empty: {rouge['rouge1']['valid_count']}/{rouge['rouge1']['empty_count']}")

        # BERTScore
        if results['bert_score']:
            bert = results['bert_score']
            print(f"\nBERTScore:")
            print(f"  F1: {bert['f1']['average']:.4f} (±{bert['f1']['std']:.4f})")
            print(f"  Precision: {bert['precision']['average']:.4f} (±{bert['precision']['std']:.4f})")
            print(f"  Recall: {bert['recall']['average']:.4f} (±{bert['recall']['std']:.4f})")

        # Sentence similarity
        if results['sentence_similarity']:
            sent_sim = results['sentence_similarity']
            print("\n" + "="*60)
            print("SENTENCE-LEVEL SEMANTIC SIMILARITY")
            print("="*60)
            print(f"Sentence-BERT Cosine Similarity:")
            print(f"  Average: {sent_sim['average']:.4f} (±{sent_sim['std']:.4f})")
            print(f"  Range: {sent_sim['min']:.4f} - {sent_sim['max']:.4f}")

        # Rhyme analysis
        rhyme = results['cmu_rhyme']
        print("\n" + "="*60)
        print("RHYME ANALYSIS")
        print("="*60)
        print(f"CMU Dictionary Phonetic Analysis:")
        print(f"  Perfect Rhyme Rate: {rhyme['perfect_rhyme_rate']:.2%}")
        print(f"  Near Rhyme Rate: {rhyme['near_rhyme_rate']:.2%}")
        print(f"  Total Rhyme Rate: {rhyme['total_rhyme_rate']:.2%}")
        print(f"  Dictionary Coverage: {rhyme['coverage']:.1%} ({rhyme['total_valid']}/{rhyme['total_processed']})")

        # Additional metrics
        additional = results['additional_metrics']
        print("\n" + "="*60)
        print("RAP-SPECIFIC METRICS")
        print("="*60)
        print(f"Syllable Similarity:")
        print(f"  Average Difference: {additional['syllable_similarity']['average_diff']:.2f} syllables")
        print(f"Word Diversity:")
        print(f"  Average: {additional['word_diversity']['average']:.3f}")

        # Length analysis
        length = results['length_analysis']
        print(f"\nLength Analysis:")
        print(f"  Average Length Difference: {length['average_length_diff']:.2f} words")
        print(f"  Average Length Ratio: {length['average_length_ratio']:.2f}")

    def calculate_length_similarity(self, test_results: List[Dict]) -> Dict[str, Any]:
        """Enhanced length analysis"""
        length_diffs = []
        length_ratios = []

        for result in test_results:
            actual_len = len(result['actual'].split())
            generated_len = len(result['generated'].split())

            length_diffs.append(abs(actual_len - generated_len))

            if actual_len > 0:
                length_ratios.append(generated_len / actual_len)
            else:
                length_ratios.append(0.0)

        return {
            'average_length_diff': np.mean(length_diffs),
            'std_length_diff': np.std(length_diffs),
            'average_length_ratio': np.mean(length_ratios),
            'std_length_ratio': np.std(length_ratios),
            'individual_diffs': length_diffs,
            'individual_ratios': length_ratios
        }

In [29]:
# Initialize with custom settings
evaluator = ComprehensiveEvaluator(
    sentence_model_name='all-MiniLM-L6-v2',  # or 'all-mpnet-base-v2' for better quality
    device='cuda'  # or 'cpu'
)

# Run evaluation
comprehensive_results = evaluator.evaluate_comprehensive(test_results, show_progress=True)

Initializing evaluator on device: cuda


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

✓ Loaded Sentence-BERT model: all-MiniLM-L6-v2
✓ Loaded CMU dictionary with 123455 entries
COMPREHENSIVE EVALUATION RESULTS
Dataset Statistics:
  Total Examples: 15000
  Empty Generations: 6 (0.0%)

Computing metrics...
  • BLEU scores...
  • ROUGE scores...
  • BERTScore...


tokenizer_config.json:   0%|          | 0.00/25.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/482 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/1.42G [00:00<?, ?B/s]

Some weights of RobertaModel were not initialized from the model checkpoint at roberta-large and are newly initialized: ['pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
  return forward_call(*args, **kwargs)
  return forward_call(*args, **kwargs)


  • Sentence similarity...
  • Rhyme analysis...
  • Additional metrics...

TRADITIONAL NLP METRICS
BLEU Score:
  Average: 0.0323 (±0.1221)
  Range: 0.0000 - 1.0000
  Valid/Empty: 14994/6

ROUGE Scores:
  ROUGE-1: 0.1277 (±0.1801)
  ROUGE-2: 0.0452 (±0.1565)
  ROUGE-L: 0.1213 (±0.1764)
  Valid/Empty: 14994/6

BERTScore:
  F1: 0.8375 (±0.0362)
  Precision: 0.8386 (±0.0382)
  Recall: 0.8368 (±0.0387)

SENTENCE-LEVEL SEMANTIC SIMILARITY
Sentence-BERT Cosine Similarity:
  Average: 0.2265 (±0.1777)
  Range: -0.1335 - 1.0000

RHYME ANALYSIS
CMU Dictionary Phonetic Analysis:
  Perfect Rhyme Rate: 23.03%
  Near Rhyme Rate: 0.03%
  Total Rhyme Rate: 23.06%
  Dictionary Coverage: 81.4% (12216/15000)

RAP-SPECIFIC METRICS
Syllable Similarity:
  Average Difference: 3.79 syllables
Word Diversity:
  Average: 0.910

Length Analysis:
  Average Length Difference: 3.26 words
  Average Length Ratio: 1.32

Evaluation completed in 131.38 seconds
