基于Spark内置的ALS算法构建推荐模型，进行实时推荐demo

In [1]:
from pyspark import SparkContext, SparkConf # 导入相关工具包
from pyspark.mllib.recommendation import ALS
from math import sqrt
from operator import add
import itertools

# 下面我们先来看系统整体的构建运行脉络

# 方法模板
def initSpark(): pass  # 初试化Spark上下文
def parseData(sparkContext, ratingsFile, moviesFile): return (None, None, None, None)  # 解析预处理数据
def trainModel(trainingData, validationData, testData, iterations=5, lambda_=0.01, blocks=-1): pass  # 训练得到最佳模型
def predict(model, moviesData, rating, user_id): pass  # 根据模型和测试数据给用户进行推荐


# 模型构建运行脉络
sc = initSpark()  # 初试化Spark上下文
trainingData, validationData, testData, moviesData = parseData(sc, "file:///root/notebook/data/ratings.dat",
                                                               "file:///root/notebook/data/movies.dat")  # 解析预处理数据
model = trainModel(trainingData, validationData, testData)  # 得到最佳模型
predict(model, moviesData, testData, 1)  # 根据模型和测试数据给用户进行推荐
# sc.stop()  # 终止回收Spark上下文

In [2]:
# 初始化Spark上下文
# local为本地调试模式，具体集群方式参照http://spark.apache.org/docs/latest/cluster-overview.html
def initSpark():
    conf = SparkConf().setAppName("CF").setMaster("local")
    sc = SparkContext(conf=conf)
    print ("init complete : sc = ", sc)
    return sc

In [3]:
def parseData(sparkContext, ratingsFile, moviesFile):
    # 导入数据，数据格式为：user_id::movies_id::rating::time
    ratings = sparkContext.textFile(ratingsFile).map(lambda line: line.strip().split("::"))
    print("data.count() = %d" % ratings.count())
    # 对应的电影文件的格式为movieId::movieTitle
    movies = sparkContext.textFile(moviesFile).map(lambda line: line.strip().split("::"))

    # 数据预处理，根据评论时间戳最后一位把整个数据集分成训练集(60%), 交叉验证集(20%), 和评估集(20%)
    ratingsData = ratings.map(lambda fields: (long(fields[3]) % 10, (int(fields[0]), int(fields[1]), float(fields[2]))))
    trainingData = ratingsData.filter(lambda x: x[0] < 6).values()
    validationData = ratingsData.filter(lambda x: x[0] >= 6 and x[0] < 8).values()
    testData = ratingsData.filter(lambda x: x[0] >= 8).values()

    print("training.count()=%d,validation.count()=%d,test.count()=%d" % (
        trainingData.count(), validationData.count(), testData.count()))

    moviesData = movies.map(lambda fields: (int(fields[0]), fields[1]))
    return trainingData, validationData, testData, moviesData


# 模型参数介绍
在构建训练模型时，我们需要用到以下参数：
- numBlocks is the number of blocks used to parallelize computation (set to -1 to auto-configure).  
- rank is the number of latent factors in the model.  
- iterations is the number of iterations to run.  
- lambda specifies the regularization parameter in ALS.  
- implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data.  
- alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations.

In [4]:
# 训练模型，注意，为了更好的调整参数，每个参数都使用了两个值最为备选值，通过
# 使模型在用于调参的数据上的误差最小选取参数，这个可以参数表可以自己设置。
# train的参数有lambda_是正则化项，blocks表示分区数，设置为-1为使用默认配置
# iterations是迭代次数，rank是每一个user或movies的隐含因素的维数。注意，
# rank过大或lambda过小都可能导致过拟合，可能导致预测结果偏小
def trainModel(trainingData, validationData, testData, iterations=5, lambda_=0.01, blocks=-1):
    ranks = [8, 12]
    lambdas = [1.0, 10.0]
    numIters = [10, 20]
    bestModel = None
    bestValidationRmse = float("inf")
    bestRank = 0
    bestLambda = -1.0
    bestNumIter = -1

    # 计算model在data数据集上的均方误差(Mean Squared Error)
    def computeRmse(model, data):
        newData = data.map(lambda r: (r[0], r[1]))  # 规整校验数据成（用户,电影），用模型进行评分预测
        predictions = model.predictAll(newData).map(lambda r: ((r[0], r[1]), r[2]))  # 预测后返回(用户，电影，评分预测对)
        ratesAndPreds = data.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(
            predictions)  # 根据(用户，电影)配对连接数据，对应每队(用户，电影)有（评分，预测评分项）
        return ratesAndPreds.map(lambda r: (r[1][0] - r[1][1]) ** 2).mean()  # 计算评分和预测评分的均方误差

    for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters):  # 两两组合，形成2*2*2=8组参数循环
        model = ALS.train(trainingData, rank, numIter, lmbda, blocks)  # 通过train方法和参数建立ALS训练模型,并通过训练集进行训练
        validationRmse = computeRmse(model, validationData)  # 通过校验集，计算训练好的模型进行预测的均方误差
        print "RMSE (validation) = %f for the model trained with " % validationRmse + \
              "rank = %d, lambda = %.1f, and numIter = %d." % (rank, lmbda, numIter)
        if (validationRmse < bestValidationRmse):  # 通过比较校验集历史均方误差，选出最好的模型
            bestModel = model
            bestValidationRmse = validationRmse
            bestRank = rank
            bestLambda = lmbda
            bestNumIter = numIter

    testRmse = computeRmse(bestModel, testData)  # 在得到的最好模型，对测试集进行测试

    print "The best model was trained with rank = %d and lambda = %.1f, " % (bestRank, bestLambda) \
          + "and numIter = %d, and its RMSE on the test set is %f." % (bestNumIter, testRmse)
    return bestModel


