# Importing Datasets

In [1]:
from pyspark.sql import SQLContext,Row
from pyspark.sql import functions as F
from pyspark import SparkContext

In [2]:
sqlContext = SQLContext(sc)

In [4]:
dataDir = "/home/satvik/Analytics/Recommender Project"

userData = sc.textFile(dataDir+"/ml-100k/u.user").map(lambda x : x.split("|"))
movieData = sc.textFile(dataDir+"/ml-100k/u.item").map(lambda x : x.split("|"))
ratingData = sc.textFile(dataDir+"/ml-100k/u.data").map(lambda x : x.split("\t"))

In [5]:
ratingDataDF = ratingData.map(lambda x : Row(userID = int(x[0]),
                        movieID = int(x[1]),
                        rating=float(x[2]),
                        timestamp = int(x[3])))
ratingDataDF = sqlContext.createDataFrame(ratingDataDF)

userDataDF = userData.map(lambda x : Row(userID=int(x[0]),
                                        age = int(x[1]),
                                        gender = x[2],
                                        occupation = x[3],
                                        zipcode = x[4]))
userDataDF = sqlContext.createDataFrame(userDataDF)

movieDataDF = movieData.map(lambda x : Row(movieID = int(x[0]),
                                            movieTitle = x[1],
                                            releaseDate = x[2],
                                            videoReleaseDate = x[3],
                                            IMDBurl = x[4],
                                            unknown= int(x[5]),
                                            action = int(x[6]),
                                            adventure = int(x[7]),
                                            animation = int(x[8]),
                                            childrens = int(x[9]),
                                            comedy = int(x[10]),
                                             crime = int(x[11]),
                                             documentary = int(x[12]),
                                             drama = int(x[13]),
                                             fantasy = int(x[14]),
                                             filmNoir = int(x[15]),
                                             horror = int(x[16]),
                                             musical = int(x[17]),
                                             mystery = int(x[18]),
                                             romance = int(x[19]),
                                             sciFi = int(x[20]),
                                             thriller = int(x[21]),
                                             war = int(x[22]),
                                             western = int(x[23])))
movieDataDF = sqlContext.createDataFrame(movieDataDF)

In [6]:
# timestamp
def extract_datetime(ts):
    import datetime
    return datetime.datetime.fromtimestamp(ts)

####
newdataDF = ratingDataDF.map(lambda x: Row(x[0],x[1],x[2],x[3],extract_datetime(x[2]).day,
                                           extract_datetime(x[2]).month,extract_datetime(x[2]).year,
                                           extract_datetime(x[2]).hour,extract_datetime(x[2]).minute,
                                           extract_datetime(x[2]).second)).toDF()
ratingDataDF = newdataDF.selectExpr("_1 as movieID","_2 as rating","_3 as timestamp","_4 as userID",
                                        "_5 as date","_6 as month","_7 as year",
                                        "_8 as hour","_9 as minute","_10 as second")

In [7]:
ratingDataDF.show(20)

+-------+------+---------+------+----+-----+----+----+------+------+
|movieID|rating|timestamp|userID|date|month|year|hour|minute|second|
+-------+------+---------+------+----+-----+----+----+------+------+
|    242|   3.0|881250949|   196|   4|   12|1997|  21|    25|    49|
|    302|   3.0|891717742|   186|   5|    4|1998|   0|    52|    22|
|    377|   1.0|878887116|    22|   7|   11|1997|  12|    48|    36|
|     51|   2.0|880606923|   244|  27|   11|1997|  10|    32|     3|
|    346|   1.0|886397596|   166|   2|    2|1998|  11|     3|    16|
|    474|   4.0|884182806|   298|   7|    1|1998|  19|    50|     6|
|    265|   2.0|881171488|   115|   3|   12|1997|  23|    21|    28|
|    465|   5.0|891628467|   253|   4|    4|1998|   0|     4|    27|
|    451|   3.0|886324817|   305|   1|    2|1998|  14|    50|    17|
|     86|   3.0|883603013|     6|   1|    1|1998|   2|    46|    53|
|    257|   2.0|879372434|    62|  13|   11|1997|   3|    37|    14|
|   1014|   5.0|879781125|   286| 

# Merging Datasets

In [8]:
data = ratingDataDF.join(userDataDF, ratingDataDF.userID==userDataDF.userID, 'inner').drop(userDataDF.userID)

In [9]:
data.show()

