In [0]:
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
# Carga de datos
ratings = (spark.read
  .format("sqlserver")
  .option("host", "library-usco.database.windows.net")
  .option("port", "1433") # optional, can use default port 1433 if omitted
  .option("user", "admin_library@library-usco")
  .option("password", "Seminariousco123")
  .option("database", "library")
  .option("dbtable", "dbo.Rating") # (if schemaName not provided, default to "dbo")
  .load()
)
ratings.createOrReplaceTempView('ratings')

ratings = spark.sql(''' SELECT UserId, ISBN, BookRating FROM ratings ''')

books = (spark.read
  .format("sqlserver")
  .option("host", "library-usco.database.windows.net")
  .option("port", "1433") # optional, can use default port 1433 if omitted
  .option("user", "admin_library@library-usco")
  .option("password", "Seminariousco123")
  .option("database", "library")
  .option("dbtable", "dbo.Book") # (if schemaName not provided, default to "dbo")
  .load()
)
books.createOrReplaceTempView('books')

books = spark.sql(''' SELECT ISBN, BookTitle, BookAuthor, YearOfPublication, Publisher FROM books ''')
print("Carga de libros y ratings--")

Carga de libros y ratings--


In [0]:
# convert string to int for ALS
stringToInt = StringIndexer(inputCol='ISBN', outputCol='ISBN_int').fit(ratings)
ratings = stringToInt.transform(ratings)

# split data into training and test datatset
train_df, test_df = ratings.randomSplit([0.8,0.2])

# ALS model
rec_model = ALS( maxIter=10 ,regParam=0.01,userCol='UserId',itemCol='ISBN_int',ratingCol='BookRating', 
                nonnegative=True, coldStartStrategy="drop")

rec_model = rec_model.fit(train_df)

# making predictions on test set 
predicted_ratings=rec_model.transform(test_df)

# calculate RMSE
evaluator = RegressionEvaluator(metricName='rmse', predictionCol='prediction',labelCol='BookRating')
rmse = evaluator.evaluate(predicted_ratings)

# function to recommend top-n books for a user using trained model
def recommend_for_user(user_id, n):
    ratings_user = ratings.filter(col('UserId')==user_id)
    pred_ratings_user = rec_model.transform(ratings_user.filter(col('BookRating')!=0))
    recs_user = books.join(pred_ratings_user.select(['ISBN', 'prediction']), on='ISBN')
    recs_user = recs_user.sort('prediction', ascending=False).drop('prediction').limit(n)
    return recs_user

# Obtener la lista de usuarios distintos
user_list = ratings.select("UserId").distinct().rdd.flatMap(lambda x: x).collect()

# Generar 5 recomendaciones para cada usuario
recommendations = []
for user_id in user_list:
    recs_user = recommend_for_user(user_id, 5)
    recommendations.append(recs_user)

In [0]:
from functools import reduce
from pyspark.sql import DataFrame

In [0]:
from pyspark.sql.functions import lit
from datetime import datetime

dfs = []

for i, recs_user in enumerate(recommendations):
    df_con_nueva_columna = recs_user.withColumn("UserId", lit(user_list[i]))
    date = datetime.now().strftime("%Y-%m-%d")
    df_con_nueva_columna = df_con_nueva_columna.withColumn("date", lit(date))
    dfs.append(df_con_nueva_columna.drop("BookTitle", "BookAuthor", "YearOfPublication", "Publisher"))

In [0]:
df_complete = reduce(DataFrame.unionAll, dfs)

In [0]:
df_complete.write \
  .format("sqlserver") \
  .mode("overwrite") \
  .option("host", "library-usco.database.windows.net") \
  .option("port", "1433") \
  .option("user", "admin_library@library-usco") \
  .option("password", "Seminariousco123") \
  .option("database", "library") \
  .option("dbtable", "dbo.User_Recomendation") \
  .save()