In [1]:
%pylab inline
import rlcompleter, readline
readline.parse_and_bind('tab: complete')
import findspark
import os
findspark.init()
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
sc = SparkContext()
sqlContext = SQLContext(sc)

Populating the interactive namespace from numpy and matplotlib


In [35]:
import itertools
from pyspark.mllib.recommendation import ALS
from pyspark.mllib.evaluation import RegressionMetrics

In [29]:
def parseRatings(l):
    """
    Parse the Ratings
    """
    return int(l[0]), int(l[1]), float(l[2])




def findMovieTitles(ids, movies):
    """
    Find movie title from movie id
    """
    titles = [movies.filter(lambda x: x[0] == i).map(lambda x: x[1]).collect() for i in ids]
    return titles




def computeRmse(model, data, n):
    """
    Compute RMSE (Root Mean Squared Error).
    """
    predictions = model.predictAll(data.map(lambda x: (x[0], x[1])))
    predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])) \
      .join(data.map(lambda x: ((x[0], x[1]), x[2]))) \
      .values()
    return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(n))





def findBestModel(train, validation, ranks, lambdas, iteration):
    """
    Choose Regularization and number of latent factors 
    """
    bestModel = None
    bestLambda = -1.0
    bestRank = 0
    bestRmse = float("inf")
    
    for rank, lmbda in itertools.product(ranks, lambdas):
        model = ALS.train(train, rank, iteration, lmbda)
        rmse = computeRmse(model, validation, validation.count())
        
        print("model trained with lambda = ", lmbda, "rank = ", rank, "has the rmse = ", rmse)
        
        if rmse < bestRmse:
            bestRmse = rmse
            bestModel = model
            bestLambda = lmbda
            bestRank = rank
            
            
    return bestModel, bestRmse





def evaluateOnTest(model, test):
    """
    Evaluate the model on testing set
    """
    rmse = computeRmse(model, test, test.count())
    print("the RMSE for best trained model on test set is", rmse)
    return rmse





def recommendation_engine(model, user_data, movieList):
    """
    Build a recommendation engine 
    """
    userid = user_data.first()[0]
    watchedMovies = user_data.map(lambda x: x[1])
    recommendList = movieList.subtract(watchedMovies) 
    predictions = model.predictAll(recommendList.map(lambda x : (userid, x))).collect()
    recommendations = findMovieTitles(\
                                    [sorted(predictions, key=lambda x: x[2], reverse=True)[i].product for i in range(10)],\
                                    movies)
    
    print ("Movies recommended for user:", userid)
    for each in recommendations:
        print(each)
    
    return predictions


def subtractGlobalBias(train):
    """
    Remove global bias miu
    """
    
    miu = train.map(lambda x: x[2]).mean()
    return train.map(lambda x: (x[0], x[1], x[2] - miu)), miu



def subtractUserBias(train):
    """
    Remove user specific bias
    """
    bu = train.map(lambda x: (x[0], x[2])).groupByKey()\
                .map(lambda x:(x[0], list(x[1])))\
                .mapValues(lambda x: mean(x))
    
    subtract = train.map(lambda x: (x[0], (x[1], x[2]))).join(bu).map(lambda x: (x[0], x[1][0][0], x[1][0][1] - x[1][1]))
    return subtract, bu


def subtractItemBias(train):
    """
    Remove item specific bias
    """
    bi = train.map(lambda x: (x[1], x[2])).groupByKey()\
                .map(lambda x:(x[0], list(x[1])))\
                .mapValues(lambda x: mean(x))
            
    subtract = train.map(lambda x: (x[1], (x[0], x[2]))).join(bi).map(lambda x: (x[1][0][0], x[0], x[1][0][1] - x[1][1]))
    return subtract, bi

def subtractBias(data, bias, types):
    """
    Remove bias on given dataset
    """
    

In [4]:
movies = sc.textFile('../data/ml-10M100K/movies.dat')\
                    .map(lambda x: x.split('::'))\
                    .map(lambda x: [int(x[0]), str(x[1]), str(x[2])])
train = sc.textFile('../data/sorted_ratings/train.txt')\
                    .map(lambda x:x.strip('[').strip(']').split(','))\
                    .map(lambda x: parseRatings(x))
validation = sc.textFile('../data/sorted_ratings/validation.txt')\
                    .map(lambda x:x.strip('[').strip(']').split(','))\
                    .map(lambda x: parseRatings(x))
test = sc.textFile('../data/sorted_ratings/test.txt')\
                    .map(lambda x:x.strip('[').strip(']').split(','))\
                    .map(lambda x: parseRatings(x))

In [30]:
globalbias_removed = subtractGlobalBias(train)
userbias_removed = subtractUserBias(train)
itembias_removed = subtractItemBias(train)

In [37]:
globalbias_removed_model = ALS.train(globalbias_removed, 12, 10, 0.1)
userbias_removed_model = ALS.train(userbias_removed, 12, 10, 0.1)
itembias_removed_model = ALS.train(itembias_removed, 12, 10, 0.1)

In [39]:
print("the RMSE for model on  global bias removed dataset is", \
      computeRmse(globalbias_removed_model, validation, validation.count()))
print("the RMSE for model on  user bias removed dataset is", \
      computeRmse(userbias_removed_model, validation, validation.count()))
print("the RMSE for model on  item bias removed dataset is", \
      computeRmse(itembias_removed_model, validation, validation.count()))

the RMSE for model on  global bias removed dataset is 1.23978714257
the RMSE for model on  user bias removed dataset is 1.22889309455
the RMSE for model on  item bias removed dataset is 1.24629341324


In [42]:
unichr("abcd")

NameError: name 'unichr' is not defined

In [38]:
print (1)
print (2)

1
2
