In [None]:
import os
import sys
import zipfile
from dataclasses import dataclass
from pathlib import Path
from urllib import request

In [None]:
# !ls

### Configuration dataclass

In [None]:
@dataclass(frozen=True)
class DataIngestionConfig:
    root_dir: Path
    source_URL: str
    local_data_path: Path
    unzip_dir: Path

@dataclass(frozen=True)
class DataValidationConfig:
    root_dir: Path
    STATUS_FILE: Path
    ALL_REQUIRED_FILES: list

@dataclass(frozen=True)
class DataTransformationConfig:
    root_dir: Path
    data_path: Path
    tokenizer_name: str

@dataclass(frozen=True)
class ModelTrainerConfig:
    root_dir: Path
    data_path: Path
    model_ckpt: Path
    num_train_epochs: int
    warmup_steps: int
    per_device_train_batch_size: int
    weight_decay: float
    logging_steps: int
    evaluation_strategy: str
    eval_steps: int
    save_steps: float
    gradient_accumulation_steps: int

@dataclass(frozen=True)
class ModelEvaluationConfig:
    root_dir: Path
    data_path: Path
    model_path: Path
    tokenizer_path: Path
    metric_file_name: Path

In [None]:
from src.constants import CONFIG_FILE_PATH, PARAMS_FILE_PATH
from src.utils import read_yaml, get_size
from src.logger import logging
from src.exception import CustomException

In [None]:
h=read_yaml(Path(f'../{CONFIG_FILE_PATH}'))
h.data_ingestion.root_dir

## Configuration Manager

In [None]:
class ConfigurationManager:
    def __init__(self, config_file_path=Path(f'../{CONFIG_FILE_PATH}'),
    param_file_path=Path(f'../{PARAMS_FILE_PATH}')) -> None:
        self.config = read_yaml(config_file_path)
        self.params = read_yaml(param_file_path)

        os.makedirs(self.config.artifacts_root, exist_ok=True)
    
    def get_data_ingestion_config(self) -> DataIngestionConfig:
        config = self.config.data_ingestion
        os.makedirs(config.root_dir, exist_ok=True)

        data_ingestion_config = DataIngestionConfig(
            root_dir=config.root_dir,
            source_URL=config.source_URL,
            local_data_path=config.local_data_path,
            unzip_dir=config.unzip_dir 
        )
        return data_ingestion_config
    
    def get_data_validation_config(self) -> DataValidationConfig:
        config = self.config.data_validation
        os.makedirs(config.root_dir, exist_ok=True)

        data_validation_config = DataValidationConfig(
            root_dir=config.root_dir,
            STATUS_FILE=config.STATUS_FILE,
            ALL_REQUIRED_FILES=config.ALL_REQUIRED_FILES 
        )
        return data_validation_config
    
    def get_data_transformation_config(self) -> DataTransformationConfig:
        config = self.config.data_transformation
        os.makedirs(config.root_dir, exist_ok=True)

        data_transformation_config = DataTransformationConfig(
            root_dir=config.root_dir,
            data_path=config.data_path,
            tokenizer_name=config.tokenizer_name
        )
        return data_transformation_config
    
    def get_model_trainer_config(self):
        config = self.config.model_trainer
        params = self.params.TrainingArguments
        os.makedirs(config.root_dir, exist_ok=True)

        model_trainer_config = ModelTrainerConfig(
            root_dir=config.root_dir,
            data_path=config.data_path,
            model_ckpt = config.model_ckpt,
            num_train_epochs = params.num_train_epochs,
            warmup_steps = params.warmup_steps,
            per_device_train_batch_size = params.per_device_train_batch_size,
            weight_decay = params.weight_decay,
            logging_steps = params.logging_steps,
            evaluation_strategy = params.evaluation_strategy,
            eval_steps = params.evaluation_strategy,
            save_steps = params.save_steps,
            gradient_accumulation_steps = params.gradient_accumulation_steps
        )
        return model_trainer_config
    
    def get_model_evaluation_config(self):
        config = self.config.model_evaluation
        os.makedirs(config.root_dir, exist_ok=True)

        model_evaluation_config = ModelEvaluationConfig(
            root_dir=config.root_dir,
            data_path=config.data_path,
            model_path = config.model_path,
            tokenizer_path = config.tokenizer_path,
            metric_file_name = config.metric_file_name
           
        )
        return model_evaluation_config

