## **_Recommender System_**

### **_Introduction_**
This project uses data from Steam, an online video game distribution service to design a collaborative filtering recommendation model using ALS (Alternating Least Squares) algorithm. It utilizes the MLflow framework for tracking and logging, using PySpark for distributed data processing

### **_Objective_**
- The main objective of this project is to build a recommendation model for Steam games based on the implicit behaviour.  
- Explore the ``` steam.csv``` dataset. 
- Use MLlib to train a collaborative filtering recommender system on the provided data using ALS, tune hyperparameters, evaluate the model performance and explore some of the resulting
recommendations.

### _**Dataset Overview**_
Source: The dataset ```Steam.csv``` was loaded from Databrics FileStore. <br>
File Format: CSV <br>
Attributes: The dataset comprises of 4 columns which are;
- user_id: anonymous user identifier
- game_name: name of the game
- behavior: type of interaction ("play" or "purchase")
- value: hours played or purchase count<br>

Records: The dataset comprises of 200000 rows.<br>
Size: 7MB

### **_Set Up & Config_**
In this section, we set up the necessary configurations and import the libraries needed for the data processing, model training, hyperparameter tuning, and evaluation. These imports allow us to utilize PySpark's functionalities for building a recommendation system and MLflow for model tracking.

- ```import mlflow:``` Imports the MLflow library for managing the machine learning lifecycle.
- ```import mlflow.spark:``` Imports MLflow's Spark integration.
- ```import mlflow.pyspark.ml:``` Enables automatic logging of Spark MLlib models.
- ```import tempfile:``` Imports the tempfile module to create temporary directories.
- ```from pyspark.ml.recommendation import ALS:``` Imports the ALS algorithm for collaborative filtering.
- ```from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator:``` Imports classes for hyperparameter tuning and cross-validation.
- ```from pyspark.ml.evaluation import RegressionEvaluator:``` Imports the evaluator for regression models.
- ```from pyspark.ml.feature import StringIndexer:``` Imports the StringIndexer class to convert string columns to numerical indices.
- ```from pyspark.sql.functions import col, when, sum, count, avg, log10, expr:``` Imports various SQL functions for data manipulation.
- ```import logging:``` Imports the logging module to configure logging settings.
- ```logging.getLogger("mlflow").setLevel(logging.ERROR):``` Sets the logging level for MLflow to ERROR to suppress unnecessary logs.
- ```mlflow.pyspark.ml.autolog():``` Enables automatic logging of Spark MLlib models.
- ```SEED = 42:``` Sets the seed for reproducibility.
- ```RATING_SCALE = 5:``` Defines the rating scale.
- ```TRAIN_SPLIT = 0.85:``` Specifies the train-test split ratio.
- ```FOLDS = 3:``` Specifies the number of folds for cross-validation.

In [0]:
# === Set Up & Config ===
import mlflow
import mlflow.spark
import mlflow.pyspark.ml
import tempfile


from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import col,when, sum, count, avg, log10, expr

# Suppress unwanted MLflow logs
import logging
logging.getLogger("mlflow").setLevel(logging.ERROR)

# Enable automatic logging of PySpark models with MLflow
mlflow.pyspark.ml.autolog()


# === Configuration ===
SEED = 42
RATING_SCALE = 5
TRAIN_SPLIT = 0.85
FOLDS = 3


## _**Exploratory Data Analysis**_
This helps to understand the dataset, identify key patterns and trends, and prepare the data for further analysis and modeling. <br>
After loading the dataset,renaming columns and converting playtime values to integer, the EDA done were as follows; <br>
- Printing schema
- Row count
- Grouping by Behavior and Counting Records
- Identifying Most Played Games
- Identifying Top Players by Playtime
- Identifying Most Purchased Games
- Identifying Top Buyers
- Calculating Mean Number of Purchased Games per User
- Calculating Mean Play Hours per User

