In [1]:
import os
import sys

spark_path = "/opt/spark/"

os.environ['SPARK_HOME'] = spark_path
os.environ['HADOOP_HOME'] = spark_path

sys.path.append(spark_path + "/bin")
sys.path.append(spark_path + "/python")
sys.path.append(spark_path + "/python/pyspark/")
sys.path.append(spark_path + "/python/lib")
sys.path.append(spark_path + "/python/lib/pyspark.zip")
sys.path.append(spark_path + "/python/lib/py4j-0.9-src.zip")

In [2]:
from pyspark import SparkContext
from pyspark import SparkConf

In [3]:
sc = SparkContext("local", "test")

In [4]:
sc

<pyspark.context.SparkContext at 0x7f5a1cc5b8d0>

In [5]:
data = sc.textFile('/home/ubuntu/client3_cust_prod')

In [6]:
data.count()

406830

In [7]:
data.first()

u',product_id,cust_id,rating'

In [8]:
data.take(5)

[u',product_id,cust_id,rating',
 u'0,3249,17850,1',
 u'1,2649,17850,1',
 u'2,2855,17850,1',
 u'3,2803,17850,1']

In [9]:
data = data.zipWithIndex().filter(lambda tup: tup[1] > 0).map(lambda tup: tup[0])

In [10]:
data.take(5)

[u'0,3249,17850,1',
 u'1,2649,17850,1',
 u'2,2855,17850,1',
 u'3,2803,17850,1',
 u'4,2802,17850,1']

In [11]:
data.count()

406829

In [12]:
clean_data = data.map(lambda x:x.split(','))

In [13]:
clean_data.take(5)

[[u'0', u'3249', u'17850', u'1'],
 [u'1', u'2649', u'17850', u'1'],
 [u'2', u'2855', u'17850', u'1'],
 [u'3', u'2803', u'17850', u'1'],
 [u'4', u'2802', u'17850', u'1']]

In [14]:
rating = clean_data.map(lambda y: int(y[3]))

In [15]:
rating.take(5)

[1, 1, 1, 1, 1]

In [16]:
cust_id = clean_data.map(lambda y : int(y[2]))

In [17]:
cust_id.count()

406829

In [18]:
cust_id.take(5)

[17850, 17850, 17850, 17850, 17850]

In [19]:
cust_unique = cust_id.distinct()

In [20]:
cust_unique.count()

4372

In [21]:
product_id = clean_data.map(lambda y : int(y[1]))

In [22]:
product_id.count()

406829

In [23]:
prod_unique = product_id.distinct()

In [24]:
prod_unique.count()

3684

In [25]:
from pyspark.mllib.recommendation import ALS,MatrixFactorizationModel,Rating

In [26]:
rtl = data.map(lambda l: l.split(','))

In [27]:
rtl.take(5)

[[u'0', u'3249', u'17850', u'1'],
 [u'1', u'2649', u'17850', u'1'],
 [u'2', u'2855', u'17850', u'1'],
 [u'3', u'2803', u'17850', u'1'],
 [u'4', u'2802', u'17850', u'1']]

In [28]:
ratings_rtl = rtl.map(lambda x: Rating(int(x[2]), int(x[1]), float(x[3])))

In [29]:
ratings_rtl.take(5)

[Rating(user=17850, product=3249, rating=1.0),
 Rating(user=17850, product=2649, rating=1.0),
 Rating(user=17850, product=2855, rating=1.0),
 Rating(user=17850, product=2803, rating=1.0),
 Rating(user=17850, product=2802, rating=1.0)]

In [30]:
train, test = ratings_rtl.randomSplit([0.7,0.3],7856)

In [31]:
train.count(), test.count()

(284691, 122138)

In [32]:
train.cache(), test.cache()

(PythonRDD[26] at RDD at PythonRDD.scala:48,
 PythonRDD[27] at RDD at PythonRDD.scala:48)

In [33]:
rank = 5
numIterations = 10

In [34]:
model = ALS.trainImplicit(train, rank, numIterations, seed = 1234)

In [35]:
model.productFeatures().first()

(0,
 array('d', [0.045389577746391296, 0.024339398369193077, -0.07351785153150558, 0.0031970127020031214, 0.0012319571105763316]))

In [36]:
model.userFeatures().first()

(12346,
 array('d', [-0.00394428800791502, 0.004793320782482624, 0.0019207525765523314, -0.0037742292042821646, -0.006188021972775459]))

In [37]:
model.recommendUsers(3249,10)

[Rating(user=14911, product=3249, rating=15.458383128736351),
 Rating(user=13263, product=3249, rating=14.45150447670059),
 Rating(user=14096, product=3249, rating=10.29650553219843),
 Rating(user=12748, product=3249, rating=9.791251123564631),
 Rating(user=17841, product=3249, rating=8.620497548967652),
 Rating(user=13089, product=3249, rating=8.00678643917607),
 Rating(user=15005, product=3249, rating=6.59613590707027),
 Rating(user=15039, product=3249, rating=6.4491149271267),
 Rating(user=14796, product=3249, rating=6.031171506119956),
 Rating(user=18118, product=3249, rating=5.795933724007903)]

In [38]:
model.recommendProducts(17850, 10)