In [5]:
# 预测。注意使用ALS算法时预测的user_id和movies都必须在训练集中。
def predict(model, moviesData, rating, user_id):
    rawMoviesData = dict(moviesData.collect())
    myRateMovieIdsRDD = rating.filter(lambda x: int(x[0]) == user_id). \
        map(lambda x: x[1]).collect()  # 从测试数据中过滤得到用户数据，再取出该用户的所有电影id
    myRateMovieIds = set(myRateMovieIdsRDD)  # 进行去重
    candidates = sc.parallelize(
        [m for m in rawMoviesData if m not in myRateMovieIds])  # 从最原始的电影数据中过滤用户评过分的电影，得到所有用户未评分的候选电影集，再用于推荐
    predictions = model.predictAll(
        candidates.map(lambda x: (user_id, x))).collect()  # 建立（userId,movieId）对，通过模型进行预测，得到预测评分。
    recommendations = sorted(predictions, key=lambda x: x[2], reverse=True)[:50]  # 对评分进行排序，对前50项进行推荐
    print "Movies recommended for you:"
    for i in xrange(len(recommendations)):  # 输出推荐结果
        print ("%2d: %s" % (i + 1, rawMoviesData[recommendations[i][1]])).encode('ascii', 'ignore')


In [6]:
# 算法运行主逻辑
if __name__ == '__main__':
    sc = initSpark()  # 初试化Spark上下文
    trainingData, validationData, testData, moviesData = parseData(sc, "file:///root/notebook/data/ratings.dat",
                                                                   "file:///root/notebook/data/movies.dat")  # 解析预处理数据
    model = trainModel(trainingData, validationData, testData)  # 得到最佳模型
    predict(model, moviesData, testData, 1)  # 根据模型和测试数据给用户进行推荐
    sc.stop()  # 终止回收Spark上下文

('init complete\xef\xbc\x9asc = ', <SparkContext master=local appName=CF>)
data.count() = 10000
training.count()=5932,validation.count()=2001,test.count()=2067
RMSE (validation) = 1.829438 for the model trained with rank = 8, lambda = 1.0, and numIter = 10.
RMSE (validation) = 1.828488 for the model trained with rank = 8, lambda = 1.0, and numIter = 20.
RMSE (validation) = 14.678594 for the model trained with rank = 8, lambda = 10.0, and numIter = 10.
RMSE (validation) = 14.678594 for the model trained with rank = 8, lambda = 10.0, and numIter = 20.
RMSE (validation) = 1.830058 for the model trained with rank = 12, lambda = 1.0, and numIter = 10.
RMSE (validation) = 1.828441 for the model trained with rank = 12, lambda = 1.0, and numIter = 20.
RMSE (validation) = 14.678594 for the model trained with rank = 12, lambda = 10.0, and numIter = 10.
RMSE (validation) = 14.678594 for the model trained with rank = 12, lambda = 10.0, and numIter = 20.
The best model was trained with rank = 12 an

In [None]:
# 如果需要存储、读取数据，可运行下面的模型

def persistModel(model, sparkContext, modelPath):
    model.save(sparkContext, modelPath)


def loadModel(sparkContext, modelPath):
    from pyspark.mllib.recommendation import MatrixFactorizationModel
    return MatrixFactorizationModel.load(sparkContext, modelPath)


if __name__ == '__main__':
    import os

    rawPath = "/root/notebook/data/als_model"
    modelPath = "file://%s" % rawPath
    sc = initSpark()  # 初试化Spark上下文
    if os.path.exists(rawPath):
        model = loadModel(sc, modelPath)
        print("load model %s from %s" % (model, rawPath))
    else:
        trainingData, validationData, testData, moviesData = parseData(sc, "file:///root/notebook/data/ratings.dat",
                                                                       "file:///root/notebook/data/movies.dat")  # 解析预处理数据
        model = trainModel(trainingData, validationData, testData)  # 得到最佳模型
    predict(model, moviesData, testData, 1)  # 根据模型和测试数据给用户进行推荐
    if not os.path.exists(rawPath):
        persistModel(model, sc, modelPath)  # 保存模型，方便下次直接调用
        print("save model %s to %s" % (model, rawPath))
    sc.stop()  # 终止回收Spark上下文