In [0]:
# === Load Dataset ===
steam_df = spark.read.csv("/FileStore/tables/steam_200k.csv", header=False, inferSchema=True)

# Rename columns for clarity
steam_df = steam_df.withColumnRenamed("_c0", "user_id") \
                   .withColumnRenamed("_c1", "game_name") \
                   .withColumnRenamed("_c2", "behavior") \
                   .withColumnRenamed("_c3", "value")

steam_df = steam_df.withColumn("value", 
    when(col("behavior") == "play", col("value").cast("int")).otherwise(col("value"))
)
steam_df.printSchema()
steam_df.count()

root
 |-- user_id: integer (nullable = true)
 |-- game_name: string (nullable = true)
 |-- behavior: string (nullable = true)
 |-- value: double (nullable = true)



200000

In [0]:
# 1. Count of Plays and Purchases
behavior_records_df = steam_df.groupBy("behavior").count()
display(behavior_records_df)

# 2. Top 10 Most Played Games
most_played_games_df = steam_df.filter(col("behavior") == "play") \
    .groupBy("game_name") \
    .agg(sum("value").alias("total_play_time")) \
    .orderBy(col("total_play_time").desc()) \
    .limit(10)

display(most_played_games_df)

# 3. Top 10 Players Based on Play Time
top_players_play_df = steam_df.filter(col("behavior") == "play") \
    .groupBy("user_id") \
    .agg(
        sum("value").alias("total_play_time"),
        count("behavior").alias("play_count")
    ) \
    .orderBy(col("total_play_time").desc()) \
    .limit(10)

display(top_players_play_df)

# 4. Top 10 Most Purchased Games
most_purchased_games_df = steam_df.filter(col("behavior") == "purchase") \
    .groupBy("game_name") \
    .agg(sum("value").alias("total_purchase")) \
    .orderBy(col("total_purchase").desc()) \
    .limit(10)

display(most_purchased_games_df)

# 5. Top 10 Players Based on Number of Purchased Games
top_buyers_df = steam_df.filter(col("behavior") == "purchase") \
    .groupBy("user_id") \
    .agg(sum("value").alias("total_purchase")) \
    .orderBy(col("total_purchase").desc()) \
    .limit(10)

display(top_buyers_df)

# 6. Mean Number of Purchased Games Per User
steam_purchase_df = steam_df.filter(col("behavior") == "purchase")
purchase_per_user_df = steam_purchase_df.groupBy("user_id").count()
mean_purchased_games = purchase_per_user_df.agg(avg("count")).collect()[0][0]

print(f"\nMean number of purchased games per user: {mean_purchased_games:.2f}")

# 7. Mean Play Time Per User in Hours
steam_play_df = steam_df.filter(col("behavior") == "play")
play_hours_per_user_df = steam_play_df.withColumn("play_hours", col("value").cast("float")) \
    .groupBy("user_id") \
    .agg(sum("play_hours").alias("total_play_hours"))

mean_play_hours = play_hours_per_user_df.agg(avg("total_play_hours")).collect()[0][0]

print(f"Mean play hours per user: {mean_play_hours:.2f}")


behavior,count
purchase,129511
play,70489


Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

game_name,total_play_time
Dota 2,980325.0
Counter-Strike Global Offensive,322564.0
Team Fortress 2,172958.0
Counter-Strike,134120.0
Sid Meier's Civilization V,99738.0
Counter-Strike Source,95911.0
The Elder Scrolls V Skyrim,70779.0
Garry's Mod,49578.0
Call of Duty Modern Warfare 2 - Multiplayer,41966.0
Left 4 Dead 2,33356.0


user_id,total_play_time,play_count
73017395,11754.0,1
10599862,11607.0,178
100630947,10852.0,6
26762388,10431.0,178
153382649,9640.0,1
43684632,9509.0,122
52731290,9413.0,25
48798067,9340.0,254
42935819,8172.0,7
14544587,8118.0,57


