In [1]:
import os
import sys
import logging
import json
import joblib
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import glob
import zipfile
import shutil
import joblib as jb
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass
from typing import Any, NamedTuple
from collections import Counter,namedtuple
import os
from kfp import dsl
from kfp.dsl import (
    Input,
    Output,
    Artifact,
    Model,
    Metrics,
    component
)
from box.exceptions import BoxValueError
import yaml
import json
import joblib
from ensure import ensure_annotations
from box import ConfigBox
from pathlib import Path
from typing import Any
# ML Libraries
from sklearn.model_selection import train_test_split, RandomizedSearchCV
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import RobustScaler
from sklearn.metrics import (
    roc_auc_score, precision_score, recall_score, f1_score,
    confusion_matrix, classification_report, roc_curve, auc,
    precision_recall_curve, average_precision_score, matthews_corrcoef
)
from imblearn.combine import SMOTEENN

try:
    from google.cloud import storage
    from google.cloud.storage import transfer_manager
    CLOUD_AVAILABLE = True
except ImportError:
    CLOUD_AVAILABLE = False
    print("Google Cloud Storage not available - install with: pip install google-cloud-storage")

# ================================================================
# LOGGING SETUP
# ================================================================
import os
import sys
import logging
logging.basicConfig(level=logging.INFO, format='[%(asctime)s]: %(message)s:')
logger = logging.getLogger(__name__)
logs_dir = "logs"
log_filepath = os.path.join(logs_dir, "running_logs.log")
os.makedirs(logs_dir, exist_ok=True)

logging.basicConfig(
    filename=log_filepath,
    format="[%(asctime)s]: %(levelname)s: %(message)s",
    level=logging.INFO,
    handlers=[
        logging.FileHandler(log_filepath),
        logging.StreamHandler(sys.stdout)
    ]
)
logger = logging.getLogger("MLopsLogger")

In [2]:
@ensure_annotations
def read_yaml(path_to_yaml: Path) -> ConfigBox:
    """reads yaml file and returns

    Args:
        path_to_yaml (str): path like input

    Raises:
        ValueError: if yaml file is empty
        e: empty file

    Returns:
        ConfigBox: ConfigBox type
    """
    try:
        with open(path_to_yaml) as yaml_file:
            content = yaml.safe_load(yaml_file)
            logger.info(f"yaml file: {path_to_yaml} loaded successfully")
            return ConfigBox(content)
    except BoxValueError:
        raise ValueError("yaml file is empty")
    except Exception as e:
        raise e
    


@ensure_annotations
def create_directories(path_to_directories: list, verbose=True):
    """create list of directories

    Args:
        path_to_directories (list): list of path of directories
        ignore_log (bool, optional): ignore if multiple dirs is to be created. Defaults to False.
    """
    for path in path_to_directories:
        os.makedirs(path, exist_ok=True)
        if verbose:
            logger.info(f"created directory at: {path}")


@ensure_annotations
def save_json(path: Path, data: dict):
    """save json data

    Args:
        path (Path): path to json file
        data (dict): data to be saved in json file
    """
    with open(path, "w") as f:
        json.dump(data, f, indent=4)

    logger.info(f"json file saved at: {path}")




@ensure_annotations
def load_json(path: Path) -> ConfigBox:
    """load json files data

    Args:
        path (Path): path to json file

    Returns:
        ConfigBox: data as class attributes instead of dict
    """
    with open(path) as f:
        content = json.load(f)

    logger.info(f"json file loaded succesfully from: {path}")
    return ConfigBox(content)


@ensure_annotations
def save_bin(data: Any, path: Path):
    """save binary file

    Args:
        data (Any): data to be saved as binary
        path (Path): path to binary file
    """
    joblib.dump(value=data, filename=path)
    logger.info(f"binary file saved at: {path}")


@ensure_annotations
def load_bin(path: Path) -> Any:
    """load binary data

    Args:
        path (Path): path to binary file

    Returns:
        Any: object stored in the file
    """
    data = joblib.load(path)
    logger.info(f"binary file loaded from: {path}")
    return data



@ensure_annotations
def get_size(path: Path) -> str:
    """get size in KB

    Args:
        path (Path): path of the file

    Returns:
        str: size in KB
    """
    size_in_kb = round(os.path.getsize(path)/1024)
    return f"~ {size_in_kb} KB"

In [3]:
def upload_many_blobs_with_transfer_manager(
    bucket_name, filenames, source_directory="", workers=8, credentials_path=None
):
    """Upload multiple files to Google Cloud Storage with proper error handling."""
    if not CLOUD_AVAILABLE:
        logger.warning("Google Cloud Storage not available - skipping upload")
        return
        
    try:
        # Set GCP credentials if provided
        if credentials_path:
            os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = credentials_path
        
        if isinstance(filenames, (Path, str)):
            filenames = [str(filenames)]
        else:
            filenames = [str(f) for f in filenames]
        
        for filename in filenames:
            file_path = os.path.join(source_directory, filename) if source_directory else filename
            if not os.path.exists(file_path):
                logger.warning(f"File does not exist for upload: {file_path}")
                continue
        
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)

        results = transfer_manager.upload_many_from_filenames(
            bucket, filenames, source_directory=source_directory, max_workers=workers, blob_name_prefix="churn_data_store/"
        )

        for name, result in zip(filenames, results):
            if isinstance(result, Exception):
                logger.error(f"Failed to upload {name}: {result}")
            else:
                logger.info(f"Uploaded {name} to bucket {bucket.name}.")
                
    except Exception as e:
        logger.error(f"Cloud storage upload failed: {e}")
        raise e

In [4]:
@dataclass(frozen=True)
class DataIngestionConfig:
    root_dir: Path
    data_version_dir: Path
    local_data_file: Path
    test_size: float
    random_state: int

@dataclass(frozen=True)
class PrepareBaseModelConfig:
    model_version_dir: Path
    data_version_dir: Path
    random_state: int
    n_estimators: int
    criterion: str
    max_depth: int
    max_features: str
    min_samples_leaf: int

@dataclass(frozen=True)
class TrainingConfig:
    model_version_dir: Path
    data_version_dir: Path

@dataclass(frozen=True)
class EvaluationConfig:
    model_version_dir: Path
    data_version_dir: Path
    plots_dir: Path

@dataclass(frozen=True)
class CloudStoragePushConfig:
    root_dir: Path
    bucket_name: str
    credentials_path: str
    data_version_dir: Path
    evaluation_dir: Path
    workers: int


In [5]:
class ConfigurationManager:
    def __init__(
        self,
        config_filepath=Path(r"C:\Users\Admin\Desktop\Data projects\python\Decision-making-system\churn_mlops\config\config.yaml")
    ):
        self.config = read_yaml(config_filepath)
        create_directories([self.config.artifacts_root])


    def get_data_ingestion_config(self) -> DataIngestionConfig:
        config = self.config.data_ingestion
        
        create_directories([config.root_dir, config.data_version_dir])

        data_ingestion_config = DataIngestionConfig(
            root_dir=Path(config.root_dir),
            local_data_file=Path(config.local_data_file),
            test_size=config.test_size,
            random_state=config.random_state,
            data_version_dir=Path(config.data_version_dir)
        )

        logger.info(f"Data Ingestion config: {config}")
        return data_ingestion_config
        
    def get_prepare_base_model_config(self) -> PrepareBaseModelConfig:
        config = self.config.prepare_base_model
        
        create_directories([config.model_version_dir, config.data_version_dir])

        prepare_base_model_config = PrepareBaseModelConfig(
            model_version_dir=Path(config.model_version_dir),
            data_version_dir=Path(config.data_version_dir),
            n_estimators=config.n_estimators,
            random_state=config.random_state,
            criterion=config.criterion,
            max_depth=config.max_depth,
            max_features=config.max_features,
            min_samples_leaf=config.min_samples_leaf
        )

        logger.info(f"Prepare base model config: {config}")
        return prepare_base_model_config


    def get_training_config(self) -> TrainingConfig:
        config = self.config.training
        
        create_directories([config.model_version_dir, config.data_version_dir])

        training_config = TrainingConfig(
            model_version_dir=Path(config.model_version_dir),
            data_version_dir=Path(config.data_version_dir),
        )

        logger.info(f"Training config: {config}")
        return training_config

    def get_evaluation_config(self) -> EvaluationConfig:
        config = self.config.evaluation
        
        create_directories([config.plots_dir, config.model_version_dir, config.data_version_dir])

        evaluation_config = EvaluationConfig(
            model_version_dir=Path(config.model_version_dir),
            data_version_dir=Path(config.data_version_dir),
            plots_dir=Path(config.plots_dir)
        )

        logger.info(f"Evaluation config: {config}")
        return evaluation_config

    def get_cloud_storage_push_config(self) -> CloudStoragePushConfig:
        config = self.config.cloud_storage_push
        
        cloud_storage_push_config = CloudStoragePushConfig(
            root_dir=Path(config.root_dir),
            bucket_name=config.bucket_name,
            credentials_path=config.credentials_path,
            data_version_dir=Path(config.data_version_dir),
            evaluation_dir=Path(config.evaluation_dir),
            workers=config.workers
        )

        logger.info(f"Cloud Storage Push config: {config}")
        return cloud_storage_push_config


