In [2]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

## Initiate Spark Context

In [3]:
# allows pivot to interpret more values. 130653 is the number of distinct game IDs
spark.conf.set('spark.sql.pivotMaxValues', 130653)

In [4]:
df = spark.read.csv("data/user_rating_ID_non_null.csv",header=True)
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- user: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- ID: string (nullable = true)



In [5]:
df.show()

+---+---------------+------+-----+
|_c0|           user|rating|   ID|
+---+---------------+------+-----+
|  0|        Torsten|  10.0|30549|
|  1|mitnachtKAUBO-I|  10.0|30549|
|  2|         avlawn|  10.0|30549|
|  3|     Mike Mayer|  10.0|30549|
|  4|        Mease19|  10.0|30549|
|  5|       cfarrell|  10.0|30549|
|  6| katrinacarenne|  10.0|30549|
|  7|      DSpangler|  10.0|30549|
|  8|          gregd|  10.0|30549|
|  9|     calbearfan|  10.0|30549|
| 10|        odustin|  10.0|30549|
| 11|  treece keenes|  10.0|30549|
| 12|       davecort|  10.0|30549|
| 13|      PopeBrain|  10.0|30549|
| 14|    darrensacre|  10.0|30549|
| 15|      shigeking|  10.0|30549|
| 16|         zunyer|  10.0|30549|
| 17|          Noogs|  10.0|30549|
| 18|      tcamprubi|  10.0|30549|
| 19|     BagpipeDan|  10.0|30549|
+---+---------------+------+-----+
only showing top 20 rows



In [6]:
# drop comments, index and name columns
user_rating_id = df.drop('_c0')

In [7]:
user_rating_id.show()

+---------------+------+-----+
|           user|rating|   ID|
+---------------+------+-----+
|        Torsten|  10.0|30549|
|mitnachtKAUBO-I|  10.0|30549|
|         avlawn|  10.0|30549|
|     Mike Mayer|  10.0|30549|
|        Mease19|  10.0|30549|
|       cfarrell|  10.0|30549|
| katrinacarenne|  10.0|30549|
|      DSpangler|  10.0|30549|
|          gregd|  10.0|30549|
|     calbearfan|  10.0|30549|
|        odustin|  10.0|30549|
|  treece keenes|  10.0|30549|
|       davecort|  10.0|30549|
|      PopeBrain|  10.0|30549|
|    darrensacre|  10.0|30549|
|      shigeking|  10.0|30549|
|         zunyer|  10.0|30549|
|          Noogs|  10.0|30549|
|      tcamprubi|  10.0|30549|
|     BagpipeDan|  10.0|30549|
+---------------+------+-----+
only showing top 20 rows



## Change data type to integers

In [8]:
from pyspark.sql.types import IntegerType
from pyspark.sql.types import DoubleType
 

In [9]:
user_rating_id = user_rating_id.withColumn('ratingInt',user_rating_id['rating'].cast(DoubleType()))
user_rating_id = user_rating_id.withColumn('IDInt',user_rating_id['ID'].cast(DoubleType()))

In [10]:
user_rating_id = user_rating_id.drop(*['rating','ID'])

In [11]:
user_rating_id.printSchema()

root
 |-- user: string (nullable = true)
 |-- ratingInt: double (nullable = true)
 |-- IDInt: double (nullable = true)



In [12]:
user_rating_id.show()

+---------------+---------+-------+
|           user|ratingInt|  IDInt|
+---------------+---------+-------+
|        Torsten|     10.0|30549.0|
|mitnachtKAUBO-I|     10.0|30549.0|
|         avlawn|     10.0|30549.0|
|     Mike Mayer|     10.0|30549.0|
|        Mease19|     10.0|30549.0|
|       cfarrell|     10.0|30549.0|
| katrinacarenne|     10.0|30549.0|
|      DSpangler|     10.0|30549.0|
|          gregd|     10.0|30549.0|
|     calbearfan|     10.0|30549.0|
|        odustin|     10.0|30549.0|
|  treece keenes|     10.0|30549.0|
|       davecort|     10.0|30549.0|
|      PopeBrain|     10.0|30549.0|
|    darrensacre|     10.0|30549.0|
|      shigeking|     10.0|30549.0|
|         zunyer|     10.0|30549.0|
|          Noogs|     10.0|30549.0|
|      tcamprubi|     10.0|30549.0|
|     BagpipeDan|     10.0|30549.0|
+---------------+---------+-------+
only showing top 20 rows



In [73]:
user_rating_id.select('IDInt').distinct().count()

19330

In [26]:
from pyspark.sql.functions import first

In [28]:
user_pivot = user_rating_id.groupby(df.user).pivot('IDInt').agg(first('ratingInt'))

## Change user into an index

In [13]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

In [14]:
indexer = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in list(set(user_rating_id.columns)-set(['ratingInt'])) ]




pipeline = Pipeline(stages=indexer)
transformed = pipeline.fit(user_rating_id).transform(user_rating_id)
transformed.show()

