# Unit 4 Building an Automated ML Retraining Pipeline with Apache Airflow

# Building an Automated ML Retraining Pipeline with Airflow

Welcome to the fourth lesson of our "Automating Retraining with Apache Airflow" course\! You've made great progress, and now it's time to put everything you've learned into practice. In this lesson, we will focus on building a complete, automated ML retraining pipeline using Airflow and the modular components you've already worked with. This is where your knowledge of task definition and workflow orchestration comes together, enabling you to create a single, scheduled workflow that handles everything from data loading to model saving.

By the end of this lesson, you will be able to connect all the pieces into a robust pipeline that automatically retrains and evaluates your model. Let's get started.

-----

### Recap: Interfaces of Our ML Modules

To build our Airflow pipeline, we'll use the functions from our custom data, model, and evaluation modules. Here is a quick refresher on the key functions we'll be using:

  * **`load_diamonds_data(file_path)`**: Loads the diamonds dataset from a CSV file.
  * **`preprocess_diamonds_data(df)`**: Preprocesses the data, splits it into training and test sets, and returns the data and preprocessor.
  * **`train_model(X_train, y_train, model_type, **params)`**: Trains a machine learning model.
  * **`evaluate_model(model, X_test, y_test)`**: Evaluates the model's performance and returns a dictionary of metrics.
  * **`save_model(model, preprocessor, model_dir, model_name, metadata)`**: Saves the model, preprocessor, and metadata to disk.

By chaining these functions as Airflow tasks, we can automate the entire retraining process.

-----

### Structuring the Airflow DAG

We'll start by setting up the basic structure of our Airflow DAG. The DAG defines the workflow and the order of task execution. The following Python code shows the necessary imports and configuration, including the DAG's default arguments and constants for file paths.

```python
from datetime import datetime, timedelta
import os
from airflow.decorators import dag, task

# Import our custom ML modules
from data import load_diamonds_data, preprocess_diamonds_data
from model import train_model, save_model
from evaluation import evaluate_model

# Set default arguments for the DAG
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Define paths for data and model storage
DATA_PATH = "diamonds.csv"
MODEL_DIR = "./saved_models"
os.makedirs(MODEL_DIR, exist_ok=True)  # Ensure the model directory exists
```

-----

### Task Chaining and Data Passing

Next, we'll define the pipeline's steps as tasks and chain them together. A key feature of Airflow is its ability to pass data between tasks using **XComs**. By default, Airflow serializes objects using pickle, which is convenient for small-scale pipelines but can pose security and compatibility risks in production. For production environments, it is recommended to use more robust serialization formats or to pass file paths between tasks.

For this lesson, we will use pickle-based XComs for simplicity. Below is the code for the first two tasks: `load_data` and `process_data`.

```python
@dag(
    dag_id='mlops_pipeline',
    description='ML model training pipeline for diamond price prediction',
    default_args=default_args,
    schedule='@daily',
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=['ml', 'training', 'diamonds'],
)
def diamond_training_pipeline():
    @task(task_id="load_data")
    def task_load_data():
        # Load the diamonds dataset from CSV
        print(f"Loading diamonds data from {DATA_PATH}")
        df = load_diamonds_data(DATA_PATH)
        print(f"Dataset loaded with shape: {df.shape}")
        return df

    @task(task_id="process_data")
    def task_process_data(df):
        # Preprocess the data and split into train/test sets
        print("Preprocessing data")
        X_train, X_test, y_train, y_test, preprocessor = preprocess_diamonds_data(df)
        print(f"Data preprocessing complete. Training set size: {X_train.shape[0]}")
        return {
            "X_train": X_train,
            "X_test": X_test,
            "y_train": y_train,
            "y_test": y_test,
            "preprocessor": preprocessor
        }
```

-----

### Model Training

With the data prepared, the `train_model` task consumes the processed data to train a model. This task returns both the trained model and the data required for the next step.

```python
    @task(task_id="train_model")
    def task_train_model(processed_data):
        # Train a Random Forest model using the training set
        print("Training model")
        model = train_model(
            processed_data["X_train"],
            processed_data["y_train"],
            model_type="random_forest",
            n_estimators=10,
            max_depth=5,
            random_state=42
        )
        print("Model training complete.")
        return {"model": model, "processed_data": processed_data}
```

-----

### Model Evaluation

The `evaluate_model` task is essential for assessing the model's performance on the test set. It helps monitor the model's quality and identify potential issues.

```python
    @task(task_id="evaluate_model")
    def task_evaluate_model(training_result):
        # Evaluate the trained model on the test set
        model = training_result["model"]
        processed_data = training_result["processed_data"]
        print("Evaluating model")
        metrics, _ = evaluate_model(model, processed_data["X_test"], processed_data["y_test"])
        print(f"Model evaluation metrics: {metrics}")
        return {
            "model": model,
            "preprocessor": processed_data["preprocessor"],
            "metrics": metrics
        }
```

-----

### Model Saving and Versioning

The final step is to save the trained model, preprocessor, and any relevant metadata. This ensures that every retrained model is properly versioned and can be traced.

