Use Spark To Provide Audio Recommendations

http://www.iro.umontreal.ca/~lisa/datasets/profiledata_06-May-2005.tar.gz

million song dataset
http://labrosa.ee.columbia.edu/millionsong/pages/getting-dataset

In [69]:
from pyspark.mllib.recommendation import ALS

In [2]:
from pyspark.ml import Pipeline

In [24]:
from pyspark.mllib.recommendation import Rating

In [23]:
import numpy as np
import pandas as pd
import seaborn as sns

In [3]:
import glob
import os.path
import os
import tarfile
import urllib
import urlparse
import StringIO

In [4]:
data_root = '../../data/MyData'
data_directory = 'profiledata'
dest = os.path.join(data_root, data_directory)

In [5]:
# better to do it this way for portability
url = 'http://www.iro.umontreal.ca/~lisa/datasets/profiledata_06-May-2005.tar.gz'
p = urlparse.urlsplit(url).path
os.path.os.path.split(p)[1]

'profiledata_06-May-2005.tar.gz'

In [9]:


def getData(url, directory='', cleanup=False):
    """Download a tar file and unpack it to a specified place
    params
       url (string): url to download from 
       directory (string): directory to save to
       cleanup (bool): Remove tar file
    """
    fileName = os.path.split(
        urlparse.urlsplit(url).path)[1]
    
    directory = directory if directory else fileName.split('.')[0]
    
    if not(os.path.exists(directory)):
        os.makedirs(directory)

    dest = os.path.join(directory, fileName)
    
    if not os.path.exists(dest):
        content = urllib.urlopen(url)
        
        with open(dest, 'wb') as fh:
            fh.write(content.read())
    
    if tarfile.is_tarfile(dest):
        tar = tarfile.open(dest)
        for name in [x for x in tar.getnames() if not x.startswith('/') or x.startswith('..')]:
            try:
                tarinfo = tar.getmember(name)
                tar.extract(tarinfo, directory)
                
                print("Extracted {}".format(name))
            except KeyError as e:
                print("Couldn't extract {}".format(name))
                continue
    if cleanup:
        os.remove(dest)
        
        
        

In [10]:
getData(url, dest, cleanup=True)

Extracted profiledata_06-May-2005
Extracted profiledata_06-May-2005/artist_data.txt
Extracted profiledata_06-May-2005/README.txt
Extracted profiledata_06-May-2005/user_artist_data.txt
Extracted profiledata_06-May-2005/artist_alias.txt


In [18]:
fullPath = lambda f: os.path.join(os.path.join(dest, 'profiledata_06-May-2005'), f)
split = lambda l: l.strip(" ").split("\t")

def read_file(f):
    fPath = fullPath(f)
    rdd = sc.textFile(fPath)
    
    if f == 'user_artist_data.txt':
        return rdd.map(lambda s: s.split(" "))
    else:
        return rdd.map(split)

In [19]:
ARIST_ALIAS_RDD, ARTIST_RDD, USER_ARTIST_RDD = [i.cache() for i in map(read_file, ['artist_alias.txt',
                                                                'artist_data.txt',
                                                                'user_artist_data.txt'
                                                                ])]

In [20]:
ARTIST_RDD.take(10)

[[u'1134999', u'06Crazy Life'],
 [u'6821360', u'Pang Nakarin'],
 [u'10113088', u'Terfel, Bartoli- Mozart: Don'],
 [u'10151459', u'The Flaming Sidebur'],
 [u'6826647', u'Bodenstandig 3000'],
 [u'10186265', u'Jota Quest e Ivete Sangalo'],
 [u'6828986', u'Toto_XX (1977'],
 [u'10236364', u'U.S Bombs -'],
 [u'1135000', u'artist formaly know as Mat'],
 [u'10299728', u'Kassierer - Musik f\xfcr beide Ohren']]

In [21]:
USER_ARTIST_RDD.take(10)

[[u'1000002', u'1', u'55'],
 [u'1000002', u'1000006', u'33'],
 [u'1000002', u'1000007', u'8'],
 [u'1000002', u'1000009', u'144'],
 [u'1000002', u'1000010', u'314'],
 [u'1000002', u'1000013', u'8'],
 [u'1000002', u'1000014', u'42'],
 [u'1000002', u'1000017', u'69'],
 [u'1000002', u'1000024', u'329'],
 [u'1000002', u'1000025', u'1']]

In [22]:
ARIST_ALIAS_RDD.take(10)

[[u'1092764', u'1000311'],
 [u'1095122', u'1000557'],
 [u'6708070', u'1007267'],
 [u'10088054', u'1042317'],
 [u'1195917', u'1042317'],
 [u'1112006', u'1000557'],
 [u'1187350', u'1294511'],
 [u'1116694', u'1327092'],
 [u'6793225', u'1042317'],
 [u'1079959', u'1000557']]

### Further Cleaning
Convert the USER_ARTIST_RDD into an rdd of ratings.  Each component needs to be in integer form. The main limitation is that the max value of the ids of either user or product need to be less than than the maximum value of the long type in Java.