In [6]:
def get_dummies(df):
    categorical_cols = df.select_dtypes(include=['object']).columns
    if len(categorical_cols) > 0:
        for col in categorical_cols:
            if df[col].isin(['yes', 'no', 'True', 'False']).any():
                df[col] = df[col].map({'yes': 1, 'True': 1, 'no': 0, 'False': 0})
            else:
                df = pd.get_dummies(df, columns=[col])
    return df

def most_common(lst):
    counts = Counter(lst)
    if not counts:
        return None 
    return counts.most_common(1)[0][0]

In [7]:
class DataIngestion:
    def __init__(self, config: DataIngestionConfig):
        self.config = config
        self.rows_processed = 0
        self.datetime_suffix = datetime.now().strftime('%Y%m%dT%H%M%S')

    def process_data_for_churn(self, df_input):
        df_input.columns = df_input.columns.map(lambda x: str(x).strip())
        cols_to_drop = {"Returns", "Age", "Total Purchase Amount"}
        df_input.drop(columns=[col for col in cols_to_drop if col in df_input.columns], inplace=True)
        df_input.dropna(inplace=True)
        if 'Price' not in df_input.columns:
            df_input['Price'] = df_input['Product Price']
        if 'Product Price' not in df_input.columns:
            raise KeyError("Required column 'Product Price' is missing from the dataset.")
        
        df_input['TotalSpent'] = df_input['Quantity'] * df_input['Price']
        df_features = df_input.groupby("customer_id", as_index=False, sort=False).agg(
            LastPurchaseDate = ("Purchase Date","max"),
            Favoured_Product_Categories = ("Product Category", lambda x: most_common(list(x))),
            Frequency = ("Purchase Date", "count"),
            TotalSpent = ("TotalSpent", "sum"),
            Favoured_Payment_Methods = ("Payment Method", lambda x: most_common(list(x))),
            Customer_Name = ("Customer Name", "first"),
            Customer_Label = ("Customer_Labels", "first"),
            Churn = ("Churn", "first"),
        )

        df_features = df_features.drop_duplicates(subset=['Customer_Name'], keep='first')
        df_features['LastPurchaseDate'] = pd.to_datetime(df_features['LastPurchaseDate'])
        df_features['LastPurchaseDate'] = df_features['LastPurchaseDate'].dt.date
        df_features['LastPurchaseDate'] = pd.to_datetime(df_features['LastPurchaseDate'])
        max_LastBuyingDate = df_features["LastPurchaseDate"].max()
        df_features['Recency'] = (max_LastBuyingDate - df_features['LastPurchaseDate']).dt.days
        df_features['LastPurchaseDate'] = df_features['LastPurchaseDate'].dt.date
        df_features['Avg_Spend_Per_Purchase'] = df_features['TotalSpent'] / df_features['Frequency'].replace(0, 1)
        df_features['Purchase_Consistency'] = df_features['Recency'] / df_features['Frequency'].replace(0, 1)
        df_features.drop(columns=["customer_id","LastPurchaseDate",'Customer_Name'], inplace=True)
        
        return df_features

    def encode_churn(self, df_features: pd.DataFrame):
        df_copy = df_features.copy()
        df_features_encode = get_dummies(df_copy)
        return df_features_encode

    def load_data(self):
        """Load the dataset from CSV file and save versioned copy."""
        try:
            logger.info(f"Loading data from {self.config.local_data_file}")
            df = pd.read_csv(self.config.local_data_file)
            
            input_data_versioned_name = f"input_raw_data_version_{self.datetime_suffix}.csv"
            input_data_versioned_path = Path(self.config.data_version_dir) / input_data_versioned_name
            if not input_data_versioned_path.exists():
                df.to_csv(input_data_versioned_path, index=False)
                logger.info(f"Created versioned input data file: {input_data_versioned_path}")
            else:
                logger.info(f"Versioned file already exists: {input_data_versioned_path}, skipping save.")

            logger.info(f"Loaded dataset with {len(df)} rows")
            logger.info(f"Columns found: {list(df.columns)}")
            return df

        except Exception as e:
            logger.error(f"Error while loading data: {e}")
            raise

    def preprocess_data(self, df_clean):
        self.rows_processed = 0
        logger.info(f"Starting preprocessing of {len(df_clean)} rows...")
        
        df_clean = self.process_data_for_churn(df_clean)
        logger.info(f"After feature engineering columns: {list(df_clean.columns)}")
        
        df_clean = self.encode_churn(df_clean)
        logger.info(f"After encoding columns: {list(df_clean.columns)}")
        
        df_clean = df_clean.dropna()
        logger.info(f"Data shape after removing NaN: {df_clean.shape}")
        
        if "Churn" not in df_clean.columns:
            logger.error(f"Churn column not found! Available columns: {list(df_clean.columns)}")
            raise KeyError("Churn column is missing after preprocessing")
        
        X = df_clean.drop("Churn", axis=1)
        y = df_clean["Churn"]
        
        logger.info(f"X shape (features): {X.shape}")
        logger.info(f"y shape (target): {y.shape}")
        
        if y.isna().sum() > 0:
            logger.warning(f"Found {y.isna().sum()} NaN values in target variable, filling with 0")
            y = y.fillna(0)
        
        nan_cols = X.columns[X.isna().any()].tolist()
        if nan_cols:
            logger.warning(f"Found NaN values in feature columns: {nan_cols}")
            X = X.fillna(0)
        
        class_distribution = y.value_counts(normalize=True)
        logger.info(f"Target variable distribution (normalized): \n{class_distribution}")

        imbalance_threshold = 0.4
        if class_distribution.min() < imbalance_threshold:
            logger.info("Target variable is imbalanced. Applying SMOTEENN...")
            smote = SMOTEENN(random_state=42)
            X_res, y_res = smote.fit_resample(X, y)
            logger.info(f"Resampled feature matrix shape: {X_res.shape}")
        else:
            logger.info("Target variable is balanced. Skipping SMOTEENN.")
            X_res, y_res = X, y
        return X_res, y_res

    def split_data(self, X_res, y_res):
        """Split data into training and testing sets."""
        logger.info("Splitting data into train and test sets")
        X_train, X_test, y_train, y_test = train_test_split(
            X_res, y_res, test_size=self.config.test_size, random_state=self.config.random_state
        )
        logger.info(f"Train data: {X_train.shape}, Test data: {X_test.shape}")
        return X_train, X_test, y_train, y_test

    def save_data(self, X_train, X_test, y_train, y_test):
        """Save training and testing data to CSV files."""
        logger.info("Saving processed feature data to versioned directory")
        
        os.makedirs(self.config.data_version_dir, exist_ok=True)
        
        train_feature_path = os.path.join(self.config.data_version_dir, f"train_feature_version_{self.datetime_suffix}.csv")
        test_feature_path = os.path.join(self.config.data_version_dir, f"test_feature_version_{self.datetime_suffix}.csv")
        train_target_path = os.path.join(self.config.data_version_dir, f"train_target_version_{self.datetime_suffix}.csv")
        test_target_path = os.path.join(self.config.data_version_dir, f"test_target_version_{self.datetime_suffix}.csv")
        
        X_train.to_csv(train_feature_path, index=False)
        X_test.to_csv(test_feature_path, index=False)
        y_train.to_csv(train_target_path, index=False, header=['Churn'])
        y_test.to_csv(test_target_path, index=False, header=['Churn'])
        
        try:
            versioned_files = [
                f"train_feature_version_{self.datetime_suffix}.csv",
                f"test_feature_version_{self.datetime_suffix}.csv", 
                f"train_target_version_{self.datetime_suffix}.csv",
                f"test_target_version_{self.datetime_suffix}.csv"
            ]
            
            upload_many_blobs_with_transfer_manager(
                bucket_name="churn_data_version",  # You can modify this
                filenames=versioned_files,
                source_directory=str(self.config.data_version_dir),
                workers=8
            )
            logger.info("Successfully uploaded versioned training data to Google Cloud Storage")

        except Exception as cloud_error:
            logger.warning(f"Failed to upload training data to cloud storage: {cloud_error}")
        
        logger.info(f"Processed features saved to:")
        logger.info(f"  X_train: {train_feature_path}")
        logger.info(f"  X_test: {test_feature_path}")
        logger.info(f"  y_train: {train_target_path}")
        logger.info(f"  y_test: {test_target_path}")
        
        return train_feature_path, test_feature_path, train_target_path, test_target_path

    def data_ingestion_pipeline(self):
        """Main method to perform data ingestion."""
        logger.info("Initiating data ingestion")
        df = self.load_data()
        X, y = self.preprocess_data(df)
        X_train, X_test, y_train, y_test = self.split_data(X, y)
        train_target, test_target, y_train_path, y_test_path = self.save_data(X_train, X_test, y_train, y_test)
        
        logger.info("Data ingestion completed successfully")
        return X_train, X_test, y_train, y_test, train_target, test_target, y_train_path, y_test_path

