# MovieLens Recommendation System Training

This notebook demonstrates how to train a hybrid matrix factorization recommendation system on the MovieLens 10M dataset using `pybpr`.

## 1. Import the required packages + set configuration

In [12]:
import os
import itertools
from functools import partial

import hero
import torch
import numpy as np
import pandas as pd
from dotenv import load_dotenv
from pathos.multiprocessing import ProcessPool

from pybpr import RecSys, UserItemData, HybridMF
from pybpr import bpr_loss, hinge_loss, bpr_loss_v2
from pybpr.movielens_downloader import MovieLensDownloader

load_dotenv()
torch.set_num_threads(1)  # Limit threads per process

In [13]:
hero_client = hero.HeroClient()
model_registry = hero_client.MLModelRegistry()
mlflow = model_registry.get_patched_mlflow()
mlflow.set_tracking_uri(model_registry.get_tracking_uri())

experiment_name = "MovieLens BPR Experiments"
experiment = model_registry.read_or_create_experiment(experiment_name)
print(f"Tracking URI: {model_registry.get_tracking_uri()}")
print(f"Experiment: {experiment.name} (ID: {experiment.experiment_id})")

Tracking URI: https://dev-hero.nrel.gov/ml-model-registry/api/v1/proxy/dev-recommender-system-app
Experiment: MovieLens BPR Experiments (ID: 133)


## 2. Load and prepare the data

We use `MovieLensDownloader` to download the MovieLens 20M dataset (which includes ratings and tag genome scores). Items (movies) without tag genome data are filtered out, and tags with relevance below 0.8 are dropped.

In [14]:
# Download and load the MovieLens 20M dataset (includes genome scores)
downloader = MovieLensDownloader(cache_dir='./data', log_level=20)
data = downloader.load_dataset_with_tags('ml-20m')

print(f"Available keys: {list(data.keys())}")
assert 'genome_scores' in data, (
    "genome_scores not found in loaded data. "
    "Check that genome-scores.csv exists in the downloaded dataset directory."
)

# Ratings
rdf = data['ratings'].rename(columns={
    'userId': 'UserID',
    'movieId': 'MovieID',
    'rating': 'Rating',
    'timestamp': 'Timestamp'
})

# Tag genome scores (movieId, tagId, relevance)
tdf = data['genome_scores'].rename(columns={
    'movieId': 'MovieID',
    'tagId': 'TagID',
    'relevance': 'Relevance'
})
tdf.drop(index=tdf.index[tdf.Relevance < 0.8], inplace=True)

# Filter ratings to only include movies with tag data
rdf = rdf[rdf.MovieID.isin(tdf.MovieID.unique())]

print(f"Ratings: {len(rdf):,} rows")
print(f"Tags: {len(tdf):,} rows")
print(f"Unique users: {rdf.UserID.nunique():,}")
print(f"Unique movies: {rdf.MovieID.nunique():,}")

MovieLensDownloader-INFO-Cache directory: data/movielens
MovieLensDownloader-INFO-Dataset ml-20m already exists at data/movielens/ml-20m
MovieLensDownloader-INFO-Loaded 20,000,263 ratings
MovieLensDownloader-INFO-Loaded 27,278 movies
MovieLensDownloader-INFO-Loaded 465,564 tags
MovieLensDownloader-INFO-Loaded 27,278 links
MovieLensDownloader-INFO-Loaded 11,709,768 genome_scores
MovieLensDownloader-INFO-Loaded 1,128 genome_tags
MovieLensDownloader-INFO-Loaded 6 datasets: ['ratings', 'movies', 'tags', 'links', 'genome_scores', 'genome_tags']
Available keys: ['ratings', 'movies', 'tags', 'links', 'genome_scores', 'genome_tags']
Ratings: 19,738,489 rows
Tags: 103,052 rows
Unique users: 138,493
Unique movies: 10,150


## 3. Define the model building function

The `build_recsys` function constructs a `UserItemData` object, configures positive/negative interactions and item features based on the experiment parameters, splits into train/test, and trains a `HybridMF` model using BPR.

