<a href="https://colab.research.google.com/github/kulkarnisunil/AI_Assistant/blob/main/model_3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [53]:
'''
crypto-volatility-prediction/
│
├── config.yaml
├── requirements.txt
├── setup.py
├── .gitignore
│
├── src/
│   ├── __init__.py
│   ├── data/
│   │   ├── __init__.py
│   │   ├── loader.py
│   │   ├── preprocessor.py
│   │   └── feature_engineering.py
│   ├── models/
│   │   ├── __init__.py
│   │   ├── factory.py
│   │   ├── trainer.py
│   │   └── evaluator.py
│   ├── pipeline/
│   │   ├── __init__.py
│   │   └── training_pipeline.py
│   └── utils/
│       ├── __init__.py
│       ├── config_loader.py
│       └── logger.py
│
├── tests/
│   ├── __init__.py
│   ├── test_data/
│   │   ├── test_loader.py
│   │   └── test_preprocessor.py
│   ├── test_models/
│   │   ├── test_factory.py
│   │   └── test_trainer.py
│   └── test_pipeline/
│       └── test_training_pipeline.py
│
├── notebooks/
│   ├── exploration.ipynb
│   └── modeling.ipynb
│
├── data/
│   ├── raw/
│   │   └── dataset.csv
│   ├── processed/
│   └── features/
│
├── models/
│   ├── best_model.pkl
│   ├── scaler.pkl
│   └── feature_columns.pkl
│
├── mlruns/  # MLflow tracking
├── logs/
├── api/
│   ├── app.py
│   └── schemas.py
│
├── monitoring/
│   ├── drift_detector.py
│   └── performance_monitor.py
│
├── deployment/
│   ├── Dockerfile
│   ├── docker-compose.yml
│   └── kubernetes/
│       ├── deployment.yaml
│       └── service.yaml
│
└── scripts/
    ├── train.py
    ├── predict.py
    └── serve.py'''

'\ncrypto-volatility-prediction/\n│\n├── config.yaml\n├── requirements.txt\n├── setup.py\n├── .gitignore\n│\n├── src/\n│   ├── __init__.py\n│   ├── data/\n│   │   ├── __init__.py\n│   │   ├── loader.py\n│   │   ├── preprocessor.py\n│   │   └── feature_engineering.py\n│   ├── models/\n│   │   ├── __init__.py\n│   │   ├── factory.py\n│   │   ├── trainer.py\n│   │   └── evaluator.py\n│   ├── pipeline/\n│   │   ├── __init__.py\n│   │   └── training_pipeline.py\n│   └── utils/\n│       ├── __init__.py\n│       ├── config_loader.py\n│       └── logger.py\n│\n├── tests/\n│   ├── __init__.py\n│   ├── test_data/\n│   │   ├── test_loader.py\n│   │   └── test_preprocessor.py\n│   ├── test_models/\n│   │   ├── test_factory.py\n│   │   └── test_trainer.py\n│   └── test_pipeline/\n│       └── test_training_pipeline.py\n│\n├── notebooks/\n│   ├── exploration.ipynb\n│   └── modeling.ipynb\n│\n├── data/\n│   ├── raw/\n│   │   └── dataset.csv\n│   ├── processed/\n│   └── features/\n│\n├── models/\n│   ├

In [54]:
!pip install scikit-learn xgboost mlflow fastapi uvicorn pyyaml pandas numpy



In [55]:
%%writefile config.yaml
project:
  name: "crypto-volatility-prediction"
  version: "1.0.0"
  description: "Machine Learning pipeline for predicting cryptocurrency volatility"

data:
  raw_path: "data/raw/dataset.csv"
  processed_path: "data/processed/"
  features_path: "data/features/"

  # Crypto selection criteria
  min_days: 500
  min_avg_volume: 1000000
  top_n_coins: 20

  # Selected cryptocurrencies
  selected_coins:
    - Bitcoin
    - Ethereum
    - Polkadot
    - BNB
    - Solana
    - Cardano
    - Tether
    - XRP
    - USD_Coin
    - Bitcoin_Cash
    - Uniswap
    - Avalanche
    - Shiba_Inu
    - Binance_USD
    - Terra_Classic
    - EOS
    - Wrapped_Bitcoin
    - Dogecoin
    - Chainlink
    - Litecoin

