# BART Model

### Note: this does not contain fine tuning code

# Import Statements

In [None]:
# Fix tensorflow conflict and install required packages
!pip uninstall -y tensorflow
!pip install tensorflow-cpu
!pip install -q pytorch-lightning transformers torch bert-score rouge-score scikit-learn

In [None]:
# Imports
import torch
import pandas as pd
import numpy as np
import pytorch_lightning as pl
from transformers import BartTokenizer, BartForConditionalGeneration
from torch.utils.data import DataLoader, TensorDataset, RandomSampler
from sklearn.model_selection import train_test_split
from rouge_score import rouge_scorer
from bert_score import score
from tqdm import tqdm
import math
import random
import re
import os
from typing import List, Dict, Tuple

In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Check device
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

# Data Processing

In [None]:
# Load lyrics data
print("Loading lyrics data from Google Drive...")
df_list = []
lyrics_folder_path = "/content/drive/My Drive/266 Final Project/Song Files"
for filename in os.listdir(lyrics_folder_path):
    if filename.endswith('.csv'):
        file_path = os.path.join(lyrics_folder_path, filename)
        df = pd.read_csv(file_path)
        df_list.append(df)

lyrics_df = pd.concat(df_list, ignore_index=True)

In [None]:
# Load poetry data
print("\nLoading poetry data...")
poem_list = []
poetry_files = {
    'test': "/content/drive/My Drive/266 Final Project/PoemSum Model/poemsum_test.csv",
    'train': "/content/drive/My Drive/266 Final Project/PoemSum Model/poemsum_train.csv",
    'valid': "/content/drive/My Drive/266 Final Project/PoemSum Model/poemsum_valid.csv"
}

for dataset_type, filepath in poetry_files.items():
    print(f"Loading poetry {dataset_type} dataset")
    poem_data = pd.read_csv(filepath)
    poem_list.append(poem_data)

poem_df = pd.concat(poem_list, ignore_index=True)


In [None]:
# Split datasets
print("\nSplitting datasets...")
train_val_df, test_df = train_test_split(lyrics_df, test_size=0.2, random_state=42)
train_val_poem, test_poem = train_test_split(poem_df, test_size=0.2, random_state=42)

print(f"Lyrics data split - Training+Validation: {len(train_val_df)}, Test: {len(test_df)}")
print(f"Poetry data split - Training+Validation: {len(train_val_poem)}, Test: {len(test_poem)}")

# Model Definition

In [None]:
class BARTDataModule(pl.LightningDataModule):
    def __init__(self, train_df, val_df, tokenizer, batch_size=16, max_length=512):
        super().__init__()
        self.train_df = train_df
        self.val_df = val_df
        self.tokenizer = tokenizer
        self.batch_size = batch_size
        self.max_length = max_length

    def setup(self, stage=None):
        self.train_encodings = self._encode_data(self.train_df)
        self.val_encodings = self._encode_data(self.val_df)

    def _encode_data(self, df):
        source_encodings = self.tokenizer(
            df['source'].tolist(),
            padding=True,
            truncation=True,
            max_length=self.max_length,
            return_tensors='pt'
        )

        target_encodings = self.tokenizer(
            df['target'].tolist(),
            padding=True,
            truncation=True,
            max_length=self.max_length,
            return_tensors='pt'
        )

        return {
            'input_ids': source_encodings['input_ids'],
            'attention_mask': source_encodings['attention_mask'],
            'labels': target_encodings['input_ids']
        }

    def train_dataloader(self):
        dataset = TensorDataset(
            self.train_encodings['input_ids'],
            self.train_encodings['attention_mask'],
            self.train_encodings['labels']
        )
        return DataLoader(dataset, batch_size=self.batch_size, shuffle=True)

    def val_dataloader(self):
        dataset = TensorDataset(
            self.val_encodings['input_ids'],
            self.val_encodings['attention_mask'],
            self.val_encodings['labels']
        )
        return DataLoader(dataset, batch_size=self.batch_size)