In [None]:
class DataIngestion:
    def __init__(self, config: DataIngestionConfig):
        self.config = config

    def download_dataset(self):
        '''Download dataset if not eexists loacal disk'''
        if not os.path.exists(self.config.local_data_path):
            file, header = request.urlretrieve(url=self.config.source_URL,
                                               filename=self.config.local_data_path)
            logging.info(f"{file} successfully downloaded! Info: \n{header}")
        else:
            logging.info(f"File already exists of size: \
                         {get_size(Path(self.config.local_data_path))} KB")

    def extract_zip_file(self, unzip_path=None):
        """Extract zip file to specified path

        Args:
            unzip_path: str, optional
                The path to extract the zip file to. If not provided,\
                    it defaults to `self.config.unzip_dir`.
        """
        if unzip_path is None:
            unzip_path = self.config.unzip_dir
        try:
            os.makedirs(unzip_path, exist_ok=True)
            with zipfile.ZipFile(self.config.local_data_path, 'r') as zip_fp:
                zip_fp.extractall(unzip_path)
                logging.info("Dataset successfully extracted.")
        except Exception as err:
            logging.warning(f"Error while extracting file path: {unzip_path}")
            logging.error(f"Error explaination: {err}")
            raise CustomException(err, sys) from err


### Data Validation

In [None]:
class DataValidation:
    def __init__(self, config: DataValidationConfig) -> None:
        self.config = config

    def validate_files(self):
        try:
            validation_status = False
            all_files = os.listdir(os.path.join("artifacts", "data_ingestion", "samsum_dataset"))
            for file in all_files:
                if file not in self.config.ALL_REQUIRED_FILES:
                    validation_status = False
                else:
                    validation_status = True
                with open(self.config.STATUS_FILE, 'a', encoding='utf-8') as file_fp:
                    file_fp.write(f"Validation status of {file}: {validation_status}")
                logging.info(f"{file} Files validation Success. \
                    VALIDATION_STATUS: {validation_status}")
            return validation_status

        except Exception as err:
            logging.warning(f"Cannot validate due to error: \n{err}")
            raise CustomException(err, sys) from err


In [None]:
from transformers import AutoTokenizer
from datasets import load_dataset, load_from_disk

In [None]:
class DataTransformation:
    def __init__(self, config: DataTransformationConfig):
        self.config = config
        self.tokenizer = AutoTokenizer.from_pretrained(config.tokenizer_name, use_fast=False)

    def convert_to_features(self, batch_data, tokenizer=None):
        try:
            if tokenizer is None:
                tokenizer = self.tokenizer
            # Tokenize the dialogue in the batch using the tokenizer
            input_encodings = tokenizer(batch_data['dialogue'], max_length=1024, truncation=True)

            # Tokenize the summary in the batch using the tokenizer as the target tokenizer
            target_encodings = tokenizer(batch_data['summary'],
                                         max_length=128, truncation=True,
                                         text_target=batch_data['summary'])
        except Exception as err:
            raise CustomException(err, sys) from err

        # Return the converted features as a dictionary
        return {
            'input_ids': input_encodings['input_ids'],         # Input token IDs
            'attention_mask': input_encodings['attention_mask'],  # Attention mask
            'labels': target_encodings['input_ids']  # Target token IDs for the model's training
        }

    def get_tokenizer(self):
        return self.tokenizer

    def convert(self):
        try:
            dataset = load_from_disk(self.config.data_path)
            dataset_batch_path = os.path.join(self.config.root_dir, "samsum_dataset")
            if os.path.exists(dataset_batch_path):
                logging.warning(f"Batched dataset alreay exist at path: {dataset_batch_path}")
                return dataset_batch_path
            dataset_in_batch = dataset.map(self.convert_to_features, batched=True)
            dataset_in_batch.save_to_disk(dataset_batch_path)
            logging.info(f"Converting dataset in batch sucessful \
                         and saved at: {dataset_batch_path}")
        except Exception as err:
            raise CustomException(err, sys) from err
        return dataset_batch_path