In [8]:
class PrepareBaseModel:
    def __init__(self, config: PrepareBaseModelConfig):
        self.config = config
        # Generate datetime suffix for this run
        self.datetime_suffix = datetime.now().strftime('%Y%m%dT%H%M%S')
        
    def get_base_model(self):
        """Create and return a base Random Forest model for churn prediction."""
        logger.info("Creating base Random Forest model")
        
        model = RandomForestClassifier(
            n_estimators=self.config.n_estimators, 
            random_state=self.config.random_state,
            criterion=self.config.criterion,
            max_depth=self.config.max_depth,
            max_features=self.config.max_features,
            min_samples_leaf=self.config.min_samples_leaf
        )
        logger.info(f"Model params:{self.config.n_estimators}, {self.config.random_state}, {self.config.criterion}, {self.config.max_depth}, {self.config.max_features}, {self.config.min_samples_leaf}")
        return model
    
    def scaler(self, train_path, test_path, y_train_path, y_test_path):
        """Prepare and save scaler based on training data."""
        logger.info("Creating scaler from training data")
        
        os.makedirs(self.config.data_version_dir, exist_ok=True)
        os.makedirs(self.config.model_version_dir, exist_ok=True)
        
        scaled_test_data_path = os.path.join(self.config.data_version_dir, f"test_feature_scaled_version_{self.datetime_suffix}.csv")
        scaled_train_data_path = os.path.join(self.config.data_version_dir, f"train_feature_scaled_version_{self.datetime_suffix}.csv")
        scaler_path = os.path.join(self.config.model_version_dir, f"scaler_churn_version_{self.datetime_suffix}.pkl")
        
        try:
            X_train = pd.read_csv(train_path)
            X_test = pd.read_csv(test_path)
            y_train = pd.read_csv(y_train_path)
            y_test = pd.read_csv(y_test_path)
            
            scaler = RobustScaler()
            X_train_scaled = scaler.fit_transform(X_train)
            X_test_scaled = scaler.transform(X_test)
            
            jb.dump(scaler, scaler_path)
            
            X_train_scaled_df = pd.DataFrame(X_train_scaled, columns=X_train.columns)
            X_test_scaled_df = pd.DataFrame(X_test_scaled, columns=X_test.columns)
    
            X_train_scaled_df.to_csv(scaled_train_data_path, index=False)
            X_test_scaled_df.to_csv(scaled_test_data_path, index=False)
            
            logger.info(f"Scalers and scaled data saved:")
            logger.info(f"  Scaler: {scaler_path}")
            logger.info(f"  X_train_scaled: {scaled_train_data_path}")
            logger.info(f"  X_test_scaled: {scaled_test_data_path}")
            
            return scaler, X_train, X_test, y_train, y_test, scaled_train_data_path, scaled_test_data_path, scaler_path
 
        except Exception as e:
            logger.error(f"Error in preparing scaler: {e}")
            raise e
    
    def full_model(self, train_path, test_path, y_train_path, y_test_path):
        """Create the base model and scaler."""
        logger.info("Creating base model and scaler")
        
        model = self.get_base_model()
        scaler, X_train, X_test, y_train, y_test, scaled_train_path, scaled_test_path, scaler_path = self.scaler(
            train_path, test_path, y_train_path, y_test_path
        )
        
        base_model_path = os.path.join(self.config.model_version_dir, f"base_model_churn_{self.datetime_suffix}.pkl")
        
        # Save base model
        jb.dump(model, base_model_path)
        
        logger.info(f"Base model saved: {base_model_path}")

        return model, scaler, X_train, X_test, y_train, y_test, base_model_path, scaled_train_path, scaled_test_path, scaler_path