In [26]:
# check to see if there are any misentries in the dataset
print(USER_ARTIST_RDD.filter(lambda t: len(t) != 3).collect())

[]


In [27]:
ratingsRDD = (USER_ARTIST_RDD
                   .map(lambda t: Rating(*[int(_) for _ in t]))
              ).cache()

In [28]:
ratingsRDD.take(10)

[Rating(user=1000002, product=1, rating=55.0),
 Rating(user=1000002, product=1000006, rating=33.0),
 Rating(user=1000002, product=1000007, rating=8.0),
 Rating(user=1000002, product=1000009, rating=144.0),
 Rating(user=1000002, product=1000010, rating=314.0),
 Rating(user=1000002, product=1000013, rating=8.0),
 Rating(user=1000002, product=1000014, rating=42.0),
 Rating(user=1000002, product=1000017, rating=69.0),
 Rating(user=1000002, product=1000024, rating=329.0),
 Rating(user=1000002, product=1000025, rating=1.0)]

In [43]:
# take a look at the artist alias rdd
ARTIST_RDD.take(100)

[[u'1134999', u'06Crazy Life'],
 [u'6821360', u'Pang Nakarin'],
 [u'10113088', u'Terfel, Bartoli- Mozart: Don'],
 [u'10151459', u'The Flaming Sidebur'],
 [u'6826647', u'Bodenstandig 3000'],
 [u'10186265', u'Jota Quest e Ivete Sangalo'],
 [u'6828986', u'Toto_XX (1977'],
 [u'10236364', u'U.S Bombs -'],
 [u'1135000', u'artist formaly know as Mat'],
 [u'10299728', u'Kassierer - Musik f\xfcr beide Ohren'],
 [u'10299744', u'Rahzel, RZA'],
 [u'6864258', u'Jon Richardson'],
 [u'6878791', u'Young Fresh Fellowslows & the Minus 5'],
 [u'10299751', u'Ki-ya-Kiss'],
 [u'6909716', u'Underminded - The Task Of Modern Educator'],
 [u'10435121', u'Kox-Box'],
 [u'6918061', u'alexisonfire [wo!]'],
 [u'1135001', u'dj salinger'],
 [u'6940391', u"The B52's - Channel Z"],
 [u'10475396', u'44 Hoes'],
 [u'10584537', u'orchestral mandeuvres in dark'],
 [u'10584538', u'Josh Groban (Featuring Angie Stone)'],
 [u'6945632', u'Savage Garden - Truley, Madly, Deeply'],
 [u'10584546', u'Nislije'],
 [u'10584550', u'ONEYA 

In [52]:
aliasMapping = (ARIST_ALIAS_RDD
                .map(lambda t: map(int, t) if len(t) == 2 and all(t) else None)
                .filter(lambda t: t != None)
                .collectAsMap()
)

In [61]:
def fixArtistRDDEntry(entry):
    if len(entry) == 2 and all(entry):
        try:
            rv = int(entry[0]), entry[1]
        except ValueError:
            rv = None
    else:
        rv = None
    return rv

In [62]:
# clean up the artist rdd
artistRDD = (ARTIST_RDD
             .map(fixArtistRDDEntry)
             .filter(lambda t: t is not None)
             )

In [54]:
aliasMapping[6803336]

1000010

In [63]:
artistRDD.lookup(6803336)

[u'Aerosmith (unplugged)']

The early ratings rdd may not suffice because there are multiple ways to represent a product.  Convert all products to their alternate representation using the `aliasMapping`

In [64]:
# broadcast the aliasmapping
aliasBrodcast = sc.broadcast(aliasMapping)

In [67]:
userArtistRDD = (USER_ARTIST_RDD
                 .map(lambda t: (int(t[0]), aliasBrodcast.value.get(int(t[1]), int(t[1])), int(t[2])))
                 .map(lambda t: Rating(*t))
                 .cache()
                 )

In [68]:
userArtistRDD.take(100)

[Rating(user=1000002, product=1, rating=55.0),
 Rating(user=1000002, product=1000006, rating=33.0),
 Rating(user=1000002, product=1000007, rating=8.0),
 Rating(user=1000002, product=1000009, rating=144.0),
 Rating(user=1000002, product=1000010, rating=314.0),
 Rating(user=1000002, product=1000013, rating=8.0),
 Rating(user=1000002, product=1000014, rating=42.0),
 Rating(user=1000002, product=1000017, rating=69.0),
 Rating(user=1000002, product=1000024, rating=329.0),
 Rating(user=1000002, product=1000025, rating=1.0),
 Rating(user=1000002, product=1000028, rating=17.0),
 Rating(user=1000002, product=1000031, rating=47.0),
 Rating(user=1000002, product=1000033, rating=15.0),
 Rating(user=1000002, product=1000042, rating=1.0),
 Rating(user=1000002, product=1000045, rating=1.0),
 Rating(user=1000002, product=1000054, rating=2.0),
 Rating(user=1000002, product=1000055, rating=25.0),
 Rating(user=1000002, product=1000056, rating=4.0),
 Rating(user=1000002, product=1000059, rating=2.0),
 Rat

### Build the ALS Model

In [71]:
model = ALS.trainImplicit(userArtistRDD, 10, iterations=5, lambda_=.01, alpha=0.01)

### See if you can get some predictions
For user 1000002

In [75]:
userID = 1000002

In [76]:
artistsForUser = (USER_ARTIST_RDD
                  .filter(lambda t: int(t[0]) == userID)
                  .map(lambda t: int(t[1]))
                  .collect()
                  )

In [79]:
artistsForUser = set(artistsForUser)

In [81]:
artistRDD.filter(lambda t: t[0] in artistsForUser).map(lambda t: t[1]).collect()

[u'Kerrang',
 u'YMC',
 u'George Duke',
 u'Firebird',
 u'Caf\xe9 Del Mar',
 u'Mallrats',
 u'Benny Goodman Orchestra',
 u'Brian Hughes',
 u'Armand Van Helden',
 u'Brant Bjork and The Operators',
 u'Echo & the Bunnymen',
 u'Joshua Redman',
 u'Elvis Costello',
 u'Enigma',
 u'Eric Clapton',
 u'Eurythmics',
 u'The Buddy Rich Big Band',
 u'Alien Ant Farm',
 u'Duke Ellington and Johnny Hodg',
 u'Jeno Jando',
 u'The Horace Silver Quintet',
 u'Pimps',
 u'Benny Goodman & Harry James',
 u'Steve Cole',
 u'Oleander-',
 u'Yellowjackets',
 u'Skid Row',
 u'Sublime',
 u'Nelly Furtado',
 u'The Stranglers',
 u'Elastica',
 u'Eiffel 65',
 u'Louis Armstrong',
 u'Eddie Henderson',
 u'Alice Cooper',
 u'Annie Lennox',
 u'The Jimi Hendrix Experience',
 u'Hothouse Flowers',
 u'Hole',
 u'The Hollies',
 u'Roxy Music',
 u'Meanwhile, Back In Communist Russia...',
 u'Mr. Bungle',
 u'Bill Evans',
 u'Count Basie',
 u'ZZ Top',
 u'Chuck Mangione',
 u'Bob James',
 u'Earl Klugh',
 u'Larry Carlton',
 u'Brian Culbertson',
 u'

In [84]:
# can't recommend, only in spark  versions greater than 1.4.0. bummer.
recommendations = model.predict(userID, 1000055)

In [85]:
recommendations

0.4970475819364981

## So, since recommendations are only available in spark versions greater than 1.4.0.  I guess I can just do a quick summary of the data.

In [89]:
# who is the most played artist.  
# do this by summing up all the played instances of this particular artist.
artistPlayRDD = (userArtistRDD
                 .map(lambda r: (r.product, r.rating))
                 .reduceByKey(lambda a1, a2: a1+a2)
                 .cache()
                 )

In [None]:
top10PlayedArtists = artistPlayRDD.takeOrdered(10, key=lambda t: -1*t[1])

In [94]:
top10PlayedArtists = artistPlayRDD.join(artistRDD).takeOrdered(10, key=lambda t: -1*t[1][0])

In [99]:
for artist, plays in [(t[1][1], t[1][0]) for t in top10PlayedArtists]:
    print("Artist: {:20}Plays:{:10}".format(artist, plays))

Artist: Radiohead           Plays: 2502596.0
Artist: The Beatles         Plays: 2259825.0
Artist: Green Day           Plays: 1931143.0
Artist: Metallica           Plays: 1543430.0
Artist: System of a Down    Plays: 1426254.0
Artist: Pink Floyd          Plays: 1399665.0
Artist: Nine Inch Nails     Plays: 1361977.0
Artist: Modest Mouse        Plays: 1328969.0
Artist: Bright Eyes         Plays: 1234773.0
Artist: Nirvana             Plays: 1203348.0


## Which User played the most tracks?

In [None]:
userPlayRDD = (userArtistRDD
              .map(lambda r: (r.user, r.rating))
              .reduceByKey(lambda u1, u2: u1+u2)
              .cache())
top10PlayingUsers = userPlayRDD.takeOrdered(10, key=lambda t: -1*t[1])

In [102]:
for user, plays in top10PlayingUsers:
    print("User: {:<20}Plays: {:10}".format(user, plays))
    

User: 1059637             Plays:   674412.0
User: 2064012             Plays:   548427.0
User: 2069337             Plays:   393515.0
User: 2023977             Plays:   285978.0
User: 1046559             Plays:   183972.0
User: 1052461             Plays:   175822.0
User: 1070932             Plays:   168977.0
User: 1031009             Plays:   167273.0
User: 2020513             Plays:   165642.0
User: 2062243             Plays:   151504.0
