In [1]:
import os
os.environ['PYSPARK_PYTHON']        = '/usr/local/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/local/bin/python3'
os.environ['JAVA_HOME']             = '/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home' # Java 10 is problematic
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setCheckpointDir('checkpoint/') # https://stackoverflow.com/a/31484461

# explicit rating model

In [None]:
# !wget http://files.grouplens.org/datasets/movielens/ml-latest-small.zip
!curl -O http://files.grouplens.org/datasets/movielens/ml-latest-small.zip
!unzip ml-latest-small.zip

In [3]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pyspark.sql.types import *


schema = StructType([
    StructField('user'     , IntegerType()),
    StructField('movie'    , IntegerType()),
    StructField('rating'   , DoubleType() ),
    StructField('timestamp', LongType()   ),
])

ratings = spark.read.csv('ml-latest-small/ratings.csv', header=True, schema=schema).select(['user', 'movie', 'rating'])
ratings = ratings.limit(500)
ratings.show()

+----+-----+------+
|user|movie|rating|
+----+-----+------+
|   1|    1|   4.0|
|   1|    3|   4.0|
|   1|    6|   4.0|
|   1|   47|   5.0|
|   1|   50|   5.0|
|   1|   70|   3.0|
|   1|  101|   5.0|
|   1|  110|   4.0|
|   1|  151|   5.0|
|   1|  157|   5.0|
|   1|  163|   5.0|
|   1|  216|   5.0|
|   1|  223|   3.0|
|   1|  231|   5.0|
|   1|  235|   4.0|
|   1|  260|   5.0|
|   1|  296|   3.0|
|   1|  316|   3.0|
|   1|  333|   5.0|
|   1|  349|   4.0|
+----+-----+------+
only showing top 20 rows



In [4]:
train, test = ratings.randomSplit([0.8, 0.2])

# Params Grid Search
Cool way to do gridsearch is `TrainValidationSplit` and `ParamGridBuilder`  
but it is broken for many models including `ALS`  
it doesn't save all params to `bestModel`  
therefore I use my own grid search implementation