In [None]:
def build_recsys(
    rdf,
    tdf,
    run,
    neg_option,
    item_option,
    n_latent,
    learning_rate,
    loss_function,
    weight_decay,
    n_iter=5,
    batch_size=1000,
    eval_every=10,
    eval_user_size=7000,
    output_dir="/kfs2/projects/zazzle/pybpr/examples/output/movielens/",
):
    """Build and train a recommendation system."""

    # Name the run
    name = f'{run}_{item_option}_{loss_function.__name__}'
    name += f'_{neg_option}'
    name += f'_ld{n_latent}_lr{int(learning_rate*1000)}'
    name += f'_wd{int(weight_decay*1000)}'
    print(f"Starting process: {name}", flush=True)

    # Build data object
    ui = UserItemData(name=name)

    ui.add_positive_interactions(
        user_ids=rdf.UserID[rdf.Rating >= 4.0],
        item_ids=rdf.MovieID[rdf.Rating >= 4.0]
    )
    if neg_option != 'neg-ignore':
        ui.add_negative_interactions(
            user_ids=rdf.UserID[rdf.Rating < 4.0],
            item_ids=rdf.MovieID[rdf.Rating < 4.0]
        )
    ui.add_user_features(
        user_ids=rdf.UserID.unique(),
        feature_ids=rdf.UserID.unique()
    )
    if item_option == 'metadata':
        ui.add_item_features(
            item_ids=tdf.MovieID,
            feature_ids=tdf.TagID
        )
    elif item_option == 'indicator':
        ui.add_item_features(
            item_ids=tdf.MovieID.unique(),
            feature_ids=tdf.MovieID.unique()
        )
    elif item_option == 'both':
        ui.add
        ui.add_item_features(
            item_ids=np.concatenate(
                (tdf.MovieID.values, tdf.MovieID.unique())),
            feature_ids=np.concatenate(
                (tdf.TagID.values, tdf.TagID.max() + tdf.MovieID.unique()))
        )
    else:
        raise ValueError(f"Unknown item_features type: {item_option}")

    # Split data into train and test sets
    ui.train_test_split(
        train_ratio_pos=0.8,
        train_ratio_neg=0.0 if neg_option == 'neg-test' else 0.8,
        show_progress=False
    )
    print(ui, flush=True)

    # Build recommender
    recommender = RecSys(
        data=ui,
        model=HybridMF(ui.n_user_features,
                       ui.n_item_features, n_latent=n_latent),
        optimizer=partial(
            torch.optim.Adam, lr=learning_rate, weight_decay=weight_decay
        ),
        loss_function=loss_function,
        output_dir=os.path.join(output_dir, ui.name),
        log_level=1
    )

    # Train the model
    recommender.fit(
        n_iter=n_iter,
        batch_size=batch_size,
        eval_every=eval_every,
        eval_user_size=eval_user_size,
        early_stopping_patience=100
    )

    print(f"Finished process: {name}")
    return recommender

## 4. Define the experiment parameter grid

We define a grid of hyperparameters to sweep over. Each combination of parameters will be run as a separate experiment. Adjust the grid to expand or narrow the search space.

In [4]:
param_grid = {
    'run': list(range(1)),
    'item_option': ['metadata', 'indicator'],
    'n_latent': [64],
    'learning_rate': [0.005],
    'loss_function': [bpr_loss],
    'weight_decay': [0],
    'neg_option': ['neg-ignore', 'neg-test', 'neg-both'],
}

# Generate all parameter combinations
all_params = []
param_names = list(param_grid.keys())
param_values = list(param_grid.values())
for values in itertools.product(*param_values):
    params = dict(zip(param_names, values))
    all_params.append(params)

print(f"Total experiments to run: {len(all_params)}")
for i, p in enumerate(all_params):
    print(f"  [{i}] item={p['item_option']}, neg={p['neg_option']}, loss={p['loss_function'].__name__}")