class BARTLitModel(pl.LightningModule):
    def __init__(self, model, learning_rate=2e-5):
        super().__init__()
        self.model = model
        self.learning_rate = learning_rate

    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.model(
            input_ids,
            attention_mask=attention_mask,
            labels=labels
        )
        return outputs

    def training_step(self, batch, batch_idx):
        input_ids, attention_mask, labels = batch
        outputs = self(input_ids, attention_mask, labels)
        self.log('train_loss', outputs.loss)
        return outputs.loss

    def validation_step(self, batch, batch_idx):
        input_ids, attention_mask, labels = batch
        outputs = self(input_ids, attention_mask, labels)
        self.log('val_loss', outputs.loss)
        return outputs.loss

    def configure_optimizers(self):
        return torch.optim.AdamW(self.parameters(), lr=self.learning_rate)

# Model Evaluation Code

In [None]:
def calculate_content_coverage(text: str, summary: str) -> float:
    """Calculate content coverage based on token overlap."""
    text_tokens = set(text.lower().split())
    summary_tokens = set(summary.lower().split())
    overlap = len(text_tokens.intersection(summary_tokens))
    return overlap / len(text_tokens) if text_tokens else 0.0

def calculate_consistency_score(summaries: List[str]) -> float:
    """Calculate consistency between multiple generations using ROUGE scores."""
    if len(summaries) < 2:
        return 1.0

    rouge_scorer_obj = rouge_scorer.RougeScorer(
        ['rouge1', 'rouge2', 'rougeL'],
        use_stemmer=True
    )

    scores = []
    for i in range(len(summaries)):
        for j in range(i + 1, len(summaries)):
            score = rouge_scorer_obj.score(summaries[i], summaries[j])
            avg_score = (
                score['rouge1'].fmeasure +
                score['rouge2'].fmeasure +
                score['rougeL'].fmeasure
            ) / 3
            scores.append(avg_score)

    return np.mean(scores)

def calculate_semantic_similarity(text: str, summary: str) -> float:
    """Calculate semantic similarity using token overlap."""
    text_tokens = set(text.lower().split())
    summary_tokens = set(summary.lower().split())
    intersection = len(text_tokens.intersection(summary_tokens))
    union = len(text_tokens.union(summary_tokens))
    return intersection / union if union > 0 else 0.0

def print_evaluation_results(metrics: Dict[str, float], examples: List[Dict]):
    """Print evaluation results and examples."""
    print("\nEvaluation Results:")
    for metric, value in metrics.items():
        print(f"{metric}: {value:.3f}")

    print("\nExample Generations:")
    for i, example in enumerate(examples, 1):
        print(f"\nExample {i}:")
        print(f"Original Text (truncated): {example['original_text'][:200]}...")
        print(f"Actual Summary: {example['actual_summary']}")
        print("\nGenerated Summaries:")
        for j, summary in enumerate(example['generated_summaries'], 1):
            print(f"{j}. {summary}")
        print("\nMetrics:")
        for metric, value in example['metrics'].items():
            print(f"{metric}: {value:.3f}")