In [9]:
class TrainAndEvaluateModel:
    def __init__(self, config_train: TrainingConfig, config_eval: EvaluationConfig = None):
        self.train_config = config_train
        self.eval_config = config_eval
        self.datetime_suffix = datetime.now().strftime('%Y%m%dT%H%M%S')
        self.model_name = f"model_churn_{self.datetime_suffix}"
        self.fine_tuned_model_name = f"finetuned_churn_{self.datetime_suffix}"
    
    def train(self, X_train_scaled, y_train, base_model_path):
        """Train the model."""
        logger.info(f"Loading model from: {base_model_path}")
        model = jb.load(base_model_path)
        logger.info(f"Starting model training for {self.model_name}")
        model = model.fit(X_train_scaled, y_train)

        logger.info(f"Model training for {self.model_name} completed")
        model_version_dir = str(self.train_config.model_version_dir)
        os.makedirs(model_version_dir, exist_ok=True)
        
        trained_model_path_versioned = os.path.join(model_version_dir, f"model_churn_version_{self.datetime_suffix}.pkl")
        
        jb.dump(model, trained_model_path_versioned)
        logger.info(f"  trained model file (for future use): {trained_model_path_versioned}")
        return model, trained_model_path_versioned
    
    def fine_tune(self, model, X_train_scaled, y_train):
        """Fine-tune the exact trained model with hyperparameter search."""
        logger.info("Starting fine-tuning of the trained model")
        
        rf_params = {
            'n_estimators': [100, 200, 300, 400, 500, 700, 1000],  
            'criterion': ['gini', 'entropy', 'log_loss'],          # log_loss for classification since sklearn 1.1+
            'max_depth': [None, 10, 20, 30, 50, 70],                # Include deeper trees
            'min_samples_split': [2, 5, 10, 15],                    # More control over overfitting
            'min_samples_leaf': [1, 2, 4, 6],                       # Helps with generalization
            'max_features': ['sqrt', 'log2', None],                # 'auto' is deprecated; None = all features
            'bootstrap': [True, False],                             # Evaluate both bootstrapped and full datasets
            'class_weight': [None, 'balanced', 'balanced_subsample']  # Handles imbalanced datasets
        }

        logger.info(f"Fine-tuning model with current parameters: n_estimators={model.n_estimators}, criterion={model.criterion}")
        random_search = RandomizedSearchCV(model, rf_params, cv=5, n_jobs=1, n_iter=20, random_state=42)
        random_search.fit(X_train_scaled, y_train)
        
        best_model = RandomForestClassifier(**random_search.best_params_, random_state=42)
        logger.info(f"Best parameters found: {random_search.best_params_}")
        logger.info(f"Best cross-validation score: {random_search.best_score_:.4f}")
        
        # Fit the best model with the training data
        best_model = best_model.fit(X_train_scaled, y_train)
        logger.info("Best model fitted with training data")

        model_version_dir = str(self.train_config.model_version_dir)
        os.makedirs(model_version_dir, exist_ok=True)
        
        fine_tuned_model_path_versioned = os.path.join(model_version_dir, f"finetuned_churn_{self.datetime_suffix}.pkl")
        
        jb.dump(best_model, fine_tuned_model_path_versioned)
        
        logger.info(f"Fine-tuned models saved:")
        logger.info(f"  Versioned file (for future use): {fine_tuned_model_path_versioned}")
        
        return best_model, fine_tuned_model_path_versioned
    
    def perform_detailed_evaluation(self, model, X_test_scaled, y_test):
        """Evaluate the model in detail and save metrics."""
        logger.info("Performing detailed evaluation on test data")

        y_pred = model.predict(X_test_scaled)
        y_pred_prob = model.predict_proba(X_test_scaled)[:, 1]
        
        # Calculate metrics
        precision = precision_score(y_test, y_pred)
        recall = recall_score(y_test, y_pred)
        f1 = f1_score(y_test, y_pred)
        roc_auc = roc_auc_score(y_test, y_pred_prob)
        mcc = matthews_corrcoef(y_test, y_pred)
        avg_precision = average_precision_score(y_test, y_pred_prob)
        
        metrics = {
            "precision": float(precision),
            "recall": float(recall),
            "f1_score": float(f1),
            "roc_auc": float(roc_auc),
            "mcc": float(mcc),
            "avg_precision": float(avg_precision)
        }
        
        # Generate classification report
        report = classification_report(y_test, y_pred, output_dict=True)
        metrics["classification_report"] = report
        
        metrics_file_versioned = Path(self.eval_config.plots_dir) / f"metrics_{self.datetime_suffix}.json"
        
        save_json(metrics_file_versioned, metrics)
        
        logger.info(f"Metrics saved:")
        logger.info(f"  Versioned file (for future use): {metrics_file_versioned}")
        
        return metrics, y_pred, y_pred_prob
    
    def plot_confusion_matrix(self, y_test, y_pred):
        """Plot and save confusion matrix."""
        logger.info("Creating confusion matrix plot")
        
        cm = confusion_matrix(y_test, y_pred)
        plt.figure(figsize=(8, 6))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', cbar=False)
        plt.title('Confusion Matrix')
        plt.ylabel('True Label')
        plt.xlabel('Predicted Label')
        
        # Save the plot with datetime naming
        cm_path = os.path.join(self.eval_config.plots_dir, f"confusion_matrix_{self.datetime_suffix}.png")
        plt.savefig(cm_path)
        plt.close()
        logger.info(f"Confusion matrix saved to: {cm_path}")
        
        return cm_path
    def plot_precision_recall_curve(self, y_test, y_pred_prob):
        """Plot and save Precision-Recall curve."""
        logger.info("Creating Precision-Recall curve plot")

        precision_vals, recall_vals, _ = precision_recall_curve(y_test, y_pred_prob)
        avg_precision = average_precision_score(y_test, y_pred_prob)

        plt.figure(figsize=(8, 6))
        plt.plot(recall_vals, precision_vals, color='purple', lw=2,
                label=f'Precision-Recall curve (AP = {avg_precision:.3f})')
        plt.xlabel('Recall')
        plt.ylabel('Precision')
        plt.title('Precision-Recall Curve')
        plt.legend(loc='lower left')

        # Save plot with datetime suffix naming
        pr_path = os.path.join(self.eval_config.plots_dir, f"precision_recall_curve_{self.datetime_suffix}.png")
        plt.savefig(pr_path)
        plt.close()
        logger.info(f"Precision-Recall curve saved to: {pr_path}")

        return pr_path
    def plot_roc_curve(self, y_test, y_pred_prob):
        """Plot and save ROC curve."""
        logger.info("Creating ROC curve plot")
        
        fpr, tpr, thresholds = roc_curve(y_test, y_pred_prob)
        roc_auc = auc(fpr, tpr)
        
        plt.figure(figsize=(8, 6))
        plt.plot(fpr, tpr, color='blue', lw=2, label=f'ROC curve (AUC = {roc_auc:.3f})')
        plt.plot([0, 1], [0, 1], color='gray', linestyle='--')
        plt.xlim([0.0, 1.0])
        plt.ylim([0.0, 1.05])
        plt.xlabel('False Positive Rate')
        plt.ylabel('True Positive Rate')
        plt.title('Receiver Operating Characteristic (ROC) Curve')
        plt.legend(loc="lower right")
        
        # Save the plot with datetime naming
        roc_path = os.path.join(self.eval_config.plots_dir, f"roc_curve_{self.datetime_suffix}.png")
        plt.savefig(roc_path)
        plt.close()
        logger.info(f"ROC curve saved to: {roc_path}")
        
        return roc_path
    
    def train_and_evaluate(self, base_model_path, scaled_train_path, scaled_test_path, y_train_path, y_test_path):
        """Main method to train and evaluate the model."""
        logger.info("Initiating model training and evaluation")
        
        X_train_scaled = pd.read_csv(scaled_train_path)
        X_test_scaled = pd.read_csv(scaled_test_path)

        try:
            y_train = pd.read_csv(y_train_path)['Churn']
            y_test = pd.read_csv(y_test_path)['Churn']
            
            logger.info(f"Loaded y_train shape: {y_train.shape}")
            logger.info(f"Loaded y_test shape: {y_test.shape}")
            
            # Handle NaN values in target variables
            logger.info(f"NaN count in y_train: {y_train.isna().sum()}")
            logger.info(f"NaN count in y_test: {y_test.isna().sum()}")
            
            if y_train.isna().sum() > 0:
                logger.warning(f"Found {y_train.isna().sum()} NaN values in y_train, filling with 0")
                y_train = y_train.fillna(0)
                
            if y_test.isna().sum() > 0:
                logger.warning(f"Found {y_test.isna().sum()} NaN values in y_test, filling with 0")
                y_test = y_test.fillna(0)
                
        except FileNotFoundError as e:
            logger.error(f"Target files not found: {e}")
            raise e

        logger.info(f"Final X_train_scaled shape: {X_train_scaled.shape}")
        logger.info(f"Final y_train shape: {y_train.shape}")
        logger.info(f"Final X_test_scaled shape: {X_test_scaled.shape}")
        logger.info(f"Final y_test shape: {y_test.shape}")

        if X_train_scaled.isna().sum().sum() > 0:
            logger.warning(f"Found NaN values in X_train_scaled, filling with 0")
            X_train_scaled = X_train_scaled.fillna(0)
            
        if X_test_scaled.isna().sum().sum() > 0:
            logger.warning(f"Found NaN values in X_test_scaled, filling with 0")
            X_test_scaled = X_test_scaled.fillna(0)

        if y_train.isna().sum() > 0:
            logger.error(f"Still have NaN values in y_train: {y_train.isna().sum()}")
            y_train = y_train.fillna(0)
            
        if y_test.isna().sum() > 0:
            logger.error(f"Still have NaN values in y_test: {y_test.isna().sum()}")
            y_test = y_test.fillna(0)

        model, trained_model_path_static = self.train(X_train_scaled, y_train, base_model_path)
        accuracy = model.score(X_test_scaled, y_test)
        logger.info(f"Model accuracy on test data: {accuracy}")
        
        if accuracy < 0.85:
            logger.info("Model accuracy is less than 85%, fine-tuning needed")
            fine_tuned_model, fine_tuned_model_path_static = self.fine_tune(model, X_train_scaled, y_train)
            os.makedirs(self.eval_config.plots_dir, exist_ok=True)

            detailed_metrics, y_pred, y_pred_prob = self.perform_detailed_evaluation(fine_tuned_model, X_test_scaled, y_test)

            cm_path = self.plot_confusion_matrix(y_test, y_pred)
            per_path = self.plot_precision_recall_curve(y_test, y_pred_prob)
            roc_path = self.plot_roc_curve(y_test, y_pred_prob)
            accuracy = fine_tuned_model.score(X_test_scaled, y_test)
            logger.info(f"Model accuracy on test data after fine-tuned: {accuracy}")

            return fine_tuned_model, detailed_metrics, fine_tuned_model_path_static
        
        else:
            logger.info("Model accuracy is 85% or above, not needed finetuned")
            os.makedirs(self.eval_config.plots_dir, exist_ok=True)

            detailed_metrics, y_pred, y_pred_prob = self.perform_detailed_evaluation(model, X_test_scaled, y_test)
            
            cm_path = self.plot_confusion_matrix(y_test, y_pred)
            per_path = self.plot_precision_recall_curve(y_test, y_pred_prob)
            roc_path = self.plot_roc_curve(y_test, y_pred_prob)

            return model, detailed_metrics, trained_model_path_static

In [10]:
class CloudStoragePush:
    def __init__(self, config: CloudStoragePushConfig):
        self.config = config

    def validate_bucket_exists(self):
        """Validate that the GCP bucket exists before attempting upload."""
        try:
            # Set GCP credentials from configuration
            os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self.config.credentials_path
            
            from google.cloud import storage
            client = storage.Client()
            bucket = client.bucket(self.config.bucket_name)
            bucket.reload()
            logger.info(f"Bucket '{self.config.bucket_name}' exists and is accessible")
            return True
        except Exception as e:
            logger.error(f"Bucket validation failed: {e}")
            logger.error(f"Bucket '{self.config.bucket_name}' does not exist or is not accessible")
            return False

    def get_files_to_upload(self):
        """Get all files that need to be uploaded to cloud storage."""
        files_to_upload = []
        
        # Only upload from data_version_dir
        if os.path.exists(self.config.data_version_dir):
            data_files = glob.glob(os.path.join(self.config.data_version_dir, "**"), recursive=True)
            for file_path in data_files:
                if os.path.isfile(file_path):
                    files_to_upload.append(file_path)
                    
        # Only upload from evaluation_dir
        if os.path.exists(self.config.evaluation_dir):
            eval_files = glob.glob(os.path.join(self.config.evaluation_dir, "**"), recursive=True)
            for file_path in eval_files:
                if os.path.isfile(file_path):
                    files_to_upload.append(file_path)
        
        return files_to_upload

    def push_to_cloud_storage(self):
        """Push all artifacts to GCP cloud storage."""
        try:
            os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self.config.credentials_path
            
            logger.info("Starting cloud storage push process...")
            
            if not self.validate_bucket_exists():
                error_msg = (
                    f"CLOUD STORAGE SETUP REQUIRED:\n"
                    f"1. Create GCP bucket '{self.config.bucket_name}' in your Google Cloud project\n"
                    f"2. Set up GCP credentials using one of these methods:\n"
                    f"   - Run: gcloud auth application-default login\n"
                    f"   - Set GOOGLE_APPLICATION_CREDENTIALS environment variable\n"
                    f"   - Use service account key file\n"
                    f"3. Ensure you have storage.objects.create permission on the bucket"
                )
                raise Exception(error_msg)
            
            files_to_upload = self.get_files_to_upload()
            
            if not files_to_upload:
                logger.warning("No files found to upload to cloud storage.")
                return
                
            logger.info(f"Found {len(files_to_upload)} files to upload to cloud storage.")
            
            relative_files = []
            for file_path in files_to_upload:
                rel_path = os.path.relpath(file_path, self.config.root_dir)
                relative_files.append(rel_path)
            
            upload_many_blobs_with_transfer_manager(
                bucket_name=self.config.bucket_name,
                filenames=relative_files,
                source_directory=self.config.root_dir,
                workers=self.config.workers,
                credentials_path=self.config.credentials_path
            )
            
            logger.info(f"Successfully uploaded {len(files_to_upload)} files to GCP bucket: {self.config.bucket_name}")
            
        except Exception as e:
            logger.error(f"Failed to push to cloud storage: {e}")
            raise e 

