
# 01 — Clean pipeline (SWAP‑SAFE): Data → ALS → Metrics → MLflow

This version is **robust** to the occasional factor **swap bug** in `implicit` (when item/user
factor matrices get flipped). It also limits BLAS threads to avoid warnings/perf issues.


In [1]:

import os, io, zipfile, pathlib, json, warnings
import numpy as np
import pandas as pd

from scipy.sparse import csr_matrix
from sklearn.model_selection import train_test_split
from collections import defaultdict

# BLAS/threading hygiene
os.environ.setdefault("OPENBLAS_NUM_THREADS", "1")
os.environ.setdefault("OMP_NUM_THREADS", "1")
try:
    from threadpoolctl import threadpool_limits
    threadpool_limits(1, "blas")
except Exception:
    pass

# Core libs
from implicit.nearest_neighbours import bm25_weight
from implicit.als import AlternatingLeastSquares

import mlflow
print("MLflow tracking URI:", mlflow.get_tracking_uri())


MLflow tracking URI: http://mlflow:5001


In [2]:

ROOT = pathlib.Path("/workspace")
DATA = ROOT / "data" / "raw"
DATA.mkdir(parents=True, exist_ok=True)

URL = "https://files.grouplens.org/datasets/movielens/ml-latest-small.zip"
TARGET = DATA / "ml-latest-small"
RATINGS_CSV = TARGET / "ratings.csv"
MOVIES_CSV  = TARGET / "movies.csv"

if not RATINGS_CSV.exists():
    import requests
    print("Downloading MovieLens (ml-latest-small)…")
    z = zipfile.ZipFile(io.BytesIO(requests.get(URL, timeout=60).content))
    z.extractall(DATA)

ratings = pd.read_csv(RATINGS_CSV)
movies  = pd.read_csv(MOVIES_CSV)

print("ratings:", ratings.shape, "movies:", movies.shape)
ratings.head(3)


ratings: (100836, 4) movies: (9742, 3)


Unnamed: 0,userId,movieId,rating,timestamp
0,1,1,4.0,964982703
1,1,3,4.0,964981247
2,1,6,4.0,964982224


In [3]:

# Positive-only implicit feedback
pos = ratings[ratings["rating"] >= 4.0][["userId", "movieId"]].copy()
pos["rating"] = 1.0

# Compact categorical mapping → contiguous user/item indices
u_cat = pos["userId"].astype("category")
i_cat = pos["movieId"].astype("category")

u_idx = u_cat.cat.codes.to_numpy()
i_idx = i_cat.cat.codes.to_numpy()
vals  = pos["rating"].to_numpy(dtype=np.float32)

n_users = len(u_cat.cat.categories)
n_items = len(i_cat.cat.categories)

UI = csr_matrix((vals, (u_idx, i_idx)), shape=(n_users, n_items), dtype=np.float32)
print("UI shape (users, items):", UI.shape, "nnz:", UI.nnz)

# Mapping tables (for API/UI)
users_map = pd.DataFrame({"user_index": np.arange(n_users), "userId": u_cat.cat.categories.astype(int)})
items_map = pd.DataFrame({"item_index": np.arange(n_items), "movieId": i_cat.cat.categories.astype(int)}).merge(
    movies[["movieId","title"]], on="movieId", how="left"
)
users_map.head(), items_map.head()


UI shape (users, items): (609, 6298) nnz: 48580


(   user_index  userId
 0           0       1
 1           1       2
 2           2       3
 3           3       4
 4           4       5,
    item_index  movieId                               title
 0           0        1                    Toy Story (1995)
 1           1        2                      Jumanji (1995)
 2           2        3             Grumpier Old Men (1995)
 3           3        5  Father of the Bride Part II (1995)
 4           4        6                         Heat (1995))

In [4]:

rows, cols = UI.nonzero()
idx_all = np.arange(UI.nnz, dtype=np.int64)
train_idx, test_idx = train_test_split(idx_all, test_size=0.2, random_state=42)

def build_sparse(indices):
    r = rows[indices]; c = cols[indices]; v = UI.data[indices]
    return csr_matrix((v, (r, c)), shape=UI.shape, dtype=np.float32)

UI_train = build_sparse(train_idx)
UI_test  = build_sparse(test_idx)

UI_train_w = bm25_weight(UI_train, K1=1.2, B=0.75).astype(np.float32)
UI_train_w.shape, UI_train_w.nnz


((609, 6298), 38864)

In [5]:

# Hyperparams
factors = 64
regularization = 0.02
iterations = 15
k_eval = 10

# Train on ITEM×USER (transpose)
IU_train_w = UI_train_w.T.tocsr()
print("IU_train_w (items, users):", IU_train_w.shape)

model = AlternatingLeastSquares(
    factors=factors,
    regularization=regularization,
    iterations=iterations,
    random_state=42
)
model.fit(IU_train_w)

# Shapes reported by model
print("item_factors:", model.item_factors.shape)
print("user_factors:", model.user_factors.shape)

# Detect potential swap (rare implicit quirk)
n_items_train = UI_train.shape[1]
n_users_train = UI_train.shape[0]
n_items_model, k1 = model.item_factors.shape
n_users_model, k2 = model.user_factors.shape

SWAPPED = (n_items_model == n_users_train) and (n_users_model == n_items_train)
print("SWAPPED detected:", SWAPPED)

def get_items_matrix():
    return model.user_factors if SWAPPED else model.item_factors

