In [1]:
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import col, countDistinct
spark = SparkSession.builder.appName("Python Spark SQL basic example").config("spark.some.config.option","some-value").getOrCreate()
df = spark.read.format("csv").option("header", "false").load('/Users/Joyce/Desktop/Project AMZ/ratings_Electronics.csv')
df = df.select(col("_c0").alias("userId"), col("_c1").alias("itemId"), col("_c2").alias("rating"), col("_c3").alias("timestamp"))

In [2]:
'''numRatings = rating_only_data.count()
numUsers = rating_only_data.map(lambda r: r[0]).distinct().count()
numProducts = rating_only_data.map(lambda r: r[1]).distinct().count()

print("Got %d ratings from %d users on %d products." % (numRatings, numUsers, numProducts))'''

'numRatings = rating_only_data.count()\nnumUsers = rating_only_data.map(lambda r: r[0]).distinct().count()\nnumProducts = rating_only_data.map(lambda r: r[1]).distinct().count()\n\nprint("Got %d ratings from %d users on %d products." % (numRatings, numUsers, numProducts))'

In [3]:
spark

In [4]:
def calculate_sparsity(itemlimit, userlimit, df):
    product = df.groupBy("itemId").count()
    product_filter = product.filter(product['count'] > itemlimit)
    Data = df.join(product_filter, ['itemId'], 'leftsemi')
    
    user = Data.groupBy("userId").count()
    user_filter = user.filter(user['count'] > userlimit)
    DF = Data.join(user_filter, ['userId'], 'leftsemi')
    
    available = DF.count()
    product_total = DF.select("itemId").distinct().count()
    user_total = DF.select("userId").distinct().count()
    
    print("available rating: " + str(available))
    print("distinct product: " + str(product_total))
    print("distinct user: " + str(user_total))
    
    result = (float(available)/(float(product_total) * float(user_total)))*100
    print(result)
    
    return DF

In [5]:
df=calculate_sparsity(2, 2, df)

available rating: 3374805
distinct product: 208371
distinct user: 615996
0.002629259887833436


In [7]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.sql import SQLContext
stringIndexer = StringIndexer(inputCol="itemId", outputCol="ProductIndex")
user = stringIndexer.fit(df)
indexed = user.transform(df)
indexed_distinct=indexed.select("userId").distinct()
from pyspark.sql.types import *
User_id = sqlContext.createDataFrame(indexed_distinct.rdd.map(lambda x: x[0]).zipWithIndex(), \
StructType([StructField("userId", StringType(), True),StructField("User_ID", IntegerType(), True)]))
join=indexed.join(User_id,indexed.userId==User_id.userId)

In [12]:
from pyspark.sql.functions import udf, col, regexp_replace
def inte(f):
    return int(f)
inte_udf = udf(inte)
rating_data = join.withColumn('Product_ID', inte_udf(col("ProductIndex")))

from pyspark.sql.types import FloatType, IntegerType
rating_data = rating_data.withColumn("Product_ID", rating_data["Product_ID"].cast(IntegerType()))
rating_data = rating_data.withColumn("rating", rating_data["rating"].cast(FloatType()))
rating_cleaned = rating_data.select('User_ID', 'Product_ID', 'rating')
rating_final=rating_cleaned.rdd

In [16]:
train_rdd, valid_rdd, test_rdd = rating_final.randomSplit(weights=[6, 2, 2], seed=0)
train_df = train_rdd.toDF().cache()
valid_df = valid_rdd.toDF().cache()
test_df = test_rdd.toDF().cache()

In [17]:
train_df.select("User_ID").distinct().count()

595383

In [18]:
#train_df.write.json('/Users/Joyce/Desktop/train_df.json')
#valid_df.write.json('/Users/Joyce/Desktop/valid_df.json')
#test_df.write.json('/Users/Joyce/Desktop/test_df.json')

# Implement ALS to training set