see [modeling - How to extract model hyper-parameters from spark.ml in PySpark? - Stack Overflow](https://stackoverflow.com/questions/36697304/how-to-extract-model-hyper-parameters-from-spark-ml-in-pyspark)

In [65]:
param_grid = {
    'rank'    : [12, 13, 14],
    'maxIter' : [18, 19, 20],
    'regParam': [0.17, 0.18],
}

In [52]:
param_grid.keys()

dict_keys(['rank', 'maxIter', 'regParam'])

In [51]:
for param in param_grid.keys():
    print(param)

rank
maxIter
regParam


In [41]:
param_grid.values()

dict_values([[12, 13, 14], [18, 19, 20], [0.17, 0.18]])

In [61]:
pks = ('rank', 'maxIter', 'regParam')
pvs = (12, 18, 0.17)

In [62]:
dict(zip(pks, pvs))

{'rank': 12, 'maxIter': 18, 'regParam': 0.17}

In [70]:
pf = {'pf1': 0, 'pf2': 5}




{'rank': 12, 'maxIter': 18, 'regParam': 0.17, 'pf1': 0, 'pf2': 5}
{'rank': 12, 'maxIter': 18, 'regParam': 0.18, 'pf1': 0, 'pf2': 5}
{'rank': 12, 'maxIter': 19, 'regParam': 0.17, 'pf1': 0, 'pf2': 5}
{'rank': 12, 'maxIter': 19, 'regParam': 0.18, 'pf1': 0, 'pf2': 5}
{'rank': 12, 'maxIter': 20, 'regParam': 0.17, 'pf1': 0, 'pf2': 5}
{'rank': 12, 'maxIter': 20, 'regParam': 0.18, 'pf1': 0, 'pf2': 5}
{'rank': 13, 'maxIter': 18, 'regParam': 0.17, 'pf1': 0, 'pf2': 5}
{'rank': 13, 'maxIter': 18, 'regParam': 0.18, 'pf1': 0, 'pf2': 5}
{'rank': 13, 'maxIter': 19, 'regParam': 0.17, 'pf1': 0, 'pf2': 5}
{'rank': 13, 'maxIter': 19, 'regParam': 0.18, 'pf1': 0, 'pf2': 5}
{'rank': 13, 'maxIter': 20, 'regParam': 0.17, 'pf1': 0, 'pf2': 5}
{'rank': 13, 'maxIter': 20, 'regParam': 0.18, 'pf1': 0, 'pf2': 5}
{'rank': 14, 'maxIter': 18, 'regParam': 0.17, 'pf1': 0, 'pf2': 5}
{'rank': 14, 'maxIter': 18, 'regParam': 0.18, 'pf1': 0, 'pf2': 5}
{'rank': 14, 'maxIter': 19, 'regParam': 0.17, 'pf1': 0, 'pf2': 5}
{'rank': 1

In [30]:
from itertools import product

In [31]:
product??

In [None]:
als = ALS(rank=12, maxIter=15, regParam=0.17, userCol='user', itemCol='movie', ratingCol='rating', coldStartStrategy='drop', nonnegative=True)




In [26]:
def m(**kwargs):
    print(kwargs)

In [27]:
pf = {'f': 0, 'g': 5}
m(z=1, r=4, **pf)

{'z': 1, 'r': 4, 'f': 0, 'g': 5}


In [22]:
for param, vals in params.items():
    print(param, vals)

ALS_4d25b8f746262162414f__rank [12, 13, 14]
ALS_4d25b8f746262162414f__maxIter [18, 19, 20]
ALS_4d25b8f746262162414f__regParam [0.17, 0.18, 0.19]


# todo error histogram

In [87]:
reduce(mul, map(len, param_grid.values()))

18

In [78]:
len(list(product(*param_grid.values())))

18

In [75]:
from itertools import product
from functools import reduce
from operator import mul

def grid_search(model_class, param_grid, param_fixed, evaluator, train, test):
    '''todo: multithreading'''
    n_combs = reduce(mul, map(len, param_grid.values()))
    best_score = float('inf')
    best_params = None
    for i, param_comb in enumerate(product(*param_grid.values())):
        params = dict(zip(param_grid.keys(), param_comb))
        print('params', params)
        model = model_class(**params, **param_fixed)
        predictions = model.fit(train).transform(test)
        score = evaluator.evaluate(predictions)
        print('score', score)
        if score < best_score:
            best_score = score
            best_params = params
            print('new best params found')
        print('progress:', i + 1, '/', n_combs)
    return best_params

In [99]:
param_fixed = {
    'userCol'          : 'user', 
    'itemCol'          : 'movie', 
    'ratingCol'        : 'rating', 
    'coldStartStrategy': 'drop', 
    'nonnegative'      : True,
}

param_grid = {
    'rank'    : [12, 13, 14],
    'maxIter' : [18, 19, 20],
    'regParam': [0.17, 0.18],
}

pgrid = [
    dict(zip(param_grid.keys(), param_comb))
    for param_comb in product(*param_grid.values())
]

In [None]:
from multiprocessing.dummy import Pool # dummy means threads, not real processes

def evaluate_params(model_class, params, param_fixed, evaluator, train, test):
    model = model_class(**params, **param_fixed)
    predictions = model.fit(train).transform(test)
    score = evaluator.evaluate(predictions)
    return (params, score)


n_combs = reduce(mul, map(len, param_grid.values()))

pool = Pool(processes=n_combs)
async_result = pool.map_async(evaluate_params, pgrid)

z = async_result.get()
z

In [None]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
best_params = grid_search(ALS, param_grid, param_fixed, evaluator, train, test)

In [90]:
best_params = {'rank': 14, 'maxIter': 20, 'regParam': 0.18}
als = ALS(**best_params, **param_fixed)

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:49824)
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 61] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:49824)