+-------+------+---------+------+----+-----+----+----+------+------+---+------+----------+-------+
|movieID|rating|timestamp|userID|date|month|year|hour|minute|second|age|gender|occupation|zipcode|
+-------+------+---------+------+----+-----+----+----+------+------+---+------+----------+-------+
|    886|   2.0|881547877|    31|   8|   12|1997|   7|    54|    37| 24|     M|    artist|  10003|
|    484|   5.0|881548030|    31|   8|   12|1997|   7|    57|    10| 24|     M|    artist|  10003|
|    682|   2.0|881547834|    31|   8|   12|1997|   7|    53|    54| 24|     M|    artist|  10003|
|    302|   4.0|881547719|    31|   8|   12|1997|   7|    51|    59| 24|     M|    artist|  10003|
|    135|   4.0|881548030|    31|   8|   12|1997|   7|    57|    10| 24|     M|    artist|  10003|
|    705|   5.0|881548110|    31|   8|   12|1997|   7|    58|    30| 24|     M|    artist|  10003|
|    504|   5.0|881548110|    31|   8|   12|1997|   7|    58|    30| 24|     M|    artist|  10003|
|    498| 

In [10]:
data = data.join(movieDataDF,data.movieID==movieDataDF.movieID,"inner").drop(movieDataDF.movieID)

In [11]:
data.count()

100000

In [12]:
data.printSchema()