features:
  base_features:
    - open
    - high
    - low
    - close
    - volume
    - marketCap

  engineered_features:
    returns:
      - log_return
      - simple_return
    volatility:
      - rolling_vol_7d
      - rolling_vol_14d
      - rolling_vol_30d
    technical_indicators:
      - atr_14
      - bb_width
    liquidity:
      - volume_pct_change
      - volume_to_marketcap
    trend:
      - close_ma_7
      - close_ma_14
      - vol_ratio_7_30

  target_column: "target_volatility_next_1d"

model:
  name: "random_forest"
  hyperparameters:
    xgboost:
      n_estimators: 200
      max_depth: 7
      learning_rate: 0.1
      subsample: 0.8
      colsample_bytree: 0.8
      random_state: 42
      n_jobs: -1
      early_stopping_rounds: 10
    random_forest:
      n_estimators: 200
      max_depth: 7
      min_samples_split: 2
      min_samples_leaf: 1
      random_state: 42
      n_jobs: -1

  evaluation:
    test_size: 0.2
    metrics:
      - mae
      - mse
      - rmse
      - r2
      - mape
    cross_validation_folds: 5

training:
  random_state: 42
  early_stopping_rounds: 10
  validation_size: 0.1
  batch_size: 32
  epochs: 100

mlflow:
  tracking_uri: "mlruns"
  experiment_name: "crypto_volatility"
  enabled: true

api:
  host: "0.0.0.0"
  port: 5000
  debug: false
  workers: 4
  log_level: "info"

deployment:
  model_path: "models/best_model.pkl"
  scaler_path: "models/scaler.pkl"
  feature_columns_path: "models/feature_columns.pkl"
  model_registry: "models/registry"

logging:
  level: "INFO"
  file: "logs/pipeline.log"
  format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"

Overwriting config.yaml


In [56]:
#Create the source directory structure
# Create directories
!mkdir -p src/{data,models,pipeline,utils}
!mkdir -p tests/{test_data,test_models,test_pipeline}
!mkdir -p data/{raw,processed,features}
!mkdir -p models logs mlruns notebooks api monitoring deployment scripts

In [57]:
%%writefile src/models/factory.py
from xgboost import XGBRegressor
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.svm import SVR
import logging
from typing import Dict, Any

logger = logging.getLogger(__name__)

class ModelFactory:
    """Factory for creating ML models based on configuration"""

    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.model_classes = {
            'xgboost': XGBRegressor,
            'random_forest': RandomForestRegressor,
            'gradient_boosting': GradientBoostingRegressor,
            'linear_regression': LinearRegression,
            'ridge': Ridge,
            'lasso': Lasso,
            'svr': SVR,
        }

    def create_model(self, model_name: str = None):
        """Create model instance based on configuration"""
        if model_name is None:
            model_name = self.config['model']['name']

        if model_name not in self.model_classes:
            available = list(self.model_classes.keys())
            raise ValueError(f"Unknown model: {model_name}. Available: {available}")

        logger.info(f"Creating {model_name} model...")

        # Get parameters for this specific model
        model_params = self.config['model']['hyperparameters'].get(
            model_name,
            {}
        )

        model_class = self.model_classes[model_name]
        return model_class(**model_params)

    def get_available_models(self):
        """Return list of available model types"""
        return list(self.model_classes.keys())

Writing src/models/factory.py


In [58]:
# data_loader

In [59]:
%%writefile src/data/loader.py
import pandas as pd
import numpy as np
from pathlib import Path
import logging
from typing import Optional, Dict, Any

logger = logging.getLogger(__name__)

