In [1]:
import sys
import os
import json
import tempfile
import mlflow
import pandas as pd
import sklearn
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from mlflow.models.signature import infer_signature
from pathlib import Path
from dataclasses import dataclass

sys.path.append('..')
from src.mlflow_utils import (
    configure_mlflow, 
    load_config, 
    find_latest_run_id_by_experiment_and_stage, 
    get_dataset
)

In [2]:
CONFIG = load_config()

In [3]:
@dataclass
class DatasetSplit:
    """Container for dataset splits with metadata"""
    data: np.ndarray
    name: str
    data_type: str
    split_type: str
    features: list[str] = None
    target: str = None

In [4]:
class DataPreprocessor:
    """Class encapsulating data preprocessing functionality"""
    
    def __init__(self, config: dict):
        self.config = config
        self.raw_data = None
        self.scaler = None
        self.splits = {}
        self._initialize_components()

    def _initialize_components(self) -> None:
        """Initialize preprocessing components from config"""
        self.scaler = StandardScaler()
        self.test_size = self.config["dataset"]["split"]["test_size"]
        self.val_size = self.config["dataset"]["split"]["val_size"]
        self.random_state = self.config["dataset"]["split"]["random_state"]

    def load_data(self) -> None:
        """Load and validate raw data from MLflow"""
        eda_run_id = find_latest_run_id_by_experiment_and_stage(
            self.config["experiment_names"]["eda"],
            self.config["run_names"]["eda"]
        )
        self.raw_data = get_dataset(eda_run_id, self.config["artifacts"]["data"]["raw"])
        self._validate_raw_data()

    def _validate_raw_data(self) -> None:
        """Perform initial data validation checks"""
        if self.raw_data.empty:
            raise ValueError("Loaded raw data is empty")
        if 'Class' not in self.raw_data.columns:
            raise KeyError("Target column 'Class' missing in raw data")
        if self.raw_data.isnull().sum().sum() > 0:
            raise ValueError("Raw data contains missing values")

    def split_data(self) -> None:
        """Perform stratified data splitting"""
        X = self.raw_data.drop(columns=['Class', 'Time'])
        y = self.raw_data['Class']
        
        # Initial train/test split
        X_train_val, X_test, y_train_val, y_test = train_test_split(
            X, y, 
            test_size=self.test_size,
            stratify=y,
            random_state=self.random_state
        )
        
        # Secondary validation split
        X_train, X_val, y_train, y_val = train_test_split(
            X_train_val, y_train_val,
            test_size=self.val_size,
            stratify=y_train_val,
            random_state=self.random_state
        )
        
        # Store splits
        self.splits = {
            'X_train': DatasetSplit(X_train, 'X_train', 'features', 'training'),
            'X_val': DatasetSplit(X_val, 'X_val', 'features', 'validation'),
            'X_test': DatasetSplit(X_test, 'X_test', 'features', 'testing'),
            'y_train': DatasetSplit(y_train.values, 'y_train', 'target', 'training'),
            'y_val': DatasetSplit(y_val.values, 'y_val', 'target', 'validation'),
            'y_test': DatasetSplit(y_test.values, 'y_test', 'target', 'testing')
        }

    def scale_features(self) -> None:
        """Scale features and store transformed data"""
        # Fit on training data
        self.scaler.fit(self.splits['X_train'].data)
        
        # Transform all feature splits
        for split in ['X_train', 'X_val', 'X_test']:
            scaled_data = self.scaler.transform(self.splits[split].data)
            self.splits[split].data = scaled_data

    def validate_splits(self) -> None:
        """Validate split integrity and data quality"""
        # Check data shapes
        assert (self.splits['X_train'].data.shape[1] == 
                self.splits['X_test'].data.shape[1]), "Feature dimension mismatch"
                
        # Check for NaNs
        for name, split in self.splits.items():
            if np.isnan(split.data).any():
                raise ValueError(f"NaN values detected in {name}")

        # Check class distributions
        train_pos = np.mean(self.splits['y_train'].data)
        test_pos = np.mean(self.splits['y_test'].data)
        if abs(train_pos - test_pos) > 0.05:
            raise Warning("Significant class distribution shift between splits")

    def get_processed_data(self) -> dict[str, DatasetSplit]:
        """Return processed dataset splits"""
        return self.splits