In [19]:
def calculate_sparsity(itemlimit, userlimit, df):
    product = df.groupBy("Product_ID").count()
    product_filter = product.filter(product['count'] >= itemlimit)
    Data = df.join(product_filter, ['Product_ID'], 'leftsemi')
    
    user = Data.groupBy("User_ID").count()
    user_filter = user.filter(user['count'] > userlimit)
    DF = Data.join(user_filter, ['User_ID'], 'leftsemi')
    
    available = DF.count()
    product_total = DF.select("Product_ID").distinct().count()
    user_total = DF.select("User_ID").distinct().count()
    
    print("available rating: " + str(available))
    print("distinct product: " + str(product_total))
    print("distinct user: " + str(user_total))
    
    result = (float(available)/(float(product_total) * float(user_total)))*100
    print(result)
    
    return DF

In [20]:
als_df=calculate_sparsity(0, 15, train_df)

available rating: 177919
distinct product: 58087
distinct user: 6768
0.04525671548111109


In [21]:
als_df.cache()
als_rdd=als_df.rdd.cache()

In [22]:
training_RDD, validation_RDD = als_rdd.randomSplit([3, 1], seed=0)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))

In [23]:
from pyspark.mllib.recommendation import ALS
import itertools
import math

seed = 5
iterations = [5, 10, 15, 20]
regularization_parameters = [0.1 , 0.01]
ranks = [5, 10, 15, 20, 25, 30, 35, 40]
errors = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,0,0,0,0,0,0,0,0,0,0,0]
err = 0
from datetime import datetime
start=datetime.now()

tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
best_regularization_parameter = -1
for itern, rank, regul in itertools.product(iterations, ranks, regularization_parameters):
    start=datetime.now()
    model = ALS.train(training_RDD, rank, seed = seed, iterations = itern,
                      lambda_= regul, nonnegative = True)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), min(r[2],5)))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print('For rank %s iteration %s regularization_parameter %s the RMSE is %s' % (rank, itern, regul, error))
    print(datetime.now()-start)
    if error < min_error:
        min_error = error
        best_rank = rank
        best_iteration = itern
        best_regularization_parameter = regul
        best_model = model

print('The best model was trained with rank %s, itern %s, regul %s with RMSE equal to %s ' % (best_rank, best_iteration, best_regularization_parameter, min_error))

For rank 35 iteration 5 regularization_parameter 0.1 the RMSE is 1.3868475136805007
0:01:56.623503
For rank 35 iteration 10 regularization_parameter 0.1 the RMSE is 1.3085217570812475
0:01:20.700311
For rank 35 iteration 15 regularization_parameter 0.1 the RMSE is 1.2898892931093384
0:01:44.276078
For rank 35 iteration 20 regularization_parameter 0.1 the RMSE is 1.2824038700152223
0:02:09.544204
The best model was trained with rank 35, itern 20, regul 0.1 with RMSE equal to 1.2824038700152223 