In [None]:
def evaluate_bart_model(
    model: BartForConditionalGeneration,
    tokenizer: BartTokenizer,
    test_data: pd.DataFrame,
    batch_size: int = 8
) -> Tuple[Dict[str, float], List[Dict]]:
    """
    Evaluate BART model with multiple metrics
    """
    model.eval()
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)

    evaluation_results = {
        'content_coverage': [],
        'consistency_score': [],
        'semantic_similarity': [],
        'bert_score': []
    }

    examples = []

    for idx in tqdm(range(0, len(test_data), batch_size)):
        batch_texts = test_data['source'].iloc[idx:idx + batch_size].tolist()

        summaries_per_text = []
        for _ in range(3):  # Generate 3 summaries per text
            inputs = tokenizer(
                batch_texts,
                padding=True,
                truncation=True,
                max_length=512,
                return_tensors="pt"
            ).to(device)

            with torch.no_grad():
                outputs = model.generate(
                    input_ids=inputs['input_ids'],
                    attention_mask=inputs['attention_mask'],
                    max_length=150,
                    min_length=40,
                    num_beams=4,
                    do_sample=True,
                    temperature=0.3,
                    top_k=50,
                    no_repeat_ngram_size=3,
                    length_penalty=0.8,
                    repetition_penalty=1.5
                )

                decoded_summaries = tokenizer.batch_decode(outputs, skip_special_tokens=True)
                summaries_per_text.append(decoded_summaries)

        for text_idx in range(len(batch_texts)):
            original_text = batch_texts[text_idx]
            actual_summary = test_data['target'].iloc[idx + text_idx]
            text_summaries = [summaries[text_idx] for summaries in summaries_per_text]

            coverage_score = calculate_content_coverage(original_text, text_summaries[0])
            consistency_score = calculate_consistency_score(text_summaries)
            semantic_score = calculate_semantic_similarity(original_text, text_summaries[0])

            P, R, F1 = score(
                [text_summaries[0]],
                [actual_summary],
                model_type="microsoft/deberta-xlarge-mnli",
                device=device
            )

            evaluation_results['content_coverage'].append(coverage_score)
            evaluation_results['consistency_score'].append(consistency_score)
            evaluation_results['semantic_similarity'].append(semantic_score)
            evaluation_results['bert_score'].append(F1.mean().item())

            if len(examples) < 5:
                examples.append({
                    'original_text': original_text,
                    'actual_summary': actual_summary,
                    'generated_summaries': text_summaries,
                    'metrics': {
                        'content_coverage': coverage_score,
                        'consistency': consistency_score,
                        'semantic_similarity': semantic_score,
                        'bert_score': F1.mean().item()
                    }
                })

        torch.cuda.empty_cache()

    metrics = {
        'avg_content_coverage': np.mean(evaluation_results['content_coverage']),
        'avg_consistency': np.mean(evaluation_results['consistency_score']),
        'avg_semantic_similarity': np.mean(evaluation_results['semantic_similarity']),
        'avg_bert_score': np.mean(evaluation_results['bert_score'])
    }

    return metrics, examples

# Creating baseline model

In [None]:
def create_baseline_model(train_val_df, train_val_poem):
    # 1. Initialize model and tokenizer
    print("Initializing BART model and tokenizer...")
    MODEL_NAME = 'facebook/bart-base'
    tokenizer = BartTokenizer.from_pretrained(MODEL_NAME)
    model = BartForConditionalGeneration.from_pretrained(MODEL_NAME)

    # 2. Prepare lyrics data
    print("Preparing lyrics data...")
    train_val_df['Combined Annotations'] = train_val_df['Combined Annotations'].astype(str)
    train_val_df['Combined Annotations'] = train_val_df['Combined Annotations'].fillna('')

    lyrics_data = pd.DataFrame({
        'source': train_val_df.apply(
            lambda x: f"summarize lyrics and capture meaning: {x['Lyrics']}",
            axis=1
        ),
        'target': train_val_df['Combined Annotations'].apply(
            lambda x: f"Meaning and themes: {' '.join(x.split()[:100])}"
        )
    })

    # 3. Prepare poem data
    print("Preparing poetry data...")
    poem_data = pd.DataFrame({
        'source': train_val_poem.apply(
            lambda x: f"summarize poem and capture meaning: {x['ctext']}",
            axis=1
        ),
        'target': train_val_poem['text'].apply(
            lambda x: f"Meaning and themes: {x}"
        )
    })

    # 4. Combine and clean data
    print("Combining and cleaning data...")
    combined_data = pd.concat([lyrics_data, poem_data], ignore_index=True)
    combined_data = combined_data.sample(frac=1, random_state=42).reset_index(drop=True)

    # 5. Split data
    train_size = int(0.8 * len(combined_data))
    train_data = combined_data[:train_size]
    val_data = combined_data[train_size:]

    print(f"\nDataset Statistics:")
    print(f"Total samples: {len(combined_data)}")
    print(f"Training samples: {len(train_data)}")
    print(f"Validation samples: {len(val_data)}")

    # 6. Initialize LightningModule and DataModule
    lit_model = BARTLitModel(model)
    data_module = BARTDataModule(
        train_df=train_data,
        val_df=val_data,
        tokenizer=tokenizer,
        batch_size=16
    )

    # 7. Setup trainer with modified configuration
    trainer = pl.Trainer(
        max_epochs=3,
        accelerator='gpu' if torch.cuda.is_available() else 'cpu',
        devices=1,
        gradient_clip_val=1.0,
        precision=16 if torch.cuda.is_available() else 32,
        strategy='auto'
    )

    # 8. Train model
    print("Starting training...")
    trainer.fit(lit_model, data_module)

    # 9. Save model
    print("Saving model and tokenizer...")
    drive_path = '/content/drive/MyDrive/266 Final Project/Our Models/BART_All_Data'
    os.makedirs(drive_path, exist_ok=True)

    try:
        lit_model.model.save_pretrained(drive_path)
        tokenizer.save_pretrained(drive_path)
        print(f"Model and tokenizer successfully saved to {drive_path}")
    except Exception as e:
        print(f"Failed to save model and tokenizer: {e}")

    return lit_model, tokenizer, trainer

