In [1]:
from pyspark.mllib.recommendation import *
import random
from operator import *
from pyspark import SparkContext
from pyspark.sql import SQLContext
import os
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

In [2]:

os.environ["JAVA_HOME"] = "/home/hduser/java"
os.environ["PYSPARK_PYTHON"] = "python3.6"

sc = SparkContext("local","sqlexample1")

sql = SQLContext(sc)
    

In [3]:
# # Method to split artist Id and its name.
def splitArtistName(line):
    try:
        id, name = line.split("\t")
        return (int(id), name)
    except ValueError:
        return None

# Load text file where each line contains artist Id and its name.
artistData = sc.textFile("./artist_data_small.txt")
#print(artistData.collect())
# Split artist id: name and store in a map. 
artistData = artistData.map(splitArtistName).filter(lambda x: x!=None).collectAsMap()
print(artistData)



In [4]:
'''
Load artist correct id and its aliases
    2 columns: badid, goodid
    known incorrectly spelt artists and the correct artist id. 
'''
artistAlias = sc.textFile("./artist_alias_small.txt")
# Split Artist Alias data into (badId, goodId)
def splitArtistAlias(line):
    try:
        # Catches error in data
        badId, goodId = line.split("\t")
        return (int(badId), int(goodId))
    except ValueError:
        return None

# Create map badId: goodId

artistAlias = artistAlias.map(splitArtistAlias).filter(lambda x: x!=None).collectAsMap()

In [6]:
'''
Load data about user's music listening history
Each line contains three features: userid, artistid, playcount
'''
userArtistData = sc.textFile("./user_artist_data_small.txt")

# Return the corrected user information.
def parseUserHistory(line):
    try:
        # Catch error in line
        user, artist, count = line.split()
        # Return the corrected user information.
        if artist in artistAlias:
            return (int(user), artistAlias[artist], int(count))
        else:
            return (int(user), int(artist), int(count))
    except ValueError:
        return None


# Create corrected user history RDD.
userArtistData = userArtistData.map(parseUserHistory)
userArtistData = userArtistData.map(lambda p: Row(userId=int(p[0]), artistId=int(p[1]),
                                     count=float(p[2])))

print(userArtistData.collect())

[Row(artistId=1000010, count=238.0, userId=1059637), Row(artistId=1000049, count=1.0, userId=1059637), Row(artistId=1000056, count=1.0, userId=1059637), Row(artistId=1000062, count=11.0, userId=1059637), Row(artistId=1000094, count=1.0, userId=1059637), Row(artistId=1000112, count=423.0, userId=1059637), Row(artistId=1000113, count=5.0, userId=1059637), Row(artistId=1000114, count=2.0, userId=1059637), Row(artistId=1000123, count=2.0, userId=1059637), Row(artistId=1000130, count=19129.0, userId=1059637), Row(artistId=1000139, count=4.0, userId=1059637), Row(artistId=1000241, count=188.0, userId=1059637), Row(artistId=1000263, count=180.0, userId=1059637), Row(artistId=1000289, count=2.0, userId=1059637), Row(artistId=1000305, count=1.0, userId=1059637), Row(artistId=1000320, count=21.0, userId=1059637), Row(artistId=1000340, count=1.0, userId=1059637), Row(artistId=1000427, count=20.0, userId=1059637), Row(artistId=1000428, count=12.0, userId=1059637), Row(artistId=1000433, count=10.0,

In [7]:
df = sql.createDataFrame(userArtistData)
df.show()

+--------+-------+-------+
|artistId|  count| userId|
+--------+-------+-------+
| 1000010|  238.0|1059637|
| 1000049|    1.0|1059637|
| 1000056|    1.0|1059637|
| 1000062|   11.0|1059637|
| 1000094|    1.0|1059637|
| 1000112|  423.0|1059637|
| 1000113|    5.0|1059637|
| 1000114|    2.0|1059637|
| 1000123|    2.0|1059637|
| 1000130|19129.0|1059637|
| 1000139|    4.0|1059637|
| 1000241|  188.0|1059637|
| 1000263|  180.0|1059637|
| 1000289|    2.0|1059637|
| 1000305|    1.0|1059637|
| 1000320|   21.0|1059637|
| 1000340|    1.0|1059637|
| 1000427|   20.0|1059637|
| 1000428|   12.0|1059637|
| 1000433|   10.0|1059637|
+--------+-------+-------+
only showing top 20 rows



In [28]:
(training, test) = df.randomSplit([0.8,0.2])
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="artistId", ratingCol="count",
          coldStartStrategy="drop")
model = als.fit(training)
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="count",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each artist
artistRecs = model.recommendForAllItems(10)


# Generate top 10 movie recommendations for a specified set of users
users = df.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)
# Generate top 10 user recommendations for a specified set of artist
artist = df.select(als.getItemCol()).distinct().limit(3)
artistSubSetRecs = model.recommendForItemSubset(movies, 10)

users.show()
artist.show()

Root-mean-square error = 947.5029296571208
+-------+
| userId|
+-------+
|2010008|
|2020513|
|1055449|
+-------+

+--------+
|artistId|
+--------+
| 1001530|
| 1002734|
| 1191501|
+--------+