In [None]:
from transformers import TrainingArguments, Trainer
from transformers import DataCollatorForSeq2Seq
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer
from datasets import load_dataset, load_from_disk
import torch

import pandas as pd
import time

In [None]:
class ModelTrainer:
    def __init__(self, config: ModelTrainerConfig):
        '''
        Initialize the ModelTrainer class.

        Args:
            config (ModelTrainerConfig): An instance of ModelTrainerConfig\
                  containing the configuration parameters for the model training.

        Returns:
            None
        '''
        self.config = config

    def train(self, tokenizer=None, exists_ok=False):
        '''
        Train the model using the specified tokenizer and configuration.

        Args:
            tokenizer (AutoTokenizer, optional): An tokenizer for tokenizing the input data.\
                If not provided, a default tokenizer will be used based on the model checkpoint.
            exists_ok (bool, optional): If True, training will be skipped\
                if the model already exists in the specified output directory. \
                    `Default is False`.

        Returns:
            Tuple (str, str): A tuple containing the paths to the saved model and tokenizer.

        Raises:
            CustomException: If any error occurs during the training process, \
                it will be captured and raised as a CustomException.
        '''
        try:
            device = "cuda" if torch.cuda.is_available() else "cpu"
            logging.info(f">>>>>>>> Model trainer initiated using '{device.upper()}' <<<<<<<<<<<")
            prev_time = time.time()

            if tokenizer is None:
                tokenizer = AutoTokenizer.from_pretrained(self.config.model_ckpt)
            model_save_path = os.path.join(self.config.root_dir, "pegasus-samsum-model")

            if os.path.exists(model_save_path) and exists_ok:
                logging.warning(f'Model already exist in path: {model_save_path}')
                tokenizer_save_path = os.path.join(self.config.root_dir, "tokenizer")
                tokenizer.save_pretrained(tokenizer_save_path)
                return model_save_path, tokenizer_save_path
            model_pegasus = AutoModelForSeq2SeqLM.from_pretrained(self.config.model_ckpt).to(device)
            seq2seq_data_collator = DataCollatorForSeq2Seq(tokenizer, model=model_pegasus)

            logging.info(f"Download the pretrained model: '{self.config.model_ckpt}' completed")
            # loading data
            dataset_samsum_pt = load_from_disk(self.config.data_path)

            trainer_args = TrainingArguments(
                output_dir=self.config.root_dir,
                num_train_epochs=self.config.num_train_epochs,
                warmup_steps=self.config.warmup_steps,
                per_device_train_batch_size=self.config.per_device_train_batch_size,
                per_device_eval_batch_size=self.config.per_device_train_batch_size,
                weight_decay=self.config.weight_decay,
                logging_steps=self.config.logging_steps,
                evaluation_strategy=self.config.evaluation_strategy,
                eval_steps=self.config.eval_steps,
                save_steps=1e6,
                gradient_accumulation_steps=self.config.gradient_accumulation_steps
            )

            trainer = Trainer(model=model_pegasus,
                            args=trainer_args,
                            tokenizer=tokenizer,
                            data_collator=seq2seq_data_collator,
                            train_dataset=dataset_samsum_pt["train"],
                            eval_dataset=dataset_samsum_pt["validation"])
            logging.info("Model training started......")
            trainer.train()
            current_time = time.time()

            logging.info(f"Model trained Successfully in {current_time - prev_time:.2f} sec")

            ## Save model
            model_pegasus.save_pretrained(model_save_path)

            ## Save tokenizer
            tokenizer_save_path = os.path.join(self.config.root_dir, "tokenizer")
            tokenizer.save_pretrained(tokenizer_save_path)

        except Exception as err:
            raise CustomException(err, sys) from err
        return model_save_path, tokenizer_save_path


In [None]:

from tqdm.auto import tqdm
from datasets import load_from_disk, load_metric

### Model Evaluation

