# ALS Music Recommendation System
In this project we will make music recommendations according to music listener's preferences. The model training is done with alternative least squares (ALS) and a sample data is taken from "uniqe_tracks.txt" from http://millionsongdataset.com/sites/default/files/AdditionalFiles/unique_tracks.txt . 

dataset by:  
Thierry Bertin-Mahieux, Daniel P.W. Ellis, Brian Whitman, and Paul Lamere.   
The Million Song Dataset. In Proceedings of the 12th International Society  
for Music Information Retrieval Conference (ISMIR 2011), 2011.  


## Pyspark tools used in this projects 
- Dataframe
- RDD
- MLLIB

In [1]:
# Imports
import pyspark.mllib
from pyspark.sql import *
from pyspark import *
from pyspark.rdd import *
from pyspark.ml import *
from pyspark.ml.feature import *
from pyspark.ml.classification import *
from pyspark.sql.types import *
from pyspark.mllib.recommendation import *
import random


sc = SparkContext("local","music")
spark = SparkSession(sc)

sampleUsersPath = "sampleUsers.txt"
sampleTracksPath = "sampleTracks.txt"

In [2]:
# Load
# When count is over twenty, the count shall be lowered to twenty.
from pyspark.sql.functions import expr
def load(path):
    schema = StructType([StructField("user",StringType(),True),StructField("song",StringType(),True),StructField("count",IntegerType(),True)])
    df = spark.read.options(delimiter = "\t").schema(schema).csv(path)
    df = df.withColumn("count", expr("CASE WHEN count > 20 THEN 20 ELSE count END"))
    return df

In [3]:
loaded = load(sampleUsersPath).persist()
loaded.show()

+--------------------+------------------+-----+
|                user|              song|count|
+--------------------+------------------+-----+
|b80344d063b5ccb32...|SOBBMDR12A8C13253B|    2|
|b80344d063b5ccb32...|SODZWFT12A8C13C0E4|    1|
|b80344d063b5ccb32...|SOHQWYZ12A6D4FA701|    1|
|b80344d063b5ccb32...|SOJNNUA12A8AE48C7A|    1|
|b80344d063b5ccb32...|SOLXHAI12A6D4FD6F3|    1|
|b80344d063b5ccb32...|SOOSIVQ12A6D4F8AE0|    1|
|b80344d063b5ccb32...|SORJNVW12A8C13BF90|    1|
|85c1f87fea955d09b...|SODJTHN12AF72A8FCD|    2|
|85c1f87fea955d09b...|SOIDFHN12A8C13ABAC|    2|
|4bd88bfb25263a75b...|SOWEHOM12A6BD4E09E|    1|
|9d6f0ead607ac2a6c...|SOCLQES12A58A7BB1D|    2|
|9d6f0ead607ac2a6c...|SOKLRPJ12A8C13C3FE|    2|
|9bb911319fbc04f01...|SOXBXBI12A8C13C71C|    5|
|b64cdd1a0bd907e5e...|SOBDWET12A6701F114|    2|
|b64cdd1a0bd907e5e...|SOLQYOG12B0B80BA71|    2|
|b64cdd1a0bd907e5e...|SOZPQES12A6D4F8E57|    2|
|17aa9f6dbdf753831...|SODHHEG12A58A779FB|    2|
|17aa9f6dbdf753831...|SODUANR12A6D4F5036

In [4]:
# Using StringIndexer to index the songs and users
def convert(df):
    i1 = StringIndexer(inputCol="user", outputCol="user_indexed")
    i2 = StringIndexer(inputCol="song", outputCol="song_indexed")
    indexed_once = i1.fit(df).transform(df)
    indexed_twice = i2.fit(indexed_once).transform(indexed_once)
    return indexed_twice
    raise NotImplementedError()

In [5]:
converted = convert(loaded).persist()
converted.show()

+--------------------+------------------+-----+------------+------------+
|                user|              song|count|user_indexed|song_indexed|
+--------------------+------------------+-----+------------+------------+
|b80344d063b5ccb32...|SOBBMDR12A8C13253B|    2|       162.0|       577.0|
|b80344d063b5ccb32...|SODZWFT12A8C13C0E4|    1|       162.0|      1053.0|
|b80344d063b5ccb32...|SOHQWYZ12A6D4FA701|    1|       162.0|      1646.0|
|b80344d063b5ccb32...|SOJNNUA12A8AE48C7A|    1|       162.0|      1945.0|
|b80344d063b5ccb32...|SOLXHAI12A6D4FD6F3|    1|       162.0|      2306.0|
|b80344d063b5ccb32...|SOOSIVQ12A6D4F8AE0|    1|       162.0|      2702.0|
|b80344d063b5ccb32...|SORJNVW12A8C13BF90|    1|       162.0|      3124.0|
|85c1f87fea955d09b...|SODJTHN12AF72A8FCD|    2|       810.0|       951.0|
|85c1f87fea955d09b...|SOIDFHN12A8C13ABAC|    2|       810.0|      1728.0|
|4bd88bfb25263a75b...|SOWEHOM12A6BD4E09E|    1|      1151.0|      3824.0|
|9d6f0ead607ac2a6c...|SOCLQES12A58A7BB