```python
    @task(task_id="save_model")
    def task_save_model(evaluation_result):
        # Save the trained model, preprocessor, and metadata
        model = evaluation_result["model"]
        preprocessor = evaluation_result["preprocessor"]
        metrics = evaluation_result["metrics"]
        print(f"Saving model to {MODEL_DIR}")
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        model_name = f"diamond_price_model_{timestamp}"
        metadata = {
            "metrics": metrics,
            "model_type": "RandomForestRegressor",
            "timestamp": timestamp
        }
        model_path = save_model(
            model, 
            preprocessor,
            MODEL_DIR, 
            model_name, 
            metadata
        )
        print(f"Model saved to {model_path}")
        return model_path
```

-----

### Building the Complete Workflow

The final step is to link all the tasks together by passing the output of one task as the input to the next. This defines the dependencies and execution order of the entire pipeline.

```python
    df = task_load_data()
    processed_data = task_process_data(df)
    training_result = task_train_model(processed_data)
    evaluation_result = task_evaluate_model(training_result)
    final_model_path = task_save_model(evaluation_result)
    
diamond_training_pipeline()
```

This linear and easy-to-follow pipeline makes it simple to add new steps or swap components as needed. You have successfully built a powerful and maintainable ML retraining workflow.

## Unpacking Data for ML Pipelines

You’ve just seen how each part of the ML pipeline fits together in an Airflow DAG and how tasks pass data from one step to the next. Now, let’s focus on the data processing step.

In this exercise, you’ll complete the task_process_data function inside the Airflow DAG. Your job is to call the preprocess_diamonds_data(df) function and unpack its return values into the variables X_train, X_test, y_train, y_test, and preprocessor. Then, make sure these variables are included in the dictionary that the function returns.

This step is important for ensuring the right data is passed to the next tasks in your pipeline. Give it a try!

```python
"""
ML Model Training Pipeline DAG

This module defines an Airflow DAG that orchestrates a complete ML training pipeline
for the diamonds dataset, using functions from our ML modules.
"""

from datetime import datetime, timedelta
import os
from airflow.decorators import dag, task

# Import our custom modules
from data import load_diamonds_data, preprocess_diamonds_data
from model import train_model, save_model
from evaluation import evaluate_model

# Define default arguments for the DAG
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Define constants
DATA_PATH = "diamonds.csv"
MODEL_DIR = "./saved_models"

# Create model directory if it doesn't exist
os.makedirs(MODEL_DIR, exist_ok=True)

@dag(
    dag_id='mlops_pipeline',
    description='ML model training pipeline for diamond price prediction',
    default_args=default_args,
    schedule_interval='@daily',
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=['ml', 'training', 'diamonds'],
)
def diamond_training_pipeline():
    """
    This DAG implements a complete ML training pipeline for diamond price prediction.
    
    Tasks:
    1. Load data: Load the diamonds dataset
    2. Process data: Preprocess the data for model training
    3. Train model: Train a random forest model
    4. Evaluate model: Evaluate the model on test data
    5. Save model: Save the trained model and preprocessor
    """
    
    @task(task_id="load_data")
    def task_load_data():
        """Load the diamonds dataset."""
        print(f"Loading diamonds data from {DATA_PATH}")
        df = load_diamonds_data(DATA_PATH)
        print(f"Dataset loaded with shape: {df.shape}")
        return df
    
    @task(task_id="process_data")
    def task_process_data(df):
        """Preprocess the data for model training."""
        print("Preprocessing data")
        # TODO: Call preprocess_diamonds_data and unpack its return values
        # into X_train, X_test, y_train, y_test, and preprocessor

        # TODO: return a dict with the unpacked variables
        
    
    @task(task_id="train_model")
    def task_train_model(processed_data):
        """Train a random forest model."""
        print("Training model")
        # Train the model
        model = train_model(
            processed_data["X_train"],
            processed_data["y_train"],
            model_type="random_forest",
            n_estimators=10,
            max_depth=5,
            random_state=42
        )
        print("Model training complete.")
        return {"model": model, "processed_data": processed_data}
    
    @task(task_id="evaluate_model")
    def task_evaluate_model(training_result):
        """Evaluate the trained model."""
        model = training_result["model"]
        processed_data = training_result["processed_data"]
        
        print("Evaluating model")
        # Evaluate the model
        metrics, _ = evaluate_model(model, processed_data["X_test"], processed_data["y_test"])
        print(f"Model evaluation metrics: {metrics}")
        
        return {
            "model": model,
            "preprocessor": processed_data["preprocessor"],
            "metrics": metrics
        }
    
    @task(task_id="save_model")
    def task_save_model(evaluation_result):
        """Save the trained model and preprocessor."""
        model = evaluation_result["model"]
        preprocessor = evaluation_result["preprocessor"]
        metrics = evaluation_result["metrics"]
        
        print(f"Saving model to {MODEL_DIR}")
        # Create a timestamp for the model name
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        model_name = f"diamond_price_model_{timestamp}"
        
        # Create metadata
        metadata = {
            "metrics": metrics,
            "model_type": "RandomForestRegressor",
            "timestamp": timestamp
        }
        
        # Save the model
        model_path = save_model(
            model, 
            preprocessor,
            MODEL_DIR, 
            model_name, 
            metadata
        )
        
        print(f"Model saved to {model_path}")
        return model_path
    
    # Define the task dependencies
    df = task_load_data()
    processed_data = task_process_data(df)
    training_result = task_train_model(processed_data)
    evaluation_result = task_evaluate_model(training_result)
    final_model_path = task_save_model(evaluation_result)

# Create the DAG
diamond_training_pipeline()

```