class DataLoader:
    """Load and validate cryptocurrency data"""

    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.raw_path = Path(config['data']['raw_path'])
        self.processed_path = Path(config['data']['processed_path'])

    def load_raw_data(self) -> pd.DataFrame:
        """Load raw dataset from CSV"""
        logger.info(f"Loading raw data from {self.raw_path}")

        if not self.raw_path.exists():
            raise FileNotFoundError(f"Raw data file not found: {self.raw_path}")

        df = pd.read_csv(self.raw_path, parse_dates=['date'])
        logger.info(f"Loaded {len(df)} rows, {len(df.columns)} columns")
        logger.info(f"Date range: {df['date'].min()} to {df['date'].max()}")

        return df

    def filter_coins(self, df: pd.DataFrame) -> pd.DataFrame:
        """Filter data for selected cryptocurrencies"""
        selected_coins = self.config['data']['selected_coins']

        # Clean coin names
        selected_coins_clean = [coin.replace(' ', '_') for coin in selected_coins]

        # Filter dataframe
        mask = df['coin'].isin(selected_coins_clean)
        filtered_df = df[mask].copy()

        logger.info(f"Filtered to {len(filtered_df['coin'].unique())} coins")

        return filtered_df

    def validate_data(self, df: pd.DataFrame) -> bool:
        """Validate data quality"""
        logger.info("Validating data...")

        # Check for missing values
        missing_percent = df.isnull().mean() * 100
        high_missing = missing_percent[missing_percent > 20]

        if len(high_missing) > 0:
            logger.warning(f"Columns with >20% missing values: {high_missing.index.tolist()}")

        # Check for duplicate rows
        duplicates = df.duplicated().sum()
        if duplicates > 0:
            logger.warning(f"Found {duplicates} duplicate rows")

        # Check date consistency
        date_nulls = df['date'].isnull().sum()
        if date_nulls > 0:
            logger.error(f"Found {date_nulls} null dates")
            return False

        logger.info("Data validation complete")
        return True

Writing src/data/loader.py


In [60]:
#Step 6: Create feature engineering module
%%writefile src/data/feature_engineering.py
import pandas as pd
import numpy as np
from typing import Dict, Any
import logging

logger = logging.getLogger(__name__)

