<a href="https://colab.research.google.com/github/kumar045/Assignment/blob/main/Readability.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:

import pandas as pd
import torch
from datasets import Dataset
from transformers import AutoTokenizer, AutoModelForCausalLM, TrainingArguments, Trainer, DataCollatorForLanguageModeling
from peft import get_peft_model, LoraConfig, TaskType
from sklearn.model_selection import train_test_split
from openai import OpenAI
import textstat
import backoff
import os
import logging
from tqdm import tqdm

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Initialize OpenAI client
client = OpenAI()

# Load and combine datasets
def load_and_combine_datasets():
    # Load CLEAR dataset
    clear_df = pd.read_csv("path_to_your_CLEAR_dataset.csv")
    clear_df['dataset'] = 'CLEAR'
    clear_df['language'] = 'en'

    # Load ratings dataset
    ratings_df = pd.read_csv("ratings.csv", encoding="iso-8859-1")
    ratings_df.rename(lambda x: str(x).lower(), axis="columns", inplace=True)
    ratings_df['dataset'] = 'ratings'
    ratings_df['language'] = 'de'
    ratings_df.rename(columns={'sentence': 'Excerpt'}, inplace=True)

    # Combine datasets
    combined_df = pd.concat([clear_df, ratings_df], ignore_index=True)
    return combined_df

@backoff.on_exception(backoff.expo, OpenAI.error.RateLimitError)
def estimate_bt_easiness(original_text):
    prompt = f"""Estimate the BT_easiness score for the following text. BT_easiness is a measure of text readability,
    where higher scores indicate easier-to-read text. The score typically ranges from 0 to 100.

    Text:
    {original_text}

    Based on the complexity, vocabulary, and structure of the text, estimate the BT_easiness score.
    Provide only the numeric score without any explanation."""

    response = client.chat.completions.create(
        model="gpt-4-1106-preview",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.3,
    )

    return float(response.choices[0].message.content.strip())

# Bilingual readability metrics
def calculate_readability_scores(text, language, row=None):
    if language == 'en':
        bt_easiness = estimate_bt_easiness(text)
        return {
            "BT_easiness": bt_easiness,
            "Flesch-Reading-Ease": textstat.flesch_reading_ease(text),
            "SMOG Readability": textstat.smog_index(text),
            "Automated Readability Index": textstat.automated_readability_index(text)
        }
    elif language == 'de':
        return {
            "mos_complexity": row['mos_complexity'] if row is not None else 0,
            "votes_complexity": row['votes_complexity'] if row is not None else 0,
            "votes_understandability": row['votes_understandability'] if row is not None else 0,
            "vote_lexical_difficulty": row['vote_lexical_difficulty'] if row is not None else 0
        }

@backoff.on_exception(backoff.expo, OpenAI.error.RateLimitError)
def modify_text_gpt4(text, current_scores, language):
    if language == 'en':
        prompt = f"""Improve the readability of the following text. Current metrics:
        - BT_easiness: {current_scores['BT_easiness']:.2f}
        - Flesch-Reading-Ease: {current_scores['Flesch-Reading-Ease']:.2f}
        - SMOG Readability: {current_scores['SMOG Readability']:.2f}
        - Automated Readability Index: {current_scores['Automated Readability Index']:.2f}

    Original text:
    {text}

    Rewrite the text to improve all readability metrics. Aim for:
        - Higher BT_easiness
        - Higher Flesch-Reading-Ease
        - Lower SMOG Readability
        - Lower Automated Readability Index

    Improved text:"""
    elif language == 'de':
        prompt = f"""Verbessere die Lesbarkeit des folgenden Textes. Aktuelle Metriken:
        - MOS Komplexität: {current_scores['mos_complexity']:.2f}
        - Komplexitätsbewertungen: {current_scores['votes_complexity']}
        - Verständlichkeitsbewertungen: {current_scores['votes_understandability']}
        - Lexikalische Schwierigkeit: {current_scores['vote_lexical_difficulty']}

    Originaltext:
    {text}

    Schreibe den Text um, um alle Lesbarkeitsmetriken zu verbessern. Ziele:
        - Niedrigere MOS Komplexität
        - Weniger Komplexitätsbewertungen
        - Mehr Verständlichkeitsbewertungen
        - Niedrigere lexikalische Schwierigkeit

    Verbesserter Text:"""

    response = client.chat.completions.create(
        model="gpt-4-1106-preview",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.7,
    )
    return response.choices[0].message.content