In [11]:
def cleanup_temp_files():
    """Clean up temporary versioned files after pipeline completion."""
    logger.info("=" * 50)
    logger.info("CLEANUP: Removing temporary versioned files")
    logger.info("=" * 50)
    config_manager = ConfigurationManager()
    config_manager.config.evaluation.plots_dir
    data_version_dir = config_manager.config.data_ingestion.data_version_dir
    evaluation_dir = config_manager.config.evaluation.plots_dir
    try:
        # Pattern: *_version_YYYYMMDDTHHMMSS.csv
        data_version_files = glob.glob(os.path.join(data_version_dir, "*_version_????????T??????.csv"))
        logger.info(f"Found {len(data_version_files)} timestamp-versioned data files to clean")
        
        for file_path in data_version_files:
            try:
                os.remove(file_path)
                logger.info(f"Deleted temporary file: {os.path.basename(file_path)}")
            except Exception as e:
                logger.warning(f"Failed to delete file {file_path}: {e}")
        
        # Pattern: *_YYYYMMDDTHHMMSS.json and *_YYYYMMDDTHHMMSS.png
        eval_json_files = glob.glob(os.path.join(evaluation_dir, "*_????????T??????.json"))
        eval_png_files = glob.glob(os.path.join(evaluation_dir, "*_????????T??????.png"))
        eval_files = eval_json_files + eval_png_files
        logger.info(f"Found {len(eval_files)} timestamp-versioned evaluation files to clean")
        
        for file_path in eval_files:
            try:
                os.remove(file_path)
                logger.info(f"Deleted temporary file: {os.path.basename(file_path)}")
            except Exception as e:
                logger.warning(f"Failed to delete file {file_path}: {e}")
        
        remaining_data_files = len(glob.glob(os.path.join(data_version_dir, "*.csv")))
        remaining_eval_files = len(glob.glob(os.path.join(evaluation_dir, "*")))
        logger.info(f"Kept {remaining_data_files} essential data files for DVC tracking")
        logger.info(f"Kept {remaining_eval_files} essential evaluation files for DVC tracking")
                
    except Exception as e:
        logger.warning(f"Error during cleanup: {e}")

In [None]:


@component(
    base_image="python:3.11",
    packages_to_install=["pandas", "scikit-learn", "numpy", "joblib"]
)
def data_preparation_component(
    train_data: Output[Artifact],
    test_data: Output[Artifact], 
    y_train_data: Output[Artifact],
    y_test_data: Output[Artifact]
) -> NamedTuple('Outputs', [('train_path', str), ('test_path', str), ('y_train_path', str), ('y_test_path', str)]):
    """Data preparation component for churn prediction using existing DataPreparationStage"""
    import os
    import sys
    import logging
    logging.basicConfig(level=logging.INFO, format='[%(asctime)s]: %(message)s:')
    logger = logging.getLogger(__name__)
    logs_dir = "logs"
    log_filepath = os.path.join(logs_dir, "running_logs.log")
    os.makedirs(logs_dir, exist_ok=True)

    logging.basicConfig(
        filename=log_filepath,
        format="[%(asctime)s]: %(levelname)s: %(message)s",
        level=logging.INFO,
        handlers=[
            logging.FileHandler(log_filepath),
            logging.StreamHandler(sys.stdout)
        ]
    )
    logger = logging.getLogger("MLopsLogger")
    
    logger.info(">>> Stage Data Ingestion started <<<")
    STAGE_NAME = "Data Ingestion stage"

    logger.info(f">>> Stage {STAGE_NAME} started <<<")
    
    class ConfigurationManager:
        def __init__(
            self,
            config_filepath=Path(r"C:\Users\Admin\Desktop\Data projects\python\Decision-making-system\churn_mlops\config\config.yaml")
        ):
            self.config = read_yaml(config_filepath)
            create_directories([self.config.artifacts_root])


        def get_data_ingestion_config(self) -> DataIngestionConfig:
            config = self.config.data_ingestion
            
            create_directories([config.root_dir, config.data_version_dir])

            data_ingestion_config = DataIngestionConfig(
                root_dir=Path(config.root_dir),
                local_data_file=Path(config.local_data_file),
                test_size=config.test_size,
                random_state=config.random_state,
                data_version_dir=Path(config.data_version_dir)
            )

            logger.info(f"Data Ingestion config: {config}")
            return data_ingestion_config
            
        def get_prepare_base_model_config(self) -> PrepareBaseModelConfig:
            config = self.config.prepare_base_model
            
            create_directories([config.model_version_dir, config.data_version_dir])

            prepare_base_model_config = PrepareBaseModelConfig(
                model_version_dir=Path(config.model_version_dir),
                data_version_dir=Path(config.data_version_dir),
                n_estimators=config.n_estimators,
                random_state=config.random_state,
                criterion=config.criterion,
                max_depth=config.max_depth,
                max_features=config.max_features,
                min_samples_leaf=config.min_samples_leaf
            )

            logger.info(f"Prepare base model config: {config}")
            return prepare_base_model_config


        def get_training_config(self) -> TrainingConfig:
            config = self.config.training
            
            create_directories([config.model_version_dir, config.data_version_dir])

            training_config = TrainingConfig(
                model_version_dir=Path(config.model_version_dir),
                data_version_dir=Path(config.data_version_dir),
            )

            logger.info(f"Training config: {config}")
            return training_config

        def get_evaluation_config(self) -> EvaluationConfig:
            config = self.config.evaluation
            
            create_directories([config.plots_dir, config.model_version_dir, config.data_version_dir])

            evaluation_config = EvaluationConfig(
                model_version_dir=Path(config.model_version_dir),
                data_version_dir=Path(config.data_version_dir),
                plots_dir=Path(config.plots_dir)
            )

            logger.info(f"Evaluation config: {config}")
            return evaluation_config

        def get_cloud_storage_push_config(self) -> CloudStoragePushConfig:
            config = self.config.cloud_storage_push
            
            cloud_storage_push_config = CloudStoragePushConfig(
                root_dir=Path(config.root_dir),
                bucket_name=config.bucket_name,
                credentials_path=config.credentials_path,
                data_version_dir=Path(config.data_version_dir),
                evaluation_dir=Path(config.evaluation_dir),
                workers=config.workers
            )

            logger.info(f"Cloud Storage Push config: {config}")
            return cloud_storage_push_config

    config = ConfigurationManager()
    data_ingestion_config = config.get_data_ingestion_config()
    data_ingestion = DataIngestion(config=data_ingestion_config)
    
    train_path_local, test_path_local, y_train_path_local, y_test_path_local = data_ingestion.data_ingestion_pipeline()
    
    logger.info(f">>> Stage {STAGE_NAME} completed <<<")
    
    
    # Copy data to KFP artifacts
    shutil.copy2(train_path_local, train_data.path)
    shutil.copy2(test_path_local, test_data.path)
    shutil.copy2(y_train_path_local, y_train_data.path)
    shutil.copy2(y_test_path_local, y_test_data.path)
    
    logger.info(">>> Stage Data Ingestion completed <<<")
    
    outputs = namedtuple('Outputs', ['train_path', 'test_path', 'y_train_path', 'y_test_path'])
    return outputs(train_data.path, test_data.path, y_train_data.path, y_test_data.path)



