In [188]:
import pandas as pd
import numpy as np
import pyspark
from pyspark.sql import SQLContext
from pyspark.mllib.recommendation import ALS,MatrixFactorizationModel, Rating
from pyspark.ml.evaluation import RegressionEvaluator
import math


# Step 1: Process rating data

## 1.1 Read data

In [189]:
df = pd.read_csv("data/rating.csv")[['userId','tmdbId','rating']]
df.head()

Unnamed: 0,userId,tmdbId,rating
0,1,862,4.0
1,5,862,4.0
2,7,862,4.5
3,15,862,2.5
4,17,862,4.5


## 1.2 Transform into spark format

In [190]:
"""Create SparkContext"""
### You can only have one sparkContext at the same tim
sc = pyspark.SparkContext.getOrCreate()

sqlContext = SQLContext(sc)

In [191]:
"""Create Spark dataframe"""
df_spark = sqlContext.createDataFrame(df)
df_spark.printSchema()

root
 |-- userId: long (nullable = true)
 |-- tmdbId: long (nullable = true)
 |-- rating: double (nullable = true)



In [192]:
df_spark.head(5)

[Row(userId=1, tmdbId=862, rating=4.0),
 Row(userId=5, tmdbId=862, rating=4.0),
 Row(userId=7, tmdbId=862, rating=4.5),
 Row(userId=15, tmdbId=862, rating=2.5),
 Row(userId=17, tmdbId=862, rating=4.5)]

train,test = df_spark.randomSplit(weights=[0.8,0.2])

train.count()

test.count()

#Need to cache the data to speed up training
train.cache()
test.cache()

# Step 2: Train Spark ALS model

## 2.1 Model training

In [193]:
"""Train ALS model """
rank = 5
numIterations = 10
model = ALS.train(df_spark, rank, numIterations)

## 2.2 Model evaluation

In [194]:
### process true value
true_rating = df_spark[['rating']].rdd.map(lambda x:x[0]).collect()

In [195]:
true_rating[:10]

[4.0, 4.0, 4.5, 2.5, 4.5, 3.5, 4.0, 3.5, 3.0, 5.0]

In [198]:
rdd_x = df_spark[['UserId',"tmdbId"]].rdd
pred_rating = model.predictAll(rdd_x).map(lambda x:x[2]).collect()

In [199]:
MSE = np.mean([(x-y)**2 for x,y, in zip(true_rating,pred_rating)])
RMSE = math.sqrt(MSE)

In [200]:
from math import sqrt

def computeRmse(model, df):
    """
    Compute RMSE (Root mean Squared Error).
    """
    true_rating = df[['rating']].rdd.map(lambda x:x[0]).collect()    
    
    rdd_x = df[['UserId',"tmdbId"]].rdd
    pred_rating = model.predictAll(rdd_x).map(lambda x:x[2]).collect()

    MSE = np.mean([(x-y)**2 for x,y, in zip(true_rating,pred_rating)])
    RMSE = math.sqrt(MSE)
    print("Root-mean-square error = " + str(RMSE)) 
    return RMSE

In [201]:
computeRmse(model,df_spark)

Root-mean-square error = 1.3297348142675336


1.3297348142675336

# Step 3: Utilized ALS model to make recommendation

In [None]:
"""Recommendation based on Users & Products"""
display('features for product one', model.productFeatures().first())
display('features for user one',model.userFeatures().first())

# For Product X, Find N Users to Sell To
productForUser = model.recommendUsers(242,10)
print("Top 10 moive recommendations for product 242: \n")
display(productForUser)

In [None]:
# For User Y Find N Products to Promote
userForProduct = model.recommendProducts(196,10)
print("Top 10 users to recommend for user 196")
display(userForProduct)

In [None]:
# Predict ratings
ratingPredict= model.predict(196,242)
print("prediction of rate for User 196 --> stock 242")
display(ratingPredict)

In [None]:
productFeatures = model.productFeatures()
productFeatures.collect()

# Step 4: Modulize

In [None]:
class SP_ALS:
    def __init__(self):
        pass
    def train(self,df,rank=10,numIterations=20):
        assert len(df.columns) == 3
        """Create SparkContext"""
        ### You can only have one sparkContext at the same time
        sc = pyspark.SparkContext.getOrCreate()
        sqlContext = SQLContext(sc)
        """Create Spark dataframe"""
        df_spark = sqlContext.createDataFrame(df)
        
        """Train ALS model """
        self.model = ALS.train(df_spark, rank, numIterations)
        self.rmse = self.computeRmse(df_spark)
        
    def computeRmse(self,df):
        """
        Compute RMSE (Root mean Squared Error).
        """
        true_rating = df[['rating']].rdd.map(lambda x:x[0]).collect()    

        rdd_x = df[['UserId',"tmdbId"]].rdd
        pred_rating = model.predictAll(rdd_x).map(lambda x:x[2]).collect()

        MSE = np.mean([(x-y)**2 for x,y, in zip(true_rating,pred_rating)])
        RMSE = math.sqrt(MSE)
        return RMSE    
    
    def recommend_to_user(self,userId =10,num_products=10):
        return self.model.recommendProducts(userId,num_products)
    def recommend_to_products(self,productId =10,num_users=10):
        return self.model.recommendUsers(productId,num_users)
    def predict_score(self,userId,productId):
        return self.model.predict(userId,productId)
    def get_product_features(self):
        productFeatures = self.model.productFeatures()
        return productFeatures.collect()

In [None]:
myALS = SP_ALS()

In [None]:
myALS.train(df=df,rank=10,numIterations=20)

In [None]:
myALS.rmse

In [186]:
myALS.get_product_features()

[(8,
  array('d', [-0.5754498839378357, 1.729912519454956, 1.7887969017028809, 1.4359464645385742, 0.19993239641189575, -0.03108469396829605, -0.4101007282733917, -0.031022824347019196, 1.4906493425369263, 0.39472028613090515])),
 (16,
  array('d', [-0.2624242305755615, 1.8636385202407837, 0.9585332870483398, 1.8470232486724854, 0.04963565617799759, -2.106616735458374, -0.8387255072593689, -0.8639708161354065, 0.4975408613681793, 1.3435550928115845])),
 (24,
  array('d', [-0.3323332667350769, 1.0146340131759644, 0.7422187328338623, 1.2253473997116089, 0.17775224149227142, -1.600788950920105, -1.688912272453308, 0.6676797866821289, 0.7452763319015503, 1.444183349609375])),
 (32,
  array('d', [-0.23060138523578644, 1.6140249967575073, 0.4064754247665405, 0.957678496837616, 0.46637842059135437, -2.4529459476470947, -1.139078974723816, -1.0483516454696655, 1.0793168544769287, 1.2402673959732056])),
 (40,
  array('d', [0.41627031564712524, 0.9401078224182129, -0.3306868374347687, 1.11968231