Total experiments to run: 6
  [0] item=metadata, neg=neg-ignore, loss=bpr_loss
  [1] item=metadata, neg=neg-test, loss=bpr_loss
  [2] item=metadata, neg=neg-both, loss=bpr_loss
  [3] item=indicator, neg=neg-ignore, loss=bpr_loss
  [4] item=indicator, neg=neg-test, loss=bpr_loss
  [5] item=indicator, neg=neg-both, loss=bpr_loss


## 5. Run experiments in parallel

We use Python's `multiprocessing.Pool` to run the experiments in parallel across available CPU cores.

In [16]:
def run_experiment(params, rdf, tdf):
    """Run a single experiment with the given parameters."""
    try:
        recommender = build_recsys(rdf, tdf, **params)
        return {"status": "success", "recommender": recommender, "params": params}
    except Exception as e:
        return {"status": "failed", "error": str(e), "params": params}


# Create a partial function with fixed rdf and tdf
run_with_fixed_data = partial(run_experiment, rdf=rdf, tdf=tdf)

# Set the number of processes to use
num_processes = min(len(all_params), os.cpu_count())
print(f"Using {num_processes} processes")

# Run experiments in parallel using pathos (supports dill serialization)
pool = ProcessPool(nodes=num_processes)
results = pool.map(run_with_fixed_data, all_params)
pool.close()
pool.join()

Using 6 processes
Starting process: 0_metadata_bpr_loss_neg-ignore_ld64_lr5_wd0


UserItemData.0_metadata_bpr_loss_neg-ignore_ld64_lr5_wd0 - INFO - Initialized UserItemData '0_metadata_bpr_loss_neg-ignore_ld64_lr5_wd0' with dtype <class 'numpy.float32'>
UserItemData.0_metadata_bpr_loss_neg-ignore_ld64_lr5_wd0 - INFO - Adding 9,908,333 positive interactions


Starting process: 0_metadata_bpr_loss_neg-test_ld64_lr5_wd0


UserItemData.0_metadata_bpr_loss_neg-test_ld64_lr5_wd0 - INFO - Initialized UserItemData '0_metadata_bpr_loss_neg-test_ld64_lr5_wd0' with dtype <class 'numpy.float32'>
UserItemData.0_metadata_bpr_loss_neg-test_ld64_lr5_wd0 - INFO - Adding 9,908,333 positive interactions
UserItemData.0_metadata_bpr_loss_neg-both_ld64_lr5_wd0 - INFO - Initialized UserItemData '0_metadata_bpr_loss_neg-both_ld64_lr5_wd0' with dtype <class 'numpy.float32'>


Starting process: 0_metadata_bpr_loss_neg-both_ld64_lr5_wd0


UserItemData.0_metadata_bpr_loss_neg-both_ld64_lr5_wd0 - INFO - Adding 9,908,333 positive interactions


Starting process: 0_indicator_bpr_loss_neg-ignore_ld64_lr5_wd0


UserItemData.0_indicator_bpr_loss_neg-ignore_ld64_lr5_wd0 - INFO - Initialized UserItemData '0_indicator_bpr_loss_neg-ignore_ld64_lr5_wd0' with dtype <class 'numpy.float32'>
UserItemData.0_indicator_bpr_loss_neg-ignore_ld64_lr5_wd0 - INFO - Adding 9,908,333 positive interactions


Starting process: 0_indicator_bpr_loss_neg-test_ld64_lr5_wd0


UserItemData.0_indicator_bpr_loss_neg-test_ld64_lr5_wd0 - INFO - Initialized UserItemData '0_indicator_bpr_loss_neg-test_ld64_lr5_wd0' with dtype <class 'numpy.float32'>


Starting process: 0_indicator_bpr_loss_neg-both_ld64_lr5_wd0


