In [1]:
import warnings
warnings.filterwarnings('ignore')

# Connect to Hive

In [2]:
from pyspark.sql import SparkSession

# Add here your team number teamx
team = 'team27'

# location of your Hive database in HDFS
warehouse = "project/hive/warehouse"

spark = SparkSession.builder\
        .appName("{} - spark ML".format(team))\
        .master("yarn")\
        .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883")\
        .config("spark.sql.warehouse.dir", warehouse)\
        .config("spark.sql.avro.compression.codec", "snappy")\
        .enableHiveSupport()\
        .getOrCreate()

In [3]:
spark

# list Hive databases

In [4]:
print(spark.catalog.listDatabases())
spark.sql("SHOW DATABASES;").show()

[Database(name='default', description='Default Hive database', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/apps/hive/warehouse'), Database(name='root_db', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/root/root_db'), Database(name='team0_projectdb', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/team0/project/hive/warehouse'), Database(name='team12_hive_projectdb', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/team12/project/hive/warehouse'), Database(name='team13_projectdb', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/team13/project/hive/warehouse'), Database(name='team14_projectdb', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/team14/project/hive/warehouse'), Database(name='team15_projectdb', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/team15/project/hive/warehouse'), Database(name='team16_projectdb', description

In [5]:
print(spark.catalog.listTables("team27_projectdb"))

[Table(name='authors', database='team27_projectdb', description=None, tableType='EXTERNAL', isTemporary=False), Table(name='q1_results', database='team27_projectdb', description=None, tableType='EXTERNAL', isTemporary=False), Table(name='q2_results', database='team27_projectdb', description=None, tableType='EXTERNAL', isTemporary=False), Table(name='q3_results', database='team27_projectdb', description=None, tableType='EXTERNAL', isTemporary=False), Table(name='q4_results', database='team27_projectdb', description=None, tableType='EXTERNAL', isTemporary=False), Table(name='q5_results', database='team27_projectdb', description=None, tableType='EXTERNAL', isTemporary=False), Table(name='reviews', database='team27_projectdb', description=None, tableType='EXTERNAL', isTemporary=False)]


# Read hive tables

In [6]:
authors = spark.read.format("avro").table('team27_projectdb.authors')

reviews = spark.read.format("avro").table('team27_projectdb.reviews')

In [7]:
authors.show(5)

+-----------------+---------+-----------+
|          steamid|num_games|num_reviews|
+-----------------+---------+-----------+
|76561198017400473|      239|        191|
|76561198012915042|      382|        168|
|76561198057849347|      394|        146|
|76561198138147736|      366|         80|
|76561198352226102|      164|         70|
+-----------------+---------+-----------+
only showing top 5 rows



In [8]:
reviews.show(5)

+--------+------+--------------+---------+--------+--------------------+-----------------+-----------------+-------------+-----------+-------------------+-------------+--------------+-----------------+---------------------------+-----------------+----------------+-----------------------+------------------+-----------+-----------+
|      id|app_id|       appname|review_id|language|              review|time_date_created|time_date_updated|votes_helpful|votes_funny|weighted_vote_score|comment_count|steam_purchase|received_for_free|written_during_early_access|   author_steamid|playtime_forever|playtime_last_two_weeks|playtime_at_review|last_played|recommended|
+--------+------+--------------+---------+--------+--------------------+-----------------+-----------------+-------------+-----------+-------------------+-------------+--------------+-----------------+---------------------------+-----------------+----------------+-----------------------+------------------+-----------+-----------+
|202

In [9]:
reviews.show(1, truncate=False, vertical=True)

-RECORD 0-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 id                          | 20270143                                                                                                                                                                                                                                                                                                                                                                                                                                            

In [10]:
print("Size of `authors` dataset:", (authors.count(), len(authors.columns)))

Size of `authors` dataset: (10000, 3)


In [11]:
print("Size of `reviews` dataset:", (reviews.count(), len(reviews.columns)))

Size of `reviews` dataset: (203518, 21)


In [12]:
data = reviews.join(authors, on=reviews.author_steamid == authors.steamid, how='left')
data.show(5)

+--------+------+--------------+---------+--------+--------------------+-----------------+-----------------+-------------+-----------+-------------------+-------------+--------------+-----------------+---------------------------+-----------------+----------------+-----------------------+------------------+-----------+-----------+-----------------+---------+-----------+
|      id|app_id|       appname|review_id|language|              review|time_date_created|time_date_updated|votes_helpful|votes_funny|weighted_vote_score|comment_count|steam_purchase|received_for_free|written_during_early_access|   author_steamid|playtime_forever|playtime_last_two_weeks|playtime_at_review|last_played|recommended|          steamid|num_games|num_reviews|
+--------+------+--------------+---------+--------+--------------------+-----------------+-----------------+-------------+-----------+-------------------+-------------+--------------+-----------------+---------------------------+-----------------+---------

In [13]:
data.show(1, vertical=True)

-RECORD 0-------------------------------------------
 id                          | 20270143             
 app_id                      | 524220               
 appname                     | NieR:Automata™       
 review_id                   | 85124293             
 language                    | english              
 review                      | This game is so b... 
 time_date_created           | 2021-01-22           
 time_date_updated           | 2021-01-22           
 votes_helpful               | 0                    
 votes_funny                 | 0                    
 weighted_vote_score         | 0.4330708384513855   
 comment_count               | 0                    
 steam_purchase              | true                 
 received_for_free           | false                
 written_during_early_access | false                
 author_steamid              | 76561198838897166    
 playtime_forever            | 128.0                
 playtime_last_two_weeks     | 0.0            

In [14]:
data = data.fillna({
    'review': '',
    'playtime_at_review': 0,
    'playtime_forever': 0,
    'playtime_last_two_weeks': 0,
})

In [15]:
target = 'recommended' # rating

data = data.withColumn(target, data[target].cast('integer'))

In [16]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.feature import MinMaxScaler

# creating the pipeline
user_indexer = StringIndexer(inputCol='steamid', outputCol="user_id", handleInvalid="keep")
item_indexer = StringIndexer(inputCol='app_id', outputCol="item_id", handleInvalid="keep")

pipeline = Pipeline(stages=[user_indexer, item_indexer])
pipeline_model = pipeline.fit(data)

transformed_df = pipeline_model.transform(data).select(['user_id', 'item_id', 'recommended'])
transformed_df.show(5)

+-------+-------+-----------+
|user_id|item_id|recommended|
+-------+-------+-----------+
| 9633.0|   49.0|          0|
| 1777.0|   49.0|          0|
| 7888.0|   49.0|          0|
| 7266.0|   49.0|          0|
| 1142.0|   49.0|          0|
+-------+-------+-----------+
only showing top 5 rows



## Split the dataset

In [17]:
(training_df, testing_df) = transformed_df.randomSplit([0.8, 0.2], seed=42)

print("Train data size:", training_df.count())
print("Test data size:", testing_df.count())

import os
def run(command):
    return os.popen(command).read()

training_df.select("user_id", "item_id", "recommended")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("json")\
    .save("project/data/train")

# Run it from root directory of the repository
run("hdfs dfs -cat project/data/train/*.json > data/train.json")

testing_df.select("user_id", "item_id", "recommended")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("json")\
    .save("project/data/test")

# Run it from root directory of the repository
run("hdfs dfs -cat project/data/test/*.json > data/test.json")

Train data size: 162716
Test data size: 40802


''

## ML modeling

For this project we will use ALS from pyspark and model using SVD

In [18]:
from pyspark.ml.recommendation import ALS

als = ALS(
    maxIter=5,
    rank=10,
    regParam=0.01,
    userCol="user_id",
    itemCol="item_id",
    ratingCol=target,
    coldStartStrategy="drop",
    seed=42,
)

model = als.fit(training_df)

In [19]:
# Make predictions on the test data
predictions = model.transform(testing_df)

predictions.show(10)

+-------+-------+-----------+------------+
|user_id|item_id|recommended|  prediction|
+-------+-------+-----------+------------+
|   11.0|   92.0|          0|   0.5735925|
|   22.0|   60.0|          0|   0.6468688|
|   28.0|   49.0|          0|  0.28481925|
|   50.0|   73.0|          0|  0.49654585|
|   78.0|   60.0|          0|   0.9384319|
|   83.0|   92.0|          0|  0.60724956|
|   93.0|   73.0|          0|  -0.3578568|
|  131.0|   92.0|          0|   0.5564618|
|  152.0|   50.0|          0|  0.47348535|
|  153.0|   50.0|          0|-0.026946994|
+-------+-------+-----------+------------+
only showing top 10 rows



In [20]:
# add calc_metrics and run functions
from pyspark.sql.functions import col, expr
from pyspark.mllib.evaluation import RankingMetrics
from pyspark.ml.evaluation import RegressionEvaluator

def calc_metrics(model, predictions, flat_recommendations=True, print_recommendations=True):
    # calculate RMSE
    evaluator = RegressionEvaluator(
        metricName="rmse", labelCol="recommended", predictionCol="prediction"
    )
    
    # calculate Mean Average Precision and NDCG
    app_recommendations = model.recommendForAllUsers(10)

    # Create ground truth DataFrame containing actual interactions
    ground_truth = testing_df.filter(col(target) == 1).groupBy("user_id") \
        .agg(expr("collect_set(item_id) as actual_apps"))

    # Join recommendations with ground truth
    recommendations_with_truth = app_recommendations.join(
        ground_truth,
        ground_truth.user_id == app_recommendations.user_id,
        "inner"
    )
    recommendations_with_truth = recommendations_with_truth.select(
        [app_recommendations.user_id, 'recommendations', 'actual_apps']
    )

    if flat_recommendations:
        # Convert recommendations and actual_apps to lists
        recommendations_with_truth = recommendations_with_truth.rdd.map(
            lambda row: (
                row.user_id,
                [r.item_id for r in row.recommendations],
                row.actual_apps
            )
        ).toDF(["user_id", "recommendations", "actual_apps"])
    
    if print_recommendations:
        recommendations_with_truth.show(10, truncate=False)

    # Compute RankingMetrics
    metrics = RankingMetrics(recommendations_with_truth.rdd.map(
        lambda row: (row.recommendations, row.actual_apps))
    )
    
    # calculate all metrics
    rmse = evaluator.evaluate(predictions)
    MAP = metrics.meanAveragePrecision
    ndcg = metrics.ndcgAt(10)
    
    print("RMSE on test data:", rmse)
    print("Mean Average Precision (MAP):", MAP)
    print("NDCG at 5:", ndcg)
    
    return rmse, MAP, ndcg

In [21]:
calc_metrics(model, predictions)

+-------+----------------------------------------+----------------------------------------+
|user_id|recommendations                         |actual_apps                             |
+-------+----------------------------------------+----------------------------------------+
|769    |[41, 37, 76, 28, 65, 27, 42, 46, 77, 63]|[47.0, 72.0]                            |
|3597   |[48, 72, 79, 94, 88, 60, 31, 68, 5, 80] |[28.0, 27.0, 62.0, 13.0, 78.0]          |
|4142   |[22, 52, 43, 70, 61, 28, 44, 75, 4, 76] |[15.0, 23.0, 36.0, 44.0]                |
|5776   |[27, 30, 50, 99, 76, 28, 57, 61, 44, 69]|[98.0, 65.0, 14.0, 32.0]                |
|8649   |[32, 99, 45, 95, 67, 84, 92, 6, 91, 30] |[19.0]                                  |
|576    |[23, 34, 9, 16, 72, 82, 55, 0, 65, 38]  |[83.0, 34.0, 33.0, 82.0]                |
|1765   |[26, 92, 20, 57, 13, 18, 8, 63, 27, 1]  |[14.0, 18.0, 44.0]                      |
|5136   |[49, 67, 36, 56, 21, 1, 31, 69, 11, 14] |[40.0, 69.0, 65.0, 51.0, 14.0,

(0.4045570136725021, 0.03652132284266052, 0.0766425245578284)

# Hyperparameter optimization

In [22]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator 
import numpy as np

evaluator = RegressionEvaluator(
    metricName="rmse", labelCol="recommended", predictionCol="prediction"
)

param_grid = ParamGridBuilder() \
    .addGrid(als.maxIter, [5]) \
    .addGrid(als.rank, np.arange(5, 20+1, 5)) \
    .addGrid(als.regParam, np.linspace(0.01, 1, num=5)) \
    .addGrid(als.implicitPrefs, [False, True]) \
    .build()

cv = CrossValidator(
    estimator=als,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    parallelism=5,
    numFolds=3,
    seed=42,
)

cvModel = cv.fit(training_df)
best_model1 = cvModel.bestModel
best_model1

ALSModel: uid=ALS_0999e54f8b2a, rank=5

## Select best model

In [23]:
from pprint import pprint
pprint(best_model1.extractParamMap())

{Param(parent='ALS_0999e54f8b2a', name='userCol', doc='column name for user ids. Ids must be within the integer value range.'): 'user_id',
 Param(parent='ALS_0999e54f8b2a', name='itemCol', doc='column name for item ids. Ids must be within the integer value range.'): 'item_id',
 Param(parent='ALS_0999e54f8b2a', name='predictionCol', doc='prediction column name.'): 'prediction',
 Param(parent='ALS_0999e54f8b2a', name='blockSize', doc='block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.'): 4096,
 Param(parent='ALS_0999e54f8b2a', name='coldStartStrategy', doc="strategy for dealing with unknown or new users/items at prediction time. This may be useful in cross-validation or production scenarios, for handling user/item ids the model has not seen in the training data. Supported values: 'nan', 'drop'."): 'drop'}


In [24]:
best_predictions1 = best_model1.transform(testing_df)
best_predictions1.show(5)

best_predictions1.select("recommended", "prediction")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header","true")\
    .save("project/output/model1_predictions.csv")

# Run it from root directory of the repository
run("hdfs dfs -cat project/output/model1_predictions.csv/*.csv > output/model1_predictions.csv")

+-------+-------+-----------+----------+
|user_id|item_id|recommended|prediction|
+-------+-------+-----------+----------+
|   11.0|   92.0|          0| 0.4386111|
|   22.0|   60.0|          0| 0.6223034|
|   28.0|   49.0|          0|0.56308067|
|   50.0|   73.0|          0| 0.6733649|
|   78.0|   60.0|          0|0.91516465|
+-------+-------+-----------+----------+
only showing top 5 rows



''

In [25]:
rmse1, map1, ndcg1 = calc_metrics(best_model1, best_predictions1)

+-------+----------------------------------------+---------------------------------------------------------------------------+
|user_id|recommendations                         |actual_apps                                                                |
+-------+----------------------------------------+---------------------------------------------------------------------------+
|28     |[32, 92, 89, 36, 86, 78, 79, 16, 77, 84]|[42.0, 20.0, 6.0, 79.0, 31.0, 80.0, 29.0]                                  |
|31     |[22, 20, 4, 91, 52, 19, 83, 36, 75, 6]  |[9.0, 23.0, 5.0, 75.0, 1.0, 12.0, 52.0, 83.0, 81.0, 79.0, 13.0, 84.0, 72.0]|
|34     |[78, 12, 21, 31, 75, 40, 93, 1, 7, 46]  |[68.0, 36.0, 1.0, 48.0, 6.0, 83.0, 55.0, 31.0, 18.0, 44.0]                 |
|53     |[36, 92, 72, 78, 79, 75, 81, 4, 6, 54]  |[0.0, 2.0, 19.0, 34.0, 78.0]                                               |
|65     |[45, 73, 89, 65, 95, 67, 23, 29, 77, 54]|[15.0, 8.0, 1.0, 49.0, 95.0, 47.0, 74.0]                     

Only RMSE improved after Hyperparameter optimization
- from (0.4045) -> to (0.3686)

In [26]:
# saving model
best_model1.write().overwrite().save("project/models/model1")

# Run it from root directory of the repository
run("hdfs dfs -get project/models/model1 models/model1")

''

# Model2

In [27]:
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.mllib.linalg import SparseVector
import pyspark.sql.functions as F
from pyspark.sql import Window
from pyspark.sql.types import *

class Model2:
    def __init__(
        self,
        userCol,
        itemCol,
        ratingCol,
        num_rank=10,
    ):
        self.userCol = userCol
        self.itemCol = itemCol
        self.ratingCol = ratingCol
        self.num_rank = num_rank
        
        self.id2item = {}

    def fit(self, df):
        # assign for each item and user it ids
        df = df.withColumn("model_user_id", F.dense_rank().over(Window.orderBy(self.userCol))-1)
        df = df.withColumn("model_item_id", F.dense_rank().over(Window.orderBy(self.itemCol))-1)

        # get number of items
        self.num_items = df.select(self.itemCol).distinct().count()

        # Convert the DataFrame into user-item matrix
        user_item_rdd = df.rdd.map(lambda row: (
            row['model_user_id'], (row['model_item_id'], row[self.ratingCol])
        ))
        grouped_user_item_rdd = user_item_rdd.groupByKey().mapValues(list)
        grouped_user_item_rdd = grouped_user_item_rdd.map(
            self._get_sparse_vector
        )
        
        # Compute SVD
        self.mat = RowMatrix(grouped_user_item_rdd)
        svd = self.mat.computeSVD(k=self.num_rank, computeU=True)

        # save U, s, V martrices
        self.user_vectors = np.array([vec.toArray() for vec in svd.U.rows.collect()])
        self.item_V = svd.V.toArray()
        self.singular_values = svd.s.toArray()
        self.df = df  # save this dataframe

        # save id2item, id2user, item2id, user2id dict
        self.id2item = {row[0]: row[1] for row in self.df.select(['model_item_id', self.itemCol]).collect()}
        self.id2user = {row[0]: row[1] for row in self.df.select(['model_user_id', self.userCol]).collect()}
        self.item2id = {v: k for k, v in self.id2item.items()}
        self.user2id = {v: k for k, v in self.id2user.items()}

        return self

    def _get_sparse_vector(self, row):
        _, item_ratings = row
        sparse = {}
        for item_id, rating in item_ratings:
            if rating > 0:
                sparse[item_id] = rating

        return SparseVector(self.num_items, sparse)

    def transform(self, testing_df):
        # for each user item pair predict the rating
        user_item = [(row[0], row[1], row[2]) for row in testing_df.select([self.userCol, self.itemCol, self.ratingCol]).collect()]
        df = []
        for u, i, r in user_item:
            if u not in self.user2id:
                continue
            df.append([u, i, r, float(self.recommendForUser(u, num_recommendations=self.num_items)[int(i)][0])])
        
        schema = StructType([
            StructField("user_id", FloatType(), True),
            StructField("item_id", FloatType(), True),
            StructField("recommended", IntegerType(), True),
            StructField("prediction", FloatType(), True),
        ])
        return self.df.sql_ctx.createDataFrame(df, schema=schema)

    def recommendForUser(self, user_id, num_recommendations=10):
        # check for user id
        model_user_id = self.user2id.get(user_id, None)
        if model_user_id is None:
            raise ValueError(f"given user_id ({user_id}) does not exists")

        predictions = np.dot(self.user_vectors[model_user_id], np.dot(np.diag(self.singular_values), self.item_V.T))
        predictions = sorted(zip(predictions, self.item2id.keys()), reverse=True)[:num_recommendations]
        return predictions

    def recommendForAllUsers(self, num_recommendations=10, return_scores=False):
        predictions = np.dot(self.user_vectors, np.dot(np.diag(self.singular_values), self.item_V.T))
        predictions = [sorted(zip(p, self.item2id.keys()), reverse=True)[:num_recommendations] for p in predictions]
        if return_scores:
            return predictions

        predictions = [[item for sc, item in p] for p in predictions]
        return self.df.sql_ctx.createDataFrame(zip(self.user2id.keys(), predictions), ['user_id', 'recommendations'])
    
    def __str__(self):
        return f"SVD: k={self.num_rank}"
    
    def save_weights(self, path):
        try:
            # save matrices produced after computeSVD
            self.mat.rows.toDF().write().overwrite().save(path)
        except:
            print("Could not save the weights.")

In [28]:
# train second model
model2 = Model2(
    userCol='user_id',
    itemCol='item_id',
    ratingCol='recommended',
    num_rank=10,
)

model2.fit(training_df)

<__main__.Model2 at 0x7fba5c8ed550>

In [29]:
# predict on test set
model2_predictions = model2.transform(testing_df)
model2_predictions.show(10)

+-------+-------+-----------+------------+
|user_id|item_id|recommended|  prediction|
+-------+-------+-----------+------------+
|   11.0|   92.0|          0|5.4368563E-4|
|   22.0|   60.0|          0|  0.21060102|
|   28.0|   49.0|          0|   0.2026345|
|   50.0|   73.0|          0|  0.19939877|
|   78.0|   60.0|          0|   0.1064963|
|   83.0|   92.0|          0| 0.102369994|
|   93.0|   73.0|          0|  0.14978652|
|  131.0|   92.0|          0| 0.026052501|
|  152.0|   50.0|          0|  0.15982084|
|  153.0|   50.0|          0|  0.20711921|
+-------+-------+-----------+------------+
only showing top 10 rows



In [30]:
calc_metrics(model2, model2_predictions, flat_recommendations=False)

+-------+---------------------------------------------------------+----------------------------------------------------------------------------------------------+
|user_id|recommendations                                          |actual_apps                                                                                   |
+-------+---------------------------------------------------------+----------------------------------------------------------------------------------------------+
|0.0    |[9.0, 8.0, 4.0, 1.0, 19.0, 14.0, 0.0, 25.0, 7.0, 15.0]   |[42.0, 2.0, 15.0, 37.0, 54.0, 65.0, 60.0, 18.0, 7.0, 44.0, 72.0]                              |
|1.0    |[4.0, 12.0, 21.0, 8.0, 0.0, 11.0, 5.0, 1.0, 2.0, 28.0]   |[70.0, 68.0, 36.0, 19.0, 6.0, 62.0, 55.0, 29.0, 5.0, 86.0, 27.0, 46.0, 26.0, 89.0, 81.0, 44.0]|
|2.0    |[5.0, 14.0, 4.0, 1.0, 12.0, 7.0, 11.0, 25.0, 9.0, 0.0]   |[5.0, 37.0, 73.0, 48.0, 45.0, 26.0, 14.0, 84.0, 7.0, 80.0]                                    |
|3.0    |[7.0, 13.0, 3

(0.7617843383809006, 0.061759351244201954, 0.11854677327476618)

## Hyperparameter optimization

In [31]:
from tqdm import tqdm

# parameters to optimizer
param_k = [5, 10, 15, 20, 30]

models = []
for k in tqdm(param_k, total=len(param_k)):
    model2_init = Model2(userCol='user_id', itemCol='item_id', ratingCol='recommended', num_rank=k)
    model2_init.fit(training_df)
    predictions = model2_init.transform(testing_df)
    
    rmse, MAP, NDCG = calc_metrics(model2_init, predictions, print_recommendations=False, flat_recommendations=False)
    models.append({
        'rmse': rmse,
        'MAP': MAP,
        'NDCG': NDCG,
        'k': k,
        'model': model2_init,
    })

 20%|██        | 1/5 [00:28<01:52, 28.10s/it]

RMSE on test data: 0.7604228716624235
Mean Average Precision (MAP): 0.06494935320897477
NDCG at 5: 0.12474180524892531


 40%|████      | 2/5 [00:57<01:26, 28.77s/it]

RMSE on test data: 0.7617843383826255
Mean Average Precision (MAP): 0.061759351244201954
NDCG at 5: 0.11854677327476618


 60%|██████    | 3/5 [01:32<01:03, 31.53s/it]

RMSE on test data: 0.7645104068235493
Mean Average Precision (MAP): 0.0601257213067417
NDCG at 5: 0.11536137131376646


 80%|████████  | 4/5 [02:19<00:37, 37.68s/it]

RMSE on test data: 0.7674062697022306
Mean Average Precision (MAP): 0.057699906700114933
NDCG at 5: 0.11181078544795109


100%|██████████| 5/5 [02:59<00:00, 35.94s/it]

RMSE on test data: 0.7720928761555658
Mean Average Precision (MAP): 0.05505272788068843
NDCG at 5: 0.10732104658072225





In [32]:
best_model2 = min(models, key=lambda x: x['rmse'])
rmse2 = best_model2['rmse']
map2 = best_model2['MAP']
ndcg2 = best_model2['NDCG']
k = best_model2['k']
best_model2 = best_model2['model']
print(f"metrics for k={k} rmse2={rmse2}, map2={map2}, ndcg2={ndcg2}")

metrics for k=5 rmse2=0.7604228716624235, map2=0.06494935320897477, ndcg2=0.12474180524892531


In [35]:
# predict on test set
model2_predictions = best_model2.transform(testing_df)
model2_predictions.show(10)

model2_predictions.select("recommended", "prediction")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header","true")\
    .save("project/output/model2_predictions.csv")

# Run it from root directory of the repository
run("hdfs dfs -cat project/output/model2_predictions.csv/*.csv > output/model2_predictions.csv")

+-------+-------+-----------+------------+
|user_id|item_id|recommended|  prediction|
+-------+-------+-----------+------------+
|   11.0|   92.0|          0| 0.028685931|
|   22.0|   60.0|          0|  0.20716113|
|   28.0|   49.0|          0|  0.20613548|
|   50.0|   73.0|          0|  0.18521795|
|   78.0|   60.0|          0|  0.09038921|
|   83.0|   92.0|          0|  0.12898695|
|   93.0|   73.0|          0|  0.14464976|
|  131.0|   92.0|          0|-0.017386159|
|  152.0|   50.0|          0|  0.17366536|
|  153.0|   50.0|          0|  0.21284482|
+-------+-------+-----------+------------+
only showing top 10 rows



''

In [33]:
best_model2.save_weights('project/models/model2')

# Run it from root directory of the repository
run("hdfs dfs -get project/models/model2 models/model2")

Could not save the weights.


''

# Compare best models

In [38]:
# Create data frame to report performance of the models
models = [[str(best_model1).replace(",", "|"), rmse1, map1, ndcg1], [str(best_model2), rmse2, map2, ndcg2]]

evaluation_results = spark.createDataFrame(models, ["model", "RMSE", "MAP", "NDCG"])
evaluation_results.show(truncate=False)

# Save it to HDFS
evaluation_results.coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header","true")\
    .save("project/output/evaluation.csv")

# Run it from root directory of the repository
run("hdfs dfs -cat project/output/evaluation.csv/*.csv > output/evaluation.csv")

+--------------------------------------+-------------------+--------------------+-------------------+
|model                                 |RMSE               |MAP                 |NDCG               |
+--------------------------------------+-------------------+--------------------+-------------------+
|ALSModel: uid=ALS_0999e54f8b2a, rank=5|0.36869138717155947|0.031177691995329694|0.0671031605304494 |
|SVD: k=5                              |0.7604228716624235 |0.06494935320897477 |0.12474180524892531|
+--------------------------------------+-------------------+--------------------+-------------------+



''

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 36642)
Traceback (most recent call last):
  File "/usr/lib64/python3.6/socketserver.py", line 320, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib64/python3.6/socketserver.py", line 351, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib64/python3.6/socketserver.py", line 364, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib64/python3.6/socketserver.py", line 724, in __init__
    self.handle()
  File "/usr/local/lib/python3.6/site-packages/pyspark/accumulators.py", line 262, in handle
    poll(accum_updates)
  File "/usr/local/lib/python3.6/site-packages/pyspark/accumulators.py", line 235, in poll
    if func():
  File "/usr/local/lib/python3.6/site-packages/pyspark/accumulators.py", line 239, in accum_updates
    num_updates = read_int(self.rfile)
  File 