In [1]:
import math
import re
import numpy as np

from pyspark import SparkContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

In [2]:
sc = SparkContext()

In [4]:
#Select the desired columns
data = sc.textFile('./lastfm-dataset-360k-small/merged-subset.csv').map(lambda x : [x.split(',')[i] for i in [1,2,4]])#
header = data.take(1)

In [11]:
# data.collect()

In [5]:
#Remove header
data2 = data.filter(lambda line: line!=header)
print ("length of uncleaned data -",len(data2.collect()))
# data2.map(lambda x : len(x[1])).collect()
data2 = data2.filter(lambda x : len(x[1]) == 36) #Clean data - remove artists without artistId

# Remove unclean rows
def isNumber(inputString):
    """ This return True if the string is pure number, False otherwise """
    return bool(re.search(r'\D', inputString))

data2 = data2.filter(lambda x: not isNumber(x[2])) # Remove faulty rows
data2 = data2.map(lambda x: [x[0], x[1], int(x[2])]) #Change plays into integer

#Filter out values with more than 500 plays (for this sake of simplicity)
print ("length of cleaned data -",len(data2.collect()))
data2 = data2.filter(lambda x : x[2] <= 500)
# print (data2.take(2))
print ("length of filtered data -",len(data2.collect()))

length of uncleaned data - 10001
length of cleaned data - 9858
length of filtered data - 8929


In [6]:
#Convert strings into integers
users = data2.map(lambda x: x[0]).distinct().zipWithIndex()
artists = data2.map(lambda x: x[1]).distinct().zipWithIndex()
# int_user = users.map(lambda u: (u[1], u[0]))
# int_artist = artists.map(lambda i: (i[1], i[0]))
# users.collect()
# artists.collect()

In [66]:
# data2 = data2.map(lambda r: (r[0], (r[1], r[2]))).join(users).map(lambda r: (r[1][1], r[1][0][0], r[1][0][1]))
# data2.collect()

In [7]:
# Substitutes the ObjectIDs in the ratings RDD with the corresponding int values
data2 = data2.map(lambda r: (r[0], (r[1], r[2]))).join(users).map(lambda r: (r[1][1], r[1][0][0], r[1][0][1]))
data2 = data2.map(lambda r: (r[1], (r[0], r[2]))).join(artists).map(lambda r: (r[1][0][0], r[1][1], r[1][0][1]))

In [8]:
# data2.filter(lambda x: x[0] == 12).collect()
plays = data2.map(lambda x: x[2]).collect()
# data2.collect()

In [9]:
summation = 0
for i in plays:
    summation += i**2
print (np.mean(plays)**2)
summation / len(plays)

14359.3343839


27803.982528838616

In [10]:
# Use 'Rating' function to get the values in the right format
data2 = data2.map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
len(data2.collect())

8929

In [11]:
# Use randomsplit to split the data into train, validation and testing sets

training_RDD, validation_RDD, test_RDD = data2.randomSplit([6, 2, 2], seed=0)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [None]:
# training_RDD.collect()

In [23]:
# Train ALS

# Parameters
seed = 5
iterations = 10
regularization_parameter = 0.1
ranks = [4,5,6,7]
errors = [0, 0, 0,0,0]
tolerance = 0.02
# alpha = 0.01

#other variables initialized
min_error = float('inf')
best_rank = -1
best_iteration = -1
err = 0

# Train - Validation loop

for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print ('For rank %s the RMSE is %s' % (rank, error))
    if error < min_error:
        min_error = error
        best_rank = rank

print ('The best model was trained with rank %s' % best_rank)

For rank 4 the RMSE is 183.16428011432353
For rank 5 the RMSE is 184.37582031488535
For rank 6 the RMSE is 148.69144604802995
For rank 7 the RMSE is 187.80308878145786
The best model was trained with rank 6


In [53]:
# Final Model
rank = 6

model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())

print ('For rank %s the RMSE is %s' % (rank, error))

For rank 6 the RMSE is 148.69144604802995


In [77]:
data2.take(10)