game_name,total_purchase
Dota 2,4841.0
Team Fortress 2,2323.0
Unturned,1563.0
Counter-Strike Global Offensive,1412.0
Half-Life 2 Lost Coast,981.0
Counter-Strike Source,978.0
Left 4 Dead 2,951.0
Counter-Strike,856.0
Warframe,847.0
Half-Life 2 Deathmatch,823.0


user_id,total_purchase
62990992,1075.0
33865373,783.0
30246419,766.0
58345543,667.0
76892907,597.0
20772968,595.0
11403772,592.0
64787956,591.0
22301321,568.0
47457723,557.0



Mean number of purchased games per user: 10.45
Mean play hours per user: 301.54


## _**Feature Extraction/Engineering**_ <br>
To normalize playtime and create a rating scale, we use a log transformation. This helps handle the wide range of playtime values and ensures that the ratings are on a consistent scale. The formula used for normalization is: <br>

Rating = ((log base 10 of playtime + 1 - minimum log-transformed playtime) / (maximum log-transformed playtime - minimum log-transformed playtime)) * rating scale <br>

Here, min_log and max_log are the minimum and maximum log-transformed playtime values, respectively. The RATING_SCALE is set to 5. <br>

We also convert user IDs and game names to numerical indices using StringIndexer. This is necessary for the ALS algorithm, which requires numerical input for users and items. 


- ```filter(col("behavior") == "play"):``` Filters the DataFrame to include only rows where the behavior is "play".
- ```withColumn("play_time", col("value").cast("double")):``` Converts the value column to double type and renames it to play_time.
- ```withColumn("log_play_time", log10("play_time") + 1):``` Applies a log base 10 transformation to the play_time column and adds 1 to handle zero values.
- ```selectExpr("min(log_play_time) as min_log", "max(log_play_time) as max_log"):``` Selects the minimum and maximum log-transformed playtime values.
- ```collect()[0]:``` Collects the results to the driver and accesses the first row.
- ```withColumn("Rating", expr(f"((log_play_time - {min_log}) / ({max_log} - {min_log})) * {RATING_SCALE}").cast("int")):``` Normalizes the log-transformed playtime values to create a rating scale and converts it to integer type.
- ```StringIndexer(inputCol="user_id", outputCol="UserIndex"):```Initializes the StringIndexer to convert user_id to numerical indices.
- ```StringIndexer(inputCol="game_name", outputCol="GameIndex"):``` Initializes the StringIndexer to convert game_name to numerical indices.
- ```fit(steam_play_df):``` Fits the indexer model to the DataFrame.
- ```transform(steam_play_df):```Transforms the DataFrame using the fitted model.


In [0]:
# === Preprocessing (Filter, Log Scale, Normalize) ===

# Assuming RATING_SCALE is 5
RATING_SCALE = 5

# Filter play behavior and calculate log of play time
steam_play_df = steam_df.filter(col("behavior") == "play") \
                        .withColumn("play_time", col("value").cast("double")) \
                        .withColumn("log_play_time", log10("play_time") + 1)

# Get the min and max of the log_play_time to normalize
stats = steam_play_df.selectExpr("min(log_play_time) as min_log", "max(log_play_time) as max_log").collect()[0]
min_log, max_log = stats["min_log"], stats["max_log"]

# Normalize log_play_time to 0-5 scale
steam_play_df = steam_play_df.withColumn(
    "Rating", 
    expr(f"((log_play_time - {min_log}) / ({max_log} - {min_log})) * {RATING_SCALE}").cast("int")
)

# Show some rows to verify
display(steam_play_df.limit(10))


