In [1]:
rawUserArtistData = sc.textFile("user_artist_data.txt")

In [2]:
# Create version of data with only 100k lines
"""
%%bash
. ~/.bashrc
head -100000 user_artist_data.txt > user_artist_data_100k.txt
"""

In [3]:
# rawUserArtistData = sc.textFile("user_artist_data_100k.txt")

In [2]:
userStats = rawUserArtistData.map(lambda l: float(l.split(' ')[0])).stats()

In [3]:
userStats

(count: 24296858, mean: 1947573.26535, stdev: 496000.544975, max: 2443548.0, min: 90.0)

In [4]:
artistStats = rawUserArtistData.map(lambda l: float(l.split(' ')[1])).stats()

In [5]:
artistStats

(count: 24296858, mean: 1718704.09376, stdev: 2539389.04017, max: 10794401.0, min: 1.0)

In [2]:
rawArtistData = sc.textFile("artist_data.txt")
rawArtistData.take(10)

[u'1134999\t06Crazy Life',
 u'6821360\tPang Nakarin',
 u'10113088\tTerfel, Bartoli- Mozart: Don',
 u'10151459\tThe Flaming Sidebur',
 u'6826647\tBodenstandig 3000',
 u'10186265\tJota Quest e Ivete Sangalo',
 u'6828986\tToto_XX (1977',
 u'10236364\tU.S Bombs -',
 u'1135000\tartist formaly know as Mat',
 u'10299728\tKassierer - Musik f\xfcr beide Ohren']

In [12]:
def getArtistByID(line):
    try:
        (artistID, name) = line.split('\t',1)
        artistID = int(artistID)
    except:
        return []
    if not name:
        return []
    else:
        return [(artistID, name.strip())]

artistByID = rawArtistData.flatMap(getArtistByID)

In [8]:
artistByID.take(1000)

[(1134999, u'06Crazy Life'),
 (6821360, u'Pang Nakarin'),
 (10113088, u'Terfel, Bartoli- Mozart: Don'),
 (10151459, u'The Flaming Sidebur'),
 (6826647, u'Bodenstandig 3000'),
 (10186265, u'Jota Quest e Ivete Sangalo'),
 (6828986, u'Toto_XX (1977'),
 (10236364, u'U.S Bombs -'),
 (1135000, u'artist formaly know as Mat'),
 (10299728, u'Kassierer - Musik f\xfcr beide Ohren'),
 (10299744, u'Rahzel, RZA'),
 (6864258, u'Jon Richardson'),
 (6878791, u'Young Fresh Fellowslows & the Minus 5'),
 (10299751, u'Ki-ya-Kiss'),
 (6909716, u'Underminded - The Task Of Modern Educator'),
 (10435121, u'Kox-Box'),
 (6918061, u'alexisonfire [wo!]'),
 (1135001, u'dj salinger'),
 (6940391, u"The B52's - Channel Z"),
 (10475396, u'44 Hoes'),
 (10584537, u'orchestral mandeuvres in dark'),
 (10584538, u'Josh Groban (Featuring Angie Stone)'),
 (6945632, u'Savage Garden - Truley, Madly, Deeply'),
 (10584546, u'Nislije'),
 (10584550, u'ONEYA BASSIVITYMIXTAPE'),
 (10584556, u'Grant Green / Osunlade'),
 (10584564, u'J

In [3]:
rawArtistAlias = sc.textFile("artist_alias.txt")

In [4]:
def tokenCheck(line):
    (id1, id2) = line.split('\t',1)
    if not id1:
        return []
    else:
        return [(int(id1), int(id2))]
    
artistAlias = rawArtistAlias.flatMap(tokenCheck).collectAsMap()

In [5]:
from pyspark.mllib.recommendation import *

bArtistAlias = sc.broadcast(artistAlias)

def createRating(line):
    (userID, artistID, count) = line.split(' ')
    finalArtistID = bArtistAlias.value.get(artistID)
    if not finalArtistID:
        finalArtistID = artistID
    return Rating(int(userID), int(finalArtistID), int(count))
    
trainData = rawUserArtistData.map(createRating).cache()

In [6]:
model = ALS.trainImplicit(trainData, rank=10, iterations=5, lambda_=0.01, alpha=1.0)

In [7]:
model.userFeatures().first()

(90,
 array('d', [0.1727316826581955, -0.3227613866329193, 0.060724887996912, -0.11930500715970993, 1.0908442735671997, -0.12767654657363892, 0.22621117532253265, 0.29617464542388916, -0.46041327714920044, 0.76097571849823]))

In [8]:
def userFilter(line):
    (user, artist, playcount) = line
    return (int(user) == 2093760)
rawArtistsForUser = rawUserArtistData.map(lambda l: l.split(' ')).filter(userFilter)

In [9]:
def artistToInt(line):
    (user, artist, playcount) = line
    return int(artist)
existingProducts = rawArtistsForUser.map(artistToInt).collect()

In [10]:
rawArtistsForUser.collect()

[[u'2093760', u'1180', u'1'],
 [u'2093760', u'1255340', u'3'],
 [u'2093760', u'378', u'1'],
 [u'2093760', u'813', u'2'],
 [u'2093760', u'942', u'7']]

In [13]:
def filterForArtistID(line):
    (id_, name) = line
    return id_ in existingProducts
print artistByID.filter(filterForArtistID).values().collect()

[u'David Gray', u'Blackalicious', u'Jurassic 5', u'The Saw Doctors', u'Xzibit']


In [14]:
# function recommendProducts copied from 
# https://spark.apache.org/docs/1.5.1/api/python/_modules/pyspark/mllib/recommendation.html 
def recommendProducts(self, user, num):
        """
        Recommends the top "num" number of products for a given user and returns a list
        of Rating objects sorted by the predicted rating in descending order.
        """
        return list(self.call("recommendProducts", user, num))
recommendations = recommendProducts(model, 2093760, 10)
recommendedProductIDs = map(lambda (userID, productID, rating): productID, recommendations)

In [15]:
recommendations

[Rating(user=2093760, product=2814, rating=0.03411656716084715),
 Rating(user=2093760, product=1300642, rating=0.03364962621973422),
 Rating(user=2093760, product=1001819, rating=0.03294394181974044),
 Rating(user=2093760, product=1037970, rating=0.0328912159842336),
 Rating(user=2093760, product=4605, rating=0.03279534547229804),
 Rating(user=2093760, product=1007614, rating=0.03275856915857159),
 Rating(user=2093760, product=1003249, rating=0.03247719302532777),
 Rating(user=2093760, product=829, rating=0.03234635158967435),
 Rating(user=2093760, product=1811, rating=0.031172909976395713),
 Rating(user=2093760, product=1004028, rating=0.031169562057750157)]

In [16]:
recommendedProductIDs

[2814, 1300642, 1001819, 1037970, 4605, 1007614, 1003249, 829, 1811, 1004028]

In [20]:
def filterForRecommendedIDs(line):
    (artistID, name) = line
    return artistID in recommendedProductIDs
recommendedProducts = artistByID.filter(filterForRecommendedIDs).values().collect()
for p in recommendedProducts: print p

Notorious B.I.G.
50 Cent
Snoop Dogg
Nas
Jay-Z
Kanye West
Dr. Dre
Ludacris
2Pac
The Game
