In [1]:
from pyspark import SparkConf,SparkContext
from pyspark import rdd
conf=SparkConf().setMaster("local[*]").setAppName("First_APP")
sc=SparkContext(conf=conf)

In [2]:
movies = sc.textFile('E:/machine_data/spark_test_data/ml-100k/u.item')
movies.take(1)

['1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0']

-----------------------
1.1 提取电影的题材标签
-----------------------

In [3]:
genres = sc.textFile('E:/machine_data/spark_test_data/ml-100k/u.genre')
print (genres.take(5))

for line in genres.take(5):
    print (line)

['unknown|0', 'Action|1', 'Adventure|2', 'Animation|3', "Children's|4"]
unknown|0
Action|1
Adventure|2
Animation|3
Children's|4


In [4]:
#为电影题材编码
genre_map = genres.filter(lambda x: len(x) > 0).map(lambda line : line.split('|')).map(lambda x:(x[1],x[0])).collectAsMap()
print ('构造出的电影题材的编码字典:',genre_map)

构造出的电影题材的编码字典: {'5': 'Comedy', '3': 'Animation', '16': 'Thriller', '1': 'Action', '2': 'Adventure', '11': 'Horror', '13': 'Mystery', '6': 'Crime', '9': 'Fantasy', '15': 'Sci-Fi', '4': "Children's", '17': 'War', '0': 'unknown', '10': 'Film-Noir', '18': 'Western', '12': 'Musical', '7': 'Documentary', '8': 'Drama', '14': 'Romance'}


In [5]:
movies=sc.textFile('E:/machine_data/spark_test_data/ml-100k/u.item')
print ('电影数据集的第一条数据:',movies.first())

#查看电影的标题
movies_title  = movies.map(lambda x: x.split('|')).map(lambda x: x[1])
print ('电影标题:',movies_title.take(5))

#查看电影的题材, 0表示不属于该题材, 1表示属于该题材
movies_genre = movies.map(lambda x: x.split('|')).map(lambda x: x[5:])
print ('电影的题材:')
print (movies_genre.take(5))

