In [None]:
#如果数据量很大，需要用spark分布式平台来完成协同过滤的训练

## User-based协同过滤

In [None]:
#-*- coding:utf8 -*-
#pySpark实现的基于用户的协同过滤
#使用的是余弦相似度

import sys
from collections import defaultdict
from itertools import combinations
import random
import numpy as np
import pdb

from pyspark import SparkContext

def parseVectorOnUser(line):
    """
    解析数据，Key是user，后面是item和rate
    """
    line = line.split("|")
    return line[0],(line[1],float(line[2]))

def parseVectorOnItem(line): #line : (user,item,rate)
    """
    解析数据，key是item，后面是user和打分
    """
    line = line.split("|")
    return line[1],(line[0],float(line[2]))

def sampleInteractions(item_id,users_with_rating,n):
    """
    如果某个商品上用户行为特别多，可以选择适当做点下采样
    """
    if len(users_with_rating) > n:
        return item_id,random.sample(users_with_rating,n)
    else:
        return item_id,users_with_rating
    
def findUserPairs(item_id,users_with_rating):
    """
    对每个Item,找到共同打分的user对
    """
    for user1,user2 in combinations(users_with_rating,2):
        return (user1[0],user2[0]),(user1[1],user2[1])
    
def calcSim(user_pair,rating_pairs):
    """
    对每个user对，根据打分计算余弦距离，并返回共同打分的item个数
    """
    sum_xx,sum_xy,sum_yy,sum_x,sum_y,n = (0.0,0.0,0.0,0.0,0.0,0)
    for rating_pair in rating_pairs: #每一个循环为一个共同打分item
        sum_xx += np.float(rating_pair[0]) * np.float(rating_pair[0])
        sum_yy += np.float(rating_pair[1]) * np.float(rating_pair[1])
        sum_xy += np.float(rating_pair[0]) * np.float(rating_pair[1])
        n += 1
    cos_sim = cosine(sum_xy,np.sqrt(sum_xx),np.sqrt(sum_yy))
    return user_pair,(cos_sim,n) #???什么鬼输出？？？  不是该user_pair和cos_sim一一对应吗？

def cosine(dot_product,rating_norm_squared,rating2_norm_squared):
    """
    2个向量A和B的余弦相似度
    dotProduct(A,B) / (norm(A) * norm(B))
    """
    numerator = dot_product
    denominator = rating_norm_squared * rating_norm_squared
    return (numerator / (float(denominator))) if denominator else 0.0

def keyOnFirstUser(user_pair,item_sim_data):
    """
    对于每个user-user对，用第一个user做key
    """
    (user1_id,user2_id) = user_pair
    return user1_id,(user2_id,item_sim_data)

def nearestNeighbors(user,users_and_sims,n):#user是一个还是一对？ users_and_sims的形式：user_pari,(cosin_sim,n)
    """
    选出相似度最高的N个邻居
    """
    users_and_sims.sort(key=lambda x:x[1][0],reverse=True)  #逆序排
    return user,user_and_sims[:n] #返回该user的前n个相似的user

def topNRecommendations(user_id,user_sims,users_with_rating,n):#(user1_id,((user2,(cos_sim,n)),...),(users:(item,rating)),n)
    """
    根据最近的N个邻居进行推荐
    """
    totals = defaultdict(int)
    sim_sums = defaultdict(int)
 #for(pre_item,rating) in user_with_rating.get(user_id,None):
#   if rating != Null:
#       continue
     
    for (neighbor,(sim,count)) in user_sims: #neighbor为与user_id相近的邻居
        #遍历邻居的打分
        unscored_items = users_with_rating.get(neighbor,None) #得到该neighbor的(item，rating)
        if unscored_items:
            for (item,rating) in unscored_items:#？？？
                if neighbor != item: # if pre_item == item
                    #更新推荐度和相似度
                    totals[neighbor] += sim * rating #totals[pre_item] += sim * rating
                    sim_sums[neighbor] += sim 
    
    #归一化
    scored_items = [(total/sim_sums[item],item) for item,total in totals.items()] #???totals.items()返回的是（Key，value）
    #按照推荐度降序排列
    scored_items.sort(reverse=True)
    #推荐度的item
    ranked_items = [x[1] for x in scored_items]
    return user_id,ranked_items[:n]


In [None]:
if __name__ == "__main__":
    if len(sys.argv) < 3:
        print(sys.stderr,"Usage:PythonUserCF <master> <file>")
        exit(-1)
    sc = SparkContext(sys.argv[1],"PythonUserCF") #没有设定SparkConf()???
    lines = sc.textFile(sys.argv[2]) #读文件
    """
    处理数据，获得稀疏的item-user矩阵
    item_id -> ((user_1,rating),(user2,rating))
    """
    item_user_pairs = lines.map(parseVectorOnItem).groupByKey().map( #{item:(user,rate)}
    lambad p:sampleInteractions(p[0],p[1],500)).cache()#itemid,user_with_rating,n 返回 （item_id,(user,rate)）
    """
    获得2个用户所有的item-item对得分组合：
    (user1_Id,user2_id) -> [(rating1,rating2),
                            (rating1,rating2),
                            (rating1,rating2),
                            ...]
    """
    pairwise_users = item_user_pairs.filter(#groupByKey()之前是 ((user1,user2),(rate1,rate2));groupByKey()后，key：（user1,user2）,value:(rate1,rate2)
        lambda p:len(p[1]) > 1).map(
        lambda p: findUserPairs(p[0],p[1])).groupByKey() #p[0]:item_id,p[1]:(user,rate) 返回：((user1,user2),(rate1,rate2))对同一item的2个user的打分
    """
    计算余弦相似度，找到最近的N个邻居：
    (user1,user2) -> (similarity,co_raters_count)
    """
    user_sims = pairwise_users.map( #pairwise_users: ((user1,user2):((rate1,rate2),(...))
        lambda p:calcSim(p[0],p[1])).map( #(user_pair,rating_pair) 返回的是(user_pair,(cos_sim,n))
        lambda p:keyOnFirstUser(p[0],p[1])).groupByKey().map( #(user_pair,item_sim_data)返回的是（user1_id,(user2_id,(cos_sim,n)）,groupbykey()以后，每个user1_id，的相似的(user2_id,(cos_sim,n))
        lambda p:nearestNeighbors(p[0],p[1],50)) #(user1_id,((user2_id,(cos_sim,n)),...),n)，返回(user1_Id,((user2_id,(cos_sim,n)),(...),(...)))
    """
    对每个用户的打分记录整理成如下形式：
    user_id -> [(item_id_1,rating_1),
                 (item_id_2,rating_2),
                 ...]
    """
    user_item_hist = lines.map(parseVectorOnUser).groupByKey().collect() #(user,((item,rate),(...),(...)))
    ui_dict = {}
    for (user,items) in user_item_hist:
        ui_dict[user]=items
    uib = sc.broadcast(ui_dict) #把文件发送到其它结点？？？
    user_item_recs = user_sims.map(lambda p: topNRecommendations(p[0],p[1],uib.value,100)) #(user1_id,({user2_id:(cos_sim,n),{user3_id,(cos_sim,n)},...},((item,rating),(item,rating)),100))
                                                             #(p[0],p[1],uib,100)
                                                             #map().reduce()???
    #？？？？

In [None]:
#上边的操作都是只有map，没有reduce啊？？？
#reduce()是只用一次就可以吗？