[Rating(user=17850, product=3249, rating=4.987052325472023),
 Rating(user=17850, product=2776, rating=2.905448986042421),
 Rating(user=17850, product=399, rating=2.738451424495606),
 Rating(user=17850, product=2780, rating=2.675310337534521),
 Rating(user=17850, product=401, rating=2.599190302017644),
 Rating(user=17850, product=393, rating=2.4715449821036937),
 Rating(user=17850, product=3280, rating=2.445053456930278),
 Rating(user=17850, product=2779, rating=2.0154964375845648),
 Rating(user=17850, product=2777, rating=1.917653719659559),
 Rating(user=17850, product=772, rating=1.8497962613156722)]

In [39]:
####cust_rec = model.recommendProductsForUsers(25).collect()

In [40]:
pred_input = train.map(lambda x:(x[0], x[1]))

In [52]:
pred = model.predictAll(pred_input).distinct()

In [53]:
pred.take(10)

[Rating(user=13742, product=180, rating=0.16687689437607428),
 Rating(user=16771, product=478, rating=0.023795010940705408),
 Rating(user=15545, product=1949, rating=0.00701360065925441),
 Rating(user=16710, product=631, rating=0.16558957876001912),
 Rating(user=16110, product=1780, rating=0.02887227291732835),
 Rating(user=12731, product=2174, rating=0.13622175701575112),
 Rating(user=16931, product=2336, rating=0.07498754215719472),
 Rating(user=16914, product=182, rating=0.01663150645869567),
 Rating(user=14472, product=1335, rating=0.7529000798096375),
 Rating(user=14903, product=1637, rating=0.19956134391790292)]

In [46]:
true_reorg = train.map(lambda x:((x[0],x[1]), x[2]))

In [54]:
true_reorg.take(5)

[((17850, 3249), 1.0),
 ((17850, 2649), 1.0),
 ((17850, 2855), 1.0),
 ((17850, 2803), 1.0),
 ((17850, 1605), 1.0)]

In [55]:
pred_reorg = pred.map(lambda x:((x[0],x[1]), x[2]))

In [56]:
pred_reorg.take(5)

[((13742, 180), 0.16687689437607428),
 ((16771, 478), 0.023795010940705408),
 ((15545, 1949), 0.00701360065925441),
 ((16710, 631), 0.16558957876001912),
 ((16110, 1780), 0.02887227291732835)]

In [59]:
true_pred = true_reorg.join(pred_reorg).distinct()

In [60]:
true_pred.take(5)

[((15867, 2291), (1.0, 0.04670583425520963)),
 ((17889, 823), (1.0, 0.00811205910549957)),
 ((18122, 288), (1.0, 0.207960294559173)),
 ((13136, 2119), (1.0, 0.06098274218979988)),
 ((12367, 646), (1.0, 0.004354706464030404))]

In [61]:
from math import sqrt

In [62]:
MSE = true_pred.map(lambda r:(r[1][0] - r[1][1])**2).mean()

In [63]:
RMSE = sqrt(MSE)

In [64]:
RMSE

1.0135974619496528

In [65]:
test_input = test.map(lambda x:(x[0],x[1]))

In [66]:
test_input.take(5)

[(17850, 2802), (13047, 1598), (13047, 3124), (13047, 1480), (13047, 771)]

In [72]:
pred_test = model.predictAll(test_input).distinct()

In [73]:
pred_test.take(5)

[Rating(user=12886, product=1580, rating=0.1920652081280525),
 Rating(user=17204, product=1959, rating=0.004231274071763552),
 Rating(user=12720, product=2388, rating=0.22710199783595508),
 Rating(user=14591, product=1209, rating=0.23160042288822114),
 Rating(user=16710, product=631, rating=0.16558957876001912)]

In [67]:
test_reorg = test.map(lambda x:((x[0],x[1]), x[2]))

In [68]:
test_reorg.take(5)

[((17850, 2802), 1.0),
 ((13047, 1598), 1.0),
 ((13047, 3124), 1.0),
 ((13047, 1480), 1.0),
 ((13047, 771), 1.0)]

In [74]:
pred_reorg = pred_test.map(lambda x:((x[0],x[1]), x[2]))

In [75]:
pred_reorg.take(5)

[((12886, 1580), 0.1920652081280525),
 ((17204, 1959), 0.004231274071763552),
 ((12720, 2388), 0.22710199783595508),
 ((14591, 1209), 0.23160042288822114),
 ((16710, 631), 0.16558957876001912)]

In [81]:
test_pred = test_reorg.join(pred_reorg).distinct()

In [82]:
test_pred.take(10)

[((16423, 2172), (1.0, 0.008415678900827811)),
 ((15059, 2788), (1.0, 0.5171938868794959)),
 ((17730, 1755), (1.0, 0.166695678844741)),
 ((14078, 16), (1.0, 0.006957704819621088)),
 ((17243, 1750), (1.0, 0.04488504877203936)),
 ((14159, 3220), (1.0, 0.6937675935937351)),
 ((15856, 845), (1.0, 0.5397235419614773)),
 ((16423, 1755), (1.0, 0.011635141881900193)),
 ((15426, 2789), (1.0, 0.06686050182448691)),
 ((14223, 178), (1.0, 0.06498582386896432))]

In [83]:
test_MSE = test_pred.map(lambda r: (r[1][0] - r[1][1])**2).mean()

In [84]:
test_RMSE = sqrt(test_MSE)

In [85]:
test_RMSE

1.1173354660596282