```python
"""
ML Model Training Pipeline DAG

This module defines an Airflow DAG that orchestrates a complete ML training pipeline
for the diamonds dataset, using functions from our ML modules.
"""

from datetime import datetime, timedelta
import os
from airflow.decorators import dag, task

# Import our custom modules
from data import load_diamonds_data, preprocess_diamonds_data
from model import train_model, save_model
from evaluation import evaluate_model

# Define default arguments for the DAG
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Define constants
DATA_PATH = "diamonds.csv"
MODEL_DIR = "./saved_models"

# Create model directory if it doesn't exist
os.makedirs(MODEL_DIR, exist_ok=True)

@dag(
    dag_id='mlops_pipeline',
    description='ML model training pipeline for diamond price prediction',
    default_args=default_args,
    schedule_interval='@daily',
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=['ml', 'training', 'diamonds'],
)
def diamond_training_pipeline():
    """
    This DAG implements a complete ML training pipeline for diamond price prediction.
    
    Tasks:
    1. Load data: Load the diamonds dataset
    2. Process data: Preprocess the data for model training
    3. Train model: Train a random forest model
    4. Evaluate model: Evaluate the model on test data
    5. Save model: Save the trained model and preprocessor
    """
    
    @task(task_id="load_data")
    def task_load_data():
        """Load the diamonds dataset."""
        print(f"Loading diamonds data from {DATA_PATH}")
        df = load_diamonds_data(DATA_PATH)
        print(f"Dataset loaded with shape: {df.shape}")
        return df
    
    @task(task_id="process_data")
    def task_process_data(df):
        """Preprocess the data for model training."""
        print("Preprocessing data")
        # TODO: Call preprocess_diamonds_data and unpack its return values
        # into X_train, X_test, y_train, y_test, and preprocessor
        X_train, X_test, y_train, y_test, preprocessor = preprocess_diamonds_data(df)

        # TODO: return a dict with the unpacked variables
        print(f"Data preprocessing complete. Training set size: {X_train.shape[0]}")
        return {
            "X_train": X_train,
            "X_test": X_test,
            "y_train": y_train,
            "y_test": y_test,
            "preprocessor": preprocessor
        }
    
    @task(task_id="train_model")
    def task_train_model(processed_data):
        """Train a random forest model."""
        print("Training model")
        # Train the model
        model = train_model(
            processed_data["X_train"],
            processed_data["y_train"],
            model_type="random_forest",
            n_estimators=10,
            max_depth=5,
            random_state=42
        )
        print("Model training complete.")
        return {"model": model, "processed_data": processed_data}
    
    @task(task_id="evaluate_model")
    def task_evaluate_model(training_result):
        """Evaluate the trained model."""
        model = training_result["model"]
        processed_data = training_result["processed_data"]
        
        print("Evaluating model")
        # Evaluate the model
        metrics, _ = evaluate_model(model, processed_data["X_test"], processed_data["y_test"])
        print(f"Model evaluation metrics: {metrics}")
        
        return {
            "model": model,
            "preprocessor": processed_data["preprocessor"],
            "metrics": metrics
        }
    
    @task(task_id="save_model")
    def task_save_model(evaluation_result):
        """Save the trained model and preprocessor."""
        model = evaluation_result["model"]
        preprocessor = evaluation_result["preprocessor"]
        metrics = evaluation_result["metrics"]
        
        print(f"Saving model to {MODEL_DIR}")
        # Create a timestamp for the model name
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        model_name = f"diamond_price_model_{timestamp}"
        
        # Create metadata
        metadata = {
            "metrics": metrics,
            "model_type": "RandomForestRegressor",
            "timestamp": timestamp
        }
        
        # Save the model
        model_path = save_model(
            model, 
            preprocessor,
            MODEL_DIR, 
            model_name, 
            metadata
        )
        
        print(f"Model saved to {model_path}")
        return model_path
    
    # Define the task dependencies
    df = task_load_data()
    processed_data = task_process_data(df)
    training_result = task_train_model(processed_data)
    evaluation_result = task_evaluate_model(training_result)
    final_model_path = task_save_model(evaluation_result)

# Create the DAG
diamond_training_pipeline()
```

## Ensuring Data Flow in Model Training

You’ve just practiced how to pass multiple pieces of data between Airflow tasks in your ML pipeline. Now, let’s ensure that each task provides everything the next step needs.

In this exercise, you’ll work on the model training step. The current code implementing this step features a bug that prevents the pipeline from running correctly. Can you ensure our pipeline runs smoothly from start to finish? Enjoy the challenge!