class FeatureEngineer:
    """Feature engineering for cryptocurrency data"""

    def __init__(self, config: Dict[str, Any]):
        self.config = config

    def create_features(self, df: pd.DataFrame) -> pd.DataFrame:
        """Create all engineered features"""
        logger.info("Creating engineered features...")

        df_features = df.copy()

        # Sort by coin and date
        df_features = df_features.sort_values(['coin', 'date'])

        # 1. Returns features
        df_features = self._create_return_features(df_features)

        # 2. Volatility features
        df_features = self._create_volatility_features(df_features)

        # 3. Technical indicators
        df_features = self._create_technical_indicators(df_features)

        # 4. Liquidity features
        df_features = self._create_liquidity_features(df_features)

        # 5. Trend features
        df_features = self._create_trend_features(df_features)

        logger.info(f"Created features. Final shape: {df_features.shape}")

        return df_features

    def _create_return_features(self, df: pd.DataFrame) -> pd.DataFrame:
        """Create return-based features"""
        # Log returns
        df['log_return'] = np.log(df['close'] / df.groupby('coin')['close'].shift(1))

        # Simple returns
        df['simple_return'] = df['close'].pct_change()

        # Rolling returns
        df['return_7d'] = df.groupby('coin')['simple_return'].rolling(7).mean().reset_index(level=0, drop=True)
        df['return_30d'] = df.groupby('coin')['simple_return'].rolling(30).mean().reset_index(level=0, drop=True)

        return df

    def _create_volatility_features(self, df: pd.DataFrame) -> pd.DataFrame:
        """Create volatility features"""
        # Rolling volatility (standard deviation of returns)
        df['rolling_vol_7d'] = df.groupby('coin')['log_return'].rolling(7).std().reset_index(level=0, drop=True)
        df['rolling_vol_14d'] = df.groupby('coin')['log_return'].rolling(14).std().reset_index(level=0, drop=True)
        df['rolling_vol_30d'] = df.groupby('coin')['log_return'].rolling(30).std().reset_index(level=0, drop=True)

        return df

    def _create_technical_indicators(self, df: pd.DataFrame) -> pd.DataFrame:
        """Create technical indicators"""
        # Average True Range (ATR)
        high_low = df['high'] - df['low']
        high_close = np.abs(df['high'] - df.groupby('coin')['close'].shift(1))
        low_close = np.abs(df['low'] - df.groupby('coin')['close'].shift(1))

        tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
        df['atr_14'] = tr.groupby(df['coin']).rolling(14).mean().reset_index(level=0, drop=True)

        # Bollinger Bands width
        df['sma_20'] = df.groupby('coin')['close'].rolling(20).mean().reset_index(level=0, drop=True)
        df['std_20'] = df.groupby('coin')['close'].rolling(20).std().reset_index(level=0, drop=True)
        df['bb_width'] = (df['std_20'] * 2) / df['sma_20']

        # Clean up intermediate columns
        df = df.drop(['sma_20', 'std_20'], axis=1, errors='ignore')

        return df

    def _create_liquidity_features(self, df: pd.DataFrame) -> pd.DataFrame:
        """Create liquidity features"""
        # Volume percentage change
        df['volume_pct_change'] = df.groupby('coin')['volume'].pct_change()

        # Volume to market cap ratio
        df['volume_to_marketcap'] = df['volume'] / df['marketCap']

        return df

    def _create_trend_features(self, df: pd.DataFrame) -> pd.DataFrame:
        """Create trend features"""
        # Moving averages
        df['close_ma_7'] = df.groupby('coin')['close'].rolling(7).mean().reset_index(level=0, drop=True)
        df['close_ma_14'] = df.groupby('coin')['close'].rolling(14).mean().reset_index(level=0, drop=True)
        df['close_ma_30'] = df.groupby('coin')['close'].rolling(30).mean().reset_index(level=0, drop=True)

        # Volume ratio
        df['vol_ratio_7_30'] = (
            df.groupby('coin')['volume'].rolling(7).mean() /
            df.groupby('coin')['volume'].rolling(30).mean()
        ).reset_index(level=0, drop=True)

        return df

Writing src/data/feature_engineering.py


In [61]:
#Step 7: Create trainer module
%%writefile src/models/trainer.py
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import joblib
from pathlib import Path
import logging
from typing import Dict, Any, Tuple
import mlflow
import mlflow.sklearn

logger = logging.getLogger(__name__)

