# 1.准备工作
* 运行环境:
    * hadoop-3.0.3
    * spark-2.3.1
    * ubuntu18.04

## 1.1在Spark Standalone上启动Jupyter notebook
```
cd ~/eclipse-workspace/
```
```
PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS="notebook" 
MASTER=spark://ubuntu:7077 pyspark --num-executors 1 
--total-executor-cores 2 --executor-memory 512m 
```
## 1.2下载并解压数据
```
wget http://files.grouplens.org/datasets/movielens/ml-latest-small.zip
unzip ml-latest-small.zip 
rm ml-latest-small.zip
mv ml-latest-small FilmRecommend/data/
cd FilmRecommend/data
```

## 1.3查看数据

In [4]:
%ls ml-latest-small/

links.csv  movies.csv  ratings.csv  README.txt  tags.csv


In [20]:
# %cd ml-latest-small/
%cat links.csv | head -10
#links.csv 是电影的链接ID信息，对应不同网站里的不同ID

movieId,imdbId,tmdbId
1,0114709,862
2,0113497,8844
3,0113228,15602
4,0114885,31357
5,0113041,11862
6,0113277,949
7,0114319,11860
8,0112302,45325
9,0114576,9091
cat: 写入错误: 断开的管道


In [21]:
%cat movies.csv |head -10
# movies.csv 是电影信息文件，包含电影ID、电影名、类型

movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
8,Tom and Huck (1995),Adventure|Children
9,Sudden Death (1995),Action
cat: 写入错误: 断开的管道


In [22]:
%cat ratings.csv | head -10
# ratings.csv是电影评分数据，包含用户ID，电影ID，用户评分，时间戳

userId,movieId,rating,timestamp
1,31,2.5,1260759144
1,1029,3.0,1260759179
1,1061,3.0,1260759182
1,1129,2.0,1260759185
1,1172,4.0,1260759205
1,1263,2.0,1260759151
1,1287,2.0,1260759187
1,1293,2.0,1260759148
1,1339,3.5,1260759125
cat: 写入错误: 断开的管道


In [24]:
%cat tags.csv|head -10
#tags.csv是标签数据文件，包含用户ID，电影ID，电影标签，时间戳

userId,movieId,tag,timestamp
15,339,sandra 'boring' bullock,1138537770
15,1955,dentist,1193435061
15,7478,Cambodia,1170560997
15,32892,Russian,1170626366
15,34162,forgettable,1141391765
15,35957,short,1141391873
15,37729,dull story,1141391806
15,45950,powerpoint,1169616291
15,100365,activist,1425876220


# 2.基于Spark MLlib 的ALS算法推荐电影

## 2.1 读取数据
* 在hdfs上创建文件目录

    ```hadoop fs -mkdir -p /sparkproject/FilmRecommend/data```


* 将本地数据文件拷贝至hdfs

    ```hadoop fs -copyFromLocal ml-latest-small /sparkproject/FilmRecommend/data/```

In [2]:
#配置文件读取路径
global Path
if sc.master[0:5]=='local':
    Path='file:/home/hadoop/eclipse-workspace/FilmRecommend/data/'
else:
    Path='hdfs://ubuntu:9000/sparkproject/FilmRecommend/data/'
# sc.master

In [3]:
# 读取所需的电影评分数据文件(共100005-1条,含字段名)
ratData = sc.textFile(Path+'ml-latest-small/ratings.csv')
ratData.count() 

100005

In [4]:
ratData.take(2)

['userId,movieId,rating,timestamp', '1,31,2.5,1260759144']

## 2.2 导入模块

In [5]:
from pyspark.mllib.recommendation import Rating

## 2.3 把数据转换为ALS训练数据所要求的格式(userId,movieId,rating)

In [6]:
ratRDD = ratData.map(lambda line: line.split('\t')) \
                .map(lambda x:(x[0].split(',')))  \
                .map(lambda y:(y[0],y[1],y[2])) 
                #.toDF().toPandas().drop(0,axis=0)
