In [1]:
%pwd

'/Users/stephane/Documents/Professionnel/Formation/Certificats/Datascientest/1 - Cours/MLOps/02 - MasterClass1 - OverViewMLOPS/MLOps_Wines_Model/notebooks'

In [2]:
import os
os.chdir('..')
%pwd

'/Users/stephane/Documents/Professionnel/Formation/Certificats/Datascientest/1 - Cours/MLOps/02 - MasterClass1 - OverViewMLOPS/MLOps_Wines_Model'

In [3]:
from src.common_utils import read_yaml, create_directories

In [4]:
from src.config import CONFIG_FILE_PATH, PARAMS_FILE_PATH, SCHEMA_FILE_PATH

### Step 1: Defining configuration classes for each stage

In [5]:
from dataclasses import dataclass
from pathlib import Path

In [6]:
@dataclass(frozen=True)
class DataIngestionConfig:
    root_dir: Path
    source_url: str
    local_data_file: Path
    unzip_dir: Path

@dataclass(frozen=True)
class DataValidationConfig:
    root_dir: Path
    STATUS_FILE: str
    unzip_data_dir: Path
    all_schema: dict

@dataclass(frozen=True)
class DataTransformationConfig:
    root_dir: Path
    data_path: Path

@dataclass(frozen=True)
class ModelTrainerConfig:
    root_dir: Path
    X_train_path: Path
    y_train_path: Path
    X_test_path: Path
    y_test_path: Path
    model_name: str
    alpha: float
    l1_ratio: float

@dataclass(frozen=True)
class ModelEvaluationConfig:
    root_dir: Path
    X_test_path: Path
    y_test_path: Path
    model_path: Path
    metric_file_name: Path
    all_params: dict
    metric_file_name: Path
    mlflow_uri: str

### Step 2: Creation of a configuration manager which will create the configuration objects of each class for each step

In [None]:
class ConfigurationManager:
    def __init__(
        self,
        config_filepath = CONFIG_FILE_PATH,
        params_filepath = PARAMS_FILE_PATH,
            schema_filepath = SCHEMA_FILE_PATH):

            self.config = read_yaml(config_filepath)
            self.params = read_yaml(params_filepath)
            self.schema = read_yaml(schema_filepath)

            
    def get_data_ingestion_config(self) -> DataIngestionConfig:
          config = self.config.data_ingestion

          create_directories([config.root_dir])

          data_ingestion_config = DataIngestionConfig(
                root_dir= config.root_dir,
                source_url=config.source_URL,
                local_data_file=config.local_data_file,
                unzip_dir=config.unzip_dir
          )

          return data_ingestion_config
    
    def get_data_validation_config(self) -> DataValidationConfig:
        config = self.config.data_validation
        schema = self.schema.COLUMNS

        create_directories([config.root_dir])

        data_validation_config = DataValidationConfig(
            root_dir = config.root_dir,
            STATUS_FILE = config.STATUS_FILE,
            unzip_data_dir = config.unzip_dir,
            all_schema = schema,
        )

        return data_validation_config
    
    def get_data_transformation_config(self) -> DataTransformationConfig:
          config = self.config.data_transformation

          create_directories([config.root_dir])

          data_transformation_config = DataTransformationConfig(
                root_dir = config.root_dir,
                data_path =  config.data_path,
          )

          return data_transformation_config
    
    def get_model_trainer_config(self) -> ModelTrainerConfig:
          config = self.config.model_trainer
          params = self.params.ElasticNet
          
          create_directories([config.root_dir])

          model_trainer_config = ModelTrainerConfig(
                root_dir = config.root_dir,
                X_train_path = config.X_train_path,
                y_train_path = config.y_train_path,
                X_test_path = config.X_test_path,
                y_test_path = config.y_test_path,
                model_name = config.model_name,
                alpha = params.alpha,
                l1_ratio = params.l1_ratio
          )

          return model_trainer_config
    
    def get_model_evaluation_config(self) -> ModelEvaluationConfig:
          config = self.config.model_evaluation
          params = self.params.ElasticNet

          create_directories([config.root_dir])
          
          model_evaluation_config = ModelEvaluationConfig(
                root_dir=config.root_dir,
                X_test_path = config.X_test_path,
                y_test_path = config.y_test_path,
                model_path=config.model_path,
                metric_file_name=config.metric_file_name,
                all_params=params,
                mlflow_uri="https://dagshub.com/stephaneMartinez/MLOps_Wines_Model.s3", # make sure to update this information
          )

          return model_evaluation_config

### Step 3: Creation of each module for each step using their configuration class to instantiate them.

#### Data Ingestion step

In [8]:
import urllib.request as request
import os
from custom_logger import logger
import zipfile

class DataIngestion:
    def __init__(self, config: DataIngestionConfig):
        self.config = config
        
    def download_file(self):
        if not os.path.exists(self.config.local_data_file):
            filename, headers = request.urlretrieve(
                url = self.config.source_url,
                filename = self.config.local_data_file
            )
            logger.info(f"{filename} download! With following info: \n{headers}")
        
        else:
            logger.info(f"File already exists.")

    def extract_zip_file(self):
        """
        zip_file_path: str
        Extracts the zip file into the data directory
        Function returns None
        """

        unzip_path = self.config.unzip_dir
        os.makedirs(unzip_path, exist_ok=True)
        with zipfile.ZipFile(self.config.local_data_file, 'r') as zip_ref:
            zip_ref.extractall(unzip_path)

#### Data Validation step

In [9]:
import pandas as pd
from src.config_manager import DataValidationConfig

