# FM Optuna Pipeline

Run Factorization Machines hyperparameter optimization with Optuna and MLflow tracking.

In [16]:
import sagemaker
from sagemaker.workflow.notebook_job_step import NotebookJobStep
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_context import LocalPipelineSession, PipelineSession
import optuna
import mlflow
from mlflow.tracking import MlflowClient
import boto3
from pathlib import Path
import os
import dask
from typing import List, Dict
from sagemaker import get_execution_role


In [17]:
def define_steps_for_pipeline(
    config_dict: dict,
    image_uri: str,
    notebook_artifacts: str,
    input_notebook_name: str,
    kernel_name: str = "python3",
    instance_type: str = "ml.m5.xlarge",
    role: str = None,
    **params,
) -> list:
    """
    Define notebook job steps for the pipeline.

    Parameters
    ----------
    config_dict : dict
        Dictionary mapping config names to parameter lists
        e.g., {"config_1": {"n_users": 5000, "n_games": 100}, ...}
    image_uri : str
        SageMaker container image URI
    notebook_artifacts : str
        S3 path for notebook artifacts
    input_notebook_name : str
        Name of the notebook to execute
    kernel_name : str
        Jupyter kernel name
    instance_type : str
        SageMaker instance type
    role : str
        SageMaker execution role ARN
    **params : dict
        Additional parameters passed to all notebook jobs

    Returns
    -------
    list
        List of NotebookJobStep objects
    """
    pipeline_steps = []
    if role is None:
        role = sagemaker.get_execution_role()

    for config_name, config_params in config_dict.items():
        # Merge config params with global params
        nb_job_params = {
            "config_name": config_name,
            **config_params,
            **params,
        }

        train_description = f"FM Optuna training for {config_name}"
        train_id = f"fm-train-{config_name}"

        nb_step = NotebookJobStep(
            name=train_id,
            description=train_description,
            notebook_job_name=train_id,
            image_uri=image_uri,
            kernel_name=kernel_name,
            display_name=train_id,
            role=role,
            s3_root_uri=notebook_artifacts,
            input_notebook=input_notebook_name,
            instance_type=instance_type,
            parameters=nb_job_params,
            max_runtime_in_seconds=86400,  # 24 hours
            max_retry_attempts=3,
        )
        pipeline_steps.append(nb_step)

    return pipeline_steps


## Configuration

In [18]:
# Pipeline configuration
instance_type = "ml.m5.xlarge"
pipeline_name = "fm-optuna-pipeline"
train_notebook = "fm_train.ipynb"
bucket = "fm-gambling-recommender-dev-376337229415"
subfolder_name = "fm-training"
region = "us-east-1"
image_uri = f"arn:aws:sagemaker:{region}:885854791233:image/sagemaker-distribution-cpu"
kernel_name = "python3"
notebook_artifacts = f"s3://{bucket}/{subfolder_name}"

# Training parameters
max_trials = "20"
early_stopping = "5"
experiment_name = "fm_gambling_optuna_2025"


# mlflow server details

os.environ["MLFLOW_ENABLE_SYSTEM_METRICS_LOGGING"] = "true"
os.environ["MLFLOW_TRACKING_URI"] = "arn:aws:sagemaker:us-east-1:376337229415:mlflow-app/app-JZITH5VWKAWZ"

# Different dataset configurations to test
config_dict = {
    "small": {
        "n_users": "1000",
        "n_games": "50",
        "n_days": "90",
    },
    "medium": {
        "n_users": "5000",
        "n_games": "100",
        "n_days": "180",
    },
    "large": {
        "n_users": "10000",
        "n_games": "200",
        "n_days": "365",
    },
}


## Define Pipeline Steps

In [19]:
params = {
    "max_trials": max_trials,
    "experiment_name": experiment_name,
    "early_stopping": early_stopping,
}

pipeline_steps = define_steps_for_pipeline(
    config_dict,
    image_uri,
    notebook_artifacts,
    train_notebook,
    kernel_name,
    instance_type,
    **params
)

print(f"Created {len(pipeline_steps)} pipeline steps")

Created 3 pipeline steps


## Local Mode Execution (Testing)

Use local mode to test the pipeline before running on SageMaker.

In [20]:

session = LocalPipelineSession()
pipeline = Pipeline(name=pipeline_name, steps=pipeline_steps, sagemaker_session=session)
role = sagemaker.get_execution_role()
pipeline.create(role)
execution = pipeline.start()

