In [1]:
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

In [2]:
#Cargamos el fichero u.data pero solo los campos que nos interesan: user, movie_id(product) y rating 
data_file= sc.textFile('FileStore/tables/zpjo2tx51509035996987/u.data')
ratings = data_file.map(lambda l: l.split("\t")).map(lambda x:Rating(int(x[0]), int(x[1]), float(x[2])))
type(ratings)

In [3]:
ratings.take(3)

In [4]:
#Vamos a ver el total de usuarios unicos:
ratingdf = ratings.toDF() 
ratingdf.select('user').distinct().show(5)
ratingdf.cache()
ratingdf.count()

In [5]:
ratingdf.describe("rating").show()  #El rango de los ratings es 1-5 y tenemos la media de ratings, que podriamos usar para rellenar los campos vacios

In [6]:
ratingdf.select('user').distinct().count() #total usuarios unicos.

In [7]:
ratingdf.groupBy("user").count().take(5) #numero de productos recomendados por usuario

In [8]:
ratingdf.groupBy("rating").count().show()  #numero de totales de cada puntuacion de rating.

In [9]:
ratingdf.stat.crosstab("user", "rating").show() # resumen individual de ratings por usuario

In [10]:
#Para construir un modelo lo más preciso posible dividimos los datos de entrada en conjuntos de datos de entrenamiento y prueba por separado
#Para cada par (entrenamiento y prueba), itera a través del conjunto de parametros  y se ajustan al Estimador usandolos. De esta manera obtenemos el
#modelo ajustado, y evaluamos  el rendimiento del modelo utilizando el Evaluador.
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
preds = cvModel.bestModel.transform(test)
#lo primero es convertir el rating RDD en un dataframe
ratingsd=sqlContext.createDataFrame(ratings)
training, test= ratingsd.randomSplit([0.8, 0.2])
training.count()

In [11]:
ratingdf.dropna().count()

In [12]:
#training.na.drop(how = 'any') #para evitar el resultado na en el cálculo del RMSE  
training.cache()

In [13]:
from pyspark.ml.recommendation import ALS
als = ALS(userCol="user", itemCol="product",ratingCol="rating", coldStartStrategy="drop")

In [14]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[als])

In [15]:
#Estalecemeos los diferentes parametros con los que el modelo ira probando hasta encontrar el mejor modelo.
paramMap = ParamGridBuilder()   \
  .addGrid(als.rank, [4, 12])    \
  .addGrid(als.maxIter, [10, 30])     \
  .addGrid(als.regParam, [0.01, 0.1])    \
  .build()

evaluatorR = RegressionEvaluator(metricName="rmse",labelCol="rating")
cvExplicit = CrossValidator(estimator=als, estimatorParamMaps=paramMap, evaluator=evaluatorR,numFolds=5)

In [16]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluatorR = RegressionEvaluator(metricName="rmse", labelCol="rating")

In [17]:
#Entrenamos con el dataset training e incoporo comandos para ver el tiempo que tarda. He ido testeando diferentes parametros y combinanciones y he podido observar auqnue , reducir el RMSE ha sido a costa de incremento de tiempo.
from time import time
t0 = time()
cvModel = cvExplicit.fit(training)
tt = time() - t0
print "model trained in %s seconds" % round(tt,3)  #model trained in 271.262 seconds

In [18]:
#Evaluamos el modelo mirando el RMSE en los datos del test
predict = cvModel.bestModel.transform(test)
evaluator = RegressionEvaluator(metricName="rmse",labelCol="rating",predictionCol="prediction")

In [19]:
rmse = evaluator.evaluate(predict)
print 'For testing data the RMSE is %s' % (rmse)
#For testing data the RMSE is 0.922383301033

In [20]:
#miramos el error en los datos del training. Al haber más datos, deberia tener un RMSE menor.
predict = cvModel.bestModel.transform(training)
rmse = evaluator.evaluate(preds)
print 'For training data the RMSE is %s' % (rmse)
#For training data the RMSE is 0.0.819636022516-como vemos el error efectivamente ha disminuido

