Necessary package imports

In [None]:
from pyspark.mllib.recommendation import *
import random
from operator import *

artist_data_small.txt contains information about each user's unique artistId and its name. We read the file and create a map with key: artistId and value: artist Name

In [None]:
# Method to split artist Id and its name.
def splitArtistName(line):
    try:
        id, name = line.split("\t")
        return (int(id), name)
    except ValueError:
        return None

# Load text file where each line contains artist Id and its name.
artistData = sc.textFile("artist_data_small.txt")
# Split artist id: name and store in a map. 
artistData = artistData.map(splitArtistName).filter(lambda x: x!=None).collectAsMap()

AudioScrobbler has provided us with the a file which contains information about an artist's other alias' / misspelled names. We use this information to correct the user-artist information by replacing the aliases by its uniqueId. 

In [None]:
'''
Load artist correct id and its aliases
    2 columns: badid, goodid
    known incorrectly spelt artists and the correct artist id. 
'''
artistAlias = sc.textFile('artist_alias_small.txt')
# Split Artist Alias data into (badId, goodId)
def splitArtistAlias(line):
    try:
        # Catches error in data
        badId, goodId = line.split("\t")
        return (int(badId), int(goodId))
    except ValueError:
        return None

# Create map badId: goodId

artistAlias = artistAlias.map(splitArtistAlias).filter(lambda x: x!=None).collectAsMap()

As mentioned above, user_artist_data_small.txt contains misspelled artistId. Hence we use the artistAlias map to correct the entries in the RDD.

In [None]:
'''
Load data about user's music listening history
Each line contains three features: userid, artistid, playcount
'''
userArtistData = sc.textFile("user_artist_data_small.txt")

# Return the corrected user information.
def parseUserHistory(line):
    try:
        # Catch error in line
        user, artist, count = line.split()
        # Return the corrected user information.
        if artist in artistAlias:
            return (int(user), artistAlias[artist], int(count))
        else:
            return (int(user), int(artist), int(count))
    except ValueError:
        return None


# Create corrected user history RDD.
userArtistData = userArtistData.map(parseUserHistory)

Since userArtistData would be used repeatedly, we might want to cache this to avoid re-computation of the same RDD.

In [None]:
userArtistData.cache()

The following section creates a new RDD containing basic user listening stats.

In [None]:
userArtistPurge = userArtistData.map(lambda x: (x[0],x[2]))
# Create an RDD storing user information in the form of (total play count of all artists combined for the current user, (userId of the current user, number of unique artists listened by user))
songCountAgg = userArtistPurge.aggregateByKey((0,0), lambda a,b: (a[0] + b, a[1] + 1), lambda a,b: (a[0] + b[0], a[1] + b[1])).map(lambda x: (x[1][0], (x[0], x[1][1])))
# Sort the RDD based on the total play counts so as to find the most active user.
sortedCount = songCountAgg.sortByKey(False)
# Find the top 3 user information
sortedCountTop3 = sortedCount.take(3)

# Print the top 3 user information.

print "User %s has a total play count of %d and a mean play count of %s" %(sortedCountTop3[0][1][0],sortedCountTop3[0][0], sortedCountTop3[0][0]/sortedCountTop3[0][1][1])

print "User %s has a total play count of %d and a mean play count of %s" %(sortedCountTop3[1][1][0],sortedCountTop3[1][0], sortedCountTop3[1][0]/sortedCountTop3[1][1][1])

print "User %s has a total play count of %d and a mean play count of %s" %(sortedCountTop3[2][1][0],sortedCountTop3[2][0], sortedCountTop3[2][0]/sortedCountTop3[2][1][1])