INFO:sagemaker.telemetry.telemetry_logging:SageMaker Python SDK will collect telemetry to help us better understand our user's needs, diagnose issues, and deliver additional features.
To opt out of telemetry, please disable via TelemetryOptOut parameter in SDK defaults config. For more information, refer to https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk.
INFO:sagemaker.local.entities:Starting execution for pipeline fm-optuna-pipeline. Execution ID is 74b37890-6f17-4dd7-bcd6-aa53f1e5ca70
INFO:sagemaker.local.entities:Starting pipeline step: 'fm-train-large'
INFO:sagemaker.telemetry.telemetry_logging:SageMaker Python SDK will collect telemetry to help us better understand our user's needs, diagnose issues, and deliver additional features.
To opt out of telemetry, please disable via TelemetryOptOut parameter in SDK defaults config. For more information, refer to https://sagemaker.readthedocs.io/en/stable/overview.html#

## SageMaker Pipeline Execution

In [None]:
# Execute on SageMaker

session = PipelineSession()
pipeline = Pipeline(name=pipeline_name, steps=pipeline_steps, sagemaker_session=session)

if role is None:
    role = sagemaker.get_execution_role()

pipeline.upsert(role)
execution = pipeline.start()
print(execution.describe())


## Aggregate Results

After pipeline completes, aggregate Optuna studies from all runs.

In [None]:


def copy_optuna_study_to_db(source_db_file, target_db_file):
    study_name = optuna.study.get_all_study_names(
        storage=f"sqlite:///{source_db_file}"
    )[0]
    optuna.copy_study(
        from_study_name=study_name,
        from_storage=f"sqlite:///{source_db_file}",
        to_storage=f"sqlite:///{target_db_file}",
    )


def copy_all_studies_to_new_file(target_db_file, source_db_dir):
    if Path(target_db_file).exists():
        print(f"Removing existing optuna target db {target_db_file}")
        Path(target_db_file).unlink()
    for source_file in Path(source_db_dir).iterdir():
        if source_file.suffix == ".db":
            copy_optuna_study_to_db(source_file, target_db_file)


def download_single_artifact(run_info: Dict, folder, base_path: str) -> str:
    """Download an mlflow artifact for a single run based on root folder"""
    try:
        run_id = run_info["run_id"]
        run_path = Path(base_path)
        run_path.mkdir(exist_ok=True)
        client = mlflow.tracking.MlflowClient()
        artifacts = client.list_artifacts(run_id)
        for artifact in artifacts:
            if folder in artifact.path:
                client.download_artifacts(run_id, artifact.path, str(run_path))
    except Exception as e:
        return f"Error downloading artifacts for run {run_id}: {str(e)}"


def get_experiment_runs(experiment_name) -> List[Dict]:
    """Get all runs for the specified experiment"""
    experiment = mlflow.get_experiment_by_name(experiment_name)
    if not experiment:
        raise ValueError(f"Experiment {experiment_name} not found")
    runs = mlflow.search_runs(experiment_ids=[experiment.experiment_id])
    return runs.to_dict("records")


def download_artifacts_across_runs(experiment_name: str, folder: str, local_path: str):
    runs = get_experiment_runs(experiment_name)
    local_path = Path(local_path)
    local_path.mkdir(parents=True, exist_ok=True)
    # Create delayed objects for each download task
    delayed_tasks = [
        dask.delayed(download_single_artifact)(run, folder, str(local_path))
        for run in runs
    ]
    # Compute all tasks
    dask.compute(*delayed_tasks)


def execute_study_agg_pipeline(
    experiment_name: str,
    optuna_target_db: str = "all_studies.db",
    local_folder: str = "results",
    mlflow_artifact_folder: str = "optuna_db",
) -> None:
    """
    Aggregate Optuna studies from MLflow experiment runs.

    Parameters
    ----------
    experiment_name : str
        MLflow experiment name
    optuna_target_db : str
        Target database filename for aggregated studies
    local_folder : str
        Local folder to download artifacts to
    mlflow_artifact_folder : str
        MLflow artifact folder name containing Optuna DBs
    """
    Path(local_folder).mkdir(exist_ok=True)

    # Download Optuna DB artifacts from all runs
    download_artifacts_across_runs(
        experiment_name, mlflow_artifact_folder, local_folder
    )

    # Combine all studies into single database
    copy_all_studies_to_new_file(
        f"{local_folder}/{optuna_target_db}",
        f"{local_folder}/{mlflow_artifact_folder}",
    )

    print(f"Aggregated studies saved to {local_folder}/{optuna_target_db}")


In [None]:
# Aggregate studies
execute_study_agg_pipeline(
    experiment_name=experiment_name,
    optuna_target_db="all_fm_studies.db",
    local_folder="results",
)

## View Best Results

In [None]:
# Load aggregated study
study = optuna.load_study(
    study_name="fm_gambling",
    storage="sqlite:///results/all_fm_studies.db"
)

print(f"Best RMSE: {-study.best_value:.4f}")
print(f"Best params: {study.best_params}")