In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pyspark.sql import Row

In [0]:
import pandas as pd

In [0]:
import io

In [0]:
df = pd.read_csv('/content/Reviews copy.csv')

In [0]:
ratings = spark.createDataFrame(df_r)

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(ratings) for column in list(set(ratings.columns)-set(['date'])) ]


pipeline = Pipeline(stages=indexers)
df_r = pipeline.fit(ratings).transform(ratings)


In [0]:
df_r.dtypes

[('Score', 'bigint'),
 ('ProductId_index', 'double'),
 ('UserId_index', 'double')]

In [0]:
columns_to_drop = ["Id_index", "ProductId", "User_Id", "Id", "Score_index"]

df_r = df_r.drop(*columns_to_drop)

In [0]:
df_r.drop("Id", "ProductId", "UserId", "Score_index").collect()

In [0]:
# Build the recommendation model using Alternating Least Squares
# split training and testing
(training, test) = df_r.randomSplit([0.8, 0.2])

In [0]:
# Building the recommendation model by ALS
als_old = ALS(userCol="UserId_index", itemCol="ProductId_index", ratingCol="Score", nonnegative=True, coldStartStrategy= "drop")
model_old = als_old.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions_old = model_old.transform(test)
evaluator_old = RegressionEvaluator(metricName="rmse", labelCol="Score",
                                predictionCol="prediction")

rmse_old = evaluator_old.evaluate(predictions_old)
print("Root-mean-square error = " + str(rmse_old))

Root-mean-square error = 1.2046699187526728


In [0]:
#Improve the performance of the model by using CROSS VALIDATION and PARAMETER TUNING
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder


#create a new ALS estimator
als = ALS(userCol="UserId_index", itemCol="ProductId_index", ratingCol="Score", coldStartStrategy="drop")
#define a grid for both parameters
#this will test 9 different combinations of the 2 parameters
paramGrid = ParamGridBuilder() \
    .addGrid(als.rank, [1, 5, 10, 25, 50]) \
    .addGrid(als.regParam, [.001,0.1, .01, 1, 10]) \
    .build()



trainValSplit = TrainValidationSplit(estimator = als, estimatorParamMaps=paramGrid, 
                                    evaluator = RegressionEvaluator(metricName="rmse", predictionCol="prediction", labelCol="Score"), 
                                     trainRatio = 0.9, parallelism = 2)



model = trainValSplit.fit(training)


# Finally retrieve the best model
bestModel = model.bestModel

In [0]:
# Generate top 10 movie recommendations for each user
userRecs = model_old.recommendForAllUsers(10)
userRecs.count()

219736

In [0]:
userRecs

DataFrame[UserId_index: int, recommendations: array<struct<ProductId_index:int,rating:float>>]

In [0]:
userRecs.iloc[0,0]

AttributeError: ignored