UserItemData.0_indicator_bpr_loss_neg-both_ld64_lr5_wd0 - INFO - Initialized UserItemData '0_indicator_bpr_loss_neg-both_ld64_lr5_wd0' with dtype <class 'numpy.float32'>
UserItemData.0_indicator_bpr_loss_neg-test_ld64_lr5_wd0 - INFO - Adding 9,908,333 positive interactions
UserItemData.0_indicator_bpr_loss_neg-both_ld64_lr5_wd0 - INFO - Adding 9,908,333 positive interactions
UserItemData.0_metadata_bpr_loss_neg-ignore_ld64_lr5_wd0 - INFO - Successfully added positive interactions. New dimensions: 138287 users × 10130 items
UserItemData.0_metadata_bpr_loss_neg-ignore_ld64_lr5_wd0 - INFO - Adding 138,493 user features
UserItemData.0_metadata_bpr_loss_neg-ignore_ld64_lr5_wd0 - INFO - Successfully added user features
UserItemData.0_metadata_bpr_loss_neg-ignore_ld64_lr5_wd0 - INFO - 138493 users × 138493 user features
UserItemData.0_metadata_bpr_loss_neg-ignore_ld64_lr5_wd0 - INFO - Adding 103,052 item features
UserItemData.0_metadata_bpr_loss_neg-ignore_ld64_lr5_wd0 - INFO - Successfully a

UserItemData(0_metadata_bpr_loss_neg-ignore_ld64_lr5_wd0)
  Fuser     :(138493×138493) nnz=   138,493 (0.001%), empty rows/cols=     0/     0
  Fitem     :( 10160×  1104) nnz=   103,052 (0.919%), empty rows/cols=     0/     0
  Rpos      :(138493× 10160) nnz= 9,908,333 (0.704%), empty rows/cols=   206/    30
    └─ users: min=1, max=3028 | items: min=1, max=55807
  Rneg      :(138493× 10160) nnz=         0 (0.000%), empty rows/cols=138493/ 10160
  Rpos_train:(138493× 10160) nnz= 7,926,717 (0.563%), empty rows/cols=   206/    33
    └─ users: min=1, max=2423 | items: min=1, max=44687
  Rpos_test :(138493× 10160) nnz= 1,981,616 (0.141%), empty rows/cols=  4007/   220
    └─ users: min=1, max=605 | items: min=1, max=11120
  Rneg_train:(138493× 10160) nnz=         0 (0.000%), empty rows/cols=138493/ 10160
  Rneg_test :(138493× 10160) nnz=         0 (0.000%), empty rows/cols=138493/ 10160


UserItemData.0_indicator_bpr_loss_neg-ignore_ld64_lr5_wd0 - INFO - Adding 138,493 user features
UserItemData.0_indicator_bpr_loss_neg-ignore_ld64_lr5_wd0 - INFO - Successfully added user features
UserItemData.0_indicator_bpr_loss_neg-ignore_ld64_lr5_wd0 - INFO - 138493 users × 138493 user features
UserItemData.0_indicator_bpr_loss_neg-ignore_ld64_lr5_wd0 - INFO - Adding 10,160 item features
UserItemData.0_indicator_bpr_loss_neg-ignore_ld64_lr5_wd0 - INFO - Successfully added item features
UserItemData.0_indicator_bpr_loss_neg-ignore_ld64_lr5_wd0 - INFO - New dimensions: 10160 items × 10160 item features
UserItemData.0_indicator_bpr_loss_neg-ignore_ld64_lr5_wd0 - INFO - Splitting interactions: train_pos=0.80, train_neg=0.80, random_state=None
INFO - Initiating Hybrid Recommender System..
UserItemData.0_indicator_bpr_loss_neg-test_ld64_lr5_wd0 - INFO - Successfully added positive interactions. New dimensions: 138287 users × 10130 items
UserItemData.0_indicator_bpr_loss_neg-both_ld64_lr5_