In [None]:
# Train the model
model, tokenizer, trainer = create_baseline_model(train_val_df, train_val_poem)

# Code to load the model

In [None]:
# Import necessary libraries
from transformers import BartTokenizer, BartForConditionalGeneration
import torch

# Path to your saved model
model_path = '/content/drive/MyDrive/266 Final Project/Our Models/BART_All_Data'

# Load tokenizer and model
tokenizer = BartTokenizer.from_pretrained(model_path)
model = BartForConditionalGeneration.from_pretrained(model_path)

# Move model to GPU if available
device = "cuda" if torch.cuda.is_available() else "cpu"
model = model.to(device)


In [None]:
print(type(bart_all_model))


# Example Summary Generation

In [None]:
def generate_song_summary(model, tokenizer, data, song_index, max_length=150):
    """Generate a summary for a single song using BART"""

    # Move model to correct device
    device = "cuda" if torch.cuda.is_available() else "cpu"
    model = model.to(device)

    # Training input format
    input_text = f"summarize lyrics and capture meaning: {data.iloc[song_index]['Lyrics']}"

    # Encode the text (BART-specific encoding)
    inputs = tokenizer(
        input_text,
        max_length=1024,  # BART's max input length
        truncation=True,
        padding=True,
        return_tensors="pt"
    ).to(device)  # Move inputs to same device as model

    # Generate summary
    summary_ids = model.generate(
        input_ids=inputs["input_ids"],
        attention_mask=inputs["attention_mask"],
        max_length=max_length,
        min_length=50,
        num_beams=5,
        length_penalty=0.5,
        early_stopping=True,
        no_repeat_ngram_size=2,
        do_sample=True,
        top_k=50,
        temperature=0.7
    )

    # Decode summary
    summary = tokenizer.decode(summary_ids[0], skip_special_tokens=True)
    return summary

In [None]:
# Example usage:
summary = generate_song_summary(model, tokenizer, df, 0)
print(summary)

In [None]:
print(test_df.shape)
print(test_df.columns)
print(test_df.head())

In [None]:
# Check for NaN values
print("NaN values in test_df:")
print(test_df.isna().sum())

# Check data types
print("\nData types:")
print(test_df.dtypes)

# Clean the data
test_df['Lyrics'] = test_df['Lyrics'].fillna('')
test_df['Combined Annotations'] = test_df['Combined Annotations'].fillna('')

# Convert to string type
test_df['Lyrics'] = test_df['Lyrics'].astype(str)
test_df['Combined Annotations'] = test_df['Combined Annotations'].astype(str)

# Verify no empty strings that might cause issues
print("\nNumber of empty lyrics:", len(test_df[test_df['Lyrics'] == '']))
print("Number of empty annotations:", len(test_df[test_df['Combined Annotations'] == '']))

