In [None]:
import os

In [None]:
%pwd

In [None]:
os.chdir("../")

In [None]:
%pwd

In [None]:
from dataclasses import dataclass
from pathlib import Path

In [None]:
@dataclass(frozen=True)
class ModelTrainerConfig:
    root_dir: Path
    train_heart_rate_data_path: Path
    test_heart_rate_data_path: Path
    train_is_anomaly_data_path: Path
    test_is_anomaly_data_path: Path
    data_transformation_dir: Path  
    
   
    heart_rate_predictor_model_name: str
    anomaly_detector_model_name: str
    
    
    heart_rate_target_column: str
    anomaly_target_column: str

In [None]:
from Heart_Rate_Anomaly_Detector.constants import *
from Heart_Rate_Anomaly_Detector.utils.common import read_yaml, create_directories

In [None]:
class ConfigurationManager:
    def __init__(
        self,
        config_filepath = CONFIG_FILE_PATH,
        params_filepath = PARAMS_FILE_PATH,
        schema_filepath = SCHEMA_FILE_PATH):

        self.config = read_yaml(config_filepath)
        self.params = read_yaml(params_filepath)
        self.schema = read_yaml(schema_filepath)

        create_directories([self.config.artifacts_root])
        
    def get_model_trainer_config(self) -> ModelTrainerConfig:
        config = self.config.model_trainer
        
        
        heart_rate_params = self.params.HEART_RATE_PREDICTOR
        anomaly_params = self.params.ANOMALY_DETECTOR
        
       
        heart_rate_model_schema = self.schema.models.HeartRatePredictor
        anomaly_model_schema = self.schema.models.AnomalyDetector

        create_directories([config.root_dir])

        model_trainer_config = ModelTrainerConfig(
            root_dir=Path(config.root_dir),
            train_heart_rate_data_path=Path(config.data_path.train_heart_rate),
            test_heart_rate_data_path=Path(config.data_path.test_heart_rate),
            train_is_anomaly_data_path=Path(config.data_path.train_is_anomaly),
            test_is_anomaly_data_path=Path(config.data_path.test_is_anomaly),
            data_transformation_dir=Path(self.config.data_transformation.root_dir),
            
         
            heart_rate_predictor_model_name=config.model_name.heart_rate_predictor,
            anomaly_detector_model_name=config.model_name.anomaly_detector,
            
           
            heart_rate_target_column=heart_rate_model_schema.target_column,
            anomaly_target_column=anomaly_model_schema.target_column
        )
        return model_trainer_config
    
    def get_heart_rate_features(self) -> list:
       
        return self.schema.models.HeartRatePredictor.features
    
    def get_anomaly_features(self) -> list:
       
        return self.schema.models.AnomalyDetector.features
    
    def get_column_dtypes(self) -> dict:
        
        return self.schema.columns

In [None]:
import pandas as pd
import os
import numpy as np
from datetime import datetime
import joblib
from Heart_Rate_Anomaly_Detector import logger
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error
from sklearn.model_selection import train_test_split
from sklearn.ensemble import IsolationForest
from sklearn.metrics import accuracy_score, f1_score, r2_score

