In [1]:
sc, spark

(<pyspark.context.SparkContext at 0x104b23c10>,
 <pyspark.sql.session.SparkSession at 0x104b7ecd0>)

In [133]:
import pyspark as ps
from pyspark.sql.types import *
from pyspark.ml.recommendation import *
from pyspark.ml.evaluation import *
import numpy as np
from sklearn.metrics import pairwise_distances
from scipy.spatial.distance import cosine
import pandas as pd
import time
import ast

In [3]:
# sc = ps.SparkContext('local[128]')
# spark = ps.SQLContext(sc)

# from pyspark.sql import SparkSession

# spark = (SparkSession.builder
#     .master("spark://localhost:7077")
#     .config("spark.driver.cores", 127)
#     .config("spark.driver.memory", "1000g")
#     .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
#     .config("spark.driver.extraClassPath","")
#     .config("spark.driver.extraClassPath","")
#     .appName("artistrecommender")
#     .getOrCreate())

## Data frame loads

In [210]:
artist_follower_df = spark.read.csv('../data/artist_follower.csv').persist()
artist_df = spark.read.csv('../data/artist_meta.csv').persist() #manually flagged duplicates to remove after sc scrape as 1's
follower_df = spark.read.csv('../data/followers.csv').persist() 

artist_follower_df.createOrReplaceTempView('artist_follower')
artist_df.createOrReplaceTempView('artists')
follower_df.createOrReplaceTempView('followers')

In [211]:
df = pd.read_csv('../data/artist_meta.csv')

## Artist & Follower column name generation

In [212]:
artist_follower_df = spark.sql("""
SELECT distinct
    /*
    _c0 as exception
    ,_c1 as artist_num
    ,_c2 as artist_follower_num
    ,*/
    _c3 as artist_alias
    ,_c4 as follower_alias
FROM artist_follower where _c2 != 'artist_follower_num'
""").persist()

artist_follower_df.createOrReplaceTempView('artist_follower')

#0 means the artist is not a duplicate record
artist_df = spark.sql("""
SELECT distinct
    _c0 as index
    ,_c1 as id
    ,_c2 as artist_name
    ,_c3 as sc_alias
    ,_c4 as city
    ,_c5 as country
    ,_c6 as followers_count
    ,_c7 as followings_count
    ,_c8 as last_modified
    ,_c9 as playlist_count
    ,_c10 as plan
    ,_c11 as public_favorites_count
    ,_c12 as track_count
    ,_c13 as count
FROM artists where _c2 != 'artist_name'
AND _c14 = 0
AND _c2 != 'Daft Punk'
""").persist()

artist_df.createOrReplaceTempView('artists')

follower_df = spark.sql("""
SELECT distinct
    --_c0 as exception
    --,_c1 as artist_num
    --,_c2 as artist_follower_num
    _c3 as artist_alias
    ,_c4 as follower_alias
    ,_c5 as comments_count
    ,_c6 as followers_count
    ,_c7 as followings_count
    --,_c8 as last_modified
    ,_c9 as likes_count
    ,_c10 as plan
    ,_c11 as playlist_count
    ,_c12 as public_favorites_count
    ,_c13 as reposts_count
    ,_c14 as track_count
    ,_c15 as uri
    ,_c16 as username
FROM followers where _c0 != 'exception'
""").persist()

follower_df.createOrReplaceTempView('followers')

## Artist & Follower id generation

In [214]:
custom_artist_id_df = spark.sql("""
select distinct
    row_number() over(order by sc_alias) as artist_id
    ,sc_alias as artist_alias
from artists
group by sc_alias
""").persist()

custom_artist_id_df.createOrReplaceTempView('custom_artist_id')

custom_follower_id_df = spark.sql("""
select distinct
    row_number() over(order by follower_alias) as follower_id
    ,follower_alias
from followers
group by follower_alias
""").persist()

custom_follower_id_df.createOrReplaceTempView('custom_follower_id')

artist_follower_ids_df = spark.sql("""
SELECT DISTINCT
    caid.artist_id
    ,cfid.follower_id
    ,af.artist_alias
    ,af.follower_alias
    ,1 as count
FROM artist_follower af
JOIN custom_artist_id caid on af.artist_alias = caid.artist_alias
JOIN custom_follower_id cfid on af.follower_alias = cfid.follower_alias
sort by af.artist_alias
""").persist()

artist_follower_ids_df.createOrReplaceTempView('artist_follower')

## ALS model generation