UserItemData(0_indicator_bpr_loss_neg-ignore_ld64_lr5_wd0)
  Fuser     :(138493×138493) nnz=   138,493 (0.001%), empty rows/cols=     0/     0
  Fitem     :( 10160× 10160) nnz=    10,160 (0.010%), empty rows/cols=     0/     0
  Rpos      :(138493× 10160) nnz= 9,908,333 (0.704%), empty rows/cols=   206/    30
    └─ users: min=1, max=3028 | items: min=1, max=55807
  Rneg      :(138493× 10160) nnz=         0 (0.000%), empty rows/cols=138493/ 10160
  Rpos_train:(138493× 10160) nnz= 7,926,742 (0.563%), empty rows/cols=   206/    34
    └─ users: min=1, max=2440 | items: min=1, max=44751
  Rpos_test :(138493× 10160) nnz= 1,981,591 (0.141%), empty rows/cols=  3965/   235
    └─ users: min=1, max=588 | items: min=1, max=11056
  Rneg_train:(138493× 10160) nnz=         0 (0.000%), empty rows/cols=138493/ 10160
  Rneg_test :(138493× 10160) nnz=         0 (0.000%), empty rows/cols=138493/ 10160


INFO - Initiating Hybrid Recommender System..
UserItemData.0_metadata_bpr_loss_neg-test_ld64_lr5_wd0 - INFO - Successfully added negative interactions. New dimensions: 138493 users × 10150 items
UserItemData.0_metadata_bpr_loss_neg-test_ld64_lr5_wd0 - INFO - Adding 138,493 user features
UserItemData.0_metadata_bpr_loss_neg-test_ld64_lr5_wd0 - INFO - Successfully added user features
UserItemData.0_metadata_bpr_loss_neg-test_ld64_lr5_wd0 - INFO - 138493 users × 138493 user features
UserItemData.0_metadata_bpr_loss_neg-test_ld64_lr5_wd0 - INFO - Adding 103,052 item features
UserItemData.0_metadata_bpr_loss_neg-test_ld64_lr5_wd0 - INFO - Successfully added item features
UserItemData.0_metadata_bpr_loss_neg-test_ld64_lr5_wd0 - INFO - New dimensions: 10160 items × 1104 item features
UserItemData.0_metadata_bpr_loss_neg-both_ld64_lr5_wd0 - INFO - Successfully added negative interactions. New dimensions: 138493 users × 10150 items
UserItemData.0_metadata_bpr_loss_neg-both_ld64_lr5_wd0 - INFO -

UserItemData(0_metadata_bpr_loss_neg-both_ld64_lr5_wd0)
  Fuser     :(138493×138493) nnz=   138,493 (0.001%), empty rows/cols=     0/     0
  Fitem     :( 10160×  1104) nnz=   103,052 (0.919%), empty rows/cols=     0/     0
  Rpos      :(138493× 10160) nnz= 9,908,333 (0.704%), empty rows/cols=   206/    30
    └─ users: min=1, max=3028 | items: min=1, max=55807
  Rneg      :(138493× 10160) nnz= 9,830,156 (0.699%), empty rows/cols=   239/    12
    └─ users: min=1, max=4461 | items: min=1, max=27737
  Rpos_train:(138493× 10160) nnz= 7,926,727 (0.563%), empty rows/cols=   206/    38
    └─ users: min=1, max=2425 | items: min=1, max=44450
  Rpos_test :(138493× 10160) nnz= 1,981,606 (0.141%), empty rows/cols=  3946/   240
    └─ users: min=1, max=603 | items: min=1, max=11357
  Rneg_train:(138493× 10160) nnz= 7,864,217 (0.559%), empty rows/cols=   239/    14
    └─ users: min=1, max=3556 | items: min=1, max=22207
  Rneg_test :(138493× 10160) nnz= 1,965,939 (0.140%), empty rows/cols=  6734/

INFO - Initiating Hybrid Recommender System..


UserItemData(0_indicator_bpr_loss_neg-both_ld64_lr5_wd0)
  Fuser     :(138493×138493) nnz=   138,493 (0.001%), empty rows/cols=     0/     0
  Fitem     :( 10160× 10160) nnz=    10,160 (0.010%), empty rows/cols=     0/     0
  Rpos      :(138493× 10160) nnz= 9,908,333 (0.704%), empty rows/cols=   206/    30
    └─ users: min=1, max=3028 | items: min=1, max=55807
  Rneg      :(138493× 10160) nnz= 9,830,156 (0.699%), empty rows/cols=   239/    12
    └─ users: min=1, max=4461 | items: min=1, max=27737
  Rpos_train:(138493× 10160) nnz= 7,926,726 (0.563%), empty rows/cols=   206/    40
    └─ users: min=1, max=2456 | items: min=1, max=44610
  Rpos_test :(138493× 10160) nnz= 1,981,607 (0.141%), empty rows/cols=  4004/   214
    └─ users: min=1, max=572 | items: min=1, max=11197
  Rneg_train:(138493× 10160) nnz= 7,864,210 (0.559%), empty rows/cols=   239/    15
    └─ users: min=1, max=3558 | items: min=1, max=22191
  Rneg_test :(138493× 10160) nnz= 1,965,946 (0.140%), empty rows/cols=  6765