In [None]:
class ModelTrainer:
    def __init__(self, config, schema_config, params, model_name):
        self.config = config
        self.schema_config = schema_config
        self.params = params
        self.model_name = model_name
        self.scaler = None
        self.feature_columns = None

        
        self.target_column = schema_config.models[model_name].target_column
        self.features = schema_config.models[model_name].features

    def load_data(self):
        
        if self.model_name == "HeartRatePredictor":
            train_data = pd.read_csv(self.config.train_heart_rate_data_path)
            test_data = pd.read_csv(self.config.test_heart_rate_data_path)
        elif self.model_name == "AnomalyDetector":
            train_data = pd.read_csv(self.config.train_is_anomaly_data_path)
            test_data = pd.read_csv(self.config.test_is_anomaly_data_path)
        else:
            raise ValueError(f"Unknown model name: {self.model_name}")
        
        logger.info(f"Data loaded for {self.model_name}")
        return train_data, test_data

    def prepare_features(self, train_data, test_data):
        train_x = train_data[self.features].copy()
        test_x = test_data[self.features].copy()

        train_y = train_data[self.target_column]
        test_y = test_data[self.target_column]

       
        cat_cols = train_x.select_dtypes(include=['object']).columns.tolist()
        if cat_cols:
            logger.info(f"Encoding categorical columns: {cat_cols}")
            train_x = pd.get_dummies(train_x, columns=cat_cols, drop_first=True)
            test_x = pd.get_dummies(test_x, columns=cat_cols, drop_first=True)
            test_x = test_x.reindex(columns=train_x.columns, fill_value=0)

        self.feature_columns = train_x.columns.tolist()
        return train_x, test_x, train_y, test_y

    def get_model(self):
        if self.model_name == "HeartRatePredictor":
            algo_config = self.params.HEART_RATE_PREDICTOR  
            if algo_config.algorithm == "random_forest":     
                return RandomForestRegressor(
                    n_estimators=algo_config.n_estimators,   
                    max_depth=algo_config.max_depth,         
                    random_state=algo_config.random_state,   
                    n_jobs=-1
                )
        elif self.model_name == "AnomalyDetector":
            algo_config = self.params.ANOMALY_DETECTOR       
            if algo_config.algorithm == "isolation_forest":  
                return IsolationForest(
                    contamination=algo_config.contamination,  
                    n_estimators=algo_config.n_estimators,   
                    random_state=algo_config.random_state     
                )
        raise ValueError(f"Unsupported algorithm for {self.model_name}")
    

    def save_model_artifacts(self, model, metrics=None, test_data=None):
       
        os.makedirs(self.config.root_dir, exist_ok=True)
        
        
        model_artifacts = {
            'model': model,
            'scaler': self.scaler,
            'feature_columns': self.feature_columns,
            'target_column': self.target_column,
            'model_type': type(model).__name__,
            'timestamp': datetime.now().isoformat(),
            'metrics': metrics
        }
        
        model_path = os.path.join(self.config.root_dir, f"{self.model_name}.joblib")
        joblib.dump(model_artifacts, model_path)
        logger.info(f"Model artifacts saved at: {model_path}")

      
        if test_data is not None:
            test_data_path = os.path.join(self.config.root_dir, f"{self.model_name}_test_data.joblib")
            joblib.dump(test_data, test_data_path)
            logger.info(f"Test data saved at: {test_data_path}")

       
        if hasattr(model, 'feature_importances_') and self.feature_columns:
            feature_importance = pd.DataFrame({
                'feature': self.feature_columns,
                'importance': model.feature_importances_
            }).sort_values('importance', ascending=False)
            
            importance_path = os.path.join(self.config.root_dir, f"{self.model_name}_feature_importance.csv")
            feature_importance.to_csv(importance_path, index=False)
            logger.info(f"Feature importance saved at: {importance_path}")

           
            logger.info("=== Top 10 Important Features ===")
            for idx, row in feature_importance.head(10).iterrows():
                logger.info(f"{row['feature']}: {row['importance']:.4f}")


    def train_model(self):
        try:
            train_data, test_data = self.load_data()
            train_x, test_x, train_y, test_y = self.prepare_features(train_data, test_data)
            model = self.get_model()
            model.fit(train_x, train_y)

            if self.model_name == "HeartRatePredictor":
                train_preds = model.predict(train_x)
                test_preds = model.predict(test_x)

                train_rmse = np.sqrt(mean_squared_error(train_y, train_preds))
                test_rmse = np.sqrt(mean_squared_error(test_y, test_preds))
                test_r2 = r2_score(test_y, test_preds)

                metrics = {
                    "train_rmse": train_rmse,
                    "test_rmse": test_rmse,
                    "test_r2": test_r2
                }
                
                print("=== Regression Results ===")
                print(f"Train RMSE: {train_rmse:.4f}")
                print(f"Test RMSE: {test_rmse:.4f}")
                print(f"Test R²: {test_r2:.4f}")

                test_data_to_save = pd.concat([test_x, test_y], axis=1)
                self.save_model_artifacts(model, metrics=metrics, test_data=test_data_to_save)
                logger.info("Model training for heart rate is completed successfully!")

            elif self.model_name == "AnomalyDetector":
                train_preds = model.predict(train_x)
                test_preds = model.predict(test_x)

            
                train_preds = (train_preds == -1).astype(int)
                test_preds = (test_preds == -1).astype(int)

                train_acc = accuracy_score(train_y, train_preds)
                test_acc = accuracy_score(test_y, test_preds)
                test_f1 = f1_score(test_y, test_preds)

                metrics = {
                    "train_acc": train_acc,
                    "test_acc": test_acc,
                    "test_f1": test_f1
                }

                print("=== Classification Results ===")
                print(f"Train Accuracy: {train_acc:.4f}")
                print(f"Test Accuracy: {test_acc:.4f}")
                print(f"Test F1 Score: {test_f1:.4f}")

                test_data_to_save = pd.concat([test_x, test_y], axis=1)
                self.save_model_artifacts(model, metrics=metrics, test_data=test_data_to_save)
                logger.info("Model training for is anomaly completed successfully!")

            return model, test_preds
        

        except Exception as e:
            logger.error(f"Error in model training: {str(e)}")
            raise e

In [None]:
try:
    
    config = ConfigurationManager()
    model_trainer_config = config.get_model_trainer_config()
    
    hr_trainer = ModelTrainer(
        config=model_trainer_config,
        schema_config=schema,
        params=params,
        model_name="HeartRatePredictor"
    )
    hr_model, hr_predictions = hr_trainer.train_model(train_hr_data, test_hr_data)
    
  
    train_anomaly_data = pd.read_csv(model_trainer_config.train_is_anomaly_data_path)
    test_anomaly_data = pd.read_csv(model_trainer_config.test_is_anomaly_data_path)
    
   
   
    anomaly_trainer = ModelTrainer(
        config=model_trainer_config,
        schema_config=schema,
        params=params,
        model_name="AnomalyDetector"
    )
    anomaly_model, anomaly_predictions = anomaly_trainer.train_model(train_anomaly_data, test_anomaly_data)
    
  
    
except Exception as e:
    print(f"Error occurred: {str(e)}")
    raise e