+---------------+---------+-------+-----------+----------+
|           user|ratingInt|  IDInt|IDInt_index|user_index|
+---------------+---------+-------+-----------+----------+
|        Torsten|     10.0|30549.0|        0.0|     189.0|
|mitnachtKAUBO-I|     10.0|30549.0|        0.0|    5357.0|
|         avlawn|     10.0|30549.0|        0.0|     486.0|
|     Mike Mayer|     10.0|30549.0|        0.0|    1388.0|
|        Mease19|     10.0|30549.0|        0.0|    5859.0|
|       cfarrell|     10.0|30549.0|        0.0|     106.0|
| katrinacarenne|     10.0|30549.0|        0.0|  334120.0|
|      DSpangler|     10.0|30549.0|        0.0|  199515.0|
|          gregd|     10.0|30549.0|        0.0|   18651.0|
|     calbearfan|     10.0|30549.0|        0.0|   73238.0|
|        odustin|     10.0|30549.0|        0.0|  197741.0|
|  treece keenes|     10.0|30549.0|        0.0|   19725.0|
|       davecort|     10.0|30549.0|        0.0|  132334.0|
|      PopeBrain|     10.0|30549.0|        0.0|    1200.

In [76]:
transformed.printSchema()

root
 |-- user: string (nullable = true)
 |-- ratingInt: double (nullable = true)
 |-- IDInt: double (nullable = true)
 |-- user_index: double (nullable = false)
 |-- IDInt_index: double (nullable = false)



## Regression Evaluator

In [15]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [16]:
(training,test)=transformed.randomSplit([0.8, 0.2])

## ALS Model

In [47]:
als=ALS(maxIter=5,regParam=0.09,userCol="user_index",itemCol="IDInt_index",ratingCol="ratingInt",coldStartStrategy="drop",nonnegative=True)

model=als.fit(training)

KeyboardInterrupt: 

## Baseline Average Model

In [34]:
training.columns

['user', 'ratingInt', 'IDInt', 'IDInt_index', 'user_index']

In [23]:
averages = training.groupby('IDInt').mean()

In [27]:
averages.columns

['IDInt',
 'avg(ratingInt)',
 'avg(IDInt)',
 'avg(IDInt_index)',
 'avg(user_index)']

In [36]:
average_model = averages.select(['IDInt','avg(ratingInt)'])

In [38]:
baseline_train = training

In [42]:
baseline_train = baseline_train.join(average_model,'IDInt','left').select(['user_index', 'IDInt_index','ratingInt', 'avg(ratingInt)' ])

In [43]:
baseline_train.show()

+----------+-----------+---------+-----------------+
|user_index|IDInt_index|ratingInt|   avg(ratingInt)|
+----------+-----------+---------+-----------------+
|   14813.0|     3416.0|      7.0|7.279879516441007|
|  151390.0|     3416.0|      7.0|7.279879516441007|
|   29970.0|     3416.0|      8.0|7.279879516441007|
|  114054.0|     3416.0|      4.0|7.279879516441007|
|   68210.0|     3416.0|      8.0|7.279879516441007|
|   85737.0|     3416.0|      9.0|7.279879516441007|
|    8782.0|     3416.0|      7.0|7.279879516441007|
|   82839.0|     3416.0|      7.0|7.279879516441007|
|   55896.0|     3416.0|      6.0|7.279879516441007|
|   73772.0|     3416.0|      5.3|7.279879516441007|
|  119202.0|     3416.0|      9.0|7.279879516441007|
|  223460.0|     3416.0|      8.0|7.279879516441007|
|   84273.0|     3416.0|     10.0|7.279879516441007|
|  155582.0|     3416.0|     10.0|7.279879516441007|
|   74922.0|     3416.0|      8.0|7.279879516441007|
|   95601.0|     3416.0|      9.0|7.2798795164

In [44]:
baseline_test = test.join(average_model,'IDInt','left').select(['user_index', 'IDInt_index','ratingInt', 'avg(ratingInt)' ])

In [48]:
evaluator=RegressionEvaluator(metricName="rmse",labelCol="ratingInt",predictionCol='avg(ratingInt)')

# predictions=model.transform(test)

rmse=evaluator.evaluate(baseline_test)

print("RMSE="+str(rmse))

baseline_test.show()

RMSE=1.4084991683214756
+----------+-----------+---------+-----------------+
|user_index|IDInt_index|ratingInt|   avg(ratingInt)|
+----------+-----------+---------+-----------------+
|   88824.0|     3416.0|      9.0|7.279879516441007|
|   52340.0|     3416.0|     10.0|7.279879516441007|
|  136910.0|     3416.0|      6.0|7.279879516441007|
|   33727.0|     3416.0|      7.0|7.279879516441007|
|     700.0|     3416.0|      6.0|7.279879516441007|
|    5852.0|     3416.0|      7.0|7.279879516441007|
|   35857.0|     3416.0|      2.0|7.279879516441007|
|  111844.0|     3416.0|     10.0|7.279879516441007|
|   39482.0|     3416.0|      9.0|7.279879516441007|
|   51716.0|     3416.0|      8.0|7.279879516441007|
|   14219.0|     3416.0|      7.0|7.279879516441007|
|   37493.0|     3416.0|      7.0|7.279879516441007|
|    1564.0|     3416.0|      3.0|7.279879516441007|
|  103300.0|     3416.0|     10.0|7.279879516441007|
|   24037.0|     3416.0|      8.0|7.279879516441007|
|  148159.0|     3416.