@backoff.on_exception(backoff.expo, OpenAI.error.RateLimitError)
def estimate_german_metrics(original_text, modified_text, original_scores):
    prompt = f"""Given the original German text and its improved version, estimate the new readability metrics.
    The original metrics were:
    - MOS Komplexität: {original_scores['mos_complexity']:.2f}
    - Komplexitätsbewertungen: {original_scores['votes_complexity']}
    - Verständlichkeitsbewertungen: {original_scores['votes_understandability']}
    - Lexikalische Schwierigkeit: {original_scores['vote_lexical_difficulty']}

    Original text:
    {original_text}

    Improved text:
    {modified_text}

    Estimate the new metrics. They should show improvement over the original scores.
    Provide only the numeric scores in the following format:
    MOS Komplexität: [value]
    Komplexitätsbewertungen: [value]
    Verständlichkeitsbewertungen: [value]
    Lexikalische Schwierigkeit: [value]"""

    response = client.chat.completions.create(
        model="gpt-4-1106-preview",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.3,
    )

    # Parse the response
    lines = response.choices[0].message.content.strip().split('\n')
    new_scores = {}
    for line in lines:
        key, value = line.split(':')
        new_scores[key.strip()] = float(value.strip())

    return {
        "mos_complexity": new_scores["MOS Komplexität"],
        "votes_complexity": int(new_scores["Komplexitätsbewertungen"]),
        "votes_understandability": int(new_scores["Verständlichkeitsbewertungen"]),
        "vote_lexical_difficulty": int(new_scores["Lexikalische Schwierigkeit"])
    }

# Data preparation function
def prepare_data(df, sample_size=1000):
    df_subset = df.sample(sample_size, random_state=42).reset_index(drop=True)
    prepared_data = []

    for _, row in tqdm(df_subset.iterrows(), total=len(df_subset), desc="Preparing data"):
        original_scores = calculate_readability_scores(row['Excerpt'], row['language'], row)

        gpt4_modified_text = modify_text_gpt4(row['Excerpt'], original_scores, row['language'])
        if row['language'] == 'en':
            gpt4_scores = calculate_readability_scores(gpt4_modified_text, row['language'])
        else:
            gpt4_scores = estimate_german_metrics(row['Excerpt'], gpt4_modified_text, original_scores)

        prepared_data.append({
            "original_text": row['Excerpt'],
            "original_scores": original_scores,
            "gpt4_text": gpt4_modified_text,
            "gpt4_scores": gpt4_scores,
            "language": row['language']
        })

    return prepared_data

# Llama 2 model setup and training functions
def setup_llama2_model(model_name):
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    tokenizer.pad_token = tokenizer.eos_token

    model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.float16)

    peft_config = LoraConfig(
        task_type=TaskType.CAUSAL_LM,
        r=16,
        lora_alpha=32,
        lora_dropout=0.05,
        bias="none",
        target_modules=["q_proj", "v_proj"]
    )

    model = get_peft_model(model, peft_config)
    return model, tokenizer

def train_llama2(model, tokenizer, train_data, output_dir, num_epochs=3):
    train_dataset = Dataset.from_pandas(pd.DataFrame(train_data))

    def tokenize_function(examples):
        return tokenizer(examples["text"], truncation=True, padding="max_length", max_length=2048)

    tokenized_dataset = train_dataset.map(tokenize_function, batched=True, remove_columns=train_dataset.column_names)

    training_args = TrainingArguments(
        output_dir=output_dir,
        num_train_epochs=num_epochs,
        per_device_train_batch_size=1,
        gradient_accumulation_steps=8,
        warmup_steps=500,
        weight_decay=0.01,
        logging_dir=f"{output_dir}/logs",
        logging_steps=100,
        save_steps=1000,
        fp16=True,
    )

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=tokenized_dataset,
        data_collator=DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False),
    )

    trainer.train()
    return model

# Evaluation and feedback functions
def evaluate_improvement(original_text, gpt4_text, llama2_text, original_scores, gpt4_scores, language):
    if language == 'en':
        llama2_scores = calculate_readability_scores(llama2_text, language)
    else:
        llama2_scores = estimate_german_metrics(original_text, llama2_text, original_scores)

    improvements = {}
    metrics = list(original_scores.keys())
    for metric in metrics:
        original_val = original_scores[metric]
        gpt4_val = gpt4_scores[metric]
        llama2_val = llama2_scores[metric]

        if language == 'en' and metric in ['BT_easiness', 'Flesch-Reading-Ease']:
            gpt4_improvement = (gpt4_val - original_val) / original_val * 100
            llama2_improvement = (llama2_val - original_val) / original_val * 100
        elif language == 'de' and metric in ['votes_understandability']:
            gpt4_improvement = (gpt4_val - original_val) / original_val * 100
            llama2_improvement = (llama2_val - original_val) / original_val * 100
        else:
            gpt4_improvement = (original_val - gpt4_val) / original_val * 100
            llama2_improvement = (original_val - llama2_val) / original_val * 100

        improvements[metric] = {
            'gpt4': gpt4_improvement,
            'llama2': llama2_improvement
        }

    return llama2_scores, improvements