INFO - Initiating Hybrid Recommender System..


## 6. Log results to MLflow

After parallel training completes, we log each experiment's parameters, metrics, and artifacts to the HERO ML Model Registry.

In [18]:
for result in results:
    params = result["params"]
    run_name = f"{params['item_option']}_{params['neg_option']}_{params['loss_function'].__name__}"

    if result["status"] == "failed":
        print(f"SKIPPED (failed): {run_name} - {result['error']}")
        continue

    recommender = result["recommender"]
    metrics_list = recommender.metrics

    with mlflow.start_run(
        experiment_id=experiment.experiment_id,
        run_name=run_name,
    ):
        # Log hyperparameters
        mlflow.log_params({
            "item_option": params["item_option"],
            "neg_option": params["neg_option"],
            "n_latent": params["n_latent"],
            "learning_rate": params["learning_rate"],
            "weight_decay": params["weight_decay"],
            "loss_function": params["loss_function"].__name__,
            "run": params["run"],
        })

        # Log final epoch metrics
        final = metrics_list[-1]
        for key in ["loss", "train_auc", "train_auc_std", "train_loss",
                     "test_auc", "test_auc_std", "test_loss"]:
            if key in final:
                mlflow.log_metric(f"final_{key}", final[key])

        # Log best test AUC across all epochs
        test_aucs = [m["test_auc"] for m in metrics_list if "test_auc" in m]
        if test_aucs:
            mlflow.log_metric("best_test_auc", max(test_aucs))

        # Log per-epoch metrics as step metrics
        for m in metrics_list:
            step = m["epoch"]
            mlflow.log_metric("loss", m["loss"], step=step)
            for key in ["train_auc", "test_auc", "train_loss", "test_loss"]:
                if key in m:
                    mlflow.log_metric(key, m[key], step=step)

        # Save and log metrics JSON as artifact
        recommender.save_metrics()
        mlflow.log_artifact(
            os.path.join(recommender.output_dir, "metrics.json"),
            artifact_path="metrics"
        )

        # Log best model checkpoint as artifact
        model_files = [f for f in os.listdir(recommender.output_dir)
                       if f.startswith("best_model") and f.endswith(".torch")]
        for mf in model_files:
            mlflow.log_artifact(
                os.path.join(recommender.output_dir, mf),
                artifact_path="model"
            )

    print(f"Logged: {run_name}")

print(f"\nAll experiments logged to experiment: {experiment.name}")
print(f"Tracking URI: {model_registry.get_tracking_uri()}")

SKIPPED (failed): metadata_neg-ignore_bpr_loss - [Errno 30] Read-only file system: '/kfs2'
SKIPPED (failed): metadata_neg-test_bpr_loss - train_ratio_neg must be between 0 and 1, got 0.0
SKIPPED (failed): metadata_neg-both_bpr_loss - [Errno 30] Read-only file system: '/kfs2'
SKIPPED (failed): indicator_neg-ignore_bpr_loss - [Errno 30] Read-only file system: '/kfs2'
SKIPPED (failed): indicator_neg-test_bpr_loss - train_ratio_neg must be between 0 and 1, got 0.0
SKIPPED (failed): indicator_neg-both_bpr_loss - [Errno 30] Read-only file system: '/kfs2'

All experiments logged to experiment: MovieLens BPR Experiments
Tracking URI: https://dev-hero.nrel.gov/ml-model-registry/api/v1/proxy/dev-recommender-system-app