## Evaluate ALS

In [81]:
evaluator=RegressionEvaluator(metricName="rmse",labelCol="ratingInt",predictionCol="prediction")

predictions=model.transform(test)

rmse=evaluator.evaluate(predictions)

print("RMSE="+str(rmse))

predictions.show()

RMSE=1.2888062261007116
+--------------------+---------+--------+----------+-----------+----------+
|                user|ratingInt|   IDInt|user_index|IDInt_index|prediction|
+--------------------+---------+--------+----------+-----------+----------+
|      Bookywankenobi|      7.0|209685.0|     471.0|      148.0| 6.6209335|
|               Plush|      7.5|209685.0|     833.0|      148.0| 6.4387417|
|        MegaMushroom|      7.0|209685.0|    2866.0|      148.0|  7.162261|
|           ashermarx|      9.1|209685.0|    3794.0|      148.0|  7.926335|
|         NeverSummer|      9.0|209685.0|   10206.0|      148.0| 7.7223186|
|            Drjack32|      6.5|209685.0|   10362.0|      148.0| 7.5634317|
|               Lrdjr|      8.0|209685.0|   11141.0|      148.0| 7.0467916|
|           decterous|      7.0|209685.0|   12046.0|      148.0| 6.3835196|
|           guigtexas|      5.0|209685.0|   18498.0|      148.0| 6.4452567|
|          TeaShop726|     10.0|209685.0|   28024.0|      148.0|

In [83]:
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

userRecs.show(10)

+----------+--------------------+
|user_index|     recommendations|
+----------+--------------------+
|       148|[[16501, 10.18137...|
|       463|[[16501, 13.24292...|
|       471|[[16501, 13.10085...|
|       496|[[17112, 13.14176...|
|       833|[[17296, 11.87113...|
|      1088|[[17296, 11.79920...|
|      1238|[[17112, 14.0364]...|
|      1342|[[16501, 12.80586...|
|      1580|[[13042, 11.50758...|
|      1591|[[16501, 13.03294...|
+----------+--------------------+
only showing top 10 rows



In [84]:
movieRecs.show(10)

+-----------+--------------------+
|IDInt_index|     recommendations|
+-----------+--------------------+
|       1580|[[270636, 10.8434...|
|       4900|[[258206, 13.0238...|
|       5300|[[214105, 12.3621...|
|       6620|[[270636, 12.7623...|
|       7240|[[270636, 11.3938...|
|       7340|[[234317, 10.3982...|
|       7880|[[193279, 11.2696...|
|       9900|[[274550, 12.0367...|
|      12940|[[251652, 9.41025...|
|      13840|[[315997, 13.6469...|
+-----------+--------------------+
only showing top 10 rows



## Try Gridsearch to tune Hyper Parameters

In [None]:
als=ALS(maxIter=5,regParam=0.09,
                    userCol="user_index",
                    itemCol="IDInt_index",
                    ratingCol="ratingInt",
                    coldStartStrategy="drop",
                    nonnegative=True)

model=als.fit(training)

In [102]:
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator
from pyspark.ml.regression import LinearRegression

In [103]:

als2 = ALS(
                    userCol="user_index",
                    itemCol="IDInt_index",
                    ratingCol="ratingInt",
                    coldStartStrategy="drop",
)


paramGrid = ParamGridBuilder()\
    .addGrid(als2.rank, [5,10,15])\
    .addGrid(als2.maxIter, [5, 10, 15])\
    .addGrid(als2.regParam, [0.01, 0.09,1])\
    .build()

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:35503)
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:35503)

In [None]:
## Cross validation

crossval = CrossValidator(estimator=als2,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:35503)
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:35503)
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception

In [92]:
tvs = TrainValidationSplit(estimator=als2,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(),
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)

In [None]:
model = cvModel.fit(training)

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

# Prepare training and test data.
data = spark.read.format("libsvm")\
    .load("data/mllib/sample_linear_regression_data.txt")
train, test = data.randomSplit([0.9, 0.1], seed=12345)

lr = LinearRegression(maxIter=10)

# We use a ParamGridBuilder to construct a grid of parameters to search over.
# TrainValidationSplit will try all combinations of values and determine best model using
# the evaluator.
paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.fitIntercept, [False, True])\
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
    .build()

# In this case the estimator is simply the linear regression.
# A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
tvs = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(),
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)

# Run TrainValidationSplit, and choose the best set of parameters.
model = tvs.fit(train)

# Make predictions on test data. model is the model with combination of parameters
# that performed best.
model.transform(test)\
    .select("features", "label", "prediction")\
    .show()