# Model Evaluation Section

In [None]:
# Install required packages
!pip install -q bert-score rouge-score

import torch
import numpy as np
import pandas as pd
from typing import List, Dict, Tuple
from transformers import BartTokenizer, BartForConditionalGeneration
from rouge_score import rouge_scorer
from bert_score import score
from tqdm import tqdm


In [None]:
def evaluate_supervised_model(
    model: BartForConditionalGeneration,
    tokenizer: BartTokenizer,
    test_data: pd.DataFrame,
    batch_size: int = 16
) -> Tuple[Dict[str, float], List[Dict]]:
    """
    Evaluate supervised lyrics model comparing against Genius annotations using BART
    """
    model.eval()
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)

    evaluation_results = {
        'content_coverage': [],
        'consistency_score': [],
        'semantic_similarity': [],
        'rouge1_scores': [],
        'rouge2_scores': [],
        'rougeL_scores': [],
        'bert_scores': []
    }

    examples = []
    previous_bert_score = 0.0

    for idx in tqdm(range(0, len(test_data), batch_size)):
        batch_lyrics = test_data['Lyrics'].iloc[idx:idx + batch_size].tolist()
        batch_annotations = test_data['Combined Annotations'].iloc[idx:idx + batch_size].tolist()

        # Generate summaries (BART-specific encoding)
        inputs = tokenizer(
            [f"summarize lyrics and capture meaning: {lyric}" for lyric in batch_lyrics],
            padding=True,
            truncation=True,
            max_length=1024,
            return_tensors="pt"
        ).to(device)

        with torch.no_grad():
            outputs = model.generate(
                input_ids=inputs['input_ids'],
                attention_mask=inputs['attention_mask'],
                max_length=150,
                min_length=50,
                num_beams=4,
                do_sample=True,
                temperature=0.7,
                top_k=50,
                no_repeat_ngram_size=3,
                length_penalty=1.0,
                repetition_penalty=1.2
            )

            generated_summaries = tokenizer.batch_decode(outputs, skip_special_tokens=True)

        # Evaluate each summary against its reference annotation
        for i in range(len(generated_summaries)):
            original_lyric = batch_lyrics[i]
            generated_summary = generated_summaries[i]
            reference_annotation = batch_annotations[i]

            # Content Coverage (between summary and lyrics)
            coverage_score = calculate_content_coverage(original_lyric, generated_summary)
            evaluation_results['content_coverage'].append(coverage_score)

            # Semantic Similarity (between summary and lyrics)
            semantic_score = calculate_semantic_similarity(original_lyric, generated_summary)
            evaluation_results['semantic_similarity'].append(semantic_score)

            # ROUGE Scores (between generated summary and reference annotation)
            rouge_scores = calculate_rouge_scores([generated_summary, reference_annotation])
            evaluation_results['rouge1_scores'].append(rouge_scores['rouge1'])
            evaluation_results['rouge2_scores'].append(rouge_scores['rouge2'])
            evaluation_results['rougeL_scores'].append(rouge_scores['rougeL'])

            # BERTScore (between generated summary and reference annotation)
            if i % 8 == 0:  # Compute less frequently to save time
                P, R, F1 = score([generated_summary], [reference_annotation], lang='en', verbose=False)
                previous_bert_score = F1.mean().item()
            evaluation_results['bert_scores'].append(previous_bert_score)

            # Store examples
            if len(examples) < 5:
                examples.append({
                    'lyrics': original_lyric,
                    'reference_annotation': reference_annotation,
                    'generated_summary': generated_summary,
                    'metrics': {
                        'content_coverage': coverage_score,
                        'semantic_similarity': semantic_score,
                        'rouge1': rouge_scores['rouge1'],
                        'rouge2': rouge_scores['rouge2'],
                        'rougeL': rouge_scores['rougeL'],
                        'bert_score': previous_bert_score
                    }
                })

        # Memory cleanup
        if idx % 5 == 0:
            torch.cuda.empty_cache()

    # Aggregate results
    metrics = {
        'avg_content_coverage': np.mean(evaluation_results['content_coverage']),
        'avg_semantic_similarity': np.mean(evaluation_results['semantic_similarity']),
        'avg_rouge1': np.mean(evaluation_results['rouge1_scores']),
        'avg_rouge2': np.mean(evaluation_results['rouge2_scores']),
        'avg_rougeL': np.mean(evaluation_results['rougeL_scores']),
        'avg_bert_score': np.mean(evaluation_results['bert_scores'])
    }

    return metrics, examples