class ModelTrainer:
    """Train and evaluate ML models"""

    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.model = None
        self.scaler = None

    def prepare_data(self, df: pd.DataFrame) -> Tuple[np.ndarray, np.ndarray, list]:
        """Prepare data for training"""
        logger.info("Preparing data for training...")

        # Get target column
        target_col = self.config['features']['target_column']

        # Get feature columns
        feature_cols = self._get_feature_columns(df)

        # Separate features and target
        X = df[feature_cols].values
        y = df[target_col].values

        logger.info(f"Data prepared. X shape: {X.shape}, y shape: {y.shape}")

        return X, y, feature_cols

    def train_test_split(self, X: np.ndarray, y: np.ndarray) -> Tuple:
        """Split data into train and test sets"""
        test_size = self.config['model']['evaluation']['test_size']
        random_state = self.config['training']['random_state']

        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=test_size, random_state=random_state
        )

        logger.info(f"Train shape: {X_train.shape}, Test shape: {X_test.shape}")

        return X_train, X_test, y_train, y_test

    def train(self, model, X_train: np.ndarray, y_train: np.ndarray) -> Any:
        """Train the model"""
        logger.info("Training model...")

        # Start MLflow run if enabled
        if self.config['mlflow']['enabled']:
            mlflow.set_experiment(self.config['mlflow']['experiment_name'])
            mlflow.start_run()

            # Log parameters
            mlflow.log_params(self.config['model']['hyperparameters'][self.config['model']['name']])

        # Train model
        model.fit(X_train, y_train)
        self.model = model

        logger.info("Model training completed")

        return model

    def evaluate(self, model, X_test: np.ndarray, y_test: np.ndarray) -> Dict[str, float]:
        """Evaluate model performance"""
        logger.info("Evaluating model...")

        # Make predictions
        y_pred = model.predict(X_test)

        # Calculate metrics
        metrics = {
            'mae': mean_absolute_error(y_test, y_pred),
            'mse': mean_squared_error(y_test, y_pred),
            'rmse': np.sqrt(mean_squared_error(y_test, y_pred)),
            'r2': r2_score(y_test, y_pred),
        }

        # Calculate MAPE (avoid division by zero)
        mask = y_test != 0
        if mask.any():
            mape = np.mean(np.abs((y_test[mask] - y_pred[mask]) / y_test[mask])) * 100
            metrics['mape'] = mape

        # Log metrics to MLflow
        if self.config['mlflow']['enabled']:
            for metric_name, value in metrics.items():
                mlflow.log_metric(metric_name, value)

        logger.info("Model evaluation completed")
        for metric_name, value in metrics.items():
            logger.info(f"{metric_name}: {value:.4f}")

        return metrics

    def cross_validate(self, model, X: np.ndarray, y: np.ndarray) -> Dict[str, float]:
        """Perform cross-validation"""
        logger.info("Performing cross-validation...")

        cv_folds = self.config['model']['evaluation']['cross_validation_folds']
        scoring = 'neg_mean_squared_error'

        cv_scores = cross_val_score(model, X, y, cv=cv_folds, scoring=scoring)

        cv_results = {
            'mean_cv_score': -cv_scores.mean(),
            'std_cv_score': cv_scores.std(),
            'cv_scores': -cv_scores
        }

        logger.info(f"CV Mean MSE: {cv_results['mean_cv_score']:.4f} (+/- {cv_results['std_cv_score']:.4f})")

        return cv_results

    def save_model(self, model, scaler, feature_cols, model_path: str = None):
        """Save trained model and artifacts"""
        if model_path is None:
            model_path = self.config['deployment']['model_path']

        # Create directory if it doesn't exist
        Path(model_path).parent.mkdir(parents=True, exist_ok=True)

        # Save model
        joblib.dump(model, model_path)

        # Save scaler
        scaler_path = self.config['deployment']['scaler_path']
        joblib.dump(scaler, scaler_path)

        # Save feature columns
        feature_cols_path = self.config['deployment']['feature_columns_path']
        joblib.dump(feature_cols, feature_cols_path)

        logger.info(f"Model saved to {model_path}")
        logger.info(f"Scaler saved to {scaler_path}")
        logger.info(f"Feature columns saved to {feature_cols_path}")

        # Log to MLflow
        if self.config['mlflow']['enabled']:
            mlflow.sklearn.log_model(model, "model")
            mlflow.log_artifact(model_path)
            mlflow.end_run()

    def _get_feature_columns(self, df: pd.DataFrame) -> list:
        """Get feature columns from dataframe"""
        # Exclude non-feature columns
        exclude_cols = ['date', 'coin', self.config['features']['target_column']]

        # Get all numeric columns that aren't excluded
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        feature_cols = [col for col in numeric_cols if col not in exclude_cols]

        return feature_cols

Writing src/models/trainer.py


In [62]:
#Create training pipeline
%%writefile src/pipeline/training_pipeline.py
import yaml
import logging
from pathlib import Path
import sys

# Add src to path
sys.path.append(str(Path(__file__).parent.parent.parent))