```python
"""
ML Model Training Pipeline DAG

This module defines an Airflow DAG that orchestrates a complete ML training pipeline
for the diamonds dataset, using functions from our ML modules.
"""

from datetime import datetime, timedelta
import os
from airflow.decorators import dag, task

# Import our custom modules
from data import load_diamonds_data, preprocess_diamonds_data
from model import train_model, save_model
from evaluation import evaluate_model

# Define default arguments for the DAG
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Define constants
DATA_PATH = "diamonds.csv"
MODEL_DIR = "./saved_models"

# Create model directory if it doesn't exist
os.makedirs(MODEL_DIR, exist_ok=True)

@dag(
    dag_id='mlops_pipeline',
    description='ML model training pipeline for diamond price prediction',
    default_args=default_args,
    schedule_interval='@daily',
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=['ml', 'training', 'diamonds'],
)
def diamond_training_pipeline():
    """
    This DAG implements a complete ML training pipeline for diamond price prediction.
    
    Tasks:
    1. Load data: Load the diamonds dataset
    2. Process data: Preprocess the data for model training
    3. Train model: Train a random forest model
    4. Evaluate model: Evaluate the model on test data
    5. Save model: Save the trained model and preprocessor
    """
    
    @task(task_id="load_data")
    def task_load_data():
        """Load the diamonds dataset."""
        print(f"Loading diamonds data from {DATA_PATH}")
        df = load_diamonds_data(DATA_PATH)
        print(f"Dataset loaded with shape: {df.shape}")
        return df
    
    @task(task_id="process_data")
    def task_process_data(df):
        """Preprocess the data for model training."""
        print(f"Preprocessing data")
        # Preprocess the data
        X_train, X_test, y_train, y_test, preprocessor = preprocess_diamonds_data(df)
        print(f"Data preprocessing complete. Training set size: {X_train.shape[0]}")
        return {"X_train": X_train, "X_test": X_test, "y_train": y_train, 
                "y_test": y_test, "preprocessor": preprocessor}
    
    @task(task_id="train_model")
    def task_train_model(processed_data):
        """Train a random forest model."""
        print("Training model")
        # Train the model
        model = train_model(
            processed_data["X_train"],
            processed_data["y_train"],
            model_type="random_forest",
            n_estimators=10,
            max_depth=5,
            random_state=42
        )
        print("Model training complete.")
        return model
    
    @task(task_id="evaluate_model")
    def task_evaluate_model(training_result):
        """Evaluate the trained model."""
        model = training_result["model"]
        processed_data = training_result["processed_data"]
        
        print("Evaluating model")
        # Evaluate the model
        metrics, _ = evaluate_model(model, processed_data["X_test"], processed_data["y_test"])
        print(f"Model evaluation metrics: {metrics}")
        
        return {
            "model": model,
            "preprocessor": processed_data["preprocessor"],
            "metrics": metrics
        }
    
    @task(task_id="save_model")
    def task_save_model(evaluation_result):
        """Save the trained model and preprocessor."""
        model = evaluation_result["model"]
        preprocessor = evaluation_result["preprocessor"]
        metrics = evaluation_result["metrics"]
        
        print(f"Saving model to {MODEL_DIR}")
        # Create a timestamp for the model name
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        model_name = f"diamond_price_model_{timestamp}"
        
        # Create metadata
        metadata = {
            "metrics": metrics,
            "model_type": "RandomForestRegressor",
            "timestamp": timestamp
        }
        
        # Save the model
        model_path = save_model(
            model, 
            preprocessor,
            MODEL_DIR, 
            model_name, 
            metadata
        )
        
        print(f"Model saved to {model_path}")
        return model_path
    
    # Define the task dependencies
    df = task_load_data()
    processed_data = task_process_data(df)
    training_result = task_train_model(processed_data)
    evaluation_result = task_evaluate_model(training_result)
    final_model_path = task_save_model(evaluation_result)

# Create the DAG
diamond_training_pipeline()

```

```python
"""
ML Model Training Pipeline DAG

This module defines an Airflow DAG that orchestrates a complete ML training pipeline
for the diamonds dataset, using functions from our ML modules.
"""

from datetime import datetime, timedelta
import os
from airflow.decorators import dag, task

# Import our custom modules
from data import load_diamonds_data, preprocess_diamonds_data
from model import train_model, save_model
from evaluation import evaluate_model

# Define default arguments for the DAG
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Define constants
DATA_PATH = "diamonds.csv"
MODEL_DIR = "./saved_models"

# Create model directory if it doesn't exist
os.makedirs(MODEL_DIR, exist_ok=True)

@dag(
    dag_id='mlops_pipeline',
    description='ML model training pipeline for diamond price prediction',
    default_args=default_args,
    schedule_interval='@daily',
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=['ml', 'training', 'diamonds'],
)
def diamond_training_pipeline():
    """
    This DAG implements a complete ML training pipeline for diamond price prediction.
    
    Tasks:
    1. Load data: Load the diamonds dataset
    2. Process data: Preprocess the data for model training
    3. Train model: Train a random forest model
    4. Evaluate model: Evaluate the model on test data
    5. Save model: Save the trained model and preprocessor
    """
    
    @task(task_id="load_data")
    def task_load_data():
        """Load the diamonds dataset."""
        print(f"Loading diamonds data from {DATA_PATH}")
        df = load_diamonds_data(DATA_PATH)
        print(f"Dataset loaded with shape: {df.shape}")
        return df
    
    @task(task_id="process_data")
    def task_process_data(df):
        """Preprocess the data for model training."""
        print(f"Preprocessing data")
        # Preprocess the data
        X_train, X_test, y_train, y_test, preprocessor = preprocess_diamonds_data(df)
        print(f"Data preprocessing complete. Training set size: {X_train.shape[0]}")
        return {"X_train": X_train, "X_test": X_test, "y_train": y_train, 
                "y_test": y_test, "preprocessor": preprocessor}
    
    @task(task_id="train_model")
    def task_train_model(processed_data):
        """Train a random forest model."""
        print("Training model")
        # Train the model
        model = train_model(
            processed_data["X_train"],
            processed_data["y_train"],
            model_type="random_forest",
            n_estimators=10,
            max_depth=5,
            random_state=42
        )
        print("Model training complete.")
        return {"model": model, "processed_data": processed_data}
    
    @task(task_id="evaluate_model")
    def task_evaluate_model(training_result):
        """Evaluate the trained model."""
        model = training_result["model"]
        processed_data = training_result["processed_data"]
        
        print("Evaluating model")
        # Evaluate the model
        metrics, _ = evaluate_model(model, processed_data["X_test"], processed_data["y_test"])
        print(f"Model evaluation metrics: {metrics}")
        
        return {
            "model": model,
            "preprocessor": processed_data["preprocessor"],
            "metrics": metrics
        }
    
    @task(task_id="save_model")
    def task_save_model(evaluation_result):
        """Save the trained model and preprocessor."""
        model = evaluation_result["model"]
        preprocessor = evaluation_result["preprocessor"]
        metrics = evaluation_result["metrics"]
        
        print(f"Saving model to {MODEL_DIR}")
        # Create a timestamp for the model name
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        model_name = f"diamond_price_model_{timestamp}"
        
        # Create metadata
        metadata = {
            "metrics": metrics,
            "model_type": "RandomForestRegressor",
            "timestamp": timestamp
        }
        
        # Save the model
        model_path = save_model(
            model, 
            preprocessor,
            MODEL_DIR, 
            model_name, 
            metadata
        )
        
        print(f"Model saved to {model_path}")
        return model_path
    
    # Define the task dependencies
    df = task_load_data()
    processed_data = task_process_data(df)
    training_result = task_train_model(processed_data)
    evaluation_result = task_evaluate_model(training_result)
    final_model_path = task_save_model(evaluation_result)

# Create the DAG
diamond_training_pipeline()
```

