In [1]:
import pyspark
import pandas as pd
import numpy as np
from pyspark import SparkContext, since
from pyspark.rdd import RDD
import pyspark.sql

from pyspark.mllib.recommendation import ALS
import math

In [2]:
import os
from pyspark import SQLContext
from pyspark import SparkContext 
from pyspark.sql import SQLContext 
sc = pyspark.SparkContext('local')
sqlContext = SQLContext(sc)

In [3]:
ratings = sc.textFile("/Users/drakaris/Downloads/Electronics_Video_Games.csv").map(lambda line: line.split(","))
ratings = ratings.toDF(['userID','itemID','rating'])
ratings.show()


+------+------+------+
|userID|itemID|rating|
+------+------+------+
| 28831|     2|   5.0|
| 35247|    19|   1.0|
| 40339|    19|   3.0|
|  9887|    19|   1.0|
| 15802|    21|   5.0|
| 53574|    21|   2.0|
| 38732|    21|   5.0|
| 44967|    21|   5.0|
| 64000|    21|   5.0|
| 80521|    21|   5.0|
|  7996|    22|   5.0|
| 26947|    22|   1.0|
| 73519|    22|   5.0|
| 72526|    22|   1.0|
| 26252|    22|   4.0|
| 55982|    22|   3.0|
| 10348|    22|   2.0|
| 28705|    22|   2.0|
| 25076|    22|   4.0|
| 46811|    22|   5.0|
+------+------+------+
only showing top 20 rows



In [4]:
from pyspark.sql.functions import regexp_extract
from pyspark.sql.types import *
ratings_filename="/Users/drakaris/Downloads/Electronics_Video_Games.csv"
ratings_df_schema = StructType([StructField('userID', IntegerType()),StructField('movieID', IntegerType()),StructField('rating', DoubleType())])


In [5]:
ratings_df = sqlContext.read.format('com.databricks.spark.csv').options(header=True, inferSchema=False).schema(ratings_df_schema).load(ratings_filename)
#ratings_df = raw_ratings_df.drop('Timestamp')
#ratings_df.cache()


In [6]:
ratings_df.show()
print(ratings_df)

+------+-------+------+
|userID|movieID|rating|
+------+-------+------+
| 35247|     19|   1.0|
| 40339|     19|   3.0|
|  9887|     19|   1.0|
| 15802|     21|   5.0|
| 53574|     21|   2.0|
| 38732|     21|   5.0|
| 44967|     21|   5.0|
| 64000|     21|   5.0|
| 80521|     21|   5.0|
|  7996|     22|   5.0|
| 26947|     22|   1.0|
| 73519|     22|   5.0|
| 72526|     22|   1.0|
| 26252|     22|   4.0|
| 55982|     22|   3.0|
| 10348|     22|   2.0|
| 28705|     22|   2.0|
| 25076|     22|   4.0|
| 46811|     22|   5.0|
| 27683|     22|   4.0|
+------+-------+------+
only showing top 20 rows

DataFrame[userID: int, movieID: int, rating: double]


In [7]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder


#als=ALS(userCol="userID",itemCol="movieID", ratingCol="rating", coldStartStrategy="drop",nonnegative=True)

In [74]:
from pyspark.mllib.recommendation\
    import ALS,MatrixFactorizationModel, Rating
    
ratings = ratings_df.rdd.map(lambda x: Rating(int(x[0]),\
    int(x[1]), float(x[2])))    

train, test = ratings.randomSplit([0.8,0.2])

train.count() #70,005
test.count() #29,995

#Need to cache the data to speed up training
train.cache()
test.cache()

#Setting up the parameters for ALS
rank = 25 # Latent Factors to be made
numIterations = 25# Times to repeat process
regParam = 0.35
#Create the model on the training data
model = ALS.train(train, rank, numIterations, regParam)

In [75]:
#display(train)

In [76]:
model.recommendUsers(242,10)

[Rating(user=52519, product=242, rating=4.650234298211485),
 Rating(user=38896, product=242, rating=3.2448196209320255),
 Rating(user=76890, product=242, rating=3.072546772374353),
 Rating(user=12696, product=242, rating=2.9823640814484524),
 Rating(user=70205, product=242, rating=2.9823640814484524),
 Rating(user=18793, product=242, rating=2.978626192256457),
 Rating(user=52725, product=242, rating=2.9113174981089878),
 Rating(user=8839, product=242, rating=2.9113174981089878),
 Rating(user=55479, product=242, rating=2.884237519188793),
 Rating(user=9662, product=242, rating=2.7481616744071085)]

In [77]:
# For User Y Find N Products to Promote
model.recommendProducts(196,10)



[Rating(user=196, product=67, rating=3.6938825940752356),
 Rating(user=196, product=5665, rating=2.9900092053319094),
 Rating(user=196, product=4581, rating=2.6101358684975153),
 Rating(user=196, product=1332, rating=2.5682584930131767),
 Rating(user=196, product=913, rating=2.415789865946774),
 Rating(user=196, product=5729, rating=2.3812850027861714),
 Rating(user=196, product=1828, rating=2.2995355074893835),
 Rating(user=196, product=5713, rating=2.2445142326657948),
 Rating(user=196, product=7105, rating=2.242506947793156),
 Rating(user=196, product=3778, rating=2.2037059013382465)]

In [78]:
#Predict Single Product for Single User
model.predict(196, 242)



-0.657286139974113

In [79]:
# Predict Multi Users and Multi Products
# Pre-Processing
pred_input = train.map(lambda x:(x[0],x[1]))   



In [80]:
# Lots of Predictions
#Returns Ratings(user, item, prediction)
pred = model.predictAll(pred_input) 

In [81]:
true_reorg = train.map(lambda x:((x[0],x[1]), x[2]))
pred_reorg = pred.map(lambda x:((x[0],x[1]), x[2]))

#Do the actual join
true_pred = true_reorg.join(pred_reorg)

#Need to be able to square root the Mean-Squared Error
from math import sqrt

MSE = true_pred.map(lambda r: (r[1][0] - r[1][1])**2).mean()
RMSE = sqrt(MSE)#Results in 0.7629908117414474

In [82]:
print(RMSE)

0.396909609300436


In [83]:
#Test Set Evaluation
#More dense, but nothing we haven't done before
test_input = test.map(lambda x:(x[0],x[1])) 
pred_test = model.predictAll(test_input)
test_reorg = test.map(lambda x:((x[0],x[1]), x[2]))
pred_reorg = pred_test.map(lambda x:((x[0],x[1]), x[2]))
test_pred = test_reorg.join(pred_reorg)
test_MSE = test_pred.map(lambda r: (r[1][0] - r[1][1])**2).mean()
test_RMSE = sqrt(test_MSE)#1.0145549956596238

In [84]:
print(test_RMSE)

2.20186591317568