@component(
    base_image="python:3.11",
    packages_to_install=["pandas", "scikit-learn", "numpy", "joblib"]
)
def model_preparation_component(
    train_data: Input[Artifact],
    test_data: Input[Artifact],
    y_train_data: Input[Artifact],
    y_test_data: Input[Artifact],
    base_model: Output[Model],
    scaler_artifact: Output[Artifact],
    scaled_train: Output[Artifact],
    scaled_test: Output[Artifact]
) -> NamedTuple('Outputs', [('base_model_path', str), ('scaled_train_path', str), ('scaled_test_path', str), ('scaler_path', str)]):
    """Model preparation component using existing ModelPreparationStage"""
    import os
    import sys
    import logging
    logging.basicConfig(level=logging.INFO, format='[%(asctime)s]: %(message)s:')
    logger = logging.getLogger(__name__)
    logs_dir = "logs"
    log_filepath = os.path.join(logs_dir, "running_logs.log")
    os.makedirs(logs_dir, exist_ok=True)

    logging.basicConfig(
        filename=log_filepath,
        format="[%(asctime)s]: %(levelname)s: %(message)s",
        level=logging.INFO,
        handlers=[
            logging.FileHandler(log_filepath),
            logging.StreamHandler(sys.stdout)
        ]
    )
    logger = logging.getLogger("MLopsLogger")

    logger.info(">>> Stage Prepare base model started <<<")
    STAGE_NAME = "Prepare base model"

    logger.info(f">>> Stage {STAGE_NAME} started <<<")
    class ConfigurationManager:
        def __init__(
            self,
            config_filepath=Path(r"C:\Users\Admin\Desktop\Data projects\python\Decision-making-system\churn_mlops\config\config.yaml")
        ):
            self.config = read_yaml(config_filepath)
            create_directories([self.config.artifacts_root])

            
        def get_prepare_base_model_config(self) -> PrepareBaseModelConfig:
            config = self.config.prepare_base_model
            
            create_directories([config.model_version_dir, config.data_version_dir])

            prepare_base_model_config = PrepareBaseModelConfig(
                model_version_dir=Path(config.model_version_dir),
                data_version_dir=Path(config.data_version_dir),
                n_estimators=config.n_estimators,
                random_state=config.random_state,
                criterion=config.criterion,
                max_depth=config.max_depth,
                max_features=config.max_features,
                min_samples_leaf=config.min_samples_leaf
            )

            logger.info(f"Prepare base model config: {config}")
            return prepare_base_model_config





    prepare_base_model_config = ConfigurationManager().get_prepare_base_model_config()
    prepare_base_model = PrepareBaseModel(config=prepare_base_model_config)

    base_model_path_local, scaled_train_path_local, scaled_test_path_local, scaler_path_local = prepare_base_model.full_model(
        train_path=train_data.path,
        test_path=test_data.path,
        y_train_path=y_train_data.path,
        y_test_path=y_test_data.path
    )

    logger.info(f">>> Stage {STAGE_NAME} completed <<<")
    # Call the existing pipeline function - matches main_pipeline.py exactly
    
    shutil.copy2(base_model_path_local, base_model.path)
    shutil.copy2(scaler_path_local, scaler_artifact.path)
    shutil.copy2(scaled_train_path_local, scaled_train.path)
    shutil.copy2(scaled_test_path_local, scaled_test.path)
    
    logger.info(">>> Stage Prepare base model completed <<<")
    
    outputs = namedtuple('Outputs', ['base_model_path', 'scaled_train_path', 'scaled_test_path', 'scaler_path'])
    return outputs(base_model.path, scaled_train.path, scaled_test.path, scaler_artifact.path)



@component(
    base_image="python:3.11",
    packages_to_install=["pandas", "scikit-learn", "numpy", "joblib", "matplotlib", "seaborn"]
)
def train_evaluation_component(
    base_model: Input[Model],
    scaled_train: Input[Artifact],
    scaled_test: Input[Artifact],
    y_train_data: Input[Artifact],
    y_test_data: Input[Artifact],
    final_model: Output[Model],
    metrics: Output[Metrics],
    evaluation_plots: Output[Artifact]
) -> NamedTuple('Outputs', [('final_model_path', str), ('accuracy', float), ('roc_auc', float)]):
    """Training and evaluation component using existing TrainEvaluationStage"""
    import os
    import sys
    import logging
    logging.basicConfig(level=logging.INFO, format='[%(asctime)s]: %(message)s:')
    logger = logging.getLogger(__name__)
    logs_dir = "logs"
    log_filepath = os.path.join(logs_dir, "running_logs.log")
    os.makedirs(logs_dir, exist_ok=True)

    logging.basicConfig(
        filename=log_filepath,
        format="[%(asctime)s]: %(levelname)s: %(message)s",
        level=logging.INFO,
        handlers=[
            logging.FileHandler(log_filepath),
            logging.StreamHandler(sys.stdout)
        ]
    )
    logger = logging.getLogger("MLopsLogger")
    
    logger.info(">>> Stage TRAIN_AND_EVALUATE_MODEL started <<<")
    STAGE_NAME = "TRAIN_AND_EVALUATE_MODEL"

    logger.info(f">>> Stage {STAGE_NAME} started <<<")
    class ConfigurationManager:
        def __init__(
            self,
            config_filepath=Path(r"C:\Users\Admin\Desktop\Data projects\python\Decision-making-system\churn_mlops\config\config.yaml")
        ):
            self.config = read_yaml(config_filepath)
            create_directories([self.config.artifacts_root])

        def get_training_config(self) -> TrainingConfig:
            config = self.config.training
            
            create_directories([config.model_version_dir, config.data_version_dir])

            training_config = TrainingConfig(
                model_version_dir=Path(config.model_version_dir),
                data_version_dir=Path(config.data_version_dir),
            )

            logger.info(f"Training config: {config}")
            return training_config

        def get_evaluation_config(self) -> EvaluationConfig:
            config = self.config.evaluation
            
            create_directories([config.plots_dir, config.model_version_dir, config.data_version_dir])

            evaluation_config = EvaluationConfig(
                model_version_dir=Path(config.model_version_dir),
                data_version_dir=Path(config.data_version_dir),
                plots_dir=Path(config.plots_dir)
            )

            logger.info(f"Evaluation config: {config}")
            return evaluation_config



    training_config = ConfigurationManager().get_training_config()
    evaluation_config = ConfigurationManager().get_evaluation_config()

    model_processor = TrainAndEvaluateModel(
        config_train=training_config,
        config_eval=evaluation_config
    )
    
    model, metrics_dict, final_model_path_local = model_processor.train_and_evaluate(
        base_model_path=base_model.path,
        scaled_train_path=scaled_train.path,
        scaled_test_path=scaled_test.path,
        y_train_path=y_train_data.path,
        y_test_path=y_test_data.path
    )
    
    logger.info(f">>> Stage {STAGE_NAME} completed <<<")

    
    # Copy final model to KFP artifact
    shutil.copy2(final_model_path_local, final_model.path)
    
    # Log metrics to KFP
    metrics.log_metric('accuracy', metrics_dict.get('accuracy', 0.0))
    metrics.log_metric('roc_auc', metrics_dict.get('roc_auc', 0.0))
    metrics.log_metric('precision', metrics_dict.get('precision', 0.0))
    metrics.log_metric('recall', metrics_dict.get('recall', 0.0))
    metrics.log_metric('f1_score', metrics_dict.get('f1_score', 0.0))
    
    # Create evaluation plots zip file from the plots directory
    config_manager = ConfigurationManager()
    eval_dir = config_manager.get_evaluation_config().plots_dir

    if os.path.exists(eval_dir):
        with zipfile.ZipFile(evaluation_plots.path, 'w') as zipf:
            for root, _, files in os.walk(eval_dir):
                for file in files:
                    if file.endswith('.png') or file.endswith('.json'):
                        file_path = os.path.join(root, file)
                        arcname = os.path.relpath(file_path, eval_dir)
                        zipf.write(file_path, arcname)
    else:
        logger.warning(f"Evaluation directory {eval_dir} not found. No plots will be zipped.")
    
    logger.info(">>> Stage TRAIN_AND_EVALUATE_MODEL completed <<<")

    outputs = namedtuple('Outputs', ['final_model_path', 'accuracy', 'roc_auc'])
    return outputs(
        final_model.path,
        metrics_dict.get('accuracy', 0.0),
        metrics_dict.get('roc_auc', 0.0)
    )