## Adding a Model Quality Gate

You’ve just seen how to evaluate your model’s performance in the pipeline. Now, let’s make your workflow a bit smarter by adding a quality check before saving the model.

Your task is to add a new step to the Airflow DAG that checks if the model’s R2 score is good enough before moving on. This helps ensure you only keep models that meet your standards.

Here’s what you need to do:

Create a new task called check_model_quality in the pipeline.
In this task, check if the R2 score from the evaluation metrics is above a set threshold (for example, 0.8).
If the R2 score is missing or too low, raise an exception with a clear message so the pipeline stops.
If the model passes the check, return the evaluation result so the next step can use it.
Update the task dependencies so that the model is only saved if it passes the quality check.
This step will help you build more reliable and trustworthy ML pipelines.


```python
"""
ML Model Training Pipeline DAG

This module defines an Airflow DAG that orchestrates a complete ML training pipeline
for the diamonds dataset, using functions from our ML modules.
"""

from datetime import datetime, timedelta
import os
from airflow.decorators import dag, task

# Import our custom modules
from data import load_diamonds_data, preprocess_diamonds_data
from model import train_model, save_model
from evaluation import evaluate_model

# Define default arguments for the DAG
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Define constants
DATA_PATH = "diamonds.csv"
MODEL_DIR = "./saved_models"

# Create model directory if it doesn't exist
os.makedirs(MODEL_DIR, exist_ok=True)

@dag(
    dag_id='mlops_pipeline',
    description='ML model training pipeline for diamond price prediction',
    default_args=default_args,
    schedule_interval='@daily',
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=['ml', 'training', 'diamonds'],
)
def diamond_training_pipeline():
    """
    This DAG implements a complete ML training pipeline for diamond price prediction.
    
    Tasks:
    1. Load data: Load the diamonds dataset
    2. Process data: Preprocess the data for model training
    3. Train model: Train a random forest model
    4. Evaluate model: Evaluate the model on test data
    5. Save model: Save the trained model and preprocessor
    """
    
    @task(task_id="load_data")
    def task_load_data():
        """Load the diamonds dataset."""
        print(f"Loading diamonds data from {DATA_PATH}")
        df = load_diamonds_data(DATA_PATH)
        print(f"Dataset loaded with shape: {df.shape}")
        return df
    
    @task(task_id="process_data")
    def task_process_data(df):
        """Preprocess the data for model training."""
        print(f"Preprocessing data")
        # Preprocess the data
        X_train, X_test, y_train, y_test, preprocessor = preprocess_diamonds_data(df)
        print(f"Data preprocessing complete. Training set size: {X_train.shape[0]}")
        return {"X_train": X_train, "X_test": X_test, "y_train": y_train, 
                "y_test": y_test, "preprocessor": preprocessor}
    
    @task(task_id="train_model")
    def task_train_model(processed_data):
        """Train a random forest model."""
        print("Training model")
        # Train the model
        model = train_model(
            processed_data["X_train"],
            processed_data["y_train"],
            model_type="random_forest",
            n_estimators=10,
            max_depth=5,
            random_state=42
        )
        print("Model training complete.")
        return {"model": model, "processed_data": processed_data}
    
    @task(task_id="evaluate_model")
    def task_evaluate_model(training_result):
        """Evaluate the trained model."""
        model = training_result["model"]
        processed_data = training_result["processed_data"]
        
        print("Evaluating model")
        # Evaluate the model
        metrics, _ = evaluate_model(model, processed_data["X_test"], processed_data["y_test"])
        print(f"Model evaluation metrics: {metrics}")
        
        return {
            "model": model,
            "preprocessor": processed_data["preprocessor"],
            "metrics": metrics
        }
    
    # TODO: Define a new task called check_model_quality

    #     # TODO: Set the minimum R2 score required (e.g., 0.8)
    #     # TODO: Get the R2 score from evaluation_result["metrics"]
    #     # TODO: If the R2 score is missing or too low, raise an Exception with a helpful message
    #     # TODO: If the check passes, return evaluation_result
    
    @task(task_id="save_model")
    def task_save_model(evaluation_result):
        """Save the trained model and preprocessor."""
        model = evaluation_result["model"]
        preprocessor = evaluation_result["preprocessor"]
        metrics = evaluation_result["metrics"]
        
        print(f"Saving model to {MODEL_DIR}")
        # Create a timestamp for the model name
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        model_name = f"diamond_price_model_{timestamp}"
        
        # Create metadata
        metadata = {
            "metrics": metrics,
            "model_type": "RandomForestRegressor",
            "timestamp": timestamp
        }
        
        # Save the model
        model_path = save_model(
            model, 
            preprocessor,
            MODEL_DIR, 
            model_name, 
            metadata
        )
        
        print(f"Model saved to {model_path}")
        return model_path
    
    # Define the task dependencies
    df = task_load_data()
    processed_data = task_process_data(df)
    training_result = task_train_model(processed_data)
    evaluation_result = task_evaluate_model(training_result)
    # TODO: Insert the check_model_quality task here, so it runs after evaluation_result

    final_model_path = task_save_model(evaluation_result)

# Create the DAG
diamond_training_pipeline()

```

