In [2]:
# -*- coding: UTF-8 -*-
import sys
from pyspark import SparkConf, SparkContext
from pyspark.mllib.recommendation import  MatrixFactorizationModel
from pyspark.mllib.recommendation import ALS

# 初始化

sparkContext的初始化只需要一个sparkConf，sparkConf的实质是一个`键值对`.   
AppName = "Recommend_Test"   
Master = 'local[2]'   
'spark.driver.memory' = '8G'   

In [3]:
sparkConf = SparkConf().setAppName("Recommend_Test"
                    ).setMaster('local[2]'
                    ).set('spark.driver.memory', '8G')
sc = SparkContext(conf = sparkConf)
sc.master

'local[2]'

# 日志记录

**level** 是日志记录的优先级，分为 `OFF、 FATAL、 ERROR、 WARN、 INFO、 DEBUG、 ALL`或者您定义的级别。    
**Log4j** 建议只使用四个级别 ，优先级从高到低分别是 `ERROR 、 WARN、 INFO、 DEBUG`。   
通过在这里定义的级别，您可以控制到应用程序中相应级别的日志信息的开关。   
比如在这里定义了 INFO 级别，则应用程序中所有 DEBUG 级别的日志信息将不被打印出来。    
appenderName就是指 B 日志信息输出到哪个地方。您可以同时指定多个输出目的地。

参考链接[org.apache.log4j.Logger详解](https://blog.csdn.net/anlina_1984/article/details/5313023)

In [5]:
logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )
logger.LogManager.getRootLogger().setLevel(logger.Level.ERROR)

# 读取文件 

本地读取文件:   
> textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/writeback.txt")

读取hdfs中文件:   
> textFile = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")   
> textFile = sc.textFile("/user/hadoop/word.txt")   
> textFile = sc.textFile("word.txt")


In [8]:
path = 'file:///mnt/f/AI/Book/other/spark_tmp/PythonProject/data/'
data = sc.textFile(path+'u.data')
data.cache()

file:///mnt/f/AI/Book/other/spark_tmp/PythonProject/data/u.data MapPartitionsRDD[6] at textFile at NativeMethodAccessorImpl.java:0

user id \t movie id \t rating \t time

In [9]:
# 数据条数
data.count()

100000

In [10]:
# 设定分隔符
data_ratings = data.map(lambda line:line.split('\t')[:3])
# 查看前5项
data_ratings.take(5)

[['196', '242', '3'],
 ['186', '302', '3'],
 ['22', '377', '1'],
 ['244', '51', '2'],
 ['166', '346', '1']]

In [13]:
# 查看所有的user数目
data_users = data_ratings.map(lambda x:x[0])
data_users.distinct().count()

943

In [14]:
# 查看所有的movie数目
data_movies = data_ratings.map(lambda x:x[1])
data_movies.distinct().count()

1682

In [19]:
# 查看所有的打分级别
data_rating = data_ratings.map(lambda x:x[2])
set(data_rating.collect())

{'1', '2', '3', '4', '5'}

In [15]:
# 生成模型, 并训练
model = ALS.train(data_ratings, 10, 10, 0.1)
model

<pyspark.mllib.recommendation.MatrixFactorizationModel at 0x7fa16c575b00>

In [16]:
# 输入user_id=100, 召回最高的评分的电影, 选出前10个 
model.recommendProducts(100, 10)

[Rating(user=100, product=316, rating=4.198600574309651),
 Rating(user=100, product=1192, rating=4.139422215166272),
 Rating(user=100, product=313, rating=4.11902819052346),
 Rating(user=100, product=1449, rating=4.074026850819229),
 Rating(user=100, product=315, rating=4.050482099792885),
 Rating(user=100, product=64, rating=4.033476621491211),
 Rating(user=100, product=318, rating=4.027254031922888),
 Rating(user=100, product=22, rating=4.006394939883782),
 Rating(user=100, product=272, rating=3.993697155368104),
 Rating(user=100, product=1431, rating=3.9718886340621977)]

In [17]:
# 输入movie_id=100, 召回最高的评分的用户, 选出前10个 
model.recommendUsers(100, 10)

[Rating(user=34, product=100, rating=5.38160763686137),
 Rating(user=173, product=100, rating=5.2110349791033315),
 Rating(user=928, product=100, rating=5.176931436413363),
 Rating(user=4, product=100, rating=5.091305021106637),
 Rating(user=252, product=100, rating=4.997433439107866),
 Rating(user=808, product=100, rating=4.986576493925978),
 Rating(user=628, product=100, rating=4.986243673168811),
 Rating(user=810, product=100, rating=4.928138455669418),
 Rating(user=686, product=100, rating=4.915622934709694),
 Rating(user=819, product=100, rating=4.911784602788444)]

In [20]:
# 输入user_id, movie_id, 预测打分
model.predict(100, 313)

4.11902819052346

In [21]:
# 读入item文件
item = sc.textFile(path+'u.item')
# 缓存到内存中
item.cache()

file:///mnt/f/AI/Book/other/spark_tmp/PythonProject/data/u.item MapPartitionsRDD[241] at textFile at NativeMethodAccessorImpl.java:0

In [22]:
# 查看首列
item.first()

'1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0'

In [27]:
# 生成 ( movie_id, movie_name )键值对
movie_id = item.map(lambda x:x.split('|')[:2]
                   ).map(lambda x:(int(x[0]), x[1])
                        ).collectAsMap()
movie_id[5]

'Copycat (1995)'

In [36]:
# 针对inputMovieID, 召回最高的评分的用户, 选出前10个 
inputMovieID = 100
item_list = model.recommendUsers(100, 10)
print(type(item_list[0]))
print("针对电影 id:{0}\t 电影名:{1}\t 推荐下列用户id:".format( 
        inputMovieID,movie_id[inputMovieID]))
for rmd in item_list:
    print("针对用户id:{0}\t 推荐电影:{1}\t 推荐评分:{2}".format(
            rmd[0],movie_id[rmd[1]],rmd[2]))

<class 'pyspark.mllib.recommendation.Rating'>
针对电影 id:100	 电影名:Fargo (1996)	 推荐下列用户id:
针对用户id:34	 推荐电影:Fargo (1996)	 推荐评分:5.38160763686137
针对用户id:173	 推荐电影:Fargo (1996)	 推荐评分:5.2110349791033315
针对用户id:928	 推荐电影:Fargo (1996)	 推荐评分:5.176931436413363
针对用户id:4	 推荐电影:Fargo (1996)	 推荐评分:5.091305021106637
针对用户id:252	 推荐电影:Fargo (1996)	 推荐评分:4.997433439107866
针对用户id:808	 推荐电影:Fargo (1996)	 推荐评分:4.986576493925978
针对用户id:628	 推荐电影:Fargo (1996)	 推荐评分:4.986243673168811
针对用户id:810	 推荐电影:Fargo (1996)	 推荐评分:4.928138455669418
针对用户id:686	 推荐电影:Fargo (1996)	 推荐评分:4.915622934709694
针对用户id:819	 推荐电影:Fargo (1996)	 推荐评分:4.911784602788444


In [37]:
# 保存模型到本地, 
# 若有相同文件夹名, 则无法保存, 记得事先清除
model.save(sc,path+'model66666')

In [38]:
# 导入保存好的模型
model_s = MatrixFactorizationModel.load(sc, path+"model66666")

In [40]:
# 进行预测
model_s.predict(100, 55)

3.1278692847762226

In [41]:
# 结束sc
sc.stop()