@component(
    base_image="python:3.11",
    packages_to_install=["boto3", "google-cloud-storage"]
)
def cloud_storage_push_component() -> str:
    """Cloud storage push component using existing CloudStoragePushPipeline - matches main_pipeline.py exactly"""
    
    import os
    import sys
    import logging
    logging.basicConfig(level=logging.INFO, format='[%(asctime)s]: %(message)s:')
    logger = logging.getLogger(__name__)
    logs_dir = "logs"
    log_filepath = os.path.join(logs_dir, "running_logs.log")
    os.makedirs(logs_dir, exist_ok=True)

    logging.basicConfig(
        filename=log_filepath,
        format="[%(asctime)s]: %(levelname)s: %(message)s",
        level=logging.INFO,
        handlers=[
            logging.FileHandler(log_filepath),
            logging.StreamHandler(sys.stdout)
        ]
    )
    logger = logging.getLogger("MLopsLogger")
    
    logger.info(">>> Stage Cloud Storage Push started <<<")
    logger.info("Starting cloud storage push component")
    class ConfigurationManager:
        def __init__(
            self,
            config_filepath=Path(r"C:\Users\Admin\Desktop\Data projects\python\Decision-making-system\churn_mlops\config\config.yaml")
        ):
            self.config = read_yaml(config_filepath)
            create_directories([self.config.artifacts_root])

        def get_cloud_storage_push_config(self) -> CloudStoragePushConfig:
            config = self.config.cloud_storage_push
            
            cloud_storage_push_config = CloudStoragePushConfig(
                root_dir=Path(config.root_dir),
                bucket_name=config.bucket_name,
                credentials_path=config.credentials_path,
                data_version_dir=Path(config.data_version_dir),
                evaluation_dir=Path(config.evaluation_dir),
                workers=config.workers
            )

            logger.info(f"Cloud Storage Push config: {config}")
            return cloud_storage_push_config

    config = ConfigurationManager()
    cloud_storage_push_config = config.get_cloud_storage_push_config()
    cloud_storage_push = CloudStoragePush(config=cloud_storage_push_config)
    cloud_storage_push.push_to_cloud_storage()
    
    logger.info("Cloud storage push component completed")

    
    
    logger.info(">>> Stage Cloud Storage Push completed <<<")
    return "Cloud storage push completed successfully"


@component(
    base_image="python:3.11"
)
def cleanup_component() -> str:
    """Cleanup component using existing cleanup logic from main_pipeline.py"""

    import os
    import sys
    import logging
    logging.basicConfig(level=logging.INFO, format='[%(asctime)s]: %(message)s:')
    logger = logging.getLogger(__name__)
    logs_dir = "logs"
    log_filepath = os.path.join(logs_dir, "running_logs.log")
    os.makedirs(logs_dir, exist_ok=True)

    logging.basicConfig(
        filename=log_filepath,
        format="[%(asctime)s]: %(levelname)s: %(message)s",
        level=logging.INFO,
        handlers=[
            logging.FileHandler(log_filepath),
            logging.StreamHandler(sys.stdout)
    ]
    )
    logger = logging.getLogger("MLopsLogger")
    logger.info("=" * 50)
    logger.info("CLEANUP: Removing temporary versioned files")
    logger.info("=" * 50)
    
    try:
        class ConfigurationManager:
            def __init__(
                self,
                config_filepath=Path(r"C:\Users\Admin\Desktop\Data projects\python\Decision-making-system\churn_mlops\config\config.yaml")
            ):
                self.config = read_yaml(config_filepath)
                create_directories([self.config.artifacts_root])


            def get_data_ingestion_config(self) -> DataIngestionConfig:
                config = self.config.data_ingestion
                
                create_directories([config.root_dir, config.data_version_dir])

                data_ingestion_config = DataIngestionConfig(
                    root_dir=Path(config.root_dir),
                    local_data_file=Path(config.local_data_file),
                    test_size=config.test_size,
                    random_state=config.random_state,
                    data_version_dir=Path(config.data_version_dir)
                )

                logger.info(f"Data Ingestion config: {config}")
                return data_ingestion_config
                
            def get_prepare_base_model_config(self) -> PrepareBaseModelConfig:
                config = self.config.prepare_base_model
                
                create_directories([config.model_version_dir, config.data_version_dir])

                prepare_base_model_config = PrepareBaseModelConfig(
                    model_version_dir=Path(config.model_version_dir),
                    data_version_dir=Path(config.data_version_dir),
                    n_estimators=config.n_estimators,
                    random_state=config.random_state,
                    criterion=config.criterion,
                    max_depth=config.max_depth,
                    max_features=config.max_features,
                    min_samples_leaf=config.min_samples_leaf
                )

                logger.info(f"Prepare base model config: {config}")
                return prepare_base_model_config


            def get_training_config(self) -> TrainingConfig:
                config = self.config.training
                
                create_directories([config.model_version_dir, config.data_version_dir])

                training_config = TrainingConfig(
                    model_version_dir=Path(config.model_version_dir),
                    data_version_dir=Path(config.data_version_dir),
                )

                logger.info(f"Training config: {config}")
                return training_config

            def get_evaluation_config(self) -> EvaluationConfig:
                config = self.config.evaluation
                
                create_directories([config.plots_dir, config.model_version_dir, config.data_version_dir])

                evaluation_config = EvaluationConfig(
                    model_version_dir=Path(config.model_version_dir),
                    data_version_dir=Path(config.data_version_dir),
                    plots_dir=Path(config.plots_dir)
                )

                logger.info(f"Evaluation config: {config}")
                return evaluation_config

            def get_cloud_storage_push_config(self) -> CloudStoragePushConfig:
                config = self.config.cloud_storage_push
                
                cloud_storage_push_config = CloudStoragePushConfig(
                    root_dir=Path(config.root_dir),
                    bucket_name=config.bucket_name,
                    credentials_path=config.credentials_path,
                    data_version_dir=Path(config.data_version_dir),
                    evaluation_dir=Path(config.evaluation_dir),
                    workers=config.workers
                )

                logger.info(f"Cloud Storage Push config: {config}")
                return cloud_storage_push_config

        config_manager = ConfigurationManager()
        data_version_dir = config_manager.config.data_ingestion.data_version_dir
        evaluation_dir = config_manager.config.evaluation.plots_dir
        
        # Pattern: *_version_YYYYMMDDTHHMMSS.csv
        data_version_files = glob.glob(os.path.join(data_version_dir, "*_version_????????T??????.csv"))
        logger.info(f"Found {len(data_version_files)} timestamp-versioned data files to clean")
        
        for file_path in data_version_files:
            try:
                os.remove(file_path)
                logger.info(f"Deleted temporary file: {os.path.basename(file_path)}")
            except Exception as e:
                logger.warning(f"Failed to delete file {file_path}: {e}")
        
        # Pattern: *_YYYYMMDDTHHMMSS.json and *_YYYYMMDDTHHMMSS.png
        eval_json_files = glob.glob(os.path.join(evaluation_dir, "*_????????T??????.json"))
        eval_png_files = glob.glob(os.path.join(evaluation_dir, "*_????????T??????.png"))
        eval_files = eval_json_files + eval_png_files
        logger.info(f"Found {len(eval_files)} timestamp-versioned evaluation files to clean")
        
        for file_path in eval_files:
            try:
                os.remove(file_path)
                logger.info(f"Deleted temporary file: {os.path.basename(file_path)}")
            except Exception as e:
                logger.warning(f"Failed to delete file {file_path}: {e}")
        
        remaining_data_files = len(glob.glob(os.path.join(data_version_dir, "*.csv")))
        remaining_eval_files = len(glob.glob(os.path.join(evaluation_dir, "*")))
        logger.info(f"Kept {remaining_data_files} essential data files for DVC tracking")
        logger.info(f"Kept {remaining_eval_files} essential evaluation files for DVC tracking")
                
    except Exception as e:
        logger.warning(f"Error during cleanup: {e}")
    
    logger.info("Cleanup completed")
    return "Cleanup completed successfully"


@dsl.pipeline(
    name='churn-prediction-pipeline',
    description='Churn prediction pipeline with artifact tracking - matches main_pipeline.py logic'
)
def churn_prediction_pipeline():
    """Main pipeline definition with proper artifact tracking and same logic as main_pipeline.py"""
    
    # Step 1: Data preparation
    data_prep_task = data_preparation_component()
    
    # Step 2: Model preparation
    model_prep_task = model_preparation_component(
        train_data=data_prep_task.outputs['train_data'],
        test_data=data_prep_task.outputs['test_data'],
        y_train_data=data_prep_task.outputs['y_train_data'],
        y_test_data=data_prep_task.outputs['y_test_data']
    )
    model_prep_task.after(data_prep_task)
    
    # Step 3: Training and evaluation
    train_eval_task = train_evaluation_component(
        base_model=model_prep_task.outputs['base_model'],
        scaled_train=model_prep_task.outputs['scaled_train'],
        scaled_test=model_prep_task.outputs['scaled_test'],
        y_train_data=data_prep_task.outputs['y_train_data'],
        y_test_data=data_prep_task.outputs['y_test_data']
    )
    train_eval_task.after(model_prep_task)
    
    # Step 4: Cloud storage push
    cloud_push_task = cloud_storage_push_component()
    cloud_push_task.after(train_eval_task)
    
    # Step 5: Cleanup
    cleanup_task = cleanup_component()
    cleanup_task.after(cloud_push_task)