root
 |-- movieID: long (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- userID: long (nullable = true)
 |-- date: long (nullable = true)
 |-- month: long (nullable = true)
 |-- year: long (nullable = true)
 |-- hour: long (nullable = true)
 |-- minute: long (nullable = true)
 |-- second: long (nullable = true)
 |-- age: long (nullable = true)
 |-- gender: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- IMDBurl: string (nullable = true)
 |-- action: long (nullable = true)
 |-- adventure: long (nullable = true)
 |-- animation: long (nullable = true)
 |-- childrens: long (nullable = true)
 |-- comedy: long (nullable = true)
 |-- crime: long (nullable = true)
 |-- documentary: long (nullable = true)
 |-- drama: long (nullable = true)
 |-- fantasy: long (nullable = true)
 |-- filmNoir: long (nullable = true)
 |-- horror: long (nullable = true)
 |-- movieTitle: string (nullable 

In [13]:
#removing Null values
nullDF = data.filter(data['movieID'].isNull() |
                        data['movieTitle'].isNull() |
                        data['releaseDate'].isNull() |
                        data['videoReleaseDate'].isNull() |
                        data['IMDBurl'].isNull() |
                        data['unknown'].isNull() |
                        data['action'].isNull() |
                        data['adventure'].isNull() |
                        data['animation'].isNull() |
                        data['childrens'].isNull() |
                        data['comedy'].isNull() |
                        data['crime'].isNull() |
                        data['documentary'].isNull() |
                        data['drama'].isNull() |
                        data['fantasy'].isNull() |
                        data['filmNoir'].isNull() |
                        data['horror'].isNull() |
                        data['musical'].isNull() |
                        data['mystery'].isNull() |
                        data['romance'].isNull() |
                        data['sciFi'].isNull() |
                        data['thriller'].isNull() |
                        data['userID'].isNull() |
                        data['age'].isNull() |
                        data['gender'].isNull() |
                        data['occupation'].isNull() |
                        data['zipcode'].isNull() |
                        data['userID'].isNull() |
                        data['movieID'].isNull() |
                        data['rating'].isNull() |
                        data['timestamp'].isNull() | 
                        data['war'].isNull())                
nullDF.count()

0

In [14]:
# dropping video releasedate and IMDBurl
data = data.drop('videoReleaseDate')
data = data.drop('IMDBurl')

In [15]:
#resolving occupation and gender
gen_categories = data.select('gender').distinct().rdd.flatMap(lambda x: x).collect()
occ_categories = data.select('occupation').distinct().rdd.flatMap(lambda x: x).collect()

exprs = [F.when(F.col("gender") == category, 1).otherwise(0).alias(category)
         for category in gen_categories]

exprss = [F.when(F.col("occupation") == category, 1).otherwise(0).alias(category)
         for category in occ_categories]
gennewdata = data.select('gender',*exprs)
occnewdata = data.select('occupation',*exprss)
data1 = data.join(gennewdata,'gender','inner').drop('gender')
data2 = data1.join(occnewdata,'occupation','inner').drop('occupation')

In [16]:
data2.columns

['movieID',
 'rating',
 'timestamp',
 'userID',
 'date',
 'month',
 'year',
 'hour',
 'minute',
 'second',
 'age',
 'zipcode',
 'action',
 'adventure',
 'animation',
 'childrens',
 'comedy',
 'crime',
 'documentary',
 'drama',
 'fantasy',
 'filmNoir',
 'horror',
 'movieTitle',
 'musical',
 'mystery',
 'releaseDate',
 'romance',
 'sciFi',
 'thriller',
 'unknown',
 'war',
 'western',
 'F',
 'M',
 'administrator',
 'salesman',
 'programmer',
 'doctor',
 'scientist',
 'lawyer',
 'homemaker',
 'retired',
 'engineer',
 'none',
 'educator',
 'writer',
 'other',
 'executive',
 'entertainment',
 'artist',
 'marketing',
 'technician',
 'student',
 'healthcare',
 'librarian']

# ALS

In [17]:
RDD_data = data.rdd

### Splitting data

In [18]:
train,validation,test = RDD_data.randomSplit([6,2,2],seed=0L)

In [19]:
validation.take(3)

[Row(movieID=31, rating=5.0, timestamp=884131157, userID=435, date=7, month=1, year=1998, hour=5, minute=29, second=17, age=24, gender=u'M', occupation=u'engineer', zipcode=u'60007', action=0, adventure=0, animation=0, childrens=0, comedy=0, crime=0, documentary=0, drama=1, fantasy=0, filmNoir=0, horror=0, movieTitle=u'Crimson Tide (1995)', musical=0, mystery=0, releaseDate=u'01-Jan-1995', romance=0, sciFi=0, thriller=1, unknown=0, war=1, western=0),
 Row(movieID=31, rating=3.0, timestamp=890687473, userID=41, date=24, month=3, year=1998, hour=2, minute=41, second=13, age=33, gender=u'M', occupation=u'engineer', zipcode=u'80525', action=0, adventure=0, animation=0, childrens=0, comedy=0, crime=0, documentary=0, drama=1, fantasy=0, filmNoir=0, horror=0, movieTitle=u'Crimson Tide (1995)', musical=0, mystery=0, releaseDate=u'01-Jan-1995', romance=0, sciFi=0, thriller=1, unknown=0, war=1, western=0),
 Row(movieID=31, rating=4.0, timestamp=875807058, userID=851, date=2, month=10, year=1997,

In [20]:
train_RDD = train.map(lambda x: (x[0],x[3],x[1]))
validation_for_predict_RDD = validation.map(lambda x: (x[0], x[3]))
test_for_predict_RDD = test.map(lambda x: (x[0], x[3]))

In [21]:
validation_for_predict_RDD.take(3)

[(31, 435), (31, 41), (31, 851)]

In [22]:
from pyspark.mllib.recommendation import ALS
import math

seed = 5L
iterations = 2
regularization_parameter = 0.1
ranks = [18, 15, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    print "Iteration starts..."
    model = ALS.train(train_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation.map(lambda r: ((int(r[0]), int(r[3])), float(r[1]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print 'For rank %s the RMSE is %s' % (rank, error)
    if error < min_error:
        min_error = error
        best_rank = rank

print 'The best model was trained with rank %s' % best_rank

Iteration starts...
For rank 18 the RMSE is 0.98497204399
Iteration starts...
For rank 15 the RMSE is 0.997378110304
Iteration starts...
For rank 12 the RMSE is 0.98847546646
The best model was trained with rank 18


In [23]:
predictions.take(3)

[((1100, 201), 2.6758243061720797),
 ((1100, 405), 1.652081205197201),
 ((100, 1), 3.766693784394797)]

In [24]:
rates_and_preds.take(3)

[((492, 474), (4.0, 4.103014935235372)),
 ((414, 488), (2.0, 3.2609309990147572)),
 ((255, 839), (3.0, 3.083994191313232))]

In [25]:
model = ALS.train(train_RDD, best_rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test.map(lambda r: ((int(r[0]), int(r[3])), float(r[1]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())

print 'For testing data the RMSE is %s' % (error)

For testing data the RMSE is 0.984570186144


In [27]:
train_RDD.take(5)

[(31, 233, 3.0), (31, 234, 4.0), (31, 442, 3.0), (31, 249, 4.0), (31, 56, 4.0)]