[Rating(user=12, product=11, rating=168.0),
 Rating(user=18, product=23, rating=178.0),
 Rating(user=26, product=29, rating=17.0),
 Rating(user=5571, product=29, rating=38.0),
 Rating(user=31, product=31, rating=59.0),
 Rating(user=37, product=42, rating=26.0),
 Rating(user=4039, product=42, rating=66.0),
 Rating(user=4371, product=42, rating=483.0),
 Rating(user=50, product=55, rating=228.0),
 Rating(user=6089, product=55, rating=98.0)]

In [78]:
model.predict(12,11)

167.9981782153419

In [79]:
model.predict(18,23)

177.9982862633466

In [80]:
model.predict(26,29)

Py4JJavaError: An error occurred while calling o1987.predict.
: java.util.NoSuchElementException: next on empty iterator
	at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
	at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
	at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
	at scala.collection.IterableLike$class.head(IterableLike.scala:107)
	at scala.collection.mutable.ArrayBuffer.scala$collection$IndexedSeqOptimized$$super$head(ArrayBuffer.scala:48)
	at scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:126)
	at scala.collection.mutable.ArrayBuffer.head(ArrayBuffer.scala:48)
	at org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:81)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)


In [43]:
# print(len(data2.collect()))
# values = data2.map(lambda x: (x[0], x[1]))
# print(len(values.collect()))
# predictions = model.predictAll(values)
print(len(predictions.collect()))
print(len(validation_for_predict_RDD.collect()))
print(len(validation_RDD.collect()))

5401
1763
1763


In [46]:
pred = model.predictAll(validation_for_predict_RDD)
pred2 = pred.map(lambda x : ((x[0], x[1]), x[2]))

In [50]:
print(pred.take(20))
print ('')
print (pred2.take(20))

[Rating(user=6472, product=586, rating=203.56013428189112), Rating(user=6680, product=2639, rating=-22.290913780496503), Rating(user=976, product=2950, rating=-13.088973987739678), Rating(user=1353, product=2633, rating=-24.163823255056045), Rating(user=3125, product=812, rating=1.5132890885141563), Rating(user=5945, product=3247, rating=-17.957026871846278), Rating(user=4794, product=2792, rating=74.65524647233833), Rating(user=514, product=429, rating=63.993144708107934), Rating(user=2459, product=1873, rating=0.2837004828375107), Rating(user=6607, product=350, rating=25.622733513223086), Rating(user=6835, product=2488, rating=-0.5775383876601863)]

[((6472, 586), 203.56013428189112), ((6680, 2639), -22.290913780496503), ((976, 2950), -13.088973987739678), ((1353, 2633), -24.163823255056045), ((3125, 812), 1.5132890885141563), ((5945, 3247), -17.957026871846278), ((4794, 2792), 74.65524647233833), ((514, 429), 63.993144708107934), ((2459, 1873), 0.2837004828375107), ((6607, 350), 25.

In [49]:
len(pred.collect())

11

In [51]:
len(rates_and_preds.collect())

11

In [40]:
user_product_pairs = sc.parallelize([(196, 242), (196, 243), (196, 244)])
user_product_pairs.collect()
p = model.predictAll(user_product_pairs)

In [41]:
p.collect()

[Rating(user=196, product=244, rating=50.056446614482695),
 Rating(user=196, product=242, rating=-2.5788212949425366)]

MapPartitionsRDD[3922] at mapPartitions at PythonMLLibAPI.scala:1339

In [19]:
predictions.filter(lambda x : x[0] == 26).collect()

[]

In [20]:
validation_RDD.filter(lambda x : x[0] == 26).collect()

[Rating(user=26, product=29, rating=17.0)]

In [18]:
validation_RDD.take(5)

[Rating(user=26, product=29, rating=17.0),
 Rating(user=5571, product=29, rating=38.0),
 Rating(user=53, product=39, rating=14.0),
 Rating(user=34, product=39, rating=262.0),
 Rating(user=6031, product=39, rating=203.0)]