In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import lit
from pyspark.sql.functions import col

In [2]:
#Importing data
spotifyDataList1 = spark.sql("SELECT * FROM bigdata1")
# spotifyDataList2 = spark.sql("SELECT * FROM bigdata2")
# spotifyDataList3 = spark.sql("SELECT * FROM bigdata3")
spotifyDataList = spotifyDataList1

# Adding the rating column 
all_data = spotifyDataList.select("trackid","artist_name","track_name", "pid").withColumn("rating", lit(1))

In [3]:
all_data.show()

In [4]:
#Adding column songid, which uniqely identifies each song using integers.
# Takes about ~1 minute
from pyspark.ml.feature import StringIndexer
stringIndexer = StringIndexer(inputCol="trackid", outputCol="songid", handleInvalid="error", stringOrderType="frequencyDesc")
model = stringIndexer.fit(all_data)
all_data = model.transform(all_data)

In [5]:
# Exploring the data
all_data.filter(all_data.trackid == 'spotify:track:5cCAZS9VhLGEDV4NCfieeg').show()

In [6]:
ratings = all_data.select("pid","songid","rating")

In [7]:
#Adding the playlist that we want to continue
# Songs in the 'Soft Rap' playlist:
# Khalid - Location
# Post Malone - Go Flex
# Post Malone - Too Young
# Khalid - Saved
# Post Malone - Congratulations
# Khalid - Saved
# Post Malone - Yours Truly, Austin Post
# 6LACK - PRBLMS
# Post Malone - I Fall Apart
# Khalid - Coaster
# Khalid - Reasons
softRap = spark.createDataFrame([[-1, 9, 1], [-1, 133, 1], [-1, 1228, 1], [-1, 4, 1], [-1, 4876, 1], [-1, 308, 1], [-1, 666, 1], [-1, 1648, 1], [-1, 5437, 1], [-1, 681, 1]], ['pid', 'songid', 'rating'])
withMyRatings = softRap.union(ratings)

In [8]:
# Songs in the 'Run the World' playlist:
# Beyonce - Run the World (Girls) 710.0
# Rihanna - S&M 1054.0
# Britney Spears - Work B**ch 2153.0
# will.i.am - Scream & Shout 1786.0
# Britney Spears - I Wanna Go 6870.0
runTheWorld = spark.createDataFrame([[-2, 710, 1], [-2, 1054, 1], [-2, 2153, 1], [-2, 1786, 1], [-2, 6870, 1]], ['pid', 'songid', 'rating'])
withMyRatings = runTheWorld.union(withMyRatings)

# Splitting training data
We will use MLlib's ALS (alternating least squares) to train a ALSModel, which takes a RDD[(user, product, rating)] as an input in Python. ALS has training parameters such as rank for matrix factors and regularization constants. We hold the training, and test sets in memory by calling cache because we need to visit them multiple times.

In [10]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
ratingsR = withMyRatings.rdd.map(lambda x: Rating(int(x[0]), int(x[1]), float(x[2])))
train, test = ratingsR.randomSplit([0.7,0.3],7856)
train.cache()
test.cache()

In [11]:
#Setting up the parameters for ALS
rank = 5 # Latent Factors to be made
numIterations = 10 # Times to repeat process

#Creating the model on the training data
model = ALS.trainImplicit(train, rank, numIterations)

In [12]:
# Recommendations for 'Soft Rap' playlist
result1 = model.recommendProducts(-1,10)

In [13]:
#Putting results in a Dataframe
resultSchema = StructType([
    StructField("user", StringType(), True),
    StructField("product", IntegerType(), True),
    StructField("rating", FloatType(), True),
])
resultDf = sqlContext.createDataFrame(result1, resultSchema)
resultDf = resultDf.join(all_data, resultDf.product == all_data.songid).drop_duplicates(['product'])
resultDf.select('songid', 'artist_name', 'track_name').show()

In [14]:
# Recommendations for 'Run The World' playlist
result2 = model.recommendProducts(-2,10)

In [15]:
resultDf2 = sqlContext.createDataFrame(result2, resultSchema)
resultDf2 = resultDf2.join(all_data, resultDf2.product == all_data.songid).drop_duplicates(['product'])
resultDf2.select('songid', 'artist_name', 'track_name').show()