In [37]:
!pip install rouge_score

Collecting rouge_score
  Downloading rouge_score-0.1.2.tar.gz (17 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: rouge_score
  Building wheel for rouge_score (setup.py) ... [?25l[?25hdone
  Created wheel for rouge_score: filename=rouge_score-0.1.2-py3-none-any.whl size=24933 sha256=66de77ec9d922ac9d660fff86d79f36ab50f8ea3ce4318f4498a7fd047f68a90
  Stored in directory: /root/.cache/pip/wheels/5f/dd/89/461065a73be61a532ff8599a28e9beef17985c9e9c31e541b4
Successfully built rouge_score
Installing collected packages: rouge_score
Successfully installed rouge_score-0.1.2


In [5]:
!pip install ensure==1.0.2



In [14]:
!pip install transformers datasets transformers[sentencepiece] transformers[torch]

Collecting datasets
  Downloading datasets-2.19.2-py3-none-any.whl (542 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m542.1/542.1 kB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m16.6 MB/s[0m eta [36m0:00:00[0m
Collecting requests (from transformers)
  Downloading requests-2.32.3-py3-none-any.whl (64 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m64.9/64.9 kB[0m [31m9.4 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting xxhash (from datasets)
  Downloading xxhash-3.4.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (194 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m194.1/194.1 kB[0m [31m23.1 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting multiprocess (from datasets)
  Downloading multiprocess-0.70.16-py310-none-any.whl (134 kB)


In [9]:
!pip install boto3 mypy-boto3-s3

Collecting boto3
  Downloading boto3-1.34.122-py3-none-any.whl (139 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.3/139.3 kB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting mypy-boto3-s3
  Downloading mypy_boto3_s3-1.34.120-py3-none-any.whl (83 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m83.9/83.9 kB[0m [31m11.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting botocore<1.35.0,>=1.34.122 (from boto3)
  Downloading botocore-1.34.122-py3-none-any.whl (12.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.3/12.3 MB[0m [31m62.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting jmespath<2.0.0,>=0.7.1 (from boto3)
  Downloading jmespath-1.0.1-py3-none-any.whl (20 kB)
Collecting s3transfer<0.11.0,>=0.10.0 (from boto3)
  Downloading s3transfer-0.10.1-py3-none-any.whl (82 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m82.2/82.2 kB[0m [31m12.8 MB/s[0m eta [36m0:00:00[0m
Installing collecte

In [1]:
"""This module includes logging configurations"""

import logging
import os
from datetime import datetime

LOG_FILE = f"{datetime.now().strftime('%m_%d_%Y_%H_%M_%S')}.log"

LOG_DIR = 'logs'

logs_path = os.path.join(LOG_DIR, LOG_FILE)

os.makedirs(LOG_DIR, exist_ok=True)


logging.basicConfig(
    filename=logs_path,
    format="[ %(asctime)s ] %(name)s - %(levelname)s - %(message)s",
    level=logging.DEBUG,
)


In [2]:
"""This module includes codes that defines custom Exception"""

import types


def error_message_detail(error, error_detail: types.ModuleType):
    """This function is used to"""
    _, _, exc_tb = error_detail.exc_info()

    file_name = exc_tb.tb_frame.f_code.co_filename

    error_message = f"Error occurred python script name {file_name} \
    line number {exc_tb.tb_lineno} error message {str(error)}"

    return error_message


class TextSummarizerException(Exception):
    """This class encapsulated the method that returns error message"""

    def __init__(self, error_message, error_detail):
        """
        :param error_message: error message in string format
        """
        super().__init__(error_message)
        self.error_message = error_message_detail(
            error_message, error_detail=error_detail
        )

    def __str__(self):
        return self.error_message

In [39]:
"""This module maps the configuration for all the constants in each pipeline"""

import os
from pathlib import Path
from dataclasses import dataclass, field
from datetime import datetime
from typing import List

TIMESTAMP: str = datetime.now().strftime("%m_%d_%Y_%H_%M_%S")

ARTIFACTS_ROOT: str = os.path.join("artifacts",TIMESTAMP)

@dataclass
class TrainingArguments:
  NUM_TRAIN_EPOCHS = 1
  WARMUP_STEPS = 500
  PER_DEVICE_TRAIN_BATCH_SIZE = 1
  PER_DEVICE_EVAL_BATCH_SIZE = 1
  WEIGHT_DECAY = 0.01
  LOGGING_STEPS = 10
  EVALUATION_STRATEGY = "steps"
  EVAL_STEPS = 50
  SAVE_STEPS = 1e6
  GRADIENT_ACCUMULATION_STEPS = 16


@dataclass
class DataIngestionConstants:
  DATA_INGESTION_ROOT_DIR: str = os.path.join(ARTIFACTS_ROOT,"DataIngestionArtifacts")
  DATA_FILE_NAME: str = "data.zip"
  DATA_URL: str = "https://text-summer-bucket.s3.amazonaws.com/summarizer-data.zip"
  DOWNLOADED_DATA_FILE: str = os.path.join(DATA_INGESTION_ROOT_DIR, DATA_FILE_NAME)
  UNZIPPED_DIR: str =  DATA_INGESTION_ROOT_DIR
  DATA_BUCKET_NAME: str = "text-summarization-data-06062024"


@dataclass
class DataValidationConstants:
  DATA_VALIDATION_STATUS_FILE = "status.txt"
  ALL_REQUIRED_FILES: List[str] = field(default_factory=list)
  DATA_VALIDATION_ROOT_DIR: str = os.path.join(ARTIFACTS_ROOT,"DataValidationArtifacts")
  DATA_VALIDATION_STATUS_FILE: str = os.path.join(DATA_VALIDATION_ROOT_DIR, DATA_VALIDATION_STATUS_FILE)


@dataclass
class DataTransformationConstants:
  DATA_TRANSFORMATION_ROOT_DIR: str = os.path.join(ARTIFACTS_ROOT,"DataTransformationArtifacts")
  TRANSFORMED_DATA_PATH: str = DataIngestionConstants.DATA_INGESTION_ROOT_DIR
  TOKENIZER_NAME: str = "google/pegasus-cnn_dailymail"
  MAX_INPUT_LENGTH: int = 1024
  MAX_TARGET_LENGTH: int = 128
  PREFIX: str = "Summarize: "



@dataclass
class ModelTrainingConstants:
  MODEL_TRAINING_ROOT_DIR: str = os.path.join(ARTIFACTS_ROOT, "ModelTraining")
  MODEL_TRAINING_DATA_PATH: str = DataTransformationConstants.DATA_TRANSFORMATION_ROOT_DIR
  MODEL_CKPT: str = "google/pegasus-cnn_dailymail"
  MODEL_PATH: str = os.path.join(MODEL_TRAINING_ROOT_DIR, "model")
  TOKENIZER_PATH: str = os.path.join(MODEL_TRAINING_ROOT_DIR, "tokenizer")

@dataclass
class ModelEvaluationConstants:
  MODEL_EVALUATION_ROOT_DIR: str = os.path.join(ARTIFACTS_ROOT, "ModelEvaluation")
  DATA_PATH: str =  ModelTrainingConstants.MODEL_TRAINING_DATA_PATH
  SAVED_MODEL_PATH: str = ModelTrainingConstants.MODEL_PATH
  TOKENIZER_PATH: str =  ModelTrainingConstants.TOKENIZER_PATH
  METRIC_FILE_NAME: str = os.path.join(MODEL_EVALUATION_ROOT_DIR, "metrics.csv")



In [4]:
"""This module includes all the configurations for each stage of pipeline"""

from dataclasses import dataclass
from pathlib import Path

@dataclass(frozen=True)
class DataIngestionConfig:
    root_dir: Path
    data_url: str
    downloaded_data_file: Path
    unzipped_dir: Path
    data_bucket_name: str
    data_file_name: str


@dataclass(frozen=True)
class DataValidationConfig:
    root_dir: Path
    status_file: str
    all_required_files: list



@dataclass(frozen=True)
class DataTransformationConfig:
    root_dir: Path
    transformed_data_path: Path
    tokenizer_name: Path
    max_input_length: int
    max_target_length: int
    prefix: str



@dataclass(frozen=True)
class ModelTrainingConfig:
    root_dir: Path
    data_path: Path
    model_ckpt: str
    model_path: Path
    tokenizer_path: Path
    num_train_epochs: int
    warmup_steps: int
    per_device_train_batch_size: int
    per_device_eval_batch_size: int
    weight_decay: float
    logging_steps: int
    evaluation_strategy: str
    eval_steps: int
    save_steps: float
    gradient_accumulation_steps: int



@dataclass(frozen=True)
class ModelEvaluationConfig:
    root_dir: Path
    data_path: Path
    saved_model_path: Path
    tokenizer_path: Path
    metric_file_name: Path

In [6]:
import os
from box.exceptions import BoxValueError
import yaml
from ensure import ensure_annotations
from box import ConfigBox
from pathlib import Path
from typing import Any



@ensure_annotations
def read_yaml(path_to_yaml: Path) -> ConfigBox:
    """reads yaml file and returns

    Args:
        path_to_yaml (str): path like input

    Raises:
        ValueError: if yaml file is empty
        e: empty file

    Returns:
        ConfigBox: ConfigBox type
    """
    try:
        with open(path_to_yaml) as yaml_file:
            content = yaml.safe_load(yaml_file)
            logging.info(f"yaml file: {path_to_yaml} loaded successfully")
            return ConfigBox(content)
    except BoxValueError:
        raise ValueError("yaml file is empty")
    except Exception as error:
        logging.error(error)
        raise error



@ensure_annotations
def create_directories(list_of_directories: list, verbose=True):
    """create list of directories

    Args:
        path_to_directories (list): list of path of directories
        ignore_log (bool, optional): ignore if multiple dirs is to be created. Defaults to False.
    """
    for dir in list_of_directories:
        os.makedirs(dir, exist_ok=True)
        if verbose:
            logging.info(f"created directory at: {dir}")



@ensure_annotations
def get_size(file_path: Path) -> str:
    """get size in KB

    Args:
        path (Path): path of the file

    Returns:
        str: size in KB
    """
    size_in_kb = round(os.path.getsize(file_path)/1024)
    return f"~ {size_in_kb} KB"



In [27]:
"""
This module basically manages the configurations for each stage fo the pipeline
"""

# import TrainingArguments, DataIngestionConstants, DataTransformationConstants, DataValidationConstants, ModelTrainingConstants, ModelEvaluationConstants

# from src.text_summarization.utils.common_utils import read_yaml, create_directories

# import DataIngestionConfig, DataValidationConfig, DataTransformationConfig, ModelTrainingConfig, ModelEvaluationConfig


class ConfigurationManager:
    """This class binds the methods for all the configuration files"""
    def __init__(self):
        self.data_ingestion_const = DataIngestionConstants()
        self.data_validation_const = DataValidationConstants()
        self.data_transformation_const = DataTransformationConstants()
        self.model_training_const = ModelTrainingConstants()
        self.model_evaluation_const = ModelEvaluationConstants()


    def get_data_ingestion_config(self) -> DataIngestionConfig:
        """This method assigns the constants for Data Ingestion config"""

        create_directories([self.data_ingestion_const.DATA_INGESTION_ROOT_DIR])

        data_ingestion_config = DataIngestionConfig(
            root_dir = self.data_ingestion_const.DATA_INGESTION_ROOT_DIR,
            data_url = self.data_ingestion_const.DATA_URL,
            downloaded_data_file = self.data_ingestion_const.DOWNLOADED_DATA_FILE,
            unzipped_dir = self.data_ingestion_const.UNZIPPED_DIR,
            data_bucket_name = self.data_ingestion_const.DATA_BUCKET_NAME,
            data_file_name = self.data_ingestion_const.DATA_FILE_NAME
        )

        return data_ingestion_config


    def get_data_validation_config(self) -> DataValidationConfig:
        """This method assigns the constants for Data Validation config"""

        create_directories([self.data_validation_const.DATA_VALIDATION_ROOT_DIR])

        data_validation_config = DataValidationConfig(
            root_dir = self.data_validation_const.DATA_VALIDATION_ROOT_DIR,
            status_file = self.data_validation_const.DATA_VALIDATION_STATUS_FILE,
            all_required_files = ["samsum-train.csv", "samsum-test.csv", "samsum-validation.csv"]
        )

        return data_validation_config


    def get_data_transformation_config(self) -> DataTransformationConfig:
        """This method assigns the constants for Data Transformation config"""

        create_directories([self.data_transformation_const.DATA_TRANSFORMATION_ROOT_DIR])

        data_transformation_config = DataTransformationConfig(
            root_dir = self.data_transformation_const.DATA_TRANSFORMATION_ROOT_DIR,
            transformed_data_path = self.data_transformation_const.TRANSFORMED_DATA_PATH,
            tokenizer_name = self.data_transformation_const.TOKENIZER_NAME,
            max_input_length= self.data_transformation_const.MAX_INPUT_LENGTH,
            max_target_length= self.data_transformation_const.MAX_TARGET_LENGTH,
            prefix = self.data_transformation_const.PREFIX
        )

        return data_transformation_config



    def get_model_training_config(self) -> ModelTrainingConfig:
        """This method assigns the constants for Model Training config"""

        config = ModelTrainingConstants()
        params = TrainingArguments()
        create_directories([config.MODEL_TRAINING_ROOT_DIR])

        model_trainer_config = ModelTrainingConfig(
            root_dir = config.MODEL_TRAINING_ROOT_DIR,
            data_path=config.MODEL_TRAINING_DATA_PATH,
            model_ckpt = config.MODEL_CKPT,
            model_path = config.MODEL_PATH,
            tokenizer_path = config.TOKENIZER_PATH,
            num_train_epochs = params.NUM_TRAIN_EPOCHS,
            warmup_steps = params.WARMUP_STEPS,
            per_device_train_batch_size = params.PER_DEVICE_TRAIN_BATCH_SIZE,
            per_device_eval_batch_size = params.PER_DEVICE_EVAL_BATCH_SIZE,
            weight_decay = params.WEIGHT_DECAY,
            logging_steps = params.LOGGING_STEPS,
            evaluation_strategy = params.EVALUATION_STRATEGY,
            eval_steps = params.EVAL_STEPS,
            save_steps = params.SAVE_STEPS,
            gradient_accumulation_steps = params.GRADIENT_ACCUMULATION_STEPS
        )

        return model_trainer_config



    def get_model_evaluation_config(self) -> ModelEvaluationConfig:
        """This method assigns the constants for Model Evaluation config"""

        config = ModelEvaluationConstants()

        create_directories([config.MODEL_EVALUATION_ROOT_DIR])

        model_evaluation_config = ModelEvaluationConfig(
            root_dir=config.MODEL_EVALUATION_ROOT_DIR,
            data_path=config.DATA_PATH,
            saved_model_path = config.SAVED_MODEL_PATH,
            tokenizer_path = config.TOKENIZER_PATH,
            metric_file_name = config.METRIC_FILE_NAME

        )

        return model_evaluation_config


In [8]:
import os
import sys
from typing import List, Union
import pickle
from pandas import DataFrame, read_csv
import boto3
from botocore.exceptions import ClientError
from mypy_boto3_s3.service_resource import Bucket

class S3Operations:
    """This class encapsulates contains all the methods that will be used for S3 Operations"""

    def __init__(self):
        self.s3_client = boto3.client("s3")
        self.s3_resource = boto3.resource("s3")


    def download_object(self, file_name, bucket_name, file_path):
        """This method is used for downloading the file from S3"""
        bucket = self.s3_resource.Bucket(bucket_name)
        bucket.download_file(Key=file_name, Filename=file_path)


    def get_bucket(self, bucket_name: str) -> Bucket:
        """
        Method Name :   get_bucket
        Description :   This method gets the bucket object based on the bucket_name
        Output      :   Bucket object is returned based on the bucket name
        """
        logging.info("Inside the get_bucket method of S3Operations class")
        try:
            bucket = self.s3_resource.Bucket(bucket_name)
            logging.info("Completed execution of the get_bucket method of S3Operations class")
            return bucket

        except Exception as error:
            logging.error(error)
            raise TextSummarizerException(error, sys) from error


    def is_model_present(self, bucket_name: str, s3_model_key: str) -> bool:
        """
        Method Name :   is_model_present
        Description :   This method validates whether model is present in the s3 bucket or not.
        Output      :   True or False
        """
        try:
            bucket = self.get_bucket(bucket_name)

            return any(True for _ in bucket.objects.filter(Prefix=s3_model_key))

        except Exception as error:
            logging.error(error)
            raise TextSummarizerException(error, sys) from error


    def get_file_object(self, filename: str, bucket_name: str) -> Union[List[object], object]:
        """
        Method Name :   get_file_object
        Description :   This method gets the file object from bucket_name bucket based on filename
        Output      :   list of objects or object is returned based on filename
        """
        logging.info("Inside the get_file_object method of S3Operations class")
        try:
            bucket = self.get_bucket(bucket_name)
            # list_objects = [object for object in bucket.objects.filter(Prefix=filename)]
            object_list = list(bucket.objects.filter(Prefix=filename))
            file_objs = object_list[0] if len(object_list) == 1 else object_list
            logging.info("Completed execution the get_file_object method of S3Operations class")
            return file_objs

        except Exception as error:
            logging.error(error)
            raise TextSummarizerException(error, sys) from error


    def load_model(self, model_name: str, bucket_name: str, model_dir=None) -> object:
        """
        Method Name :   load_model
        Description :   This method loads the model_name from bucket_name bucket with kwargs
        Output      :   list of objects or object is returned based on filename
        """
        logging.info("Inside the load_model method of S3Operations class")

        try:
            if model_dir is None:
                model_file = model_name
            else:
                model_file = os.path.join(model_dir, model_name)
                # model_file = model_dir + "/" + model_name

            file_object = self.get_file_object(model_file, bucket_name)
            model_object = self.read_object(file_object, decode=False)
            model = pickle.load(model_object)
            logging.info("Completed execution of load_model method of S3Operations class")
            return model

        except Exception as error:
            logging.error(error)
            raise TextSummarizerException(error, sys) from error


    def create_folder(self, folder_name: str, bucket_name: str) -> None:
        """
        Method Name :   create_folder
        Description :   This method creates a folder_name folder in bucket_name bucket
        Output      :   Folder is created in s3 bucket
        """
        logging.info("Inside the create_folder method of S3Operations class")

        try:
            self.s3_resource.Object(bucket_name, folder_name).load()

        except ClientError as error:
            if error.response["Error"]["Code"] == "404":
                folder_obj = folder_name + "/"
                self.s3_client.put_object(Bucket=bucket_name, Key=folder_obj)
            else:
                pass
            logging.info("Completed execution of the create_folder method of S3Operations class")


    def upload_file(self, local_file_path: str, file_name: str, bucket_name: str,
                    remove: bool = True) -> None:
        """
        Method Name :   upload_file
        Description :   This method uploads the from_filename file to bucket_name bucket with
                        to_filename as bucket filename
        Output      :   Folder is created in s3 bucket
        """
        logging.info("Inside the upload_file method of S3Operations class")
        try:
            logging.info(
                f"Uploading {local_file_path} file to {file_name} file in {bucket_name} bucket"
            )

            self.s3_resource.Bucket(bucket_name).upload_file(local_file_path, file_name)

            logging.info(
                f"Uploaded {local_file_path} file to {file_name} file in {bucket_name} bucket"
            )

            if remove is True:
                os.remove(local_file_path)
                logging.info(f"Remove is set to {remove}, deleted the file")
            else:
                logging.info(f"Remove is set to {remove}, not deleted the file")
            logging.info("Completed execution of the upload_file method of S3Operations class")

        except Exception as error:
            logging.error(error)
            raise TextSummarizerException(error, sys) from error


    def upload_folder(self, folder_name: str, bucket_name: str) -> None:
        """
        Method Name :   upload_file
        Description :   This method uploads the from_filename file to bucket_name bucket with
                        to_filename as bucket filename
        Output      :   Folder is created in s3 bucket
        """
        logging.info("Inside the upload_folder method of S3Operations class")
        try:
            folder_list = os.listdir(folder_name)
            for file in folder_list:
                local_file_path = os.path.join(folder_name, file)
                file_name = file
                self.upload_file(local_file_path, file_name, bucket_name, remove=False)
            logging.info("Completed execution of the upload_folder method of S3Operations class")

        except Exception as error:
            logging.error(error)
            raise TextSummarizerException(error, sys) from error


    def upload_df_as_csv(self,
                         data_frame: DataFrame,
                         local_file_path: str,
                         bucket_file_name: str,
                         bucket_name: str) -> None:
        """
        Method Name :   upload_df_as_csv
        Description :   This method uploads the dataframe to bucket_filename csv file
                        in bucket_name bucket
        Output      :   Folder is created in s3 bucket
        """
        logging.info("Inside the upload_df_as_csv method of S3Operations class")
        try:
            data_frame.to_csv(local_file_path, index=None, header=True)
            self.upload_file(local_file_path, bucket_file_name, bucket_name)
            logging.info("Completed execution of the upload_df_as_csv method of S3Operations class")

        except Exception as error:
            logging.error(error)
            raise TextSummarizerException(error, sys) from error


    def get_df_from_object(self, object_: object) -> DataFrame:
        """
        Method Name :   get_df_from_object
        Description :   This method gets the dataframe from the object_name object
        Output      :   Folder is created in s3 bucket
        """
        logging.info("Inside the get_df_from_object method of S3Operations class")

        try:
            content = self.read_object(object_)
            read_csv_df = read_csv(content, na_values="na")
            logging.info("Completed execution of the get_df_from_object method of S3Operations class")
            return read_csv_df

        except Exception as error:
            logging.error(error)
            raise TextSummarizerException(error, sys) from error


    def read_csv(self, file_name: str, bucket_name: str) -> DataFrame:
        """
        Method Name :   get_df_from_object
        Description :   This method gets the dataframe from the object_name object
        Output      :   Folder is created in s3 bucket
        """
        logging.info("Inside the read_csv method of S3Operations class")
        try:
            csv_obj = self.get_file_object(file_name, bucket_name)
            read_csv_df = self.get_df_from_object(csv_obj)
            logging.info("Completed execution of the read_csv method of S3Operations class")
            return read_csv_df

        except Exception as error:
            logging.error(error)
            raise TextSummarizerException(error, sys) from error


In [22]:
import os, sys
import urllib.request as request
import zipfile
from pathlib import Path


class DataIngestion:
    def __init__(self, config: DataIngestionConfig):
        self.config = config
        self.s3_storage = S3Operations()


    def get_file_from_url(self):
        """This method is used to download the file from URL"""
        if not os.path.exists(self.config.downloaded_data_file):
            filename, headers = request.urlretrieve(
                url = self.config.data_url,
                filename = self.config.downloaded_data_file
            )
            logging.info(f"{filename} download! with following info: \n{headers}")
        else:
            logging.info(f"File already exists of size: {get_size(Path(self.config.downloaded_data_file))}")

        return True


    def extract_zip_file(self):
        """
        zip_file_path: str
        Extracts the zip file into the data directory
        Function returns None
        """
        unzip_path = self.config.unzipped_dir
        os.makedirs(unzip_path, exist_ok=True)
        with zipfile.ZipFile(self.config.downloaded_data_file, 'r') as zip_ref:
            zip_ref.extractall(unzip_path)


    def download_data_from_s3(self) -> str:
        """This method is used to download the data from s3"""
        try:
            zip_download_dir = self.config.root_dir
            os.makedirs(zip_download_dir, exist_ok=True)

            logging.info(f"Downloading data from s3 into file {zip_download_dir}")

            self.config.downloaded_data_file

            self.s3_storage.download_object(
                file_name = self.config.data_file_name,
                bucket_name = self.config.data_bucket_name,
                file_path = self.config.downloaded_data_file,
            )

            if os.path.exists(self.config.downloaded_data_file):
                logging.info(f"Downloaded data from s3 into file {self.config.downloaded_data_file}")
                return True
            else:
                return False

        except Exception as error:
            logging.exception(error)
            raise TextSummarizerException(error, sys) from error


In [10]:
import os


class DataValiadtion:
    def __init__(self, config: DataValidationConfig):
        self.config = config


    def validate_all_files_exist(self)-> bool:
        try:
            validation_status = None

            all_files = os.listdir(DataIngestionConstants.UNZIPPED_DIR)

            for file in all_files:
                if file not in self.config.all_required_files:
                    validation_status = False
                    with open(self.config.status_file, 'w') as f:
                        f.write(f"Validation status: {validation_status}")
                else:
                    validation_status = True
                    with open(self.config.status_file, 'w') as f:
                        f.write(f"Validation status: {validation_status}")

            return validation_status

        except Exception as e:
            raise e


In [11]:
import os
from transformers import AutoTokenizer
from datasets import load_dataset, load_from_disk


class DataTransformation:
    def __init__(self, config: DataTransformationConfig):
        self.config = config
        self.tokenizer = AutoTokenizer.from_pretrained(config.tokenizer_name)


    def convert_examples_to_features(self, example_batch):
        input_encodings = self.tokenizer(example_batch['dialogue'] ,
                                         max_length = 1024,
                                         truncation = True
                                        )

        with self.tokenizer.as_target_tokenizer():
            target_encodings = self.tokenizer(example_batch['summary'],
                                              max_length = 128,
                                              truncation = True
                                            )

        return {
            'input_ids' : input_encodings['input_ids'],
            'attention_mask': input_encodings['attention_mask'],
            'labels': target_encodings['input_ids']
        }


    def load_samsum_dataset(self):
        data_files = {
            "train": os.path.join(self.config.transformed_data_path,"samsum-train.csv"),
            "validation": os.path.join(self.config.transformed_data_path ,"samsum-validation.csv"),
            "test": os.path.join(self.config.transformed_data_path ,"samsum-test.csv")
            }
        logging.info(f"Data files that will be loaded - {data_files}")

        dataset = load_dataset("csv", data_files= data_files)
        logging.info(f" After loading csv in dataset format- {dataset}")

        # Logging the sizes of the datasets
        logging.info(f"Train dataset size: {len(dataset['train'])}")
        logging.info(f"Text dataset size: {len(dataset['test'])}")
        logging.info(f"Validation dataset size: {len(dataset['validation'])}")

        return dataset


    def preprocess_dataset(self, examples):
        prefix = "Summarize: "
        inputs = [prefix + doc for doc in examples["dialogue"]]

        model_inputs = self.tokenizer(inputs,
                                      max_length= self.config.max_input_length,
                                      truncation=True)

        # Setup the tokenizer for targets
        labels = self.tokenizer(text_target = examples["summary"],
                                max_length = self.config.max_target_length,
                                truncation=True
                                )

        model_inputs["labels"] = labels["input_ids"]

        return model_inputs

    def convert(self):
        # dataset_samsum = load_from_disk(self.config.transformed_data_path)
        dataset_samsum = self.load_samsum_dataset()

        logging.info(f"Logging to asses the dataset - {dataset_samsum}")

        dataset_samsum_pt = dataset_samsum.map(self.preprocess_dataset, batched=True)

        logging.info(f"Logging to asses the dataset after pre-processing - {dataset_samsum_pt}")

        logging.info(f"Model Inputs for training datasets \n {dataset_samsum_pt['train'][:2]}")
        logging.info(f"Model Inputs for test datasets \n {dataset_samsum_pt['test'][:2]}")
        logging.info(f"Model Inputs for validation datasets \n {dataset_samsum_pt['validation'][:2]}")

        dataset_samsum_pt.save_to_disk(self.config.root_dir)



In [12]:
"""This module is used for training model on custom data"""
from transformers import DataCollatorForSeq2Seq
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer
from transformers import Seq2SeqTrainingArguments, Seq2SeqTrainer
from datasets import load_dataset, load_from_disk
import torch
import os
# from accelerate import Accelerator
# accelerator = Accelerator()


class ModelTraining:
    """This class encapsulates the method for training the models on custom data"""
    def __init__(self, config: ModelTrainingConfig):
        self.config = config


    def train(self):
        """This method is used for training the models on custom data"""
        logging.info("Inside ModelTraining.train of model_trainer module")

        logging.info(f"Validating cuda availability - {torch.cuda.is_available()}")
        if torch.cuda.is_available():
            device = "cuda"
        else:
            device = "cpu"

        logging.info(f"Device is set to - {device}")

        logging.info("Setting tokenizer, Model and data collator")

        tokenizer = AutoTokenizer.from_pretrained(self.config.model_ckpt)
        logging.info("Completed setting Tokenizer")

        model = AutoModelForSeq2SeqLM.from_pretrained(self.config.model_ckpt)
        logging.info("Completed setting Model")

        seq2seq_data_collator = DataCollatorForSeq2Seq(tokenizer, model = model)
        logging.info("Completed setting data collator")

        logging.info(f"Loading data from disk {self.config.data_path}")
        dataset_pt = load_from_disk(self.config.data_path)

        logging.info(dataset_pt)

        logging.info(f"Setting TrainingArguments for model training ")
        trainer_args = Seq2SeqTrainingArguments(
            output_dir = self.config.root_dir,
            num_train_epochs = self.config.num_train_epochs,
            warmup_steps = self.config.warmup_steps,
            per_device_train_batch_size = self.config.per_device_train_batch_size,
            per_device_eval_batch_size = self.config.per_device_train_batch_size,
            weight_decay = self.config.weight_decay,
            logging_steps = self.config.logging_steps,
            evaluation_strategy = self.config.evaluation_strategy,
            eval_steps = self.config.eval_steps,
            save_steps = self.config.save_steps,
            gradient_accumulation_steps = self.config.gradient_accumulation_steps
        )

        logging.info(f"Setting Model Parameters for Trainer ")
        trainer = Seq2SeqTrainer(
            model = model,
            args=trainer_args,
            tokenizer = tokenizer,
            data_collator = seq2seq_data_collator,
            train_dataset = dataset_pt["train"],
            eval_dataset = dataset_pt["validation"]
            )

        logging.info(f"Model Training Started....")
        trainer.train()
        logging.info(f"Model Training completed.")

        logging.info(f"Saving Trained Model - {self.config.root_dir}")
        model.save_pretrained(os.path.join(self.config.root_dir,"model"))

        logging.info(f"Saving tokenizer - {self.config.root_dir}")
        tokenizer.save_pretrained(os.path.join(self.config.root_dir,"tokenizer"))


In [25]:
"""This module is used for Evaluating the models after training"""
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer
from datasets import load_dataset, load_from_disk, load_metric
import torch
import pandas as pd
from tqdm import tqdm


class ModelEvaluation:
    def __init__(self, config: ModelEvaluationConfig):
        self.config = config


    def generate_batch_sized_chunks(self, list_of_elements, batch_size):
        """split the dataset into smaller batches that we can process simultaneously
        Yield successive batch-sized chunks from list_of_elements."""
        for i in range(0, len(list_of_elements), batch_size):
            yield list_of_elements[i : i + batch_size]


    def calculate_metric_on_test_ds(self,dataset, metric, model, tokenizer,
                               batch_size=16,
                               device="cuda" if torch.cuda.is_available() else "cpu",
                               column_text="article",
                               column_summary="highlights"):

        article_batches = list(self.generate_batch_sized_chunks(dataset[column_text], batch_size))
        target_batches = list(self.generate_batch_sized_chunks(dataset[column_summary], batch_size))

        for article_batch, target_batch in tqdm(
            zip(article_batches, target_batches), total=len(article_batches)):

            inputs = tokenizer(article_batch, max_length=1024,  truncation=True,
                            padding="max_length", return_tensors="pt")

            summaries = model.generate(input_ids=inputs["input_ids"].to(device),
                            attention_mask=inputs["attention_mask"].to(device),
                            length_penalty=0.8, num_beams=8, max_length=128)
            ''' parameter for length penalty ensures that the model does not generate sequences
            that are too long. '''

            # Finally, we decode the generated texts,
            # replace the  token, and add the decoded texts with the references to the metric.
            decoded_summaries = [tokenizer.decode(s, skip_special_tokens=True,
                                    clean_up_tokenization_spaces=True)
                for s in summaries]

            decoded_summaries = [d.replace("", " ") for d in decoded_summaries]


            metric.add_batch(predictions=decoded_summaries, references=target_batch)

        #  Finally compute and return the ROUGE scores.
        score = metric.compute()
        return score


    def evaluate(self):
        device = "cuda" if torch.cuda.is_available() else "cpu"
        tokenizer = AutoTokenizer.from_pretrained(self.config.tokenizer_path)
        model_pegasus = AutoModelForSeq2SeqLM.from_pretrained(self.config.saved_model_path).to(device)

        #loading data
        dataset_pt = load_from_disk(self.config.data_path)


        rouge_names = ["rouge1", "rouge2", "rougeL", "rougeLsum"]

        rouge_metric = load_metric('rouge')

        score = self.calculate_metric_on_test_ds(
        dataset_pt['test'][0:10], rouge_metric, model_pegasus, tokenizer, batch_size = 2, column_text = 'dialogue', column_summary= 'summary'
            )

        rouge_dict = dict((rn, score[rn].mid.fmeasure ) for rn in rouge_names )

        df = pd.DataFrame(rouge_dict, index = ['pegasus'] )
        df.to_csv(self.config.metric_file_name, index=False)




In [14]:
class DataIngestionPipeline:
    """This class contains the methods that triggers the Data Ingestion Pipeline"""
    def __init__(self):
        pass

    def main(self):
        """This method triggers the Data Ingestion Pipeline"""
        logging.info("Inside DataIngestionPipeline.main of model_training_pipeline module")
        config = ConfigurationManager()
        data_ingestion_config = config.get_data_ingestion_config()
        data_ingestion = DataIngestion(config = data_ingestion_config)
        if data_ingestion.get_file_from_url():
            logging.info('Calling download_data_from_s3...')
            if data_ingestion.download_data_from_s3():
                logging.info('Calling extract_zip_file...')
                data_ingestion.extract_zip_file()
                logging.info("Completed execution of DataIngestionPipeline.main of model_training_pipeline module")
            else:
                logging.exception('Failed to download files from S3')
        else:
            logging.exception('Failed to get files from Open Source')


class DataValidationPipeline:
    """This class contains the methods that triggers the Data Validation Pipeline"""
    def __init__(self):
        pass

    def main(self):
        """This method triggers the Data Validation Pipeline"""
        logging.info("Inside DataValidationPipeline.main of model_training_pipeline module")
        config = ConfigurationManager()
        data_validation_config = config.get_data_validation_config()
        data_validation = DataValiadtion(config = data_validation_config)
        data_validation.validate_all_files_exist()
        logging.info("Completed execution of DataValidationPipeline.main of model_training_pipeline module")



class DataTransformationPipeline:
    """This class contains the methods that triggers the Data Transformation Pipeline"""
    def __init__(self):
        pass

    def main(self):
        """This method triggers the Data Transformation Pipeline"""
        logging.info("Inside DataTransformationPipeline.main of model_training_pipeline module")
        config = ConfigurationManager()
        data_transformation_config = config.get_data_transformation_config()
        data_transformation = DataTransformation(config=data_transformation_config)
        data_transformation.convert()
        logging.info("Completed execution of DataTransformationPipeline.main of model_training_pipeline module")


class ModelTrainingPipeline:
    """This class contains the methods that triggers the Model Training Pipeline"""
    def __init__(self):
        pass

    def main(self):
        """This method triggers the Model Training Pipeline"""
        logging.info("Inside ModelTrainingPipeline.main of model_training_pipeline module")
        config = ConfigurationManager()
        model_trainer_config = config.get_model_training_config()
        model_trainer_config = ModelTraining(config = model_trainer_config)
        model_trainer_config.train()
        logging.info("Completed execution of ModelTrainingPipeline.main of model_training_pipeline module")


class ModelEvaluationPipeline:
    """This class contains the methods that triggers the Model Evaluation Pipeline"""
    def __init__(self):
        pass

    def main(self):
        """This method triggers the Model Evaluation Pipeline"""
        logging.info("Inside ModelEvaluationPipeline.main of model_training_pipeline module")
        config = ConfigurationManager()
        model_evaluation_config = config.get_model_evaluation_config()
        model_evaluation_config = ModelEvaluation(config = model_evaluation_config)
        model_evaluation_config.evaluate()
        logging.info("Completed execution of ModelEvaluationPipeline.main of model_training_pipeline module")


In [15]:
from transformers import AutoTokenizer
from transformers import pipeline


class PredictionPipeline:
    def __init__(self):
        self.config = ConfigurationManager().get_model_evaluation_config()


    def predict(self,text):
        tokenizer = AutoTokenizer.from_pretrained(self.config.tokenizer_path)
        gen_kwargs = {"length_penalty": 0.8, "num_beams":8, "max_length": 128}

        pipe = pipeline("summarization", model=self.config.model_path,tokenizer=tokenizer)

        print("Dialogue:")
        print(text)

        output = pipe(text, **gen_kwargs)[0]["summary_text"]
        print("\nModel Summary:")
        print(output)

        return output

In [20]:
from google.colab import userdata
import os

os.environ['AWS_ACCESS_KEY_ID'] = userdata.get('AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY'] = userdata.get('AWS_SECRET_ACCESS_KEY')
os.environ['AWS_DEFAULT_REGION'] = 'us-east-1'

In [23]:
STAGE_NAME = "Data Ingestion stage"
try:
    logging.info(f">>>>>> stage {STAGE_NAME} started <<<<<<")
    data_ingestion = DataIngestionPipeline()
    data_ingestion.main()
    logging.info(f">>>>>> stage {STAGE_NAME} completed <<<<<<\n\nx==========x")
except Exception as e:
    logging.exception(e)
    raise e



STAGE_NAME = "Data Validation stage"
try:
    logging.info(f">>>>>> stage {STAGE_NAME} started <<<<<<")
    data_validation = DataValidationPipeline()
    data_validation.main()
    logging.info(f">>>>>> stage {STAGE_NAME} completed <<<<<<\n\nx==========x")
except Exception as e:
    logging.exception(e)
    raise e



STAGE_NAME = "Data Transformation stage"
try:
    logging.info(f">>>>>> stage {STAGE_NAME} started <<<<<<")
    data_transformation = DataTransformationPipeline()
    data_transformation.main()
    logging.info(f">>>>>> stage {STAGE_NAME} completed <<<<<<\n\nx==========x")
except Exception as e:
    logging.exception(e)
    raise e



STAGE_NAME = "Model Training stage"
try:
    logging.info(f"*******************")
    logging.info(f">>>>>> stage {STAGE_NAME} started <<<<<<")
    model_trainer = ModelTrainingPipeline()
    model_trainer.main()
    logging.info(f">>>>>> stage {STAGE_NAME} completed <<<<<<\n\nx==========x")
except Exception as e:
    logging.exception(e)
    raise e

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/88.0 [00:00<?, ?B/s]



config.json:   0%|          | 0.00/1.12k [00:00<?, ?B/s]

spiece.model:   0%|          | 0.00/1.91M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/65.0 [00:00<?, ?B/s]

Generating train split: 0 examples [00:00, ? examples/s]

Generating validation split: 0 examples [00:00, ? examples/s]

Generating test split: 0 examples [00:00, ? examples/s]

Map:   0%|          | 0/14731 [00:00<?, ? examples/s]

Map:   0%|          | 0/818 [00:00<?, ? examples/s]

Map:   0%|          | 0/819 [00:00<?, ? examples/s]

Saving the dataset (0/1 shards):   0%|          | 0/14731 [00:00<?, ? examples/s]

Saving the dataset (0/1 shards):   0%|          | 0/818 [00:00<?, ? examples/s]

Saving the dataset (0/1 shards):   0%|          | 0/819 [00:00<?, ? examples/s]

pytorch_model.bin:   0%|          | 0.00/2.28G [00:00<?, ?B/s]

Some weights of PegasusForConditionalGeneration were not initialized from the model checkpoint at google/pegasus-cnn_dailymail and are newly initialized: ['model.decoder.embed_positions.weight', 'model.encoder.embed_positions.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


generation_config.json:   0%|          | 0.00/280 [00:00<?, ?B/s]



Step,Training Loss,Validation Loss
50,2.7333,2.331003
100,2.4147,1.978132
150,2.0831,1.77172
200,1.9369,1.669919
250,1.9378,1.613858
300,1.839,1.572148
350,1.6543,1.541396
400,1.6725,1.51459
450,1.6759,1.494883
500,1.6081,1.478753


Non-default generation parameters: {'max_length': 128, 'min_length': 32, 'num_beams': 8, 'length_penalty': 0.8, 'forced_eos_token_id': 1}


In [38]:
STAGE_NAME = "Model Evaluation stage"
try:
    logging.info(f"*******************")
    logging.info(f">>>>>> stage {STAGE_NAME} started <<<<<<")
    model_evaluation = ModelEvaluationPipeline()
    model_evaluation.main()
    logging.info(f">>>>>> stage {STAGE_NAME} completed <<<<<<\n\nx==========x")
except Exception as e:
    logging.exception(e)
    raise e

You can avoid this message in future by passing the argument `trust_remote_code=True`.
Passing `trust_remote_code=True` will be mandatory to load this metric from the next major release of `datasets`.
100%|██████████| 5/5 [00:15<00:00,  3.03s/it]


In [40]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [41]:
%pwd

'/content'

In [43]:
%ls -ltr /content/artifacts/06_10_2024_18_29_01/ModelTraining/model/

total 2230144
-rw-r--r-- 1 root root       1316 Jun 10 19:39 config.json
-rw-r--r-- 1 root root        275 Jun 10 19:39 generation_config.json
-rw-r--r-- 1 root root 2283652852 Jun 10 19:39 model.safetensors


In [54]:
%cp /content/artifacts/06_10_2024_18_29_01/ModelTraining/model/model.safetensors /content/drive/MyDrive/ColabNotebooks/Projects-NLP/Text-Summarizer-using-K8s/