In [69]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("demo").getOrCreate()

In [117]:
schema_ratings = StructType([
    StructField("Id",IntegerType()),
    StructField("userId",IntegerType()),
    StructField("movieId", IntegerType()),
    StructField("rating", FloatType()),
    StructField("timestamp",StringType())    
])
ratings_df = spark.read.format("csv").option("header", "true").schema(schema_ratings).load("file:////home/nhatthanh123bk/Desktop/app/data/ratings.csv")
ratings_df.show(10)
ratings_df.printSchema()

+---+------+-------+------+---------+
| Id|userId|movieId|rating|timestamp|
+---+------+-------+------+---------+
|  0|     1|   1193|   5.0|978300760|
|  1|     1|    661|   3.0|978302109|
|  2|     1|    914|   3.0|978301968|
|  3|     1|   3408|   4.0|978300275|
|  4|     1|   2355|   5.0|978824291|
|  5|     1|   1197|   3.0|978302268|
|  6|     1|   1287|   5.0|978302039|
|  7|     1|   2804|   5.0|978300719|
|  8|     1|    594|   4.0|978302268|
|  9|     1|    919|   4.0|978301368|
+---+------+-------+------+---------+
only showing top 10 rows

root
 |-- Id: integer (nullable = true)
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: float (nullable = true)
 |-- timestamp: string (nullable = true)



In [93]:
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.evaluation import RegressionEvaluator

In [94]:
ratings = ratings_df.select("userId","movieId","rating").cache()

In [95]:
ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: float (nullable = true)



In [112]:
training_data,testing_data = ratings.randomSplit([0.8,0.2])
training_data.write.csv('file:////home/nhatthanh123bk/Desktop/app/data/training_data.csv')
testing_data.write.csv('file:////home/nhatthanh123bk/Desktop/app/data/testing.csv')

In [116]:
training_data = spark.read.format("csv").load("file:////home/nhatthanh123bk/Desktop/app/data/training_data.csv")


In [108]:
als = ALS(rank=10,maxIter=10, regParam=0.01, 
          userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop",
          implicitPrefs=False)
model = als.fit(training_data)
predictions = model.transform(testing_data)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.889243228137