def get_user_vec(u: int):
    return model.item_factors[u] if SWAPPED else model.user_factors[u]


IU_train_w (items, users): (6298, 609)


  0%|          | 0/15 [00:00<?, ?it/s]

item_factors: (609, 64)
user_factors: (6298, 64)
SWAPPED detected: True


In [6]:

# Build truth from TEST and define popularity (from TRAIN)
truth = defaultdict(set)
t_rows, t_cols = UI_test.nonzero()
for r, c in zip(t_rows, t_cols):
    truth[r].add(c)

pop_counts = np.asarray(UI_train.sum(axis=0)).ravel()
pop_order = np.argsort(-pop_counts)

def recommend_manual(u: int, N: int = 10) -> list[int]:
    I = get_items_matrix()
    n_items_eff = I.shape[0]

    # Cold user in TRAIN → popularity
    if UI_train.getrow(u).nnz == 0:
        return pop_order[:N].tolist()

    # Dot-product scores
    scores = I @ get_user_vec(u)
    # Mask seen (clip to model's item range)
    seen = [i for i in UI_train.getrow(u).indices if i < n_items_eff]
    if seen:
        scores[seen] = -1e12

    top = np.argpartition(-scores, min(N, n_items_eff-1))[:N]
    top = top[np.argsort(-scores[top])]
    return top.tolist()

def eval_at_k_manual(k: int = 10, sample_users: int = 500):
    users = np.array(list(truth.keys()))
    if len(users) > sample_users:
        rng = np.random.default_rng(42)
        users = rng.choice(users, size=sample_users, replace=False)

    # Also clip TEST truth to model's item range
    n_items_eff = get_items_matrix().shape[0]
    clipped_truth = {u: {i for i in items if i < n_items_eff} for u, items in truth.items()}

    precs, recs, maps = [], [], []
    for u in users:
        t = clipped_truth[u]
        if not t:
            continue
        p = recommend_manual(u, N=k)
        inter = len(set(p) & t)
        precs.append(inter / k)                # Precision@k
        recs.append(inter / len(t))            # Recall@k

        hits, score = 0, 0.0                   # MAP@k
        for rank, item in enumerate(p, start=1):
            if item in t:
                hits += 1
                score += hits / rank
        maps.append(score / min(k, len(t)))
    return float(np.mean(precs)), float(np.mean(recs)), float(np.mean(maps))

p_atk, r_atk, map_atk = eval_at_k_manual(k=k_eval, sample_users=500)
print({"precision_at_10": p_atk, "recall_at_10": r_atk, "map_at_10": map_atk})


{'precision_at_10': 0.1526, 'recall_at_10': 0.1578889390139475, 'map_at_10': 0.11149441861929957}


In [7]:

# Save artifacts & Log to MLflow
ART = ROOT / "artifacts"
ART.mkdir(exist_ok=True)

# Save swap-safe model (npz with factors)
model_path = ART / "als_model.npz"
np.savez_compressed(model_path, user_factors=model.user_factors, item_factors=model.item_factors)

# Save mappings
users_map_path = ART / "users_map.csv"
items_map_path = ART / "items_map.csv"
users_map.to_csv(users_map_path, index=False)
items_map.to_csv(items_map_path, index=False)

# Save config (including SWAPPED flag)
config = {
    "model": "implicit_ALS",
    "factors": int(model.item_factors.shape[1]),
    "regularization": regularization,
    "iterations": iterations,
    "k_eval": k_eval,
    "swapped_detected": bool(SWAPPED),
}
config_path = ART / "run_config.json"
config_path.write_text(json.dumps(config, indent=2))

# Log to MLflow
mlflow.set_experiment("netflix-poc")
with mlflow.start_run(run_name="ALS_swap_safe"):
    for k, v in config.items():
        mlflow.log_param(k, v)
    mlflow.log_metric("precision_at_10", float(p_atk))
    mlflow.log_metric("recall_at_10", float(r_atk))
    mlflow.log_metric("map_at_10", float(map_atk))

    mlflow.log_artifact(str(model_path))
    mlflow.log_artifact(str(users_map_path))
    mlflow.log_artifact(str(items_map_path))
    mlflow.log_artifact(str(config_path))

print("Artifacts saved to:", ART)


🏃 View run ALS_swap_safe at: http://mlflow:5001/#/experiments/1/runs/1090ab7632f34bc88dc0c12154d32691
🧪 View experiment at: http://mlflow:5001/#/experiments/1
Artifacts saved to: /workspace/artifacts


In [8]:

# Sample recommendations (titles)
rng = np.random.default_rng(7)
sample_user = int(rng.choice(list(truth.keys())) if len(truth) else 0)
recs_idx = recommend_manual(sample_user, N=10)

ix = items_map.set_index("item_index")
titles = ix.loc[[i for i in recs_idx if i in ix.index]]["title"].fillna("(no title)").tolist()

print("Sample user index:", sample_user)
print("Recommended titles:")
for t in titles:
    print("-", t)


Sample user index: 574
Recommended titles:
- Who Framed Roger Rabbit? (1988)
- Poltergeist (1982)
- There's Something About Mary (1998)
- Ace Ventura: Pet Detective (1994)
- Wallace & Gromit: The Wrong Trousers (1993)
- Back to the Future (1985)
- Ghostbusters (a.k.a. Ghost Busters) (1984)
- Man on the Moon (1999)
- As Good as It Gets (1997)
- Doors, The (1991)
