In [1]:
import os
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6"
import pandas as pd
from pyspark.sql import *
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.regression import LabeledPoint
from pyspark.shell import sc,spark
from pyspark.sql.functions import desc,asc

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.2
      /_/

Using Python version 3.6.6 (default, Sep 12 2018 18:26:19)
SparkSession available as 'spark'.


In [2]:
df = spark.read.csv("the-movies-dataset/ratings_small.csv",header=True)
df.show()
df.printSchema

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|     31|   2.5|1260759144|
|     1|   1029|   3.0|1260759179|
|     1|   1061|   3.0|1260759182|
|     1|   1129|   2.0|1260759185|
|     1|   1172|   4.0|1260759205|
|     1|   1263|   2.0|1260759151|
|     1|   1287|   2.0|1260759187|
|     1|   1293|   2.0|1260759148|
|     1|   1339|   3.5|1260759125|
|     1|   1343|   2.0|1260759131|
|     1|   1371|   2.5|1260759135|
|     1|   1405|   1.0|1260759203|
|     1|   1953|   4.0|1260759191|
|     1|   2105|   4.0|1260759139|
|     1|   2150|   3.0|1260759194|
|     1|   2193|   2.0|1260759198|
|     1|   2294|   2.0|1260759108|
|     1|   2455|   2.5|1260759113|
|     1|   2968|   1.0|1260759200|
|     1|   3671|   3.0|1260759117|
+------+-------+------+----------+
only showing top 20 rows



<bound method DataFrame.printSchema of DataFrame[userId: string, movieId: string, rating: string, timestamp: string]>

In [3]:
from pyspark.sql.functions import expr
rate = df.select("userId","movieId","rating")
rate = rate.withColumn("userId", expr("CAST(userId AS INTEGER)")).withColumn("movieId", expr(
        "CAST(movieId AS INTEGER)")).withColumn("rating", expr("CAST(rating AS FLOAT)"))
rate.filter("userId = 1").sort(desc('rating')).show()
training, test = rate.randomSplit([0.8, 0.2])

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|   2105|   4.0|
|     1|   1953|   4.0|
|     1|   1172|   4.0|
|     1|   1339|   3.5|
|     1|   1029|   3.0|
|     1|   1061|   3.0|
|     1|   2150|   3.0|
|     1|   3671|   3.0|
|     1|     31|   2.5|
|     1|   1371|   2.5|
|     1|   2455|   2.5|
|     1|   1263|   2.0|
|     1|   1343|   2.0|
|     1|   1129|   2.0|
|     1|   1287|   2.0|
|     1|   2193|   2.0|
|     1|   1293|   2.0|
|     1|   2294|   2.0|
|     1|   1405|   1.0|
|     1|   2968|   1.0|
+------+-------+------+



In [4]:
from pyspark.mllib.recommendation import ALS
# ratings = sc.parallelize(ratings)
# model = ALS.train(training,10,10)
# model.recommendProducts(1,10)


from pyspark.ml.recommendation import ALS
als = ALS().setMaxIter(19).setRegParam(0.18).setRank(10).setColdStartStrategy("drop").setUserCol("userId").setItemCol("movieId").setRatingCol("rating")
alsModel = als.fit(training)
print(als.explainParams())

result = alsModel.recommendForAllUsers(10).selectExpr("userId", "explode(recommendations) as recommendations ")
tmp = result.select("userId","recommendations.movieId","recommendations.rating").filter("userId = 1")
# tmp = result.filter('userId = 1').sort(desc('recommendations.rating'))
rate = rate.filter("userId = 1").union(tmp)
rate = rate.sort(desc('rating')).distinct().show(40)
# print(type(rate))
# print(len(rate.collect()))



alpha: alpha for implicit preference (default: 1.0)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)
coldStartStrategy: strategy for dealing with unknown or new users/items at prediction time. This may be useful in cross-validation or production scenarios, for handling user/item ids the model has not seen in the training data. Supported values: 'nan', 'drop'. (default: nan, current: drop)
finalStorageLevel: StorageLevel for ALS model factors. (default: MEMORY_AND_DISK)
implicitPrefs: whether to use implicit preference (default: False)
intermediateStorageLevel: StorageLevel for intermediate datasets. Cannot be 'NONE'. (default: MEMORY_AND_DISK)
itemCol: column name for item ids. Ids must be within the integer value range. (default: item, current: movieId)
maxIter: max number of i

In [5]:
# evaluation
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator().setMetricName("rmse").setLabelCol("rating").setPredictionCol("prediction")
predictions = alsModel.transform(test)
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = %f"%rmse)

Root-mean-square error = 0.905108


In [54]:
df = spark.range(500).toDF("number")
tmp = df.select(df["number"]+10)
tmp.show()

+-------------+
|(number + 10)|
+-------------+
|           10|
|           11|
|           12|
|           13|
|           14|
|           15|
|           16|
|           17|
|           18|
|           19|
|           20|
|           21|
|           22|
|           23|
|           24|
|           25|
|           26|
|           27|
|           28|
|           29|
+-------------+
only showing top 20 rows

