In [None]:
import findspark
findspark.init()
import random
import pyspark

In [None]:
import os

config = # config setting

ss = pyspark.sql.SparkSession.builder.config(conf = config).getOrCreate()
sc = ss.sparkContext

# data preprocessing - one file

In [None]:
import json
import numpy as np
import pandas as pd
from pyspark.sql.types import *
from scipy.sparse import csr_matrix

In [None]:
# take one file as example
file_path = '/scratch/ISE495/2020_project_03/team-3/mpd.slice.332000-332999.json'

In [None]:
data = json.load(open(file_path))
DF = pd.DataFrame.from_dict(data['playlists'])

In [None]:
schema = StructType([StructField('name',StringType()),
                     StructField('collaborative',StringType()),
                     StructField('pid',StringType()),
                     StructField('modified_at',IntegerType()),
                     StructField('num_tracks',IntegerType()),
                     StructField('num_albums',IntegerType()),
                     StructField('num_followers',IntegerType()),
                     StructField('tracks',ArrayType(MapType(StringType(),StringType()))),
                     StructField('num_edits',IntegerType()),
                     StructField('duration_ms',IntegerType()),
                     StructField('num_artists',IntegerType()),
                     StructField('description',StringType())])
DF2 = ss.createDataFrame(DF,schema)

In [None]:
# convert df to rdd
RDD = DF2.rdd.map(lambda x: x.asDict())
RDD.take(1)