In [None]:
class ModelEvaluation:
    def __init__(self, config: ModelEvaluationConfig):
        self.config = config

    def generate_batch_sized_chunks(self, list_of_elements, batch_size: int):
        '''
        Generate batch-sized chunks from a given list of elements.

        Args:
            list_of_elements (List): The input list of elements.
            batch_size (int): The desired batch size for chunking.

        Yields:
            List: A batch-sized chunk of elements from the input list.

        Example:
            >>> elements = [1, 2, 3, 4, 5, 6, 7, 8, 9]
            >>> batch_size = 3
            >>> for batch in generate_batch_sized_chunks(elements, batch_size):
            >>>     print(batch)
            [1, 2, 3]
            [4, 5, 6]
            [7, 8, 9]
        '''
        for index in range(0, len(list_of_elements), batch_size):
            yield list_of_elements[index: index + batch_size]

    def calculate_metric(self, dataset,
                        metric, model, tokenizer,
                        device, batch_size=16,
                        column_text='articles', column_summary='highlights'):
        try:
            article_batches = list(self.generate_batch_sized_chunks(
                dataset[column_text], batch_size)
                )
            target_batches= list(self.generate_batch_sized_chunks(
                dataset[column_summary], batch_size)
                )

            for article_batch, target_batch in tqdm(zip(article_batches, target_batches)):

                inputs = tokenizer(article_batch, max_length=1024,  truncation=True,
                                padding="max_length", return_tensors="pt")

                summaries = model.generate(input_ids=inputs["input_ids"].to(device),
                                            attention_mask=inputs["attention_mask"].to(device),
                                            length_penalty=0.8,
                                            num_beams=8,
                                            max_length=128)

                # Finally, we decode the generated texts,
                # replace the  token, and add the decoded texts with the references to the metric.
                decoded_summaries = [tokenizer.decode(summary, skip_special_tokens=True,
                                                    clean_up_tokenization_spaces=True)
                                    for summary in summaries]

                decoded_summaries = [summary.replace("", " ") for summary in decoded_summaries]

                metric.add_batch(predictions=decoded_summaries, references=target_batch)

            #  Finally compute and return the ROUGE scores.
            score = metric.compute()
            return score
        except Exception as err:
            raise CustomException(err, sys) from err

    def evaluate(self, device=None):
        logging.info('Model evaluation initiated')
        if device not in ["cpu", "cuda"] or device=='cuda':
            device = "cuda" if torch.cuda.is_available() else "cpu"
        tokenizer = AutoTokenizer.from_pretrained(self.config.tokenizer_path, use_fast=False)
        model_pegasus = AutoModelForSeq2SeqLM.from_pretrained(self.config.model_path).to(device)

        #loading data
        dataset_loader = load_from_disk(self.config.data_path)
        rouge_names = ["rouge1", "rouge2", "rougeL", "rougeLsum"]

        rouge_metric = load_metric('rouge')

        score = self.calculate_metric(
        dataset=dataset_loader['test'][0:10], metric=rouge_metric,
        model=model_pegasus, tokenizer=tokenizer,
        batch_size=2, device=device,
        column_text='dialogue', column_summary='summary'
            )

        rouge_dict = dict((rn, score[rn].mid.fmeasure ) for rn in rouge_names )
        metric_df = pd.DataFrame(rouge_dict, index=['pegasus'] )
        metric_df.to_csv(self.config.metric_file_name, index=False)
        return metric_df


In [None]:
!set PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python

### Main running code

In [None]:

try:
    config = ConfigurationManager()
    data_ingestion_config = config.get_data_ingestion_config()
    data_ingestion_config = config.get_data_ingestion_config()
    data_ingestion = DataIngestion(config=data_ingestion_config)
    data_ingestion.download_dataset()
    data_ingestion.extract_zip_file()

    data_validation_config = config.get_data_validation_config()
    data_validation = DataValidation(config=data_validation_config)
    data_validation.validate_files()

    data_transformation_config = config.get_data_transformation_config()
    data_transformation = DataTransformation(config=data_transformation_config)
    data_transformation.convert()
    tokenizer = data_transformation.get_tokenizer()

    model_trainer_config = config.get_model_trainer_config()
    model_trainer_config = ModelTrainer(config=model_trainer_config)
    model_trainer_config.train(tokenizer=tokenizer)

    model_evaluation_config = config.get_model_evaluation_config()
    model_evaluation_config = ModelEvaluation(config=model_evaluation_config)
    model_evaluation_config.evaluate()
except Exception as err:
    raise CustomException(err, sys) from err
