# Movie Recommendation AI Ising Model

### Testing Parameters

In [None]:
USER_COUNT: int = 1000          # Number of users
MOVIE_COUNT: int = 250          # Number of movies
C_VALUE: float = 0.045          # Regularization constant
MINIMUM_RATING: float = 4.0  # Minimum rating to classify movie as liked

### Start of Code

In [4]:
import os
import zipfile
import urllib.request
import ssl
import pandas as pd
import numpy as np
import pathlib as p
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from datetime import datetime

# pyGMs library
import pyGMs as gm
import pyGMs.ising
import pyGMs.wmb

# removes deprecation warnings
import warnings
warnings.filterwarnings('ignore')

%matplotlib inline


### Downloading the Dataset

In [None]:
dataset_url: str = "https://files.grouplens.org/datasets/movielens/ml-latest.zip"
zip_file: str = "ml-latest.zip"
extract_folder: p.Path = p.Path("model/data")

# Mac workaround (only if needed)
ssl._create_default_https_context = ssl._create_unverified_context

if not os.path.exists(extract_folder):
    print("Downloading MovieLens dataset...")
    urllib.request.urlretrieve(dataset_url, zip_file)
    print("Extracting...")
    with zipfile.ZipFile(zip_file, 'r') as zip_ref:
        for member in zip_ref.namelist():
            filename = os.path.basename(member)
            if not filename:
                continue  # skip directories
            source = zip_ref.open(member)
            target_path = os.path.join(extract_folder, filename)
            with open(target_path, "wb") as target:
                target.write(source.read())    
    print("Done.")
else:
    print("Dataset already exists.")


Downloading MovieLens dataset...
Extracting...
Done.


### Save User Data

In [4]:
RUN_HISTORY_FILENAME: str = "run_history.log"
LABEL_WIDTH = 40
VALUE_WIDTH = 20
written_model_headers: set[str] = set()
def log_run_header(user_count: int, movie_count: int, c_value: float, filename: str = RUN_HISTORY_FILENAME):
    timestamp: str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    with open(filename, "a") as f:
        f.write("\n" + "=" * (LABEL_WIDTH + VALUE_WIDTH) + "\n")
        f.write(f"{'u_count:':<{LABEL_WIDTH}}{user_count:>{VALUE_WIDTH}}\n")
        f.write(f"{'m_count:':<{LABEL_WIDTH}}{movie_count:>{VALUE_WIDTH}}\n")
        f.write(f"{'c_value:':<{LABEL_WIDTH}}{c_value:>{VALUE_WIDTH}.4f}\n")
        f.write(f"{'Run at:':<{LABEL_WIDTH}}{timestamp:>{VALUE_WIDTH}}\n")
        f.write("-" * (LABEL_WIDTH + VALUE_WIDTH) + "\n")

def save_run_data(model_type: str, output_dict: dict, filename=RUN_HISTORY_FILENAME):
    with open(filename, "a") as f:
        header = {
                    "independent": "Independent Model", 
                    "ising": "Ising Model"
                 }.get(model_type.lower(), "Unknown Model")
        if model_type not in written_model_headers:
            f.write(f"{header}\n")
            written_model_headers.add(model_type)
        for key, value in output_dict.items():
            formatted_value = f"{value:.4f}" if isinstance(value, float) else str(value)
            f.write(f"{key:<{LABEL_WIDTH}}{formatted_value:>{VALUE_WIDTH}}\n")

log_run_header(USER_COUNT, MOVIE_COUNT, C_VALUE)

In [None]:
ratings = pd.read_csv('ml-latest/ratings.csv')

# Filter top users/movies
top_users = ratings['userId'].value_counts().head(USER_COUNT).index
top_movies = ratings['movieId'].value_counts().head(MOVIE_COUNT).index

filtered = ratings[(ratings['userId'].isin(top_users)) & (ratings['movieId'].isin(top_movies))]
pivot = filtered.pivot(index='userId', columns='movieId', values='rating').fillna(0)

# Binary matrix: 1 if liked (rating >= MINIMUM_RATING), else 0
X = (pivot >= MINIMUM_RATING).astype(int).to_numpy()

In [None]:
movies = pd.read_csv(os.path.join(extract_folder, 'movies.csv'))
id_to_title = dict(zip(movies['movieId'], movies['title']))

# Build a short label dictionary for visualization
# short = {i: id_to_title[mid] for i, mid in enumerate(pivot.columns)}

Xtr, Xte = train_test_split(X, test_size=0.2, random_state=42)
nMovies = Xtr.shape[1]

### Independent Model (trivial)

In [7]:
pXi = np.mean(Xtr, axis=0)
model0 = gm.GraphModel([gm.Factor([gm.Var(i, 2)], [1 - pXi[i], pXi[i]]) for i in range(nMovies)]) # type: ignore

ind_train_ll = np.mean([model0.logValue(x) for x in Xtr])
ind_test_ll = np.mean([model0.logValue(x) for x in Xte])

save_run_data("independent", {"- Log-Likelihood (Train)" : float(ind_train_ll), 
                              "- Log-Likelihood (Test)" : float(ind_test_ll)})

print("Independent model Train LL:", ind_train_ll)
print("Independent model Test  LL:", ind_test_ll)


Independent model Train LL: -155.09829868656539
Independent model Test  LL: -155.68010474922153


### Start of Ising Model

In [8]:
from sklearn.linear_model import LogisticRegression

nbrs, th_ij, th_i = [None] * nMovies, [None] * nMovies, np.zeros((nMovies,))
Xtmp = np.copy(Xtr)

for i in range(nMovies):
    Xtmp[:, i] = 0.
    lr = LogisticRegression(penalty='l1', C=C_VALUE, solver='liblinear').fit(Xtmp, Xtr[:, i])
    nbrs[i] = np.where(np.abs(lr.coef_) > 1e-6)[1]
    th_ij[i] = lr.coef_[0, nbrs[i]] / 2.
    th_i[i] = lr.intercept_ / 2.
    Xtmp[:, i] = Xtr[:, i]

average_connectivity = np.mean([len(nn) for nn in nbrs])
std_dev_average_connectivity = np.std([len(nn) for nn in nbrs])

save_run_data("ising", {"- Average Connectivity" : f"{average_connectivity:.4f} +/- {std_dev_average_connectivity:.4f}"})

print("Average connectivity at C =", C_VALUE, ":", average_connectivity)

Average connectivity at C = 0.045 : 10.456


In [9]:
factors = [gm.Factor(gm.Var(i, 2), [-t, t]).exp() for i, t in enumerate(th_i)] # type: ignore
for i in range(nMovies):
    for j, n in enumerate(nbrs[i]):
        scope = [gm.Var(i, 2), gm.Var(int(n), 2)]
        t = th_ij[i][j]
        factors.append(gm.Factor(scope, [[t, -t], [-t, t]]).exp()) # type: ignore

model1 = gm.GraphModel(factors)
model1.makeMinimal()


In [10]:
# Print mapping of movie indices to titles
# print("Movie Index to Title Mapping:")
# print("-" * 40)
# for var in model1.vars:
#     print(f"Movie {var.label}: {short[var.label]}")
# print("-" * 40)

# Draw graph with numeric labels
# short_labels = {var.label: var.label for var in model1.vars}
# gm.drawMarkovGraph(model1, labels=short_labels)


In [11]:
def conditional(factor, i, x):
    return factor.t[tuple(x[v] if v != i else slice(v.states) for v in factor.vars)]

def pseudolikelihood(model, X):
    LL = np.zeros(X.shape)
    for i in range(X.shape[1]):  # for each variable (movie)
        flist = model.factorsWith(i, copy=False)
        for j in range(X.shape[0]):  # for each data point (user)
            pXi = 1.
            for f in flist:
                pXi *= conditional(f, i, X[j])
            LL[j, i] = np.log(pXi[X[j, i]] / pXi.sum()) # type: ignore
    return LL.sum(1)


In [12]:
pseudolikelihood_tr: float = float(pseudolikelihood(model1, Xtr).mean())
pseudolikelihood_te: float = float(pseudolikelihood(model1, Xte).mean())

save_run_data("ising", {"- Pseudo-Likelihood (Train)" : pseudolikelihood_tr, 
                        "- Pseudo-Likelihood (Test)" : pseudolikelihood_te})

print("Pseudo-likelihood (Train):", pseudolikelihood_tr)
print("Pseudo-likelihood (Test):", pseudolikelihood_te)

Pseudo-likelihood (Train): -136.74699397162544
Pseudo-likelihood (Test): -142.41137286198756


In [None]:
def impute_missing(model, Xobs):
    m,n = Xobs.shape
    Xhat = np.copy(Xobs)
    for j in range(m):
        x_obs = {i:Xobs[j,i] for i in range(n) if Xobs[j,i] >= 0}
        x_unobs = [i for i in range(n) if Xobs[j,i] < 0]
        cond = gm.GraphModel([f.condition(x_obs) for f in model.factorsWithAny(x_unobs)])
        for x in cond.X:
            if x.states == 0:
                x.states = 1  # fix a bug in GraphModel behavior for missing vars...
        jt = pyGMs.wmb.JTree(cond, weights=1e-6) # 0: for maximization
        x_hat = jt.argmax()
        for i in x_unobs: 
            Xhat[j,i] = x_hat[i]
    return Xhat

In [14]:
# Create a copy of Xte to simulate missing values
Xte_missing = np.copy(Xte)

# Amount of test data that will be hidden
missing_proportion = 0.2

# Random seed for reproducibility
np.random.seed(42)

# Boolean mask where True means that position is missing and apply it to Xte_missing
mask = np.random.rand(*Xte.shape) < missing_proportion

# Set the selected entries to a missing indicator
Xte_missing[mask] = -1

In [15]:
# Slow!  (Constructing lots of conditional models...)
Xte_hat = impute_missing(model1, Xte_missing)

# Compare the imputed values (Xte_hat) with the original true values (Xte)
error_rate = np.mean(Xte_hat[mask] != Xte[mask]) * 100

save_run_data("ising", {"- Error Rate" : f"{error_rate:.4f}%"})

print(f"Error Rate: {error_rate:.4f}")

Error Rate: 25.0125