def generate_feedback(improvements, language):
    feedback = "Based on the improvements:\n" if language == 'en' else "Basierend auf den Verbesserungen:\n"
    for metric, values in improvements.items():
        if values['llama2'] >= values['gpt4']:
            feedback += f"- {metric}: {'Great job! You\'ve matched or exceeded GPT-4\'s improvement.' if language == 'en' else 'Großartig! Sie haben die Verbesserung von GPT-4 erreicht oder übertroffen.'}\n"
        else:
            feedback += f"- {metric}: {'There\'s room for improvement. Try to match GPT-4\'s performance.' if language == 'en' else 'Es gibt Raum für Verbesserungen. Versuchen Sie, die Leistung von GPT-4 zu erreichen.'}\n"
    return feedback

@backoff.on_exception(backoff.expo, OpenAI.error.RateLimitError)
def get_gpt4_analysis(original_text, gpt4_text, llama2_text, improvements, language):
    prompt = f"""Analyze the readability improvements made by Llama 2 compared to GPT-4 for the following {'English' if language == 'en' else 'German'} text.

Original text:
{original_text}

GPT-4 improved version:
{gpt4_text}

Llama 2 improved version:
{llama2_text}

Improvement percentages:
{improvements}

Provide a detailed analysis of Llama 2's performance:
1. What did Llama 2 do well in improving readability?
2. Where did Llama 2 fall short compared to GPT-4?
3. Suggest specific strategies for Llama 2 to improve its performance.
"""

    response = client.chat.completions.create(
        model="gpt-4-1106-preview",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.7,
    )
    return response.choices[0].message.content


# Continuing from the feedback_loop_training function
def feedback_loop_training(model, tokenizer, dataset, num_iterations=5):
    for iteration in range(num_iterations):
        logging.info(f"Starting iteration {iteration + 1}")

        new_training_data = []
        for item in tqdm(dataset, desc=f"Processing iteration {iteration + 1}"):
            llama2_text = improve_readability(item['original_text'], item['original_scores'],
                                              item['gpt4_text'], item['gpt4_scores'], model, tokenizer, item['language'])

            llama2_scores, improvements = evaluate_improvement(item['original_text'], item['gpt4_text'],
                                                               llama2_text, item['original_scores'], item['gpt4_scores'], item['language'])

            feedback = generate_feedback(improvements, item['language'])
            gpt4_analysis = get_gpt4_analysis(item['original_text'], item['gpt4_text'], llama2_text, improvements, item['language'])

            new_example = f"""{'Original text' if item['language'] == 'en' else 'Originaltext'}: {item['original_text']}

{'GPT-4 improved version' if item['language'] == 'en' else 'Von GPT-4 verbesserte Version'}: {item['gpt4_text']}

{'Your previous improvement' if item['language'] == 'en' else 'Ihre vorherige Verbesserung'}: {llama2_text}

{'Feedback' if item['language'] == 'en' else 'Rückmeldung'}: {feedback}

{'Expert analysis' if item['language'] == 'en' else 'Expertenanalyse'}: {gpt4_analysis}

{'Now, provide an improved version addressing the feedback and analysis' if item['language'] == 'en' else 'Geben Sie nun eine verbesserte Version an, die auf das Feedback und die Analyse eingeht'}:

{'Improved text' if item['language'] == 'en' else 'Verbesserter Text'}:"""

            new_training_data.append({"text": new_example})

        # Train on new data
        output_dir = f"./llama2_readability_improvement_iteration_{iteration + 1}"
        model = train_llama2(model, tokenizer, new_training_data, output_dir, num_epochs=1)

        logging.info(f"Completed iteration {iteration + 1}")

    return model