class DataValidation:
    def __init__(self, config: DataValidationConfig):
        self.config = config

    def validate_all_columns(self) -> bool:
        try:
            validation_status = None
            data = pd.read_csv(self.config.unzip_data_dir)
            all_cols = list(data.columns)

            all_schema = self.config.all_schema.keys()

            for col in all_cols:
                if col not in all_schema:
                    validation_status = False
                    with open(self.config.STATUS_FILE, 'w') as f:
                        f.write(f"Validation status: {validation_status}")
                else:
                    validation_status =  True
                    with open(self.config.STATUS_FILE, 'w') as f:
                        f.write(f"Validation status: {validation_status}")
            return validation_status
        except Exception as e:
            raise e

#### Data Transformation step

In [10]:
from sklearn.model_selection import train_test_split
from custom_logger import logger
from src.entity import DataTransformationConfig

class DataTransformation:
    def __init__(self, config: DataTransformationConfig):
        self.config = config

    def train_test_splitting(self):
        data = pd.read_csv(self.config.data_path)

        X = data.drop(columns=["quality"])
        y = data["quality"]

        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

        X_train.to_csv(os.path.join(self.config.root_dir, "X_train.csv"), index = False)
        y_train.to_csv(os.path.join(self.config.root_dir, "y_train.csv"), index = False)
        X_test.to_csv(os.path.join(self.config.root_dir, "X_test.csv"), index = False)
        y_test.to_csv(os.path.join(self.config.root_dir, "y_test.csv"), index = False)

        logger.info("Splitted data into training and test sets")
        logger.info(f"X_train shape: {X_train.shape}, y_train shape: {y_train.shape}")
        logger.info(f"X_test shape: {X_test.shape}, y_test shape: {y_test.shape}")

        print(X_train.shape, y_train.shape)
        print(X_test.shape, y_test.shape)

#### Model Trainer step

In [11]:
import joblib
from sklearn.linear_model import ElasticNet
from src.entity import ModelTrainerConfig

class ModelTrainer:
    def __init__(self, config: ModelTrainerConfig):
        self.config = config

    def train(self):
        X_train = pd.read_csv(self.config.X_train_path)
        y_train = pd.read_csv(self.config.y_train_path)

        lr = ElasticNet(alpha = self.config.alpha, l1_ratio = self.config.l1_ratio, random_state=42)
        lr.fit(X_train, y_train)

        joblib.dump(lr, os.path.join(self.config.root_dir, self.config.model_name))

#### Model Evaluation step

In [12]:
import numpy as np
import mlflow
import mlflow.sklearn
import dagshub
import joblib
from urllib.parse import urlparse
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from src.entity import ModelEvaluationConfig
from src.common_utils import save_json

dagshub.init(repo_owner='your_username', repo_name='your_repo', mlflow=True)

class ModelEvaluation:
    def __init__(self, config: ModelEvaluationConfig):
        self.config = config

    def eval_metrics(self, actual, pred):
        rmse = np.sqrt(mean_squared_error(actual, pred))
        mae = mean_absolute_error(actual, pred)
        r2 = r2_score(actual, pred)
        return rmse, mae, r2
    
    def log_into_mlflow(self):
        X_test = pd.read_csv(self.config.X_test_path)
        y_test = pd.read_csv(self.config.y_test_path)
        model = joblib.load(self.config.model_path)

        mlflow.set_registry_uri(self.config.mlflow_uri)
        tracking_url_type_store = urlparse(mlflow.get_tracking_uri()).scheme

        with mlflow.start_run():
            predicted_qualities = model.predict(X_test)

            (rmse, mae, r2) = self.eval_metrics(y_test, predicted_qualities)

            # Saving metrics as local
            scores = {"rmse": rmse, "mae": mae, "r2": r2}
            save_json(path=Path(self.config.metric_file_name), data=scores)

            mlflow.log_params(self.config.all_params)

            mlflow.log_metric("rmse", rmse)
            mlflow.log_metric("mae", mae)
            mlflow.log_metric("r2", r2)

            # Model registry does not work with file store
            if tracking_url_type_store != "file":

                # Register the model
                # There are other ways to use the Model Registry, which depends on the use case.

                mlflow.sklearn.log_model(model, "model", registered_model_name="ElasticnetModel")

            else:
                mlflow.sklearn.log_model(model, "model") 

### Step 4: Pipeline steps to instantiate the classes and call each of the processes

#### Data Ingestion step

In [13]:
config = ConfigurationManager()
data_ingestion_config = config.get_data_ingestion_config()
data_ingestion = DataIngestion(config = data_ingestion_config)
data_ingestion.download_file()
data_ingestion.extract_zip_file()

#### Data Validation step

In [14]:
config = ConfigurationManager()
data_validation_config = config.get_data_validation_config()
data_validation = DataValidation(config=data_validation_config)
data_validation.validate_all_columns()

True

#### Data Transformation step

In [15]:
try:
    with open(Path("data/status.txt"), 'r') as f:
        status = f.read().split(" ")[-1]
            
    if status == "True":
        config = ConfigurationManager()
        data_transformation_config = config.get_data_transformation_config()
        data_transformation = DataTransformation(config = data_transformation_config)
        data_transformation.train_test_splitting()
    else:
        raise Exception("Your data schema is not valid")
        
except Exception as e:
    print(e)

(1279, 11) (1279,)
(320, 11) (320,)


#### Model trainer step

In [16]:
config = ConfigurationManager()
model_trainer_config = config.get_model_trainer_config()
model_trainer = ModelTrainer(config= model_trainer_config)
model_trainer.train()

#### Model evaluation step

In [None]:
config = ConfigurationManager()
model_evaluation_config = config.get_model_evaluation_config()
model_evaluation = ModelEvaluation(config = model_evaluation_config)
model_evaluation.log_into_mlflow()