```python
"""
ML Model Training Pipeline DAG

This module defines an Airflow DAG that orchestrates a complete ML training pipeline
for the diamonds dataset, using functions from our ML modules.
"""

from datetime import datetime, timedelta
import os
from airflow.decorators import dag, task

# Import our custom modules
from data import load_diamonds_data, preprocess_diamonds_data
from model import train_model, save_model
from evaluation import evaluate_model

# Define default arguments for the DAG
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Define constants
DATA_PATH = "diamonds.csv"
MODEL_DIR = "./saved_models"

# Create model directory if it doesn't exist
os.makedirs(MODEL_DIR, exist_ok=True)

@dag(
    dag_id='mlops_pipeline',
    description='ML model training pipeline for diamond price prediction',
    default_args=default_args,
    schedule_interval='@daily',
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=['ml', 'training', 'diamonds'],
)
def diamond_training_pipeline():
    """
    This DAG implements a complete ML training pipeline for diamond price prediction.
    
    Tasks:
    1. Load data: Load the diamonds dataset
    2. Process data: Preprocess the data for model training
    3. Train model: Train a random forest model
    4. Evaluate model: Evaluate the model on test data
    5. Save model: Save the trained model and preprocessor
    """
    
    @task(task_id="load_data")
    def task_load_data():
        """Load the diamonds dataset."""
        print(f"Loading diamonds data from {DATA_PATH}")
        df = load_diamonds_data(DATA_PATH)
        print(f"Dataset loaded with shape: {df.shape}")
        return df
    
    @task(task_id="process_data")
    def task_process_data(df):
        """Preprocess the data for model training."""
        print(f"Preprocessing data")
        # Preprocess the data
        X_train, X_test, y_train, y_test, preprocessor = preprocess_diamonds_data(df)
        print(f"Data preprocessing complete. Training set size: {X_train.shape[0]}")
        return {"X_train": X_train, "X_test": X_test, "y_train": y_train, 
                "y_test": y_test, "preprocessor": preprocessor}
    
    @task(task_id="train_model")
    def task_train_model(processed_data):
        """Train a random forest model."""
        print("Training model")
        # Train the model
        model = train_model(
            processed_data["X_train"],
            processed_data["y_train"],
            model_type="random_forest",
            n_estimators=10,
            max_depth=5,
            random_state=42
        )
        print("Model training complete.")
        return {"model": model, "processed_data": processed_data}
    
    @task(task_id="evaluate_model")
    def task_evaluate_model(training_result):
        """Evaluate the trained model."""
        model = training_result["model"]
        processed_data = training_result["processed_data"]
        
        print("Evaluating model")
        # Evaluate the model
        metrics, _ = evaluate_model(model, processed_data["X_test"], processed_data["y_test"])
        print(f"Model evaluation metrics: {metrics}")
        
        return {
            "model": model,
            "preprocessor": processed_data["preprocessor"],
            "metrics": metrics
        }
    
    @task(task_id="check_model_quality")
    def task_check_model_quality(evaluation_result):
        """Check if the model meets the quality threshold."""
        r2_score = evaluation_result["metrics"].get("r2_score")
        min_r2_score = 0.8
        
        if r2_score is None or r2_score < min_r2_score:
            raise ValueError(
                f"Model quality check failed. R2 score ({r2_score}) is below the threshold ({min_r2_score})."
            )
        
        print(f"Model quality check passed with R2 score: {r2_score}")
        return evaluation_result
    
    @task(task_id="save_model")
    def task_save_model(evaluation_result):
        """Save the trained model and preprocessor."""
        model = evaluation_result["model"]
        preprocessor = evaluation_result["preprocessor"]
        metrics = evaluation_result["metrics"]
        
        print(f"Saving model to {MODEL_DIR}")
        # Create a timestamp for the model name
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        model_name = f"diamond_price_model_{timestamp}"
        
        # Create metadata
        metadata = {
            "metrics": metrics,
            "model_type": "RandomForestRegressor",
            "timestamp": timestamp
        }
        
        # Save the model
        model_path = save_model(
            model, 
            preprocessor,
            MODEL_DIR, 
            model_name, 
            metadata
        )
        
        print(f"Model saved to {model_path}")
        return model_path
    
    # Define the task dependencies
    df = task_load_data()
    processed_data = task_process_data(df)
    training_result = task_train_model(processed_data)
    evaluation_result = task_evaluate_model(training_result)
    
    # Insert the new task in the dependency chain
    quality_check_result = task_check_model_quality(evaluation_result)
    
    final_model_path = task_save_model(quality_check_result)

# Create the DAG
diamond_training_pipeline()
```