# Utility function for using the trained model
def improve_readability(text, original_scores, gpt4_text, gpt4_scores, model, tokenizer, language):
    if language == 'en':
        prompt = f"""Improve the readability of the following text.

Original text:
{text}

Original text metrics:
    - BT_easiness: {original_scores['BT_easiness']:.2f}
    - Flesch-Reading-Ease: {original_scores['Flesch-Reading-Ease']:.2f}
    - SMOG Readability: {original_scores['SMOG Readability']:.2f}
    - Automated Readability Index: {original_scores['Automated Readability Index']:.2f}

GPT-4 improved version:
{gpt4_text}

GPT-4 improved version metrics:
    - BT_easiness: {gpt4_scores['BT_easiness']:.2f}
    - Flesch-Reading-Ease: {gpt4_scores['Flesch-Reading-Ease']:.2f}
    - SMOG Readability: {gpt4_scores['SMOG Readability']:.2f}
    - Automated Readability Index: {gpt4_scores['Automated Readability Index']:.2f}

Now, provide your own improved version of the original text, aiming to match or exceed the GPT-4 version's readability scores:

Improved text:"""
    else:
        prompt = f"""Verbessere die Lesbarkeit des folgenden Textes.

Originaltext:
{text}

Metriken des Originaltextes:
    - MOS Komplexität: {original_scores['mos_complexity']:.2f}
    - Komplexitätsbewertungen: {original_scores['votes_complexity']}
    - Verständlichkeitsbewertungen: {original_scores['votes_understandability']}
    - Lexikalische Schwierigkeit: {original_scores['vote_lexical_difficulty']}

Von GPT-4 verbesserte Version:
{gpt4_text}

Metriken der GPT-4 verbesserten Version:
    - MOS Komplexität: {gpt4_scores['mos_complexity']:.2f}
    - Komplexitätsbewertungen: {gpt4_scores['votes_complexity']}
    - Verständlichkeitsbewertungen: {gpt4_scores['votes_understandability']}
    - Lexikalische Schwierigkeit: {gpt4_scores['vote_lexical_difficulty']}

Erstelle nun deine eigene verbesserte Version des Originaltextes und versuche, die Lesbarkeitsmetriken der GPT-4-Version zu erreichen oder zu übertreffen:

Verbesserter Text:"""

    inputs = tokenizer(prompt, return_tensors="pt", max_length=2048, truncation=True)
    outputs = model.generate(**inputs, max_new_tokens=500, temperature=0.7, top_p=0.9)
    return tokenizer.decode(outputs[0], skip_special_tokens=True)

# Main execution
def main():
    # Load and combine datasets
    combined_df = load_and_combine_datasets()

    # Split the combined dataset into train and test sets
    train_df, test_df = train_test_split(combined_df, test_size=0.2, stratify=combined_df['language'], random_state=42)

    # Prepare initial data
    initial_data = prepare_data(train_df, sample_size=1000)

    # Setup Llama 2 model
    model_name = "meta-llama/Llama-2-7b-hf"  # Ensure you have access to this model
    model, tokenizer = setup_llama2_model(model_name)

    # Initial training
    initial_training_data = [{"text": f"""{'Improve the readability of the following text' if item['language'] == 'en' else 'Verbessere die Lesbarkeit des folgenden Textes'}:

{'Original text' if item['language'] == 'en' else 'Originaltext'}:
{item['original_text']}

{'Improved text' if item['language'] == 'en' else 'Verbesserter Text'}:"""} for item in initial_data]

    model = train_llama2(model, tokenizer, initial_training_data, "./llama2_initial_training_bilingual")

    # Feedback loop training
    final_model = feedback_loop_training(model, tokenizer, initial_data, num_iterations=5)

    # Save the final model
    final_model.save_pretrained("./llama2_readability_improvement_final_with_feedback_bilingual")
    tokenizer.save_pretrained("./llama2_readability_improvement_final_with_feedback_bilingual")

    logging.info("Training completed. Final model saved.")

    # Example usage of the trained model
    test_texts = [
        {"text": "The mitochondria is the powerhouse of the cell, responsible for producing energy through a process called cellular respiration.", "language": "en"},
        {"text": "Die Mitochondrien sind die Kraftwerke der Zelle und produzieren Energie durch einen Prozess, der als zelluläre Atmung bezeichnet wird.", "language": "de"}
    ]

    for test_item in test_texts:
        original_scores = calculate_readability_scores(test_item['text'], test_item['language'])
        gpt4_text = modify_text_gpt4(test_item['text'], original_scores, test_item['language'])
        gpt4_scores = calculate_readability_scores(gpt4_text, test_item['language']) if test_item['language'] == 'en' else estimate_german_metrics(test_item['text'], gpt4_text, original_scores)

        simplified_text = improve_readability(test_item['text'], original_scores, gpt4_text, gpt4_scores, final_model, tokenizer, test_item['language'])

        print(f"Original ({test_item['language']}): {test_item['text']}")
        print(f"Simplified: {simplified_text}")
        print("---")

if __name__ == "__main__":
    main()