In [1]:
import os
os.environ['SPARK_CLASSPATH'] = "/home/ubuntu/postgresql-9.3-1103-jdbc41.jar"

In [2]:
import findspark

In [3]:
findspark.init()

In [4]:
from pyspark import SparkContext
from pyspark.sql import SQLContext,Row

In [5]:
sc = SparkContext(appName = "BuildProductRecommendations")

In [6]:
sqlContext = SQLContext(sc)

In [7]:
properties = {
    "user": "clientdemo",
    "password": "clientdemo4"
}

In [8]:
url="jdbc:postgresql://postgresdemo.c3yxphqgag3s.ap-southeast-1.rds.amazonaws.com:5432/postgresdemo"

In [9]:
df = sqlContext.read.jdbc(url=url, table="trans", properties=properties)

In [10]:
df.take(5)

[Row(userid=u'3893506', quantity=0, product_id=u'com.gopaktor.subscription.v1.premium.1m', purchase_date=datetime.datetime(2016, 9, 7, 13, 59, 45), platform=u'ios', revenue=31.28, renewal=0, insider=0, receipt_id=u'1984955'),
 Row(userid=u'3793254', quantity=0, product_id=u'com.gopaktor.subscription.v1.premium.1m', purchase_date=datetime.datetime(2016, 9, 25, 3, 6, 11), platform=u'ios', revenue=31.28, renewal=0, insider=0, receipt_id=u'2170195'),
 Row(userid=u'6148794', quantity=0, product_id=u'com.gopaktor.subscription.v1.premium.1m', purchase_date=datetime.datetime(2016, 10, 1, 2, 27, 37), platform=u'ios', revenue=31.28, renewal=0, insider=0, receipt_id=u'2259995'),
 Row(userid=u'6149220', quantity=0, product_id=u'com.gopaktor.subscription.v4.premium.1m', purchase_date=datetime.datetime(2016, 10, 1, 6, 39, 16), platform=u'android', revenue=31.28, renewal=0, insider=0, receipt_id=u'1980761'),
 Row(userid=u'6149432', quantity=0, product_id=u'com.gopaktor.subscription.v4.premium.1m', pu

In [11]:
df.count()

4428

In [12]:
type(df)

pyspark.sql.dataframe.DataFrame

In [13]:
df2 = df.select('userid', 'product_id')

In [14]:
df2.take(5)

[Row(userid=u'3893506', product_id=u'com.gopaktor.subscription.v1.premium.1m'),
 Row(userid=u'3793254', product_id=u'com.gopaktor.subscription.v1.premium.1m'),
 Row(userid=u'6148794', product_id=u'com.gopaktor.subscription.v1.premium.1m'),
 Row(userid=u'6149220', product_id=u'com.gopaktor.subscription.v4.premium.1m'),
 Row(userid=u'6149432', product_id=u'com.gopaktor.subscription.v4.premium.1m')]

In [15]:
from pyspark.sql.functions import lit

In [16]:
new_df = df2.withColumn("action", lit(1))

In [17]:
new_df.take(5)

[Row(userid=u'3893506', product_id=u'com.gopaktor.subscription.v1.premium.1m', action=1),
 Row(userid=u'3793254', product_id=u'com.gopaktor.subscription.v1.premium.1m', action=1),
 Row(userid=u'6148794', product_id=u'com.gopaktor.subscription.v1.premium.1m', action=1),
 Row(userid=u'6149220', product_id=u'com.gopaktor.subscription.v4.premium.1m', action=1),
 Row(userid=u'6149432', product_id=u'com.gopaktor.subscription.v4.premium.1m', action=1)]

In [18]:
data = new_df.collect()

In [19]:
import pandas as pd

In [20]:
data_df = pd.DataFrame(data, columns=new_df.columns)

In [21]:
data_df.head()

Unnamed: 0,userid,product_id,action
0,3893506,com.gopaktor.subscription.v1.premium.1m,1
1,3793254,com.gopaktor.subscription.v1.premium.1m,1
2,6148794,com.gopaktor.subscription.v1.premium.1m,1
3,6149220,com.gopaktor.subscription.v4.premium.1m,1
4,6149432,com.gopaktor.subscription.v4.premium.1m,1


In [22]:
import yaml
with open('/home/ubuntu/MappingBuffer.yml', 'r') as f:
    doc = yaml.load(f)

In [23]:
doc['column_map']['TRANSACTION_MASTER']

{'cust_id': 'userid',
 'product_id': 'product_id',
 'quantity': 'quantity',
 'renewal': 'renewal',
 'revenue': 'revenue',
 'timestamp': 'purchase_date'}

In [24]:
data_df.rename(columns={doc['column_map']['TRANSACTION_MASTER']['cust_id']:'cust_id'}, inplace = True)
data_df.rename(columns={doc['column_map']['TRANSACTION_MASTER']['product_id']:'product_id'}, inplace = True)

In [25]:
data_df.head()

Unnamed: 0,cust_id,product_id,action
0,3893506,com.gopaktor.subscription.v1.premium.1m,1
1,3793254,com.gopaktor.subscription.v1.premium.1m,1
2,6148794,com.gopaktor.subscription.v1.premium.1m,1
3,6149220,com.gopaktor.subscription.v4.premium.1m,1
4,6149432,com.gopaktor.subscription.v4.premium.1m,1


In [26]:
from sklearn import preprocessing

In [27]:
user_id_le = preprocessing.LabelEncoder()
product_id_le = preprocessing.LabelEncoder()

In [28]:
user_id_le.fit(data_df.cust_id)
product_id_le.fit(data_df.product_id)

LabelEncoder()

In [29]:
print('Number of unique users: ', str(len(user_id_le.classes_)))
print('Number of unique products: ', str(len(product_id_le.classes_)))

('Number of unique users: ', '3087')
('Number of unique products: ', '18')


In [30]:
n_prod_views_df = data_df

In [31]:
n_prod_views_df.cust_id = user_id_le.transform(data_df.cust_id)
n_prod_views_df.product_id = product_id_le.transform(data_df.product_id)

In [32]:
n_prod_views_df.head()

Unnamed: 0,cust_id,product_id,action
0,7,5,1
1,6,5,1
2,10,5,1
3,15,17,1
4,21,17,1


In [33]:
from pyspark.mllib.recommendation import ALS

In [35]:
n_prod_views_rdd = sqlContext.createDataFrame(n_prod_views_df).rdd

In [36]:
n_prod_views_rdd.take(5)

[Row(cust_id=7, product_id=5, action=1),
 Row(cust_id=6, product_id=5, action=1),
 Row(cust_id=10, product_id=5, action=1),
 Row(cust_id=15, product_id=17, action=1),
 Row(cust_id=21, product_id=17, action=1)]

In [37]:
training_rdd, test_rdd = n_prod_views_rdd.randomSplit([8,2], 1345)

In [38]:
training_rdd.distinct().count()

2770

In [39]:
test_for_predict_rdd = test_rdd.map(lambda x : (x[0], x[1]))

In [40]:
print('Training RDD\n', training_rdd.take(5))
print('\nTest for Prediction RDD\n', test_for_predict_rdd.take(5))

('Training RDD\n', [Row(cust_id=7, product_id=5, action=1), Row(cust_id=6, product_id=5, action=1), Row(cust_id=10, product_id=5, action=1), Row(cust_id=15, product_id=17, action=1), Row(cust_id=21, product_id=17, action=1)])
('\nTest for Prediction RDD\n', [(24, 17), (28, 17), (33, 5), (48, 5), (16, 0)])


In [41]:
import math

In [42]:
seed = 49247
iterations = 10
lambdas = [0.01, 0.1]
ranks = [16]
alphas = [1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 40.0, 80.0]
errors = [0 for x in range(len(alphas) * len(ranks) * len(lambdas))]
err_index = 0

In [43]:
for lambda_ in  lambdas:
    for rank in ranks:
        for alpha in alphas:
            model = ALS.trainImplicit(training_rdd, rank, seed=seed, iterations=iterations,
                                           lambda_ = lambda_, alpha = alpha)
            predictions = model.predictAll(test_for_predict_rdd).map(lambda r : ((r[0], r[1]), r[2]))
            views_and_preds = test_rdd.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
            error = math.sqrt(views_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
            errors[err_index] = error
            err_index += 1
            print('For rank {0} at alpha: {1} and lambda: {2}, the RMSE is{3}'.format(rank, alpha, lambda_, error))

For rank 16 at alpha: 1.0 and lambda: 0.01, the RMSE is0.403419620793
For rank 16 at alpha: 2.0 and lambda: 0.01, the RMSE is0.365176361581
For rank 16 at alpha: 4.0 and lambda: 0.01, the RMSE is0.350554686797
For rank 16 at alpha: 8.0 and lambda: 0.01, the RMSE is0.346095007943
For rank 16 at alpha: 16.0 and lambda: 0.01, the RMSE is0.345035459889
For rank 16 at alpha: 32.0 and lambda: 0.01, the RMSE is0.344983758187
For rank 16 at alpha: 40.0 and lambda: 0.01, the RMSE is0.345041422707
For rank 16 at alpha: 80.0 and lambda: 0.01, the RMSE is0.344614552869
For rank 16 at alpha: 1.0 and lambda: 0.1, the RMSE is0.388718469072
For rank 16 at alpha: 2.0 and lambda: 0.1, the RMSE is0.357938951823
For rank 16 at alpha: 4.0 and lambda: 0.1, the RMSE is0.345908522072
For rank 16 at alpha: 8.0 and lambda: 0.1, the RMSE is0.341590815522
For rank 16 at alpha: 16.0 and lambda: 0.1, the RMSE is0.340094486971
For rank 16 at alpha: 32.0 and lambda: 0.1, the RMSE is0.340223443462
For rank 16 at alpha

In [44]:
model.recommendProducts(6,10)

[Rating(user=6, product=5, rating=1.0056181013281436),
 Rating(user=6, product=0, rating=0.05448520007537963),
 Rating(user=6, product=9, rating=0.007419491600110503),
 Rating(user=6, product=13, rating=0.002863852998247221),
 Rating(user=6, product=15, rating=0.0018991022316005302),
 Rating(user=6, product=16, rating=3.472375761393032e-05),
 Rating(user=6, product=12, rating=-0.0003348634012038665),
 Rating(user=6, product=10, rating=-0.0003936586648657092),
 Rating(user=6, product=17, rating=-0.0005144505540696648),
 Rating(user=6, product=4, rating=-0.0006029846803468492)]

In [45]:
all_user = model.recommendProductsForUsers(20)

In [46]:
all_user.count()

2663

In [None]:
all_user.take(1)

In [None]:
rec_all = all_user.collect()

In [None]:
len(rec_all)

In [None]:
rec_all[0]

In [None]:
rec = []

In [None]:
for x in range(len(rec_all)):
    for y in range(17):
            v = rec_all[x][1][y][0:3]
            d = list(v)
            rec.append(d)

In [None]:
rec[2]

In [None]:
rec_df = pd.DataFrame(rec, columns=['cust_id', 'rec_product', 'rating'])

In [None]:
rec_df.head()

In [None]:
rec_df.shape

In [None]:
model1 = ALS.trainImplicit(n_prod_views_rdd, rank=16, seed=seed, iterations=iterations,alpha=0.01)

In [None]:
my_rec = model1.recommendProductsForUsers(20).collect()

In [None]:
len(my_rec)

In [None]:
rec2 = []

In [None]:
for x in range(len(rec_all)):
    for y in range(17):
            v = rec_all[x][1][y][0:3]
            d = list(v)
            rec2.append(d)

In [None]:
rec_df1 = pd.DataFrame(rec2, columns=['cust_id', 'rec_product', 'rating'])

In [None]:
rec_df1.head()

In [None]:
rec_df1.shape

In [None]:
uniq_cust = pd.read_csv('/home/ubuntu/client_demo/uniq_cust_cat')

In [None]:
uniq_prod = pd.read_csv('/home/ubuntu/client_demo/uniq_prod_cat')

In [None]:
uniq_cust.head()

In [None]:
uniq_prod.head()

In [None]:
users_map = uniq_cust.set_index('code')['cust_id'].to_dict()
product_map = uniq_prod.set_index('code')['product_id'].to_dict()

In [None]:
product_map[5]

In [None]:
rec_df1.head()

In [None]:
product_map.keys()

In [None]:
product_map[rec_df1.rec_product[2]]

In [None]:
rec_df1.rec_product[2]