In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import os
from pyspark.mllib.recommendation import ALS

In [2]:
from pyspark.sql.functions import desc, collect_list

In [None]:
conf = SparkConf().setAppName('Recommendation service - Recommender').set('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1')
sc = SparkContext(conf=conf)

In [3]:
conf = SparkConf().setAppName('Recommendation service - Recommender')
sc = SparkContext(conf=conf)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/02/04 17:55:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
sc.version

'3.2.0'

In [None]:
# MONGO_CONNECT_STRING = 'mongodb://192.168.64.16:27017,192.168.64.17:27017'
MONGO_CONNECT_STRING = 'mongodb:mongos1:27017,mongos2:27017'
MONGO_DATABASE = 'recommender'
MONGO_COLLECTION = 'recommendations'

In [None]:
spark_session = SparkSession.builder.appName('Recommendation service - Recommender'
).config('spark.mongodb.output.uri', MONGO_CONNECT_STRING
).config('spark.mongodb.output.database', MONGO_DATABASE
).config('spark.mongodb.output.collection', MONGO_COLLECTION
).getOrCreate()

In [None]:
spark_session = SparkSession.builder.appName('Recommendation service - Recommender').getOrCreate()

In [None]:
 os.getcwd()

In [None]:
ratings_file = '/opt/work/jupyter-notebook/work/ratings_100.csv'

In [None]:
datasets_path = os.getcwd()
ratings_file = os.path.join(datasets_path, 'ratings_100.csv')

In [None]:
ratings_raw_data = sc.textFile('file:///' + ratings_file)
ratings_raw_data_header = ratings_raw_data.take(1)[0]

In [None]:
ratings_raw_data_header

In [None]:
full_ratings_data = ratings_raw_data.filter(lambda line: line != ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),int(float(tokens[2])))).cache()

In [None]:
sample_size = 100000
fraction = sample_size / full_ratings_data.count()

In [None]:
ratings_data = full_ratings_data.sample(False, fraction, 1001)

In [None]:
# Split data into train, validation and test datasets
rdd_training, rdd_validating, rdd_testing = full_ratings_data.randomSplit([6,2,2], seed=1001)

In [None]:
rdd_validating_no_ratings = rdd_validating.map(lambda x: (int(x[0]), int(x[1])) )
rdd_validating_with_ratings = rdd_validating.map(lambda x: ((int(x[0]),int(x[1])), float(x[2])) )

In [None]:
nb_validating = rdd_validating.count()
nb_testing    = rdd_testing.count()

print("Training: %d, validation: %d, test: %d" % (rdd_training.count(), nb_validating, rdd_testing.count()))

In [None]:
def get_rmse(actual, size_actual, predictions):
    predictions_and_ratings = predictions.join(actual).values()    
    return sqrt(predictions_and_ratings.map(lambda s: (s[0] - s[1]) ** 2).reduce(add) / float(size_actual))

In [None]:
full_data_no_ratings = full_ratings_data.map(lambda x: (int(x[0]), int(x[1])))

In [None]:
final_rank = 5
final_regul = 0.1
final_iter = 20
final_dist = 2.426742063099514
final_alpha = 40

In [None]:
model = ALS.trainImplicit(full_ratings_data, final_rank, final_iter, float(final_regul),alpha=float(final_alpha))

In [None]:
predictions = model.predictAll(full_data_no_ratings)

In [None]:
spark_session = SparkSession(sc)

In [None]:
data = [(12,3000,0.05),(2485,408891,0.054789),(88966,6200,0.369)]
df = spark_session.createDataFrame(data,['user_id','film_id','score'])
df.show()

In [None]:
predictions_df = spark_session.createDataFrame(predictions).withColumnRenamed('user', 'user_id').orderBy('user_id', desc('rating'))

In [None]:
predictions_df.show(10)

In [None]:
group_by_user = predictions_df.groupBy('user_id').agg(collect_list('product')).alias('movies_id')

In [None]:
group_by_user.show(10)

In [None]:
predictions_df.write.format("com.mongodb.spark.sql.DefaultSource").option(
    'spark.mongodb.output.uri', MONGO_CONNECT_STRING
).option(
    'spark.mongodb.output.database', MONGO_DATABASES['db_data']
).option(
    'spark.mongodb.output.collection', MONGO_COLLECTION
).save()