from src.data.loader import DataLoader
from src.data.feature_engineering import FeatureEngineer
from src.models.factory import ModelFactory
from src.models.trainer import ModelTrainer
from sklearn.preprocessing import StandardScaler
import mlflow

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class TrainingPipeline:
    """Complete training pipeline"""

    def __init__(self, config_path: str = 'config.yaml'):
        self.config_path = config_path
        self.config = self.load_config()

        # Initialize components
        self.data_loader = DataLoader(self.config)
        self.feature_engineer = FeatureEngineer(self.config)
        self.model_factory = ModelFactory(self.config)
        self.model_trainer = ModelTrainer(self.config)

        # MLflow setup
        if self.config['mlflow']['enabled']:
            mlflow.set_tracking_uri(self.config['mlflow']['tracking_uri'])

    def load_config(self):
        """Load configuration from YAML file"""
        with open(self.config_path, 'r') as f:
            config = yaml.safe_load(f)
        return config

    def run(self):
        """Run the complete training pipeline"""
        logger.info("Starting training pipeline...")

        try:
            # 1. Load data
            logger.info("Step 1: Loading data...")
            df = self.data_loader.load_raw_data()
            df = self.data_loader.filter_coins(df)

            # 2. Create features
            logger.info("Step 2: Creating features...")
            df = self.feature_engineer.create_features(df)

            # 3. Prepare data
            logger.info("Step 3: Preparing data...")
            X, y, feature_cols = self.model_trainer.prepare_data(df)

            # 4. Scale features
            logger.info("Step 4: Scaling features...")
            scaler = StandardScaler()
            X_scaled = scaler.fit_transform(X)

            # 5. Split data
            logger.info("Step 5: Splitting data...")
            X_train, X_test, y_train, y_test = self.model_trainer.train_test_split(X_scaled, y)

            # 6. Create model
            logger.info("Step 6: Creating model...")
            model = self.model_factory.create_model()

            # 7. Train model
            logger.info("Step 7: Training model...")
            model = self.model_trainer.train(model, X_train, y_train)

            # 8. Evaluate model
            logger.info("Step 8: Evaluating model...")
            metrics = self.model_trainer.evaluate(model, X_test, y_test)

            # 9. Cross-validation
            logger.info("Step 9: Cross-validation...")
            cv_results = self.model_trainer.cross_validate(model, X_scaled, y)

            # 10. Save model
            logger.info("Step 10: Saving model...")
            self.model_trainer.save_model(model, scaler, feature_cols)

            logger.info("Training pipeline completed successfully!")

            return {
                'model': model,
                'metrics': metrics,
                'cv_results': cv_results,
                'feature_cols': feature_cols
            }

        except Exception as e:
            logger.error(f"Pipeline failed with error: {e}")
            raise

if __name__ == "__main__":
    pipeline = TrainingPipeline()
    results = pipeline.run()
    print(f"Training completed. R² score: {results['metrics']['r2']:.4f}")

Writing src/pipeline/training_pipeline.py


In [63]:
# Create FastAPI server
%%writefile api/app.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import joblib
import numpy as np
import pandas as pd
from typing import List, Dict, Any
import yaml
import sys
from pathlib import Path

# Add project root to path
sys.path.append(str(Path(__file__).parent.parent))

from src.data.feature_engineering import FeatureEngineer

app = FastAPI(
    title="Crypto Volatility Prediction API",
    description="API for predicting cryptocurrency volatility",
    version="1.0.0"
)

# Load config
with open('config.yaml', 'r') as f:
    config = yaml.safe_load(f)

# Load model and artifacts
model = joblib.load(config['deployment']['model_path'])
scaler = joblib.load(config['deployment']['scaler_path'])
feature_columns = joblib.load(config['deployment']['feature_columns_path'])

# Initialize feature engineer
feature_engineer = FeatureEngineer(config)

class PredictionRequest(BaseModel):
    """Request model for prediction"""
    coin: str
    date: str
    open: float
    high: float
    low: float
    close: float
    volume: float
    marketCap: float

class BatchPredictionRequest(BaseModel):
    """Request model for batch prediction"""
    data: List[PredictionRequest]