You’ve practiced building and connecting individual steps of an ML pipeline in Airflow, including adding a model quality check to make your workflow more reliable.

Now, it’s time to put everything together by creating a complete Airflow DAG that runs the full retraining process for the diamonds dataset. Your goal is to complete the provided Python script by filling in the missing pieces to define all the tasks and connect them into a working pipeline.

Here’s what you need to do:

Import the necessary Airflow decorators (dag and task).
Add the @dag decorator to the main pipeline function, specifying appropriate arguments (such as dag_id, description, schedule, etc.).
For each pipeline step, add the @task decorator and implement the function:
Load Data: Load the diamonds data from a CSV file using load_diamonds_data and return the DataFrame.
Process Data: Preprocess the data and split it into training and test sets using preprocess_diamonds_data. Return a dictionary with X_train, X_test, y_train, y_test, and preprocessor.
Train Model: Train a random forest model using the training data and return a dictionary with the model and the processed data.
Evaluate Model: Evaluate the model using evaluate_model and return a dictionary with the model, preprocessor, and metrics.
Check Model Quality: Check if the model’s R2 score meets a minimum threshold (e.g., 0.8). If not, raise an exception. If it passes, return the evaluation result.
Save Model: Save the model, preprocessor, and metadata (including metrics and a timestamp) using save_model. Return the model path.
Set up the dependencies so that each task runs in the correct order and passes the right data to the next step.
At the end, enable the DAG by calling your pipeline function.
Make sure your pipeline passes all the necessary information between tasks and only saves models that meet your quality standards.

This is your chance to show you can build a full, automated ML retraining workflow from start to finish!

```python
"""
Complete ML Retraining Pipeline DAG

This script will define an Airflow DAG for a full machine learning retraining workflow
for the diamonds dataset. You will need to fill in the tasks and set up the pipeline.
"""

from datetime import datetime, timedelta
import os
# TODO: Import the Airflow DAG and task decorators

# Import custom ML modules
from data import load_diamonds_data, preprocess_diamonds_data
from model import train_model, save_model
from evaluation import evaluate_model

# Default DAG arguments
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Constants
DATA_PATH = "diamonds.csv"
MODEL_DIR = "./saved_models"

# Ensure model directory exists
os.makedirs(MODEL_DIR, exist_ok=True)

# TODO: Add the dag decorator here with appropriate arguments (dag_id, description, schedule, ...)
# Make sure to use `mlops_pipeline` as `dag_id`.
def diamond_retraining_pipeline():
    """
    This DAG should run a full ML retraining workflow:
    1. Load data
    2. Preprocess data
    3. Train model
    4. Evaluate model
    5. Check model quality
    6. Save model and metadata
    """

    # TODO: Add the task decorator here
    def load_data_task():
        """Load the diamonds dataset from CSV."""
        # TODO: Load the diamonds data using load_diamonds_data and return the DataFrame
        pass

    # TODO: Add the task decorator here
    def process_data_task(df):
        """Preprocess the data and split into train/test sets."""
        # TODO: Preprocess the data using preprocess_diamonds_data
        # TODO: Return a dictionary with X_train, X_test, y_train, y_test, and preprocessor
        pass

    # TODO: Add the task decorator here
    def train_model_task(processed_data):
        """Train a random forest model."""
        # TODO: Train the model using train_model
        # TODO: Return a dictionary with the model and the processed_data
        pass

    # TODO: Add the task decorator here
    def evaluate_model_task(training_result):
        """Evaluate the trained model and return metrics."""
        # TODO: Evaluate the model using evaluate_model
        # TODO: Return a dictionary with the model, preprocessor, and metrics
        pass

    # TODO: Add the task decorator here
    def check_model_quality_task(evaluation_result):
        """
        Check if the model's R2 score meets the minimum threshold.
        Raise an exception if not.
        """
        # TODO: Set a minimum R2 score (e.g., 0.8)
        # TODO: Get the R2 score from evaluation_result["metrics"]
        # TODO: If the R2 score is missing or too low, raise an Exception
        # TODO: If the check passes, return evaluation_result
        pass

    # TODO: Add the task decorator here
    def save_model_task(evaluation_result):
        """Save the trained model, preprocessor, and metadata."""
        # TODO: Save the model using save_model
        # TODO: Use a timestamp for the model name and include metrics in metadata
        # TODO: Return the model path
        pass

    # TODO: Set up the task dependencies below


# TODO: Enable the DAG

```