In [21]:
#Miro los diferentes parametros usados en el entrenamiento y sus respectivos RMSE de metrica.
metric_params_pairs = list(zip(cvModel.avgMetrics, cvModel.getEstimatorParamMaps()))
metric_params_pairs.sort(key=lambda x: x[0], reverse=True)
best_metric_params = metric_params_pairs[0][1]
for pair in metric_params_pairs:
    metric, params = pair
    print('metric', metric)
    for k, v in params.items():
        print(k.name, v)
    print('')


In [22]:
#Ya tenemos el modelo creado. Vamos a recuperar el archivo item para sacar el titulo de la pelicula y darle un poco más de forma.
#Cargamos los datos de las peliculas archivados en el archivo u.item Cogeremos solo los datos de user_id y movie_title.
item_file= sc.textFile('FileStore/tables/zpjo2tx51509035996987/u.item')
itemRDD = item_file.map(lambda l: l.split(("|")))
movies_titles= itemRDD.map(lambda p: (int(p[0]), p[1]))
movies = sqlContext.createDataFrame(movies_titles)
movies = movies.selectExpr("_1 as product", "_2 as title")
print "There are %s movies in the u.item file" % (movies_titles.count()) #There are 1682 movies in the complete dataset
movies.cache()

In [23]:
#Volvemos a entrenar el modelo con el total del dataset.
preds = cvModel.bestModel.transform(ratingsd)

#unimos el dataframe preds con los titulos de las peliculas
predstitle = preds.join(movies, preds.product == movies.product).selectExpr("user","title", "rating", "prediction")
display(predstitle)

In [24]:
#Recomendaciones de peliculas para un usuario concreto
from pyspark.sql.functions import col
display(predstitle.where('user==3').sort(col("prediction").desc()))

In [25]:
#Recomendacion a un usuario de las peliculas predecidas con prediccion superior a N(3)
display(predstitle.filter("user =='3' and prediction > 3"))

In [26]:
#Ahora tenemos que puntuar peliculas para un nuevo usuario. Creamos un nuevo usuario "0" que no esta en el dataset inicial e incorporamos algunos ratings para el usuario. 
new_user_id = 944
new_user_ratings = [
  (944,1,4), #Toy Story
  (944,11,5), #Seven
  (944,50,4), #StarWars
  (944,71,4), #The Lion King
  (944,1569,4), #La vida es Bella
  (944,362,3), #Blues Brothers
  (944,288,2), #Scream 
  (944,301,3) #In & Out
]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print "Ratings del nuevo usuario %s" % new_user_ratings_RDD.take(8)

In [27]:
#añadimos las recomendaciones del nuevo usuario con los ratings originales. Podemos unir los RDD o crear el dataframe del usuario y luego unir los DF.
new_userdf=new_user_ratings_RDD.toDF()
complete_data_with_new_ratings_df = ratingdf.unionAll(new_userdf)
complete_data_with_new_ratings_df.count()

In [28]:
#Con la incorporacion de cierto numero de nuevos usuarios, deberia volver a entrenarse el modelo y evaluarlo nuevamente para que la maquina aprenda con los nuevos datos del dataset.

In [29]:
training_new, test_new= complete_data_with_new_ratings_df.randomSplit([0.8, 0.2])
training_new.cache()
test_new.cache()
als = ALS(userCol="user", itemCol="product",ratingCol="rating") 
evaluatorR = RegressionEvaluator(metricName="rmse",labelCol="rating")
cvExplicit = CrossValidator(estimator=als, estimatorParamMaps=paramMap, evaluator=evaluatorR,numFolds=5)
cvModel2 = cvExplicit.fit(training_new)
predictions_new_user= cvModel2.bestModel.transform(test_new)

In [30]:
#evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")
#rmse = evaluator.evaluate(predictions)
#print("RSME = " + str(rmse)) 

In [31]:
display(predstitle.filter("user =='944' and prediction > 3"))