def calculate_rouge_scores(texts: List[str]) -> Dict[str, float]:
    """Calculate ROUGE scores between texts"""
    rouge_scorer_obj = rouge_scorer.RougeScorer(
        ['rouge1', 'rouge2', 'rougeL'],
        use_stemmer=True
    )
    score = rouge_scorer_obj.score(texts[0], texts[1])
    return {
        'rouge1': score['rouge1'].fmeasure,
        'rouge2': score['rouge2'].fmeasure,
        'rougeL': score['rougeL'].fmeasure
    }

def calculate_content_coverage(lyrics: str, summary: str) -> float:
    """Calculate content coverage between lyrics and summary"""
    if isinstance(lyrics, float) or isinstance(summary, float):
        return 0.0

    try:
        lyrics_tokens = set(str(lyrics).lower().split())
        summary_tokens = set(str(summary).lower().split())
        overlap = len(lyrics_tokens.intersection(summary_tokens))
        coverage = overlap / len(lyrics_tokens) if lyrics_tokens else 0.0
        return coverage
    except Exception as e:
        print(f"Error processing lyrics/summary: {e}")
        return 0.0

def calculate_semantic_similarity(lyrics: str, summary: str) -> float:
    """Calculate semantic similarity using token overlap"""
    if isinstance(lyrics, float) or isinstance(summary, float):
        return 0.0

    try:
        lyrics_tokens = set(str(lyrics).lower().split())
        summary_tokens = set(str(summary).lower().split())
        intersection = len(lyrics_tokens.intersection(summary_tokens))
        union = len(lyrics_tokens.union(summary_tokens))
        return intersection / union if union > 0 else 0.0
    except Exception as e:
        print(f"Error processing lyrics/summary: {e}")
        return 0.0

def print_evaluation_results(metrics: Dict[str, float], examples: List[Dict]):
    """Print evaluation results and examples"""
    print("\nEvaluation Results:")
    print(f"Average Content Coverage: {metrics['avg_content_coverage']:.3f}")
    print(f"Average Semantic Similarity: {metrics['avg_semantic_similarity']:.3f}")
    print(f"Average ROUGE-1: {metrics['avg_rouge1']:.3f}")
    print(f"Average ROUGE-2: {metrics['avg_rouge2']:.3f}")
    print(f"Average ROUGE-L: {metrics['avg_rougeL']:.3f}")
    print(f"Average BERTScore: {metrics['avg_bert_score']:.3f}")

    print("\nExample Generations:")
    for i, example in enumerate(examples, 1):
        print(f"\nExample {i}:")
        print(f"Original Lyrics (truncated): {example['lyrics'][:200]}...")
        print(f"\nReference Annotation: {example['reference_annotation']}")
        print(f"\nGenerated Summary: {example['generated_summary']}")
        print("\nMetrics:")
        for metric, value in example['metrics'].items():
            print(f"{metric}: {value:.3f}")

def run_evaluation(model_path: str, test_df: pd.DataFrame):
    """Run complete evaluation pipeline"""
    tokenizer = BartTokenizer.from_pretrained(model_path)
    model = BartForConditionalGeneration.from_pretrained(model_path)
    print("Model and tokenizer loaded successfully!")

    metrics, examples = evaluate_supervised_model(
        model,
        tokenizer,
        test_data=test_df,
        batch_size=16
    )

    print_evaluation_results(metrics, examples)
    return metrics, examples


In [None]:
metrics, examples = run_evaluation(model_path, test_df)