class PredictionResponse(BaseModel):
    """Response model for prediction"""
    prediction: float
    confidence: float
    model_version: str

@app.get("/")
async def root():
    """Root endpoint"""
    return {
        "message": "Crypto Volatility Prediction API",
        "version": "1.0.0",
        "endpoints": ["/predict", "/batch_predict", "/health", "/metrics"]
    }

@app.get("/health")
async def health_check():
    """Health check endpoint"""
    return {"status": "healthy", "model_loaded": model is not None}

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    """Make a single prediction"""
    try:
        # Convert request to dataframe
        df = pd.DataFrame([request.dict()])

        # Create features
        df = feature_engineer.create_features(df)

        # Prepare features
        features = df[feature_columns].values

        # Handle missing columns
        if features.shape[1] != len(feature_columns):
            raise HTTPException(status_code=400, detail="Feature mismatch")

        # Scale features
        features_scaled = scaler.transform(features)

        # Make prediction
        prediction = model.predict(features_scaled)[0]

        return PredictionResponse(
            prediction=float(prediction),
            confidence=0.95,  # Placeholder for confidence score
            model_version=config['project']['version']
        )

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/batch_predict")
async def batch_predict(request: BatchPredictionRequest):
    """Make batch predictions"""
    try:
        # Convert requests to dataframe
        data = [item.dict() for item in request.data]
        df = pd.DataFrame(data)

        # Create features
        df = feature_engineer.create_features(df)

        # Prepare features
        features = df[feature_columns].values

        # Handle missing columns
        if features.shape[1] != len(feature_columns):
            raise HTTPException(status_code=400, detail="Feature mismatch")

        # Scale features
        features_scaled = scaler.transform(features)

        # Make predictions
        predictions = model.predict(features_scaled)

        return {
            "predictions": predictions.tolist(),
            "model_version": config['project']['version']
        }

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/metrics")
async def get_metrics():
    """Get model metrics"""
    return {
        "model_type": type(model).__name__,
        "feature_count": len(feature_columns),
        "model_version": config['project']['version']
    }

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(
        app,
        host=config['api']['host'],
        port=config['api']['port'],
        log_level=config['api']['log_level']
    )

Writing api/app.py


In [64]:
#Create test script
%%writefile tests/test_factory.py
import pytest
import sys
from pathlib import Path

sys.path.append(str(Path(__file__).parent.parent))

from src.models.factory import ModelFactory

# Test configuration
TEST_CONFIG = {
    'model': {
        'name': 'random_forest',
        'hyperparameters': {
            'xgboost': {
                'n_estimators': 100,
                'max_depth': 5,
                'learning_rate': 0.1,
                'random_state': 42
            },
            'random_forest': {
                'n_estimators': 100,
                'max_depth': 10,
                'min_samples_split': 2,
                'random_state': 42
            }
        }
    }
}

def test_create_random_forest():
    """Test creating RandomForest model"""
    factory = ModelFactory(TEST_CONFIG)
    model = factory.create_model('random_forest')

    assert model.__class__.__name__ == 'RandomForestRegressor'
    assert model.n_estimators == 100
    assert model.max_depth == 10

def test_create_xgboost():
    """Test creating XGBoost model"""
    factory = ModelFactory(TEST_CONFIG)
    model = factory.create_model('xgboost')

    assert model.__class__.__name__ == 'XGBRegressor'
    assert model.n_estimators == 100
    assert model.learning_rate == 0.1

def test_create_default_model():
    """Test creating default model from config"""
    factory = ModelFactory(TEST_CONFIG)
    model = factory.create_model()  # Should use config['model']['name']

    assert model.__class__.__name__ == 'RandomForestRegressor'

def test_unknown_model_error():
    """Test error for unknown model type"""
    factory = ModelFactory(TEST_CONFIG)

    with pytest.raises(ValueError, match="Unknown model"):
        factory.create_model('unknown_model')

