In [87]:
from pyspark.mllib.recommendation import Rating
import numpy as np
import json, time
from operator import add

In [22]:
filepath = "data/reviews.jl"
save_dir = "target/"

In [24]:
# Load and parse the data
data = sc.textFile(filepath)
ratings = data.map(lambda l: json.loads(l))\
    .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2]))).cache()
ratings.count()

2685066

In [67]:
business_dict = sc.textFile("data/business.jl").map(lambda r: json.loads(r))\
                .map(lambda  r: (r[0], (r[1], r[2]))).collectAsMap()

In [69]:
user_dict = sc.textFile("data/user.jl").map(lambda r: json.loads(r))\
                .map(lambda  r: (r[0], r[1])).collectAsMap()

In [54]:
len(business_dict)

26729

In [53]:
business_dict[1]

(u'Mr Hoagie, Dravosburg', [u'Fast Food'])

In [20]:
def loadModel(name):
    start_time = time.time()
    # Load vUser
    vUser = sc.pickleFile(save_dir+name+"/vUser")
    # Load vBusiness
    vBusiness = sc.pickleFile(save_dir+name+"/vBusiness")
    # Load Model
#     model = MatrixFactorizationModel.load(sc, save_dir+name+"/cfModel")
    end_time = time.time()
    print "Time Cost: %.2fs"%(end_time - start_time)
    return vUser, vBusiness

In [25]:
vUser, vBusiness = loadModel('final')
vUser, vBusiness

Time Cost: 0.09s


(MapPartitionsRDD[28] at objectFile at NativeMethodAccessorImpl.java:-2,
 MapPartitionsRDD[30] at objectFile at NativeMethodAccessorImpl.java:-2)

In [26]:
vUser.cache()
vUser.count()

686556

In [27]:
vBusiness.cache()
vBusiness.count()

85539

In [82]:
def getKNN(vUser, vu, k):
    return vUser.map(lambda r: (r[0], np.linalg.norm(vu-r[1]))).top(k, key=lambda r: -r[1])

def getUserFeature(vUser, uid):
    a = vUser.filter(lambda r: r[0] == uid)
    if a.isEmpty(): return None
    return np.array(a.first()[1])

def getProductFeature(vBusiness, bid):
    a = vBusiness.filter(lambda r: r[0] == bid)
    if a.isEmpty(): return None
    return np.array(a.first()[1])

def getRecommedProduct(vBusiness, vu, num=10):
    return map(
            lambda r: r[0],
            vBusiness.filter(lambda r: r[0] in business_dict)\
                .map(lambda r: (r[0], np.array(r[1]).dot(vu))).top(num, key=lambda r: r[1])
        )

In [83]:
def getTaste(uid):
    start_time = time.time()
    vu = getUserFeature(vUser, uid)
    res = getRecommedProduct(vBusiness, vu, 1000)
    recomd = map(lambda r: business_dict[r][0], res[:5])
    users = map(lambda r: user_dict[r[0]], getKNN(vUser, vu, 6)[1:] )
    taste = sc.parallelize(res).flatMap(lambda r: business_dict[r][1])\
        .map(lambda r: (r, 1)).reduceByKey(add).takeOrdered(50, key=lambda r: -r[1])
    end_time = time.time()
    print "Time Cost: %.2fs"%(end_time - start_time)
    return recomd, users, taste

In [85]:
recomd, users, taste = getTaste(9)

Time Cost: 13.86s


In [86]:
json.dumps(taste)

'[["Pizza", 117], ["Mexican", 114], ["Sandwiches", 108], ["American (Traditional)", 78], ["Italian", 77], ["Fast Food", 74], ["American (New)", 69], ["Chinese", 65], ["Japanese", 51], ["Burgers", 45], ["Cafes", 45], ["Mediterranean", 38], ["Sushi Bars", 35], ["Delis", 35], ["Thai", 31], ["Seafood", 30], ["Asian Fusion", 27], ["Coffee & Tea", 27], ["Steakhouses", 23], ["Barbeque", 23], ["Chicken Wings", 22], ["Middle Eastern", 20], ["Salad", 19], ["Bakeries", 18], ["Indian", 18], ["Greek", 17], ["Vegan", 17], ["French", 17], ["Vegetarian", 17], ["Latin American", 16], ["Hot Dogs", 16], ["Juice Bars & Smoothies", 15], ["Sports Bars", 14], ["Hawaiian", 14], ["Vietnamese", 14], ["Food Trucks", 14], ["Gluten-Free", 13], ["Desserts", 13], ["Specialty Food", 13], ["Pubs", 10], ["Ice Cream & Frozen Yogurt", 10], ["Cocktail Bars", 10], ["Korean", 9], ["Wine Bars", 9], ["Ethnic Food", 8], ["Grocery", 8], ["Soul Food", 8], ["Caribbean", 7], ["Soup", 7], ["Tex-Mex", 7]]'