In [13]:

import time
from kfp.compiler import Compiler
from kfp import Client


def reset_kubeflow_metadata():
    """Reset Kubeflow metadata to resolve context issues"""
    try:
        logger.info("Resetting Kubeflow metadata store...")
        
        # Connect to client and try to clear any stuck contexts
        client = Client(host="http://localhost:8080")
        
        # Get all experiments and clean up if needed
        experiments = client.list_experiments()
        logger.info(f"Found {experiments.total_size if experiments else 0} experiments")
        
        logger.info("Metadata reset completed")
        return True
        
    except Exception as e:
        logger.warning(f"Could not reset metadata (this is often normal): {e}")
        return True  # Continue anyway

def compile_pipeline():
    """Compile the Kubeflow pipeline to YAML"""
    try:
        logger.info("Compiling Kubeflow pipeline...")
        
        compiler = Compiler()
        compiler.compile(
            pipeline_func=churn_prediction_pipeline,
            package_path="churn_pipeline.yaml"
        )
        
        logger.info("Pipeline compiled successfully: churn_pipeline.yaml")
        return True
        
    except Exception as e:
        logger.error(f"Error compiling pipeline: {e}")
        return False


def deploy_pipeline():
    """deployment with enhanced error handling and retry logic"""
    max_retries = 3
    retry_delay = 5
    
    for attempt in range(max_retries):
        try:
            logger.info(f"Deploying pipeline to Kubeflow (attempt {attempt + 1}/{max_retries})...")
            
            # Connect to Kubeflow with timeout
            client = Client(host="http://localhost:8080")
            
            # Create unique run name with timestamp
            run_name = f"churn-prediction-{int(time.time())}-{attempt}"
            
            # Create and run the pipeline
            run = client.create_run_from_pipeline_package(
                pipeline_file="churn_pipeline.yaml",
                arguments={},
                run_name=run_name,
                experiment_name="churn-prediction-experiments"  # Create/use specific experiment
            )
            
            logger.info(f"✅ Pipeline run created successfully: {run.run_id}")
            logger.info(f"Monitor progress at: http://localhost:8080/#/runs/details/{run.run_id}")
            logger.info("Pipeline submitted to Kubeflow - check UI for status")
            
            return True
            
        except Exception as e:
            logger.error(f"Error deploying pipeline (attempt {attempt + 1}): {e}")
            
            if "Cannot find context" in str(e) or "PipelineRun" in str(e):
                logger.info("Detected metadata context issue. Attempting reset...")
                reset_kubeflow_metadata()
                
            if attempt < max_retries - 1:
                logger.info(f"Retrying in {retry_delay} seconds...")
                time.sleep(retry_delay)
                retry_delay *= 2  # Exponential backoff
            else:
                logger.error("All retry attempts failed")
                return False
    
    return False

def run_kubeflow_pipeline():
    """Main function to compile and run the Kubeflow pipeline with error handling"""
    logger.info("=" * 60)
    logger.info("STARTING KUBEFLOW PIPELINE EXECUTION")
    logger.info("=" * 60)
    
    # Step 1: Reset metadata if needed
    reset_kubeflow_metadata()
    
    # Step 2: Compile pipeline
    if not compile_pipeline():
        logger.error("Pipeline compilation failed. Exiting.")
        return False
    
    # Step 3: Deploy pipeline with retries
    if not deploy_pipeline():
        logger.error("Pipeline deployment failed after all retries. Exiting.")
        return False
    
    logger.info("=" * 60)
    logger.info("KUBEFLOW PIPELINE DEPLOYMENT COMPLETED")
    logger.info("Check Kubeflow UI at http://localhost:8080 for execution status")
    logger.info("=" * 60)
    
    return True



In [14]:
run_kubeflow_pipeline()

[2025-06-03 16:22:56,453]: STARTING KUBEFLOW PIPELINE EXECUTION:
[2025-06-03 16:22:56,453]: Resetting Kubeflow metadata store...:
[2025-06-03 16:22:56,489]: Found 3 experiments:
[2025-06-03 16:22:56,490]: Metadata reset completed:
[2025-06-03 16:22:56,490]: Compiling Kubeflow pipeline...:
[2025-06-03 16:22:56,512]: Pipeline compiled successfully: churn_pipeline.yaml:
[2025-06-03 16:22:56,513]: Deploying pipeline to Kubeflow (attempt 1/3)...:


[2025-06-03 16:22:56,690]: ✅ Pipeline run created successfully: d0cfe869-a06d-4b9a-809a-36b8905d6a92:
[2025-06-03 16:22:56,690]: Monitor progress at: http://localhost:8080/#/runs/details/d0cfe869-a06d-4b9a-809a-36b8905d6a92:
[2025-06-03 16:22:56,692]: Pipeline submitted to Kubeflow - check UI for status:
[2025-06-03 16:22:56,693]: KUBEFLOW PIPELINE DEPLOYMENT COMPLETED:
[2025-06-03 16:22:56,693]: Check Kubeflow UI at http://localhost:8080 for execution status:


True

In [1]:
from google.cloud import storage

def download_file_from_gcs(bucket_name, source_blob_name, destination_file_name):
    """Downloads a blob from the bucket."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(source_blob_name)
    blob.download_to_filename(destination_file_name)

    print(f"Downloaded {source_blob_name} from bucket {bucket_name} to {destination_file_name}")

# Replace these with your bucket and file details
bucket_name = "churn_data_version"
source_blob_name = "llmops-460406-f379299f4261.json"
destination_file_name = "llmops-460406-f379299f4261.json"

download_file_from_gcs(bucket_name, source_blob_name, destination_file_name)


DefaultCredentialsError: Your default credentials were not found. To set up Application Default Credentials, see https://cloud.google.com/docs/authentication/external/set-up-adc for more information.

In [3]:
from google.cloud import storage
from datetime import datetime
import os

def download_gcs_file_with_secret():
    key_path = "/secrets/gcp/key.json"
    
    # Check if the key file exists (to debug mount issues)
    if not os.path.exists(key_path):
        raise FileNotFoundError(f"Service account key not found at {key_path}")

    # Initialize GCS client using the service account JSON
    client = storage.Client.from_service_account_json(key_path)

    # Define bucket and file names
    bucket_name = "churn_data_version"
    source_blob_name = "input_raw.csv"

    # Create output filename with timestamp suffix
    datetime_suffix = datetime.now().strftime('%Y%m%dT%H%M%S')
    destination_file_name = f"input_raw_{datetime_suffix}.csv"

    # Download blob
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(source_blob_name)
    blob.download_to_filename(destination_file_name)

    print(f"Downloaded '{source_blob_name}' from bucket '{bucket_name}' to local file '{destination_file_name}'")


In [7]:
import pandas as pd
import os
import certifi


In [8]:
f_input = pd.read_csv("https://raw.githubusercontent.com/Teungtran/churn_mlops/main/artifacts/data_ingestion/input_raw.csv")

In [9]:
f_input

Unnamed: 0,customer_id,Purchase Date,Product Category,Product Price,Quantity,Total Purchase Amount,Payment Method,Customer Age,Returns,Customer Name,Age,Gender,Churn,Customer_Labels
0,KH46251,2020-09-08 09:38:32,Electronics,12,3,740,Credit Card,37,0.0,Christine Hernandez,37,Male,0,Lapsed Customers
1,KH46251,2022-03-05 12:56:35,Home,468,4,2739,PayPal,37,0.0,Christine Hernandez,37,Male,0,Lapsed Customers
2,KH46251,2022-05-23 18:18:01,Home,288,2,3196,PayPal,37,0.0,Christine Hernandez,37,Male,0,Lapsed Customers
3,KH46251,2020-11-12 13:13:29,Clothing,196,1,3509,PayPal,37,0.0,Christine Hernandez,37,Male,0,Lapsed Customers
4,KH13593,2020-11-27 17:55:11,Home,449,1,3452,Credit Card,49,0.0,James Grant,49,Female,1,Regular Customers
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
204158,KH17165,2020-02-25 13:38:16,Clothing,230,4,3664,Credit Card,18,0.0,Dawn Perez,18,Male,0,Lapsed Customers
204159,KH45397,2022-02-18 04:18:18,Books,95,2,3397,Cash,54,,Scott Lindsey,54,Male,0,Lapsed Customers
204160,KH45410,2021-05-30 15:37:15,Home,311,2,3302,Credit Card,50,1.0,Johnny Riley,50,Male,0,Lapsed Customers
204161,KH48835,2021-11-23 01:30:42,Home,27,1,3615,Credit Card,42,1.0,Jeremy Rush,42,Female,1,Lapsed Customers