In [None]:
'''For rank 5 iteration 5 regularization_parameter 0.1 the RMSE is 1.3096884294501052
0:02:05.212487
For rank 10 iteration 5 regularization_parameter 0.1 the RMSE is 1.3465036993923258
0:00:55.466566
For rank 15 iteration 5 regularization_parameter 0.1 the RMSE is 1.361442008063377
0:00:55.399511
For rank 20 iteration 5 regularization_parameter 0.1 the RMSE is 1.3722740534637663
0:00:56.634378
For rank 25 iteration 5 regularization_parameter 0.1 the RMSE is 1.3785868887655046
0:00:56.704825
For rank 5 iteration 10 regularization_parameter 0.1 the RMSE is 1.310342540917748
0:01:15.713039
For rank 10 iteration 10 regularization_parameter 0.1 the RMSE is 1.3167519254909124
0:01:17.357811
For rank 15 iteration 10 regularization_parameter 0.1 the RMSE is 1.3131183738490348
0:01:19.284458
For rank 20 iteration 10 regularization_parameter 0.1 the RMSE is 1.3096347371106494
0:01:25.839524
For rank 25 iteration 10 regularization_parameter 0.1 the RMSE is 1.3092144823858045
0:01:24.640798
For rank 5 iteration 15 regularization_parameter 0.1 the RMSE is 1.3126894219290142
0:01:36.648593
For rank 10 iteration 15 regularization_parameter 0.1 the RMSE is 1.3080668872068546
0:01:38.416166
For rank 15 iteration 15 regularization_parameter 0.1 the RMSE is 1.3003247869571188
0:01:40.696274
For rank 20 iteration 15 regularization_parameter 0.1 the RMSE is 1.295483679420156
0:01:43.676583
For rank 25 iteration 15 regularization_parameter 0.1 the RMSE is 1.292340227165509
0:01:45.611078
For rank 5 iteration 20 regularization_parameter 0.1 the RMSE is 1.3130607552783002
0:02:00.048818
For rank 10 iteration 20 regularization_parameter 0.1 the RMSE is 1.3036324928789071
0:02:03.849470
For rank 15 iteration 20 regularization_parameter 0.1 the RMSE is 1.294961095080365
0:02:06.990866
For rank 20 iteration 20 regularization_parameter 0.1 the RMSE is 1.290141191964345
0:02:11.550885
For rank 25 iteration 20 regularization_parameter 0.1 the RMSE is 1.2855870840830876
0:02:11.532576
For rank 30 iteration 20 regularization_parameter 0.1 the RMSE is 1.2826146580152509
0:02:29.480548
For rank 35 iteration 20 regularization_parameter 0.1 the RMSE is 1.2824038700152223
0:02:32.978795
For rank 40 iteration 20 regularization_parameter 0.1 the RMSE is 1.283351682960428
0:02:37.436889
The best model was trained with rank 35, itern 20, regul 0.1 with RMSE equal to 1.2824038700152223
'''


In [62]:
from pyspark.mllib.recommendation import MatrixFactorizationModel, Rating
sc.parallelize(best_model.recommendProducts(2214,10)).map(lambda x: x[1]).collect()
#predictions

[134143, 103553, 139116, 2172, 5399, 17131, 4240, 10257, 24696, 15690]

# Validation of ALS for Frequent Users

In [41]:
train_frequent_users = als_df.select("User_id").distinct()
als_valid_df = valid_df.join(train_frequent_users, ['User_id'], 'leftsemi')

In [44]:
als_valid_rdd = als_valid_df.rdd.map(lambda x: (x[0], x[1]))

In [45]:
valid_pred = best_model.predictAll(als_valid_rdd).map(lambda r: ((r[0], r[1]), min(r[2],5)))
valid_als_distinct_user = als_valid_df.select('User_ID').distinct()
train_als_distinct_user = training_RDD.toDF().select('User_ID').distinct()
train_als_distinct_user_list = [x["User_ID"] for x in train_als_distinct_user.rdd.collect()]

In [67]:
valid_als_distinct_user_list = [x["User_ID"] for x in valid_als_distinct_user.rdd.collect()]

In [58]:
valid_als_distinct_user_joined = list(set(train_als_distinct_user_list).intersection(valid_als_distinct_user_list))
len(valid_als_distinct_user_joined)

6733

In [51]:
from pyspark.sql.functions import col, udf, array, collect_list, regexp_replace
als_valid_groupby = als_valid_df.groupBy("User_ID").agg(collect_list("Product_ID"))
als_valid_groupby = als_valid_groupby.cache()

In [65]:
number_bought = 0
for i in valid_als_distinct_user_joined:
    recommend_list = sc.parallelize(best_model.recommendProducts(i,10)).map(lambda x: x[1]).collect()
    bought_list = als_valid_groupby.where(als_valid_groupby['User_ID'] == i).select('collect_list(Product_ID)').collect()[0][0]
    number_bought += len(set(bought_list).intersection(recommend_list))
    print(number_bought)

0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2


In [66]:
number_bought

18

In [69]:
percentage_brought = 100*number_bought/(len(valid_als_distinct_user_joined))
print(str(percentage_brought)+'%')

0.2673399673251151%