header = ratRDD.first()
ratRDD = ratRDD.filter(lambda x:x!=header)

print('转换后的数据格式为：(userId,movieId,rating)')
ratRDD.take(5)

转换后的数据格式为：(userId,movieId,rating)


[('1', '31', '2.5'),
 ('1', '1029', '3.0'),
 ('1', '1061', '3.0'),
 ('1', '1129', '2.0'),
 ('1', '1172', '4.0')]

In [7]:
# 统计非重复用户和非重复电影
numUsers = ratRDD.map(lambda x:x[0]).distinct().count()
numMovies = ratRDD.map(lambda x:x[1]).distinct().count()
print('参与评价的总人数为：',numUsers,'\n被评价的总电影数为：',numMovies)

参与评价的总人数为： 671 
被评价的总电影数为： 9066


## 2.4 训练模型

In [8]:
from pyspark.mllib.recommendation import ALS
# ALS.train(ratings,rank,iterations=5,lambda_=0.01) #显式评分训练
# ALS.trainImplicit(ratings,rank,iterations=5,lambda_=0.01 #隐式评分训练
model = ALS.train(ratRDD,10,10,0.01)

<pyspark.mllib.recommendation.MatrixFactorizationModel at 0x7faf049f2d68>

## 2.5 进行推荐

### 2.5.1 针对用户推荐电影——基于人的协同过滤（向人推荐物）
```model.recommendProducts(userId,nProducts)``` 向userId推荐几项产品

In [14]:
model.recommendProducts(666,5)
# model.predict(666,4835) #查看用户对电影id4835的评分

[Rating(user=666, product=3684, rating=7.875212404462965),
 Rating(user=666, product=6223, rating=7.626978985374312),
 Rating(user=666, product=4835, rating=7.620037603031445),
 Rating(user=666, product=6618, rating=7.425246691947353),
 Rating(user=666, product=71033, rating=7.109216173420328)]

### 2.5.2 针对电影向用户推荐——基于物的协同过滤（将物推荐给人）

```model.recommendUsers(product,num)```将product推荐给n个users

In [18]:
model.recommendUsers(4835,5)

[Rating(user=207, product=4835, rating=11.582046907051225),
 Rating(user=259, product=4835, rating=10.94243427958982),
 Rating(user=308, product=4835, rating=10.204406952908482),
 Rating(user=375, product=4835, rating=10.177399472876523),
 Rating(user=32, product=4835, rating=10.064223699901323)]

### 2.5.3 显示推荐电影的名称

In [25]:
# 读取电影信息文件movies.csv
movData = sc.textFile(Path+'ml-latest-small/movies.csv')
print('共有%i部电影' % movData.count())
movData.take(5)

共有9126部电影


['movieId,title,genres',
 '1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy',
 '2,Jumanji (1995),Adventure|Children|Fantasy',
 '3,Grumpier Old Men (1995),Comedy|Romance',
 '4,Waiting to Exhale (1995),Comedy|Drama|Romance']

In [75]:
# 电影ID与名称映射为字典
movRDD = movData.map(lambda line:line.split('\t')) \
                .map(lambda x :x[0].split(','))  \
                .map(lambda y:(y[0],y[1]))
header = movRDD.first()
movRDD = movRDD.filter(lambda z: z!=header).collectAsMap()
# movRDD.take(5)

In [74]:
recommendP = model.recommendProducts(666,5)
for p in recommendP:
    print("向用户%s推荐: %s\n     推荐评分: %s" % (p[0],movRDD[str(p[1])],p[2]))

向用户666推荐: "Fabulous Baker Boys
     推荐评分: 7.875212404462965
向用户666推荐: Spun (2001)
     推荐评分: 7.626978985374312
向用户666推荐: Coal Miner's Daughter (1980)
     推荐评分: 7.620037603031445
向用户666推荐: Shaolin Soccer (Siu lam juk kau) (2001)
     推荐评分: 7.425246691947353
向用户666推荐: "Secret in Their Eyes
     推荐评分: 7.109216173420328