In [150]:
# als_model = ALS(rank=2, implicitPrefs=True,
#           userCol="follower_id", itemCol="artist_id", ratingCol="count", nonnegative=True)

In [151]:
# als_model = ALS(rank=5, maxIter=5, seed=0, regParam=1, implicitPrefs=True,
#           userCol="follower_id", itemCol="artist_id", ratingCol="count", nonnegative=True)

In [152]:
# model = als_model.fit(artist_follower_ids_df)

## Load model item features

In [174]:
item_features = pd.read_csv('../data/bestModelItems.csv')

In [175]:
len(item_features)

4807

In [176]:
item_features.drop('Unnamed: 0', inplace=True, axis = 1)

In [177]:
items = spark.createDataFrame(item_features)

In [178]:
items.createOrReplaceTempView('item_features')

## Combine item_features df into new artist_meta data df

In [225]:
artist_info = spark.sql("""
    SELECT distinct
        if.id
        ,if.features
        ,a.artist_name
        ,a.sc_alias
        ,a.city
        ,a.country
        ,a.followers_count
        ,a.followings_count
        ,a.last_modified
        ,a.playlist_count
        ,a.plan
        ,a.public_favorites_count
        ,a.track_count
        ,a.count
    FROM item_features if 
    JOIN custom_artist_id caid on if.id = caid.artist_id
    JOIN artists a on caid.artist_alias = a.sc_alias
    sort by a.sc_alias
""")

In [226]:
artist_meta = artist_info.toPandas()

In [228]:
len(artist_meta)

4807

In [229]:
len(item_features)

4807

In [231]:
artist_meta['formatted_features'] = artist_meta['features'].map(lambda x: ast.literal_eval(x))

## Convert features to matrix and calculate cosine similarity

In [232]:
items_mat = np.array(list(artist_meta['formatted_features'].values)).astype(float)

In [233]:
cos_sim = 1-pairwise_distances(items_mat, metric="cosine")

In [234]:
print artist_meta[artist_meta['sc_alias']=='adam-stromstedt']['artist_name'].values[0]

Adam Str�_mstedt


In [235]:
artist_meta.to_json('../data/artist_meta.json')

In [236]:
artist_meta.to_excel('../data/artist_meta.xlsx')

In [54]:
items_mat = np.array(list(artist_meta['formatted_features'].values)).astype(float)

## Concatenate cos_sim matrix with meta data for easy lookup.

In [92]:
cols = list(artist_meta.columns)

In [93]:
similarity_indices = list(artist_meta['sc_alias'])

In [94]:
cols.extend(similarity_indices)

## Check recommendations by querying an alias

In [78]:
n = 30

In [71]:
alias = 'thexxofficial'

In [72]:
artist = artist_meta[artist_meta['sc_alias']==alias]['artist_name'].values[0]

In [74]:
artist_aliases = list(artist_meta['sc_alias'])

In [75]:
index = artist_aliases.index(alias)

In [76]:
arr = cos_sim[index]

In [79]:
similar_aliases = np.asarray(artist_aliases)[arr.argsort()[-(n+1):][::-1][1:]]

In [80]:
similar_aliases

array([u'scott-fraser', u'george-maple', u'atb-music', u'djkorsakoff',
       u'aoelectro', u'bluesohorecordings', u'conjureone', u'bobina',
       u'munkgomma', u'djpaulharris', u'innellea', u'tuccillo',
       u'ashleywallbridge', u'deadcruiser', u'peggygou', u'romanrauch',
       u'starslingeruk', u'ladi6', u'chris-willis', u'combodj',
       u'sasch-music', u'jesseroseofficial', u'andyduguidofficial',
       u'round-table-knights', u'lcmdf', u'ralphirosario',
       u'gloriaestefan', u'stevee1dr', u'mircovioli', u'coreyjamesofficial'], 
      dtype='<U27')

## Follower overlap analysis

In [11]:
artist_follower_group = artist_follower.groupby('follower_alias')

In [29]:
follower_count = artist_follower_group.count().sort_values(by='artist_alias', ascending=False)

In [30]:
follower_count = follower_count['artist_alias']

In [31]:
follower_count = pd.DataFrame(follower_count)

In [34]:
follower_count.reset_index(inplace=True)

In [37]:
follower_count.columns = ['follower_alias', 'count']

In [46]:
#6392743 unique followers
#29125208 total followers
#26640 followers that were following more than 100 of the ~5000 artists scraped
#504836 followers following more than 10 of the ~5000 artists scraped