In [6]:
# We generate rating by Rating function and make the dataframe rdd for ALS model
def toRating(df):
    df = df.rdd
    df = df.map(lambda d: Rating(d[3], d[4], d[2]))
    return df

In [7]:
rated = toRating(converted).persist()
rated.take(10)

[Rating(user=162, product=577, rating=2.0),
 Rating(user=162, product=1053, rating=1.0),
 Rating(user=162, product=1646, rating=1.0),
 Rating(user=162, product=1945, rating=1.0),
 Rating(user=162, product=2306, rating=1.0),
 Rating(user=162, product=2702, rating=1.0),
 Rating(user=162, product=3124, rating=1.0),
 Rating(user=810, product=951, rating=2.0),
 Rating(user=810, product=1728, rating=2.0),
 Rating(user=1151, product=3824, rating=1.0)]

In [8]:
# Training the model
def trainALS(data, seed):
    model = ALS.train(data, seed = seed, rank = 10)
    return model

In [9]:
# We will be using a seed, so that the results can be reproduced again.
random.seed(123)
rSeed = random.randint(0, 10000)
model = trainALS(rated, rSeed)

In [14]:
# Recommend five songs to user
def recommendSongs(model, user, r):
    rec = model.recommendProducts(user, r)
    return rec

In [11]:
recommends = recommendSongs(model, 162, 5)
recommends

[Rating(user=162, product=157, rating=12.384456815717648),
 Rating(user=162, product=4074, rating=12.091128420713474),
 Rating(user=162, product=2310, rating=12.091128420713474),
 Rating(user=162, product=3986, rating=11.486571658852927),
 Rating(user=162, product=1669, rating=11.480036749170573)]

In [12]:
# Above we can see five recommendations but we can't read them we because the strings are indexed.
# Now we have to change them back.
def getSongNames(converted, ar, path):
    # converting recommendations to array.
    array = spark.createDataFrame(data = ar, schema = ["user", "product", "rating"])
    # Reading data that has song names and their ids 
    schema = StructType([StructField("track_id",StringType(),True),StructField("song_id",StringType(),True),StructField("artist",StringType(),True),StructField("title",StringType(),True)])
    df = spark.read.options(delimiter = "<SEP>").schema(schema).csv(path)
    # Joining the dataframe according to songs
    converted = array.join(converted, array.product == converted.song_indexed)
    df = converted.join(df, df.song_id == converted.song)
    df = df.select("artist", "title").distinct()
    rdd = df.rdd.map(lambda x: [x[1], x[0]]).collect()
    return rdd

In [13]:
songNames = getSongNames(converted, recommends, sampleTracksPath)
songNames

[['Limbo (Remastered LP Version)', 'Rush'],
 ['Cordeiro De Nana', 'João Gilberto / Gilberto Gil / Caetano Veloso'],
 ['Whataya Want From Me', 'Adam Lambert'],
 ['Awakenings', 'Symphony X'],
 ['Inferno (unleash The Fire)', 'Symphony X']]

In [15]:
# Now we combine everything above in to one function and get the same results.
def recommend(path, userId, tracksPath, seed, numberOfRecommendation):
    data = load(path)
    data = convert(data)
    train = trainALS(toRating(data), seed)
    user = int(data.filter(data.user == userId).first()[3])
    return getSongNames(data, recommendSongs(train, user, numberOfRecommendation), tracksPath)

In [20]:
# using the original dataset
users = "users.txt"
tracks = "unique_tracks.txt"
recom = recommend(users, "b80344d063b5ccb3212f76538f3d9e43d87dca9e" ,tracks, rSeed, 5)
recom

[['Gunn Clapp', 'O.G.C.'],
 ['221', 'keller williams'],
 ['Behind Blue Eyes', 'Limp Bizkit'],
 ['VÁNDORMADÁR', 'Crystal'],
 ['Velouria', 'Pixies']]