In [5]:
def log_data_artifacts(data: pd.DataFrame) -> None:
    """Log comprehensive data artifacts with schema"""
    with tempfile.TemporaryDirectory() as tmp_dir:
        # Log raw dataset sample
        sample_path = Path(tmp_dir) / "data_sample.csv"
        data.sample(1000).to_csv(sample_path, index=False)
        mlflow.log_artifact(sample_path)
        
        # Log dataset schema
        schema = pd.io.json.build_table_schema(data)
        schema_path = Path(tmp_dir) / "data_schema.json"
        schema_path.write_text(json.dumps(schema, indent=2))
        mlflow.log_artifact(schema_path)

def log_split_characteristics(splits: dict[str, DatasetSplit]) -> None:
    """Log dataset split metrics"""
    metrics = {}
    for name, split in splits.items():
        if split.data_type == 'features':
            metrics[f"{name}_samples"] = split.data.shape[0]
            metrics[f"{name}_features"] = split.data.shape[1]
        else:
            metrics[f"{name}_samples"] = len(split.data)
            
        if split.data_type == 'target':
            metrics[f"{name}_positive"] = np.sum(split.data)
            metrics[f"{name}_negative"] = len(split.data) - np.sum(split.data)
    
    mlflow.log_metrics(metrics)

def log_processed_splits(splits: dict[str, DatasetSplit], artifact_path: str) -> None:
    """Save and log processed datasets in efficient format"""
    with tempfile.TemporaryDirectory() as tmp_dir:
        for name, split in splits.items():
            file_path = Path(tmp_dir) / f"{name}.parquet"
            pd.DataFrame(split.data).to_parquet(file_path)
            mlflow.log_artifact(file_path, os.path.join(artifact_path, split.split_type))

def log_scaler_model(scaler: StandardScaler, X_sample: pd.DataFrame, config: dict) -> None:
    """Log scaler as MLflow model with signature"""
    signature = infer_signature(X_sample, scaler.transform(X_sample))
    
    mlflow.sklearn.log_model(
        sk_model=scaler,
        artifact_path=config["models"]["scaler"]["name"],
        signature=signature,
        registered_model_name=config["models"]["scaler"]["registered_model_name"]
    )
    
    # Log scaler parameters
    mlflow.log_params({
        "scaler_mean": json.dumps(scaler.mean_.tolist()),
        "scaler_scale": json.dumps(scaler.scale_.tolist())
    })

In [6]:
def preprocessing_pipeline(config: dict) -> None:
    """Main data preprocessing execution flow"""
    # Initialize pipeline
    preprocessor = DataPreprocessor(config)
    
    # Load and validate data
    preprocessor.load_data()
    log_data_artifacts(preprocessor.raw_data)
    
    # Split and process data
    preprocessor.split_data()
    preprocessor.scale_features()
    preprocessor.validate_splits()
    
    # Log processed data
    processed_splits = preprocessor.get_processed_data()
    log_split_characteristics(processed_splits)
    log_processed_splits(processed_splits, config["artifacts"]["data"]["processed"])
    
    # Log scaler model
    log_scaler_model(
        preprocessor.scaler,
        preprocessor.raw_data.drop(columns=['Class', 'Time']),
        config
    )
    
    # Log environment details
    mlflow.log_params({
        "pandas_version": pd.__version__,
        "sklearn_version": sklearn.__version__,
        "mlflow_version": mlflow.__version__
    })

In [7]:
if __name__ == "__main__": 
    experiment_name = CONFIG["experiment_names"]["preprocessing"]
    run_name = CONFIG["run_names"]["preprocessing"]
    
    configure_mlflow(experiment_name)
    
    try:
        with mlflow.start_run(run_name=run_name):
            mlflow.set_tags({
                "stage": "preprocessing",
                "dataset_type": CONFIG["dataset"]["type"],
                "task": CONFIG["dataset"]["task"]
            })
            
            mlflow.log_dict(CONFIG, "preprocessing_config.json")
            preprocessing_pipeline(CONFIG)
            mlflow.set_tag("status", "completed")
            
            print(f"Preprocessing completed. Run ID: {mlflow.active_run().info.run_id}")
    except Exception as e:
        mlflow.log_param("error", str(e))
        mlflow.set_tag("status", "failed")
        mlflow.end_run()
        raise



Preprocessing completed. Run ID: d3fa3913137d454f985b6dda41044f87


Registered model 'CreditCardScaler' already exists. Creating a new version of this model...
Created version '26' of model 'CreditCardScaler'.