[{'name': 'cg',
  'collaborative': 'false',
  'pid': '332000',
  'modified_at': 1508284800,
  'num_tracks': 116,
  'num_albums': 84,
  'num_followers': 1,
  'tracks': [{'duration_ms': '194893',
    'artist_uri': 'spotify:artist:1dID9zgn0OV0Y8ud7Mh2tS',
    'artist_name': 'Dustin Lynch',
    'pos': '0',
    'album_name': 'Current Mood',
    'track_uri': 'spotify:track:7pxhKtuTwofDIdgHx2DcVK',
    'album_uri': 'spotify:album:23cuZhPWDfX1uKD4qwuv7t',
    'track_name': "Seein' Red"},
   {'duration_ms': '199746',
    'artist_uri': 'spotify:artist:1n2pb9Tsfe4SwAjmUac6YT',
    'artist_name': 'Jake Owen',
    'pos': '1',
    'album_name': 'American Love',
    'track_uri': 'spotify:track:0O1x2tRm8ZpfDbcpOWZp7z',
    'album_uri': 'spotify:album:5gsWgFeHRxRkIXGXWPiOIW',
    'track_name': 'American Country Love Song'},
   {'duration_ms': '202346',
    'artist_uri': 'spotify:artist:4MoAOfV4ROWofLG3a3hhBN',
    'artist_name': 'Jon Pardi',
    'pos': '2',
    'album_name': 'California Sunrise',
    '

In [None]:
tracks = RDD.flatMap(lambda x: x['tracks']).map(lambda x: (x['track_uri'],x['track_name'],x['artist_name'])).distinct().cache()
tracks.take(5)

[('spotify:track:4Fz1WWr5o0OrlIcZxcyZtK', 'On The Way Home', 'John Mayer'),
 ('spotify:track:4DJhi02qpssGdY5WPxzwhI', 'Tell Mama', 'The Civil Wars'),
 ('spotify:track:5pRvd7BtQZ42S9zegc0nOa',
  'Atlas - From “The Hunger Games: Catching Fire”/Soundtrack',
  'Coldplay'),
 ('spotify:track:48Jhybk2ZpPrnFb3oXdh6Z', "Don't Matter", 'Kings of Leon'),
 ('spotify:track:30SjdIdTMhBSe33nFnBFkC', 'Temple', 'Kings of Leon')]

In [None]:
tracks_uri = tracks.map(lambda x: x[0]).collect()
track_uri_id = tracks.map(lambda x: x[0]).zipWithIndex().collectAsMap()   # create dic[track_uri] = track_id
track_id_name = tracks.zipWithIndex().map(lambda x: (x[1],x[0][1:])).collectAsMap()    # create dic[track_id] = (track_name, artist_name)

# item to item recommendation system

In [None]:
# create matrix: row=pid, column=track_uri, value=1 if track in playlist or 0 otherwise
L = RDD.map(lambda x: [i['track_uri'] for i in x['tracks']]).map(lambda x: [int(i in x) for i in tracks_uri]).collect()
M = csr_matrix(L, dtype='uint8')
M.toarray()

array([[0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0],
       ...,
       [0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0]], dtype=uint8)

In [None]:
M.shape

(1000, 36600)

In [None]:
# create Intersection matrix: row=track_uri, column=track_uri, value=number of playlists containing track i and track j
I = M.T * M
I.toarray()

array([[2, 1, 1, ..., 0, 0, 0],
       [1, 1, 1, ..., 0, 0, 0],
       [1, 1, 3, ..., 0, 0, 0],
       ...,
       [0, 0, 0, ..., 1, 1, 1],
       [0, 0, 0, ..., 1, 1, 1],
       [0, 0, 0, ..., 1, 1, 1]], dtype=uint8)

In [None]:
# create matrix: row=track_uri, value=number of playlists containing track
track_freq = np.sum(M, axis=0, dtype='uint8').reshape(36600,1)
track_freq

matrix([[2],
        [1],
        [3],
        ...,
        [1],
        [1],
        [1]], dtype=uint8)

In [None]:
# create union matrix: row=track_uri, column=track_uri, value=number of playlists containing track i or track j
U = (track_freq + track_freq.T) - I.toarray()
U

matrix([[2, 2, 4, ..., 3, 3, 3],
        [2, 1, 3, ..., 2, 2, 2],
        [4, 3, 3, ..., 4, 4, 4],
        ...,
        [3, 2, 4, ..., 1, 1, 1],
        [3, 2, 4, ..., 1, 1, 1],
        [3, 2, 4, ..., 1, 1, 1]], dtype=uint8)

In [None]:
# create Similarity Matrix: row=track_uri, column=track_uri, value=similarity coefficient between track i and track j
S = np.divide(I.toarray(), U, dtype='float16')
S

matrix([[1.    , 0.5   , 0.25  , ..., 0.    , 0.    , 0.    ],
        [0.5   , 1.    , 0.3333, ..., 0.    , 0.    , 0.    ],
        [0.25  , 0.3333, 1.    , ..., 0.    , 0.    , 0.    ],
        ...,
        [0.    , 0.    , 0.    , ..., 1.    , 1.    , 1.    ],
        [0.    , 0.    , 0.    , ..., 1.    , 1.    , 1.    ],
        [0.    , 0.    , 0.    , ..., 1.    , 1.    , 1.    ]],
       dtype=float16)

recommend 10 songs to playlist 332000

In [None]:
track_id = RDD.map(lambda x: [track_uri_id[i['track_uri']] for i in x['tracks']]).take(1)[0]

[28458,
 27461,
 11932,
 24748,
 18949,
 4592,
 14720,
 14715,
 472,
 26430,
 29441,
 692,
 4925,
 34754,
 13882,
 29443,
 19194,
 29439,
 35444,
 35445,
 35446,
 35447,
 693,
 30027,
 12224,
 694,
 31482,
 31483,
 34665,
 18243,
 11060,
 695,
 470,
 8255,
 13883,
 14722,
 26820,
 33730,
 22934,
 27459,
 4590,
 24751,
 18240,
 4922,
 30712,
 696,
 12223,
 23944,
 24752,
 4928,
 13884,
 33883,
 8818,
 697,
 19195,
 19196,
 25757,
 19197,
 31484,
 24753,
 12379,
 13885,
 35448,
 13886,
 30022,
 8819,
 8820,
 14721,
 33556,
 13887,
 8821,
 698,
 35449,
 4998,
 31483,
 19198,
 18905,
 30712,
 19195,
 19199,
 13888,
 19200,
 24211,
 4898,
 33560,
 30291,
 29444,
 8822,
 29570,
 7387,
 4595,
 26468,
 5182,
 5183,
 5184,
 24754,
 19201,
 28459,
 28460,
 22441,
 31485,
 14729,
 15116,
 5185,
 24238,
 11854,
 5186,
 4161,
 5187,
 15495,
 26429,
 12504,
 8252,
 8823,
 28461,
 31486]

In [None]:
# create similarity matrix for playlist 332000: row=tracks in in playlist, column=all tracks
S2 = S[track_id][:]
S2

matrix([[0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        ...,
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.]], dtype=float16)

In [None]:
# create sum of similarity matrix: column=all tracks, value=similarity between tracks in playlist and track i
S2_sum = np.sum(S2,axis=0)
S2_sum

matrix([[0., 0., 0., ..., 0., 0., 0.]], dtype=float16)

In [None]:
max_idx = (-np.array(S2_sum).reshape(-1)).argsort()[:10]
rec_tracks = [track_id_name[i] for i in max_idx]
rec_tracks

[('Last Minute Late Night', 'Kane Brown'),
 ('My Kind Of Woman', 'Justin Moore'),
 ('A Girl Like You', 'Easton Corbin'),
 ("Good Lookin' Girl", 'Luke Bryan'),
 ('Even The Stars Fall 4 U', 'Keith Urban'),
 ('My Kind Of Crazy', 'Brantley Gilbert'),
 ('Eat Sleep Love You Repeat', 'Rodney Atkins'),
 ("Can't Take Her Anywhere", 'Dylan Scott'),
 ('Making My Way to You', 'Cole Swindell'),
 ('Give It All We Got Tonight', 'George Strait')]

## ALS model 1: Rating is 1 if track in playlist or 0 otherwise.

In [None]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

In [None]:
# create Rating(pid, track_id, rating)
ratings1 = RDD.map(lambda x: ([j['track_uri'] for j in x['tracks']], [(x['pid'],i) for i in tracks_uri]))\
              .map(lambda x: [(i[0],i[1],int(i[1] in x[0])) for i in x[1]])\
              .map(lambda x: [(int(i[0]),track_uri_id[i[1]],i[2]) for i in x])\
              .flatMap(lambda x: [Rating(i[0],i[1],i[2]) for i in x]).cache()
ratings1.take(5)

[Rating(user=332000, product=2179, rating=0.0),
 Rating(user=332000, product=2180, rating=0.0),
 Rating(user=332000, product=997, rating=0.0),
 Rating(user=332000, product=1880, rating=0.0),
 Rating(user=332000, product=1189, rating=0.0)]

In [None]:
train1, test1 = ratings1.randomSplit([0.7,0.3])

In [None]:
model1 = ALS.trainImplicit(train1, rank=5, iterations=10, lambda_=0.01)

recommend 10 songs to playlist 332000

In [None]:
recommend1 = model1.recommendProducts(332000,10)
recommend1

[Rating(user=332000, product=25729, rating=0.05167864091169781),
 Rating(user=332000, product=15016, rating=0.050658823451894236),
 Rating(user=332000, product=25725, rating=0.049409866133737046),
 Rating(user=332000, product=18613, rating=0.04935635473453281),
 Rating(user=332000, product=273, rating=0.048191349144067995),
 Rating(user=332000, product=4058, rating=0.04499326171001025),
 Rating(user=332000, product=33153, rating=0.04292305486479599),
 Rating(user=332000, product=18615, rating=0.04290387510261779),
 Rating(user=332000, product=11146, rating=0.04259696142293747),
 Rating(user=332000, product=33110, rating=0.041621301069179745)]

In [None]:
rec_songs1 = [track_id_name[r.product] for r in recommend1]
rec_songs1

[('Esta De Parranda El Jefe', 'Fidel Rueda'),
 ('Kill Em With Kindness', 'Selena Gomez'),
 ('Tú Ya Eres Cosa Del Pasado', 'Fidel Rueda'),
 ('Brown Eyed Girl', 'Van Morrison'),
 ('Umbrella', 'Jinsang'),
 ('Californication', 'Red Hot Chili Peppers'),
 ('XO', 'Beyoncé'),
 ('Under The Bridge', 'Red Hot Chili Peppers'),
 ('How Deep Is Your Love', 'William Singe'),
 ('Lean Wit It, Rock Wit It - feat. Peanut & Charlay', 'Dem Franchize Boyz')]

evaluate model1 by MSE

In [None]:
pred_rate1 = model1.predictAll(test1.map(lambda x: (x[0],x[1]))).map(lambda x: ((x[0], x[1]),x[2])).join(test1.map(lambda x: ((x[0],x[1]),x[2]))).cache()
pred_rate1.take(5)

[((332370, 9200), (0.0, 0.0)),
 ((332590, 9200), (0.0, 0.0)),
 ((332570, 9200), (0.0, 0.0)),
 ((332414, 9200), (0.0, 0.0)),
 ((332034, 9200), (0.0, 0.0))]

In [None]:
MSE1 = pred_rate1.map(lambda x: (x[1][0]-x[1][1])**2).mean()
MSE1

0.0018031987351745118

## data preprocessing - all files

In [None]:
file_paths = ['/scratch/ISE495/2020_project_03/team-3/mpd.slice.332000-332999.json',
              '/scratch/ISE495/2020_project_03/team-3/mpd.slice.557000-557999.json',
              '/scratch/ISE495/2020_project_03/team-3/mpd.slice.199000-199999.json',
              '/scratch/ISE495/2020_project_03/team-3/mpd.slice.998000-998999.json',
              '/scratch/ISE495/2020_project_03/team-3/mpd.slice.770000-770999.json',
              '/scratch/ISE495/2020_project_03/team-3/mpd.slice.880000-880999.json',
              '/scratch/ISE495/2020_project_03/team-3/mpd.slice.431000-431999.json',
              '/scratch/ISE495/2020_project_03/team-3/mpd.slice.132000-132999.json',
              '/scratch/ISE495/2020_project_03/team-3/mpd.slice.198000-198999.json',
              '/scratch/ISE495/2020_project_03/team-3/mpd.slice.501000-501999.json',
              '/scratch/ISE495/2020_project_03/team-3/mpd.slice.139000-139999.json',
              '/scratch/ISE495/2020_project_03/team-3/mpd.slice.377000-377999.json',
              '/scratch/ISE495/2020_project_03/team-3/mpd.slice.884000-884999.json',
              '/scratch/ISE495/2020_project_03/team-3/mpd.slice.191000-191999.json',
              '/scratch/ISE495/2020_project_03/team-3/mpd.slice.875000-875999.json',
              '/scratch/ISE495/2020_project_03/team-3/mpd.slice.650000-650999.json',
              '/scratch/ISE495/2020_project_03/team-3/mpd.slice.391000-391999.json',
              '/scratch/ISE495/2020_project_03/team-3/mpd.slice.618000-618999.json',
              '/scratch/ISE495/2020_project_03/team-3/mpd.slice.906000-906999.json',
              '/scratch/ISE495/2020_project_03/team-3/mpd.slice.851000-851999.json']

In [None]:
allDF = pd.DataFrame()
for file in file_paths:
  all_data = json.load(open(file))
  allDF = pd.concat([allDF,pd.DataFrame.from_dict(all_data['playlists'])])

In [None]:
all_schema = StructType([StructField('name',StringType()),
                     StructField('collaborative',StringType()),
                     StructField('pid',StringType()),
                     StructField('modified_at',IntegerType()),
                     StructField('num_tracks',IntegerType()),
                     StructField('num_albums',IntegerType()),
                     StructField('num_followers',IntegerType()),
                     StructField('tracks',ArrayType(MapType(StringType(),StringType()))),
                     StructField('num_edits',IntegerType()),
                     StructField('duration_ms',IntegerType()),
                     StructField('num_artists',IntegerType()),
                     StructField('description',StringType())])
allDF2 = ss.createDataFrame(allDF,all_schema)

In [None]:
allRDD = allDF2.rdd.map(lambda x: x.asDict())

## ALS model2: Rating is count of artist in playlist

In [None]:
artist_idx = allRDD.flatMap(lambda x: x['tracks']).map(lambda x: x['artist_uri']).distinct().zipWithIndex()
uri_id_artist = artist_idx.collectAsMap()   # create dic[artist_uri] = artist_id
id_uri_artist = artist_idx.map(lambda x: (x[1],x[0])).collectAsMap()    # create dic[artist_id] = artist_uri

In [None]:
# create Rating(pid, artist_id, count of artist in playlist)
ratings2 = allRDD.map(lambda x: [[x['pid'], i['artist_uri']] for i in x['tracks']])\
                 .map(lambda x: [(i[0],i[1],[i[1] for i in x].count(i[1])) for i in x]).map(set).map(list)\
                 .map(lambda x: [(int(i[0]),uri_id_artist[i[1]],i[2]) for i in x])\
                 .flatMap(lambda x: [Rating(i[0],i[1],i[2]) for i in x]).cache()
ratings2.take(5)

[Rating(user=332000, product=10315, rating=2.0),
 Rating(user=332000, product=5086, rating=1.0),
 Rating(user=332000, product=31075, rating=4.0),
 Rating(user=332000, product=56, rating=1.0),
 Rating(user=332000, product=46678, rating=2.0)]

modeling with optimal setting: % training, testing = 0.6, 0.4 and lambda_=0.0001


In [None]:
train2, test2 = ratings2.randomSplit([0.6,0.4])

In [None]:
model2 = ALS.trainImplicit(train2, rank=5, iterations=10, lambda_=0.0001)

recommend 5 artists to playlist 332000

In [None]:
# recommend 5 artists for playlist 332000
recommend2 = model2.recommendProducts(332000,5)
recommend2

[Rating(user=332000, product=120, rating=0.4851587145718874),
 Rating(user=332000, product=20986, rating=0.48199824277287573),
 Rating(user=332000, product=10316, rating=0.47097203558309086),
 Rating(user=332000, product=10338, rating=0.4546340560107405),
 Rating(user=332000, product=25906, rating=0.4462439830356786)]

In [None]:
recommend2_uri = [id_uri_artist[r.product] for r in recommend2]
recommend2_uri

['spotify:artist:2Q0MyH5YMI5HPQjFjlq5g3',
 'spotify:artist:4hfcSstwnyuBoek1dQwLkG',
 'spotify:artist:2cnMpRsOVqtPMfq7YiFE6K',
 'spotify:artist:21mKp7DqtSNHhCAU2ugvUw',
 'spotify:artist:1IueXOQyABrMOprrzwQJWN']

recommend top 2 tracks for each recommended artist by sp.artist_top_tracks

In [None]:
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials

In [None]:
cid = '09e83aa879454e2f9fce57559cf9c756'
secret = 'f3eaf812c5d64571a53b4b9b46f613fc'

auth = SpotifyClientCredentials(client_id=cid, client_secret=secret)
sp = spotipy.Spotify(auth_manager=auth)

In [None]:
# define function to get top tracks in (song, performers) format
def top_track(artist_uri):
  rec_track = sp.artist_top_tracks(artist_uri)
  
  result = []
  for track in rec_track['tracks']:
    performers = [performer['name'] for performer in track['artists']]
    song = track['name']
    result.append((song, performers))

  return result

In [None]:
# select top 2 songs from each artist
rec_songs2 = [top_track(u)[:2] for u in recommend2_uri]
rec_songs2

[[('Goodnight', ['Nick Murphy']),
  ('Basic Needs (feat. Nick Murphy)', ['Heathered Pearls', 'Nick Murphy'])],
 [('Brothers On The Slide', ['Cymande']), ('Dove', ['Cymande'])],
 [('Jump - 2015 Remaster', ['Van Halen']),
  ('Panama - 2015 Remaster', ['Van Halen'])],
 [('HEART ATTACK (feat. lau.ra)',
   ['BRONSON', 'ODESZA', 'Golden Features', 'lau.ra']),
  ('Say My Name (feat. Zyra)', ['ODESZA', 'Zyra'])],
 [('Lasting Lover', ['Sigala', 'James Arthur']),
  ('Heaven On My Mind (with Sigala)', ['Becky Hill', 'Sigala'])]]

evaluate model2 by MSE

In [None]:
pred_rate2 = model2.predictAll(test2.map(lambda x: (x[0],x[1]))).map(lambda x: ((x[0],x[1]),x[2])).join(test2.map(lambda x: ((x[0],x[1]),x[2]))).cache()
pred_rate2.take(5)

[((880541, 5153), (0.004501868692411483, 3.0)),
 ((880541, 42209), (0.004010503314397784, 1.0)),
 ((191721, 26785), (0.0011392404071107315, 1.0)),
 ((875211, 31171), (0.029850198197061403, 1.0)),
 ((875211, 17135), (0.0034516384320349763, 2.0))]

In [None]:
MSE2 = pred_rate2.map(lambda x: (x[1][0]-x[1][1])**2).mean()
MSE2

10.6021608348214

In [None]:
ss.stop()
sc.stop()