if __name__ == "__main__":
    # Run tests
    test_create_random_forest()
    test_create_xgboost()
    test_create_default_model()
    test_unknown_model_error()
    print("All tests passed!")

Writing tests/test_factory.py


In [65]:
#Step 11: Create requirements.txt
%%writefile train.py
#!/usr/bin/env python
"""Main training script"""

import argparse
from src.pipeline.training_pipeline import TrainingPipeline

def main():
    parser = argparse.ArgumentParser(description='Train crypto volatility model')
    parser.add_argument('--config', type=str, default='config.yaml',
                       help='Path to configuration file')
    parser.add_argument('--model', type=str, default=None,
                       help='Model type to train (overrides config)')

    args = parser.parse_args()

    # Run pipeline
    pipeline = TrainingPipeline(args.config)

    if args.model:
        pipeline.config['model']['name'] = args.model

    results = pipeline.run()

    print("\n" + "="*50)
    print("TRAINING RESULTS")
    print("="*50)
    print(f"Model: {type(results['model']).__name__}")
    print(f"Features: {len(results['feature_cols'])}")

    print("\nMetrics:")
    for metric, value in results['metrics'].items():
        print(f"  {metric}: {value:.4f}")

    print(f"\nCross-validation MSE: {results['cv_results']['mean_cv_score']:.4f}")
    print("="*50)

if __name__ == "__main__":
    main()

Writing train.py


In [66]:
# Create Dockerfile
%%writefile deployment/Dockerfile
FROM python:3.9-slim

WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    && rm -rf /var/lib/apt/lists/*

# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Create necessary directories
RUN mkdir -p data/raw data/processed models logs mlruns

# Expose API port
EXPOSE 5000

# Command to run the API
CMD ["uvicorn", "api.app:app", "--host", "0.0.0.0", "--port", "5000"]

Writing deployment/Dockerfile


In [67]:
#Create docker-compose.yml
%%writefile deployment/docker-compose.yml
version: '3.8'

services:
  ml-api:
    build:
      context: ..
      dockerfile: deployment/Dockerfile
    ports:
      - "5000:5000"
    volumes:
      - ./data:/app/data
      - ./models:/app/models
      - ./mlruns:/app/mlruns
      - ./logs:/app/logs
    environment:
      - PYTHONPATH=/app
      - MLFLOW_TRACKING_URI=file:/app/mlruns
    command: uvicorn api.app:app --host 0.0.0.0 --port 5000 --reload

  mlflow:
    image: ghcr.io/mlflow/mlflow:latest
    ports:
      - "5001:5000"
    volumes:
      - ./mlruns:/mlflow
    command: mlflow server --backend-store-uri file:/mlflow --host 0.0.0.0 --port 5000

Writing deployment/docker-compose.yml


In [68]:
#Run the pipeline in Colab
# Install dependencies
!pip install -r requirements.txt

# Run tests
!python -m pytest tests/test_factory.py -v

# Train model (you'll need to provide your dataset.csv first)
# !python train.py --model random_forest

# Start API server (in background)
# !uvicorn api.app:app --host 0.0.0.0 --port 5000 --reload &

platform linux -- Python 3.12.12, pytest-8.4.2, pluggy-1.6.0 -- /usr/bin/python3
cachedir: .pytest_cache
rootdir: /content
plugins: hydra-core-1.3.2, typeguard-4.4.4, langsmith-0.4.59, anyio-4.12.0
collected 4 items                                                              [0m

tests/test_factory.py::test_create_random_forest [32mPASSED[0m[32m                  [ 25%][0m
tests/test_factory.py::test_create_xgboost [32mPASSED[0m[32m                        [ 50%][0m
tests/test_factory.py::test_create_default_model [32mPASSED[0m[32m                  [ 75%][0m
tests/test_factory.py::test_unknown_model_error [32mPASSED[0m[32m                   [100%][0m



In [69]:
!rm -rf src tests data models logs mlruns notebooks api monitoring deployment scripts