[learn all params](http://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#module-pyspark.ml.recommendation)

In [5]:
als = ALS(userCol='user', itemCol='movie', ratingCol='rating', coldStartStrategy='drop', nonnegative=True)

param_grid = ParamGridBuilder()                \
    .addGrid(als.rank    , [12, 13, 14])       \
    .addGrid(als.maxIter , [18, 19, 20])       \
    .addGrid(als.regParam, [0.17, 0.18, 0.19]) \
    .build()

evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')

tvs = TrainValidationSplit(
    estimator = als,
    estimatorParamMaps = param_grid,
    evaluator = evaluator
)

In [None]:
best_model = tvs.fit(train).bestModel

In [None]:
print('best_model.rank'    , best_model.rank    )
print('best_model.maxIter' , best_model.maxIter )
print('best_model.regParam', best_model.regParam)

In [None]:
# model = tvs.fit(train)
# best_model = model.bestModel

In [10]:
predictions = best_model.transform(test)
rmse = evaluator.evaluate(predictions)

In [None]:
print('rmse', rmse)
print('best_model', best_model)

In [None]:
predictions.show()

In [None]:
# display(predictions.sort('user', 'rating')) # long running

In [None]:
# best_model.recommendForAllUsers(10).show() # Returns top numItems items recommended for each user, for all users.

In [None]:
# TrainValidationSplit vs CrossValidator pyspark
# https://spark.apache.org/docs/2.2.0/ml-tuning.html

In [None]:
param_fixed = {
    'userCol'          : 'user', 
    'itemCol'          : 'movie', 
    'ratingCol'        : 'rating', 
    'coldStartStrategy': 'drop', 
    'nonnegative'      : True,
}

param_grid = {
    'rank'    : [12, 13, 14],
    'maxIter' : [18, 19, 20],
    'regParam': [0.17, 0.18],
}

pgrid = [
    dict(zip(param_grid.keys(), param_comb))
    for param_comb in product(*param_grid.values())
]

In [None]:
def evaluate_params(params, model_class, param_fixed, evaluator, train, test):
    model = model_class(**params, **param_fixed)
    predictions = model.fit(train).transform(test)
    score = evaluator.evaluate(predictions)
    return (params, score)

def evaluate_als(params):
    return evaluate_params(
        params      = params     ,
        model_class = ALS        ,
        param_fixed = param_fixed,
        evaluator   = evaluator  ,
        train       = train      ,
        test        = test       ,
    )

evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')

In [None]:
from multiprocessing.dummy import Pool # dummy means threads, not real processes

pool = Pool(processes=len(pgrid))
z = pool.map(evaluate_als, pgrid)

print(sorted(z, key = lambda x: x[1]))

In [105]:
for i in sorted(z, key = lambda x: x[1]):
    print(i)

({'rank': 13, 'maxIter': 20, 'regParam': 0.17}, 0.8566730359915185)
({'rank': 13, 'maxIter': 19, 'regParam': 0.17}, 0.8567577889920407)
({'rank': 13, 'maxIter': 18, 'regParam': 0.17}, 0.8568637417151008)
({'rank': 12, 'maxIter': 20, 'regParam': 0.17}, 0.8569176957675673)
({'rank': 12, 'maxIter': 19, 'regParam': 0.17}, 0.8569885172704061)
({'rank': 12, 'maxIter': 18, 'regParam': 0.17}, 0.8570723816543611)
({'rank': 14, 'maxIter': 20, 'regParam': 0.17}, 0.8583020230781222)
({'rank': 14, 'maxIter': 19, 'regParam': 0.17}, 0.8584015331269317)
({'rank': 13, 'maxIter': 20, 'regParam': 0.18}, 0.8585022356083378)
({'rank': 14, 'maxIter': 18, 'regParam': 0.17}, 0.8585182962869441)
({'rank': 13, 'maxIter': 19, 'regParam': 0.18}, 0.8585910701232548)
({'rank': 13, 'maxIter': 18, 'regParam': 0.18}, 0.8586977968580285)
({'rank': 12, 'maxIter': 20, 'regParam': 0.18}, 0.8587364482831222)
({'rank': 12, 'maxIter': 19, 'regParam': 0.18}, 0.8588040055491347)
({'rank': 12, 'maxIter': 18, 'regParam': 0.18}, 

In [113]:
param_grid = {
    'rank'    : range(4, 12),
    'maxIter' : range(2, 20, 2),
    'regParam': np.arange(0.1, 0.4, 0.1),
}

pgrid = [
    dict(zip(param_grid.keys(), param_comb))
    for param_comb in product(*param_grid.values())
]

# random search instead of grid search
from random import shuffle
shuffle(pgrid)
pgrid = pgrid[:100]

In [114]:
pgrid

[{'rank': 9, 'maxIter': 8, 'regParam': 0.2},
 {'rank': 7, 'maxIter': 14, 'regParam': 0.30000000000000004},
 {'rank': 5, 'maxIter': 8, 'regParam': 0.2},
 {'rank': 6, 'maxIter': 8, 'regParam': 0.2},
 {'rank': 6, 'maxIter': 16, 'regParam': 0.2},
 {'rank': 6, 'maxIter': 6, 'regParam': 0.4},
 {'rank': 6, 'maxIter': 18, 'regParam': 0.2},
 {'rank': 10, 'maxIter': 4, 'regParam': 0.30000000000000004},
 {'rank': 5, 'maxIter': 14, 'regParam': 0.1},
 {'rank': 11, 'maxIter': 14, 'regParam': 0.1},
 {'rank': 10, 'maxIter': 2, 'regParam': 0.30000000000000004},
 {'rank': 4, 'maxIter': 18, 'regParam': 0.2},
 {'rank': 10, 'maxIter': 2, 'regParam': 0.4},
 {'rank': 5, 'maxIter': 18, 'regParam': 0.4},
 {'rank': 11, 'maxIter': 14, 'regParam': 0.30000000000000004},
 {'rank': 7, 'maxIter': 16, 'regParam': 0.2},
 {'rank': 5, 'maxIter': 12, 'regParam': 0.1},
 {'rank': 5, 'maxIter': 4, 'regParam': 0.4},
 {'rank': 10, 'maxIter': 6, 'regParam': 0.4},
 {'rank': 9, 'maxIter': 4, 'regParam': 0.4},
 {'rank': 11, 'maxIt