电影数据集的第一条数据: 1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0
电影标题: ['Toy Story (1995)', 'GoldenEye (1995)', 'Four Rooms (1995)', 'Get Shorty (1995)', 'Copycat (1995)']
电影的题材:
[['0', '0', '0', '1', '1', '1', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0'], ['0', '1', '1', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '1', '0', '0'], ['0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '1', '0', '0'], ['0', '1', '0', '0', '0', '1', '0', '0', '1', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0'], ['0', '0', '0', '0', '0', '0', '1', '0', '1', '0', '0', '0', '0', '0', '0', '0', '1', '0', '0']]


In [6]:
#根据电影的题材编码字典genre_map，从上述结果可以知道，第一部电影属于Animation，Children's，Comedy题材.
#下面看一看，各部电影各自属于哪种类型
def func(rdd):
    genres = rdd[5:]     #提取题材特征
    genres_assigned = zip(genres, range(len(genres)))
    index = []           #存储题材特征数值为1的特征索引号
    for genre,idx in genres_assigned:
        if genre=='1':
            index.append(idx)
    index_val = [genre_map[str(i)] for i in index]   #根据编码字典找出索引的相应题材名
    index_val_str = ','.join(index_val)
    return (int(rdd[0]),rdd[1]+','+index_val_str)
titles_and_genres = movies.map(lambda x: x.split('|')).map(lambda x:func(x))
print ('前5部电影的标题和相应的题材类型:',titles_and_genres.take(5))

前5部电影的标题和相应的题材类型: [(1, "Toy Story (1995),Animation,Children's,Comedy"), (2, 'GoldenEye (1995),Action,Adventure,Thriller'), (3, 'Four Rooms (1995),Thriller'), (4, 'Get Shorty (1995),Action,Comedy,Drama'), (5, 'Copycat (1995),Crime,Drama,Thriller')]


--------------------
1.2 训练推荐模型
--------------------

In [7]:
from pyspark.mllib.recommendation import ALS
from pyspark.mllib.recommendation import Rating

raw_data = sc.textFile("E:/machine_data/spark_test_data/ml-100k/u.data")
#数据集u.data中四个字段分别表示用户ID, 电影ID, 评分, 时间戳
print ('raw data sample:', raw_data.map(lambda x : x.split('\t')).take(3))

raw_ratings = raw_data.map(lambda x:x.split('\t')[:3])
ratings = raw_ratings.map(lambda x: Rating(x[0], x[1], x[2]))
ratings.cache()
print ('rating data sample:',ratings.take(3))

#训练推荐模型
als_model = ALS.train(ratings,50,5,0.1)

raw data sample: [['196', '242', '3', '881250949'], ['186', '302', '3', '891717742'], ['22', '377', '1', '878887116']]
rating data sample: [Rating(user=196, product=242, rating=3.0), Rating(user=186, product=302, rating=3.0), Rating(user=22, product=377, rating=1.0)]


In [10]:
from pyspark.mllib.linalg import Vectors

print ('productFeatures的第一条数据:',als_model.productFeatures().take(1))

movie_factors = als_model.productFeatures().map(lambda kv: (kv[0],Vectors.dense(kv[1])))
print ('movie_factors的第一条数据:',movie_factors.first())
movie_vectors = movie_factors.map(lambda kv:kv[1])

user_factors = als_model.userFeatures().map(lambda kv:(kv[0],Vectors.dense(kv[1])))
print ('user_factors的第一条数据:',user_factors.first())
user_vectors = user_factors.map(lambda kv:kv[1])

productFeatures的第一条数据: [(4, array('d', [0.14387322962284088, 0.14869925379753113, -0.2368069589138031, -0.014290721155703068, -0.12111742049455643, -0.09147009998559952, -0.09890703856945038, -0.160025492310524, 0.5735045075416565, -0.13811266422271729, -0.05158152058720589, -0.2939601242542267, -0.22409386932849884, 0.09925245493650436, -0.08321025967597961, 0.053931429982185364, 0.10029783099889755, 0.20180019736289978, -0.21017244458198547, 0.18643592298030853, -0.3217353820800781, 0.18122124671936035, -0.2510859966278076, -0.15965671837329865, -0.2223781943321228, 0.3551555275917053, -0.34012436866760254, -0.14012393355369568, 0.1685008704662323, -0.42351454496383667, 0.14878679811954498, 0.3108901381492615, -0.23603136837482452, -0.006249609403312206, 0.0655541718006134, 0.13887828588485718, -0.17523664236068726, 0.3279019892215729, 0.5693528652191162, 0.062116168439388275, 0.11561039090156555, -0.22767698764801025, -0.2499641329050064, -0.06678327918052673, 0.7092862725257874, -0

In [11]:
#归一化
from pyspark.mllib.linalg.distributed import RowMatrix

moive_matrix = RowMatrix(movie_vectors)
user_matrix = RowMatrix(user_vectors)


from pyspark.mllib.stat import MultivariateStatisticalSummary
desc_moive_matrix = MultivariateStatisticalSummary(moive_matrix.rows)
desc_user_matrix = MultivariateStatisticalSummary(user_matrix.rows)
print ('Movie factors mean:',desc_moive_matrix.mean())
print ('Movie factors variance:',desc_user_matrix.mean())
print ('User factors mean:',desc_moive_matrix.variance())
print ('User factors variance:',desc_user_matrix.variance())


Movie factors mean: [ 0.01810578  0.01361871 -0.26275148 -0.02820166 -0.08286296 -0.02365649
 -0.05980769 -0.05652419  0.37480141 -0.10572021  0.00669056 -0.10592778
 -0.09410807  0.06773259  0.09730877  0.1649335   0.0750938   0.27019998
 -0.11571402  0.13084132 -0.18573318  0.08539546 -0.21946581 -0.10670612
 -0.25077449  0.31152667 -0.29765428 -0.14251722 -0.07079642 -0.47092918
  0.16858506  0.12324861 -0.21732101 -0.05691996 -0.06235325  0.1645198
 -0.17009505  0.27837901  0.25721334 -0.06387119  0.10052837 -0.15240843
 -0.14915848  0.06086447  0.2678296  -0.14952754 -0.18920296 -0.16620377
  0.05229777 -0.3870449 ]
Movie factors variance: [ 0.05714024 -0.00647375 -0.45128601 -0.02788648 -0.17772425 -0.07894654
 -0.13689418 -0.13390418  0.59879596 -0.23217939  0.0178455  -0.13520601
 -0.18257522  0.15892019  0.10053736  0.2662292   0.21823402  0.40788709
 -0.20914878  0.20693222 -0.30303271  0.16251006 -0.41000196 -0.17527211
 -0.47297237  0.52718542 -0.44955608 -0.23641036 -0.115

In [20]:
print (moive_matrix.rows.first())

[0.143873229623,0.148699253798,-0.236806958914,-0.0142907211557,-0.121117420495,-0.0914700999856,-0.0989070385695,-0.160025492311,0.573504507542,-0.138112664223,-0.0515815205872,-0.293960124254,-0.224093869328,0.0992524549365,-0.083210259676,0.0539314299822,0.100297830999,0.201800197363,-0.210172444582,0.18643592298,-0.32173538208,0.181221246719,-0.251085996628,-0.159656718373,-0.222378194332,0.355155527592,-0.340124368668,-0.140123933554,0.168500870466,-0.423514544964,0.14878679812,0.310890138149,-0.236031368375,-0.00624960940331,0.0655541718006,0.138878285885,-0.175236642361,0.327901989222,0.569352865219,0.0621161684394,0.115610390902,-0.227676987648,-0.249964132905,-0.0667832791805,0.709286272526,-0.353281587362,-0.0523515082896,-0.123303487897,-0.00920728128403,-0.154596164823]


----------------------
2 训练聚类模型
----------------------

In [21]:
from pyspark.mllib.clustering import KMeans
num_clusters = 5
num_iterations = 20
num_runs =3
movie_cluster_model = KMeans.train(movie_vectors,num_clusters, num_iterations, num_runs)
movie_cluster_model_coverged = KMeans.train(movie_vectors,num_clusters,100)
user_cluster_model = KMeans.train(user_vectors,num_clusters,num_iterations, num_runs)
predictions = movie_cluster_model.predict(movie_vectors)
print ('对前十个样本的预测标签为:'+",".join([str(i) for i in predictions.take(10)]))



对前十个样本的预测标签为:3,3,3,2,4,2,3,2,1,0


In [22]:
print ('movie_factors的第一条数据:',movie_factors.first())
print ('========================')
print ('titles_and_genres的第一条数据:',titles_and_genres.first())

titles_factors = titles_and_genres.join(movie_factors)
print ('========================')
print ('titles_factors的第一条数据:',titles_factors.first())

movie_factors的第一条数据: (4, DenseVector([0.1439, 0.1487, -0.2368, -0.0143, -0.1211, -0.0915, -0.0989, -0.16, 0.5735, -0.1381, -0.0516, -0.294, -0.2241, 0.0993, -0.0832, 0.0539, 0.1003, 0.2018, -0.2102, 0.1864, -0.3217, 0.1812, -0.2511, -0.1597, -0.2224, 0.3552, -0.3401, -0.1401, 0.1685, -0.4235, 0.1488, 0.3109, -0.236, -0.0062, 0.0656, 0.1389, -0.1752, 0.3279, 0.5694, 0.0621, 0.1156, -0.2277, -0.25, -0.0668, 0.7093, -0.3533, -0.0524, -0.1233, -0.0092, -0.1546]))
titles_and_genres的第一条数据: (1, "Toy Story (1995),Animation,Children's,Comedy")
titles_factors的第一条数据: (1536, ('Aiqing wansui (1994),Drama', DenseVector([0.155, -0.0353, -0.2588, 0.0703, -0.0347, 0.182, 0.2314, 0.1831, 0.5017, -0.0612, -0.095, 0.0282, 0.0526, 0.1227, 0.2289, -0.0858, -0.4768, 0.2053, 0.1029, 0.164, -0.4535, 0.2508, -0.2892, 0.2077, -0.3039, 0.5213, -0.4001, -0.179, 0.0578, -0.3276, 0.0444, -0.0119, -0.3677, -0.0436, 0.0081, 0.093, -0.5312, 0.2834, 0.5837, 0.0817, 0.0533, 0.0906, -0.3159, -0.1285, 0.7609, -0.1049, -0.3

In [23]:
#对每个电影计算其特征向量与类簇中心向量的距离
def func2(rdd):
    id,(name_genres,vec) = rdd
    pred = movie_cluster_model.predict(vec)
    cluster_center = movie_cluster_model.clusterCenters[pred]
    cluster_center_vec = Vectors.dense(cluster_center)
    dist = vec.squared_distance(cluster_center_vec)
    return u'电影' + str(id) + u'的题材类型是' + name_genres + ',' + u'聚类模型预测的标签是' + str(pred)+ ',' + \
           u'与聚类所属类别中心的距离是' + str(dist)

movies_assigned = titles_factors.map(lambda x:func2(x))  
for i in movies_assigned.take(5):
    print (i)

电影1536的题材类型是Aiqing wansui (1994),Drama,聚类模型预测的标签是2,与聚类所属类别中心的距离是0.918217802304
电影1026的题材类型是Lay of the Land, The (1997),Comedy,Drama,聚类模型预测的标签是1,与聚类所属类别中心的距离是1.41027486856
电影516的题材类型是Local Hero (1983),Comedy,聚类模型预测的标签是2,与聚类所属类别中心的距离是1.15395962633
电影6的题材类型是Shanghai Triad (Yao a yao yao dao waipo qiao) (1995),Drama,聚类模型预测的标签是4,与聚类所属类别中心的距离是2.33645184929
电影1032的题材类型是Little Big League (1994),Children's,Comedy,聚类模型预测的标签是0,与聚类所属类别中心的距离是1.10033642729


--------------------------
3 评估聚类模型的性能
--------------------------

3.1 内部评价指标

 通用的内部评价指标包括WCSS（我们之前提过的K-元件的目标函数）、Davies-Bouldin指数、Dunn指数和轮廓系数（silhouette coefficient）。所有这些度量指标都是使类簇内部的样本距离尽可能接近，不同类簇的样本相对较远。





3.2 外部评价指标

 因为聚类被认为是无监督分类，如果有一些带标注的数据，便可以用这些标签来评估聚类模型。可以使用聚类模型预测类簇（类标签），使用分类模型中类似的方法评估预测值和真实标签的误差（即真假阳性率和真假阴性率）。
 具体方法包括Rand measure、F-measure、雅卡尔系数（Jaccard index）等。


In [24]:
movie_cost = movie_cluster_model.computeCost(movie_vectors)
user_cost = user_cluster_model.computeCost(user_vectors)
print ("WCSS for movies: %f"%movie_cost)
print ("WCSS for users: %f"%user_cost)

WCSS for movies: 2173.614888
WCSS for users: 1492.128578


----------------------
4 聚类模型参数调优
----------------------

不同于以往的模型，K-均值模型只有一个可以调的参数，就是K，即类中心数目。通过交叉验证选择K
 类似分类和回归模型，我们可以应用交叉验证来选择模型最优的类中心数目。这和监督学习的过程一样。需要将数据集分割为训练集和测试集，然后在训练集上训练模型，在测试集上评估感兴趣的指标的性能。如下代码用60/40划分得到训练集和测试集，并使用MLlib内置的WCSS类方法评估聚类模型的性能：

In [26]:
train_test_split_movies = movie_vectors.randomSplit([0.6,0.4],123)
train_movies = train_test_split_movies[0]
test_movies = train_test_split_movies[1]
for k in [2,3,4,5,10,20,30]:
    k_model = KMeans.train(train_movies, num_iterations, k, num_runs)
    cost = k_model.computeCost(test_movies)
    print ('WCSS for k=%d : %f'%(k,cost))



WCSS for k=2 : 782.136925
WCSS for k=3 : 773.803139
WCSS for k=4 : 774.105182
WCSS for k=5 : 769.060786
WCSS for k=10 : 768.085290
WCSS for k=20 : 767.425721
WCSS for k=30 : 766.717561



 从结果可以看出，随着类中心数目增加，WCSS值会出现下降，然后又开始增大。另外一个现象，K-均值在交叉验证的情况，WCSS随着K的增大持续减小，但是达到某个值后，下降的速率突然会变得很平缓。这时的K通常为最优的K值（这称为拐点）。根据预测结果，我们选择最优的K=10??。需要说明是，模型计算的类簇需要人工解释（比如前面提到的电影或者顾客聚类的例子），并且会影响K的选择。尽管较大的K值从数学的角度可以得到更优的解，但是类簇太多就会变得难以理解和解释。为了实验的完整性，我们还计算了用户聚类在交叉验证下的性能：

In [27]:
train_test_split_movies = user_vectors.randomSplit([0.6,0.4],123)
train_users = train_test_split_movies[0]
test_users = train_test_split_movies[1]
for k in [2,3,4,5,10,20]:
    k_model = KMeans.train(train_users,num_iterations,k,num_runs)
    cost = k_model.computeCost(test_users)
    print ('WCSS for k=%d : %f'%(k,cost))



WCSS for k=2 : 574.152900
WCSS for k=3 : 574.217316
WCSS for k=4 : 569.361084
WCSS for k=5 : 570.712942
WCSS for k=10 : 573.570436
WCSS for k=20 : 578.410961