```python
"""
Complete ML Retraining Pipeline DAG

This script will define an Airflow DAG for a full machine learning retraining workflow
for the diamonds dataset. You will need to fill in the tasks and set up the pipeline.
"""

from datetime import datetime, timedelta
import os
# TODO: Import the Airflow DAG and task decorators
from airflow.decorators import dag, task

# Import custom ML modules
from data import load_diamonds_data, preprocess_diamonds_data
from model import train_model, save_model
from evaluation import evaluate_model

# Default DAG arguments
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Constants
DATA_PATH = "diamonds.csv"
MODEL_DIR = "./saved_models"

# Ensure model directory exists
os.makedirs(MODEL_DIR, exist_ok=True)

# TODO: Add the dag decorator here with appropriate arguments (dag_id, description, schedule, ...)
# Make sure to use `mlops_pipeline` as `dag_id`.
@dag(
    dag_id='mlops_pipeline',
    description='A complete ML retraining pipeline for diamond price prediction',
    default_args=default_args,
    schedule_interval='@daily',
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=['ml', 'training'],
)
def diamond_retraining_pipeline():
    """
    This DAG should run a full ML retraining workflow:
    1. Load data
    2. Preprocess data
    3. Train model
    4. Evaluate model
    5. Check model quality
    6. Save model and metadata
    """

    # TODO: Add the task decorator here
    @task(task_id="load_data")
    def load_data_task():
        """Load the diamonds dataset from CSV."""
        # TODO: Load the diamonds data using load_diamonds_data and return the DataFrame
        print("Loading diamonds data...")
        df = load_diamonds_data(DATA_PATH)
        print("Data loaded successfully.")
        return df

    # TODO: Add the task decorator here
    @task(task_id="process_data")
    def process_data_task(df):
        """Preprocess the data and split into train/test sets."""
        # TODO: Preprocess the data using preprocess_diamonds_data
        # TODO: Return a dictionary with X_train, X_test, y_train, y_test, and preprocessor
        print("Preprocessing data...")
        X_train, X_test, y_train, y_test, preprocessor = preprocess_diamonds_data(df)
        print("Data preprocessing complete.")
        return {
            "X_train": X_train, 
            "X_test": X_test, 
            "y_train": y_train, 
            "y_test": y_test, 
            "preprocessor": preprocessor
        }

    # TODO: Add the task decorator here
    @task(task_id="train_model")
    def train_model_task(processed_data):
        """Train a random forest model."""
        # TODO: Train the model using train_model
        # TODO: Return a dictionary with the model and the processed_data
        print("Training model...")
        model = train_model(
            X_train=processed_data["X_train"],
            y_train=processed_data["y_train"],
            model_type="random_forest",
            n_estimators=10,
            max_depth=5,
            random_state=42
        )
        print("Model training complete.")
        return {"model": model, "processed_data": processed_data}

    # TODO: Add the task decorator here
    @task(task_id="evaluate_model")
    def evaluate_model_task(training_result):
        """Evaluate the trained model and return metrics."""
        # TODO: Evaluate the model using evaluate_model
        # TODO: Return a dictionary with the model, preprocessor, and metrics
        print("Evaluating model...")
        model = training_result["model"]
        processed_data = training_result["processed_data"]
        metrics, _ = evaluate_model(model, processed_data["X_test"], processed_data["y_test"])
        print(f"Model evaluation metrics: {metrics}")
        return {
            "model": model,
            "preprocessor": processed_data["preprocessor"],
            "metrics": metrics
        }

    # TODO: Add the task decorator here
    @task(task_id="check_model_quality")
    def check_model_quality_task(evaluation_result):
        """
        Check if the model's R2 score meets the minimum threshold.
        Raise an exception if not.
        """
        # TODO: Set a minimum R2 score (e.g., 0.8)
        # TODO: Get the R2 score from evaluation_result["metrics"]
        # TODO: If the R2 score is missing or too low, raise an Exception
        # TODO: If the check passes, return evaluation_result
        min_r2_score = 0.8
        r2_score = evaluation_result["metrics"].get("r2_score")
        
        if r2_score is None or r2_score < min_r2_score:
            raise ValueError(f"Model quality check failed. R2 score ({r2_score}) is below the threshold ({min_r2_score}).")
            
        print(f"Model quality check passed with R2 score: {r2_score}")
        return evaluation_result

    # TODO: Add the task decorator here
    @task(task_id="save_model")
    def save_model_task(evaluation_result):
        """Save the trained model, preprocessor, and metadata."""
        # TODO: Save the model using save_model
        # TODO: Use a timestamp for the model name and include metrics in metadata
        # TODO: Return the model path
        model = evaluation_result["model"]
        preprocessor = evaluation_result["preprocessor"]
        metrics = evaluation_result["metrics"]
        
        print(f"Saving model to {MODEL_DIR}")
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        model_name = f"diamond_price_model_{timestamp}"
        metadata = {
            "metrics": metrics,
            "model_type": "RandomForestRegressor",
            "timestamp": timestamp
        }
        
        model_path = save_model(
            model, 
            preprocessor,
            MODEL_DIR, 
            model_name, 
            metadata
        )
        
        print(f"Model saved to {model_path}")
        return model_path

    # TODO: Set up the task dependencies below
    df = load_data_task()
    processed_data = process_data_task(df)
    training_result = train_model_task(processed_data)
    evaluation_result = evaluate_model_task(training_result)
    quality_check_result = check_model_quality_task(evaluation_result)
    final_model_path = save_model_task(quality_check_result)


# TODO: Enable the DAG
diamond_retraining_pipeline()
```