user_id,game_name,behavior,value,play_time,log_play_time,Rating
151603712,The Elder Scrolls V Skyrim,play,273.0,273.0,3.436162647040756,2
151603712,Fallout 4,play,87.0,87.0,2.9395192526186182,2
151603712,Spore,play,14.0,14.0,2.146128035678238,1
151603712,Fallout New Vegas,play,12.0,12.0,2.079181246047625,1
151603712,Left 4 Dead 2,play,8.0,8.0,1.9030899869919435,1
151603712,HuniePop,play,8.0,8.0,1.9030899869919435,1
151603712,Path of Exile,play,8.0,8.0,1.9030899869919435,1
151603712,Poly Bridge,play,7.0,7.0,1.845098040014257,1
151603712,Left 4 Dead,play,3.0,3.0,1.4771212547196624,0
151603712,Team Fortress 2,play,2.0,2.0,1.3010299956639813,0


In [0]:
# === Indexing User & Game ===
# Index for user_id
user_indexer = StringIndexer(inputCol="user_id", outputCol="UserIndex")

# Index for game_name
game_indexer = StringIndexer(inputCol="game_name", outputCol="GameIndex")

# Apply transformations to the DataFrame
steam_play_df = user_indexer.fit(steam_play_df).transform(steam_play_df)
steam_play_df = game_indexer.fit(steam_play_df).transform(steam_play_df)

# Prepare ratings DataFrame
ratings_df = steam_play_df.select("UserIndex", "GameIndex", "Rating")

# Display the ratings DataFrame
ratings_df.display()

🏃 View run suave-bee-831 at: https://community.cloud.databricks.com/ml/experiments/3589594743869666/runs/d67372996bb04c1386b6179004f55277
🧪 View experiment at: https://community.cloud.databricks.com/ml/experiments/3589594743869666
🏃 View run hilarious-goose-511 at: https://community.cloud.databricks.com/ml/experiments/3589594743869666/runs/d3469406602f433ca89030457fba24b4
🧪 View experiment at: https://community.cloud.databricks.com/ml/experiments/3589594743869666


UserIndex,GameIndex,Rating
585.0,6.0,2.0
585.0,64.0,2.0
585.0,247.0,1.0
585.0,23.0,1.0
585.0,4.0,1.0
585.0,628.0,1.0
585.0,49.0,1.0
585.0,914.0,1.0
585.0,36.0,0.0
585.0,1.0,0.0


## _**Model Training**_

We split the data into training and test sets, ensuring that only records with non-null ratings are included. We then train the ALS model, which is designed for collaborative filtering and can handle large-scale datasets efficiently.

- ```randomSplit([TRAIN_SPLIT, 1-TRAIN_SPLIT], seed=SEED):``` Splits the DataFrame into training and test sets based on the specified ratio and seed.
- ```filter(training_df['Rating'].isNotNull()):``` Filters the training DataFrame to include only rows with non-null ratings.
- ```filter(test_df['Rating'].isNotNull()):``` Filters the test DataFrame to include only rows with non-null ratings.
- ```filter((col("rating").isNotNull()) & (~col("rating").isNaN())):``` Filters the DataFrame to include only rows with non-null and non-NaN ratings.
- ```ALS(userCol="UserIndex", itemCol="GameIndex", ratingCol="Rating", coldStartStrategy="drop"):``` Initializes the ALS model with specified parameters. The coldStartStrategy="drop" parameter ensures that rows with missing predictions are dropped.

In [0]:
# Split dataset into training and test datasets
(training_df, test_df) = ratings_df.randomSplit([TRAIN_SPLIT, 1-TRAIN_SPLIT], seed=SEED)

# Remove rows with null or NaN ratings
training_df = training_df.filter(training_df['Rating'].isNotNull())
test_df = test_df.filter(test_df['Rating'].isNotNull())

# Remove rows with null or NaN in rating
ratings_df_clean = ratings_df.filter(
    (col("rating").isNotNull()) & (~col("rating").isNaN()))
    
# === ALS Setup ===
als = ALS(userCol="UserIndex", itemCol="GameIndex", ratingCol="Rating", coldStartStrategy="drop")


## _**Hyperparameter Tuning**_
To find the best hyperparameters for the ALS model, we use TrainValidationSplit. This involves defining a parameter grid with different values for rank, maximum iterations, and regularization parameter. The model is evaluated using RMSE (Root Mean Squared Error) to select the best combination of hyperparameters.

- ```ParamGridBuilder():``` Initializes the parameter grid builder.
- ```addGrid(als.rank, [18,20,22]):``` Adds a grid of values for the rank parameter.
- ```addGrid(als.maxIter, [5, 10]):``` Adds a grid of values for the maxIter parameter.
- ```addGrid(als.regParam, [0.15,0.2,0.25]):``` Adds a grid of values for the regParam parameter.
- ```RegressionEvaluator(metricName="rmse", labelCol="Rating", predictionCol="prediction"):``` Initializes the evaluator with RMSE as the metric, and specifies the label and prediction columns.
- ```TrainValidationSplit(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, trainRatio=TRAIN_SPLIT, seed=SEED, parallelism=2):``` Initializes the train-validation split with the ALS model, parameter grid, evaluator, train ratio, seed, and parallelism.
- ```mlflow.start_run(run_name="ALS_Model_TrainValidation"):``` Starts a new MLflow run with the specified name.
- ```train_validation_split.fit(training_df):``` Fits the train-validation split model to the training data.
- ```best_model = tvs_model.bestModel:``` Retrieves the best model from the train-validation split.
- ```mlflow.log_params({...})```: Logs the parameter values used in the grid search.
- ```mlflow.log_param("best_rank", best_model.rank):``` Logs the best model rank.

In [0]:
# === ALS & Hyperparameter Tuning ===
param_grid = ParamGridBuilder() \
    .addGrid(als.rank, [18,20,22]) \
    .addGrid(als.maxIter, [5, 10]) \
    .addGrid(als.regParam, [0.15,0.2,0.25]) \
    .build()

# Set up RMSE evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="Rating", predictionCol="prediction")

train_validation_split = TrainValidationSplit(
    estimator=als,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    trainRatio=TRAIN_SPLIT,
    seed=SEED,
    parallelism=2
)


In [0]:

# === Model Evaluation & Logging ===
if mlflow.active_run():
    mlflow.end_run()

with mlflow.start_run(run_name="ALS_Model_TrainValidation"):
    # Fit the TrainValidationSplit model
    tvs_model = train_validation_split.fit(training_df)
    best_model = tvs_model.bestModel

    # Log parameters used in the grid search
    mlflow.log_params({
        "rank_values": [18, 20, 22],
        "maxIter_values": [5, 10],
        "regParam_values": [0.15, 0.2, 0.25],
        "train_split": TRAIN_SPLIT
    })

    # Log the best model's hyperparameters
    mlflow.log_param("best_rank", best_model.rank)
    mlflow.log_param("best_maxIter", best_model._java_obj.parent().getMaxIter())
    mlflow.log_param("best_regParam", best_model._java_obj.parent().getRegParam())

    # Evaluate model on the test set
    test_predictions = best_model.transform(test_df)
    rmse = evaluator.evaluate(test_predictions)
    mlflow.log_metric("rmse", rmse)


    # Log the best model
    with tempfile.TemporaryDirectory() as tmp_dir:
        mlflow.spark.log_model(
            spark_model=best_model,
            artifact_path="model",
            dfs_tmpdir=tmp_dir,
            pip_requirements=["mlflow", "pyspark", "databricks-feature-engineering"]
        )

    print(f"Best Model RMSE on Test Data: {rmse}")



Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Best Model RMSE on Test Data: 0.8063777092246011


### **_Model Evaluation_**
We further evaluate the model using CrossValidator to ensure robustness. This involves performing cross-validation with multiple folds and parallelism to find the best model parameters. The final model is then evaluated on the test data, and RMSE is logged. <br>

- ```best_model_params = tvs_model.bestModel.extractParamMap():``` Extracts the parameter map of the best model from the train-validation split.
- ```best_rank = best_model.rank:``` Retrieves the best rank parameter.
- ```best_reg = best_model._java_obj.parent().getRegParam():``` Retrieves the best regularization parameter.
- ```addGrid(als.rank, [best_rank - 1, best_rank, best_rank + 1]):``` Adds a grid of values for the rank parameter around the best rank.
- ```addGrid(als.regParam, [best_reg]):``` Adds a grid of values for the regParam parameter around the best regularization parameter.-
- ```CrossValidator(estimator=als, estimatorParamMaps=param_grid_cvs, evaluator=evaluator, numFolds=FOLDS, parallelism=2):``` Initializes the cross-validator with the ALS model, parameter grid, evaluator, number of folds, and parallelism.
- ```mlflow.start_run(run_name="ALS_Model_CrossValidation"):``` Starts a new MLflow run with the specified name.
- ```cross_validator.fit(ratings_df_clean):``` Fits the cross-validator model to the cleaned ratings data.
- ```final_model = cvs_model.bestModel:``` Retrieves the best model from the cross-validation.
- ```mlflow.log_params({...}):``` Logs the parameter values used in the cross-validation.
- ```predictions = final_model.transform(final_test_df):``` Transforms the final test data using the best model.
- ```predictions_clean = predictions.filter(col("prediction").isNotNull() & col("rating").isNotNull()):``` Filters the predictions to include only rows with non-null predictions and ratings.
- ```final_rmse = evaluator.evaluate(predictions_clean):``` Evaluates the cleaned predictions using RMSE.
- ```mlflow.log_metric("rmse", final_rmse):``` Logs the final RMSE metric.
- ```with tempfile.TemporaryDirectory() as tmp_dir:``` Creates a temporary directory.
- ```mlflow.spark.log_model(spark_model=final_model, artifact_path="best_cvs_model", dfs_tmpdir=tmp_dir, pip_requirements=["mlflow", "pyspark", "databricks-feature-engineering"]):``` Logs the best model to MLflow.

In [0]:
 #=== CrossValidator Refinement ===
best_model_params = tvs_model.bestModel.extractParamMap()
best_rank = best_model.rank
best_reg = best_model._java_obj.parent().getRegParam()

# Refine parameter grid around best values from TrainValidationSplit
param_grid_cvs = ParamGridBuilder() \
    .addGrid(als.rank, [best_rank - 1, best_rank, best_rank + 1]) \
    .addGrid(als.regParam, [best_reg]) \
    .build()

# Set up CrossValidator
cross_validator = CrossValidator(
    estimator=als,
    estimatorParamMaps=param_grid_cvs,
    evaluator=evaluator,
    numFolds=FOLDS,
    parallelism=2
)

# === CrossValidator & Final Evaluation ===
if mlflow.active_run():
    mlflow.end_run()

with mlflow.start_run(run_name="ALS_Model_CrossValidation"):
    cvs_model = cross_validator.fit(ratings_df_clean)
    final_model = cvs_model.bestModel

    # Log cross-validation parameters
    mlflow.log_params({
        "best_rank": best_rank,
        "best_reg_param": best_reg,
        "num_folds": FOLDS,
        "parallelism": 2
    })

    # Split data again for final evaluation
    final_training_df, final_test_df = ratings_df.randomSplit([TRAIN_SPLIT, 1 - TRAIN_SPLIT], seed=SEED)
    predictions = final_model.transform(final_test_df)
    
    # Drop rows where prediction or label is null
    predictions_clean = predictions.filter(col("prediction").isNotNull() & col("rating").isNotNull())
    
    # Now evaluate
    final_rmse = evaluator.evaluate(predictions_clean)
    

    # Log final RMSE metric
    mlflow.log_metric("rmse", final_rmse)

    # Log best model from CrossValidator
    with tempfile.TemporaryDirectory() as tmp_dir:
        mlflow.spark.log_model(
            spark_model=final_model,
            artifact_path="best_cvs_model",
            dfs_tmpdir=tmp_dir,
            pip_requirements=["mlflow", "pyspark", "databricks-feature-engineering"]
        )

    print(f"Final RMSE on Test Data from CrossValidation: {final_rmse}")

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Final RMSE on Test Data from CrossValidation: 0.4792681591875734


### **_Generating Recommendations_**
Finally, we use the trained model to generate recommendations for users. We map the numerical indices back to user IDs and game names to provide meaningful recommendations. The predictions are displayed to show the recommended games for each user.<br>

- ```logged_model = 'runs:/655a8ffd110d4c25b55a00fe838cf736/best_cvs_model':``` Specifies the path to the logged model in MLflow.
- ```mlflow.spark.load_model(logged_model):``` Loads the model from MLflow.
- ```recommender_df = loaded_model.transform(test_df):``` Transforms the test data using the loaded model to generate recommendations.
- ```user_map = steam_play_df.select("UserIndex", "user_id").distinct():``` Creates a mapping of user indices to user IDs.
- ```game_map = steam_play_df.select("GameIndex", "game_name").distinct():``` Creates a mapping of game indices to game names.
- ```final_preds = loaded_model.transform(test_df).join(user_map, on="UserIndex", how="left").join(game_map, on="GameIndex", how="left"):``` Transforms the test data using the loaded model and joins the user and game mappings to provide meaningful recommendations.


In [0]:
logged_model = 'runs:/655a8ffd110d4c25b55a00fe838cf736/best_cvs_model'

# Load model
loaded_model = mlflow.spark.load_model(logged_model)  # or use mlflow.pyfunc.load_model() if it's a general model

# Perform inference via model.transform()
recommender_df = loaded_model.transform(test_df)


Downloading artifacts:   0%|          | 0/1 [00:00<?, ?it/s]

Downloading artifacts:   0%|          | 0/42 [00:00<?, ?it/s]

In [0]:
# Create mapping DataFrames
user_map = steam_play_df.select("UserIndex", "user_id").distinct()
game_map = steam_play_df.select("GameIndex", "game_name").distinct()

# Join to add original names
final_preds = loaded_model.transform(test_df) \
    .join(user_map, on="UserIndex", how="left") \
    .join(game_map, on="GameIndex", how="left")

# Show results
final_preds.select("user_id", "game_name", "Rating", "prediction").show(10, truncate=False)

recommender_df.display()

+--------+-------------------------------+------+-----------+
|user_id |game_name                      |Rating|prediction |
+--------+-------------------------------+------+-----------+
|11403772|The Elder Scrolls V Skyrim     |1     |0.7283304  |
|11403772|Sid Meier's Civilization V     |1     |0.7405041  |
|11403772|Rust                           |1     |0.5181701  |
|11403772|Dead Island                    |1     |0.3832733  |
|11403772|Half-Life 2 Episode One        |0     |0.041552432|
|11403772|Borderlands                    |1     |0.56124717 |
|11403772|BioShock                       |1     |0.2412867  |
|11403772|Half-Life 2 Episode Two        |0     |0.17691106 |
|11403772|Warhammer 40,000 Dawn of War II|0     |0.46816033 |
|11403772|Mafia II                       |1     |0.512774   |
+--------+-------------------------------+------+-----------+
only showing top 10 rows



user_id,game_name,Rating,prediction
11403772,The Elder Scrolls V Skyrim,1,0.7283304
11403772,Sid Meier's Civilization V,1,0.7405041
11403772,Rust,1,0.5181701
11403772,Dead Island,1,0.3832733
11403772,Half-Life 2 Episode One,0,0.041552432
11403772,Borderlands,1,0.56124717
11403772,BioShock,1,0.2412867
11403772,Half-Life 2 Episode Two,0,0.17691106
11403772,"Warhammer 40,000 Dawn of War II",0,0.46816033
11403772,Mafia II,1,0.512774
