In [None]:
import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
import numpy as np

conf = SparkConf()

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

In [None]:
songsdf = spark.read.parquet('s3://17700project/song_features.parquet')
playcountdf = spark.read.parquet('s3://17700project/play_counts.parquet')

In [None]:
len(songsdf.columns)

In [None]:
def song_track_str(x):
    return x[2:-1]

def split(x):
    return x[2:-1].lower().split(' ')

song_track_str_udf = udf(lambda x: song_track_str(x), StringType())
split_udf = udf(lambda x: split(x), ArrayType(StringType()))

songsdf_filtered = songsdf.select(song_track_str_udf('song_id').alias('song_id'), song_track_str_udf('track_id').alias('track_id'), 
                                  'duration', 'end_of_fade_in', 'key', 'key_confidence', 'loudness', 'mode', 'mode_confidence',
                                  'start_of_fade_out', 'tempo', 'time_signature', 'time_signature_confidence', 'artist_familiarity', 
                                  'artist_hotttnesss', 'song_hotttnesss', split_udf('artist_name').alias('artist_name'), 'year', 
                                  split_udf('release').alias('release'), split_udf('title').alias('title'))

playcountdf_filtered = playcountdf.select('songId', 'userId', 'Plays')

In [None]:
songsdf_filtered = songsdf_filtered.na.fill(value = 0, subset = ['artist_familiarity', 'artist_hotttnesss', 'song_hotttnesss'])

In [None]:
from pyspark.ml.feature import Word2Vec, OneHotEncoder, StringIndexer

#year
stringIndexer = StringIndexer(inputCol="year", outputCol="year_idx")
model = stringIndexer.fit(songsdf_filtered)
songsdf_filtered = model.transform(songsdf_filtered)
encoder = OneHotEncoder(inputCol="year_idx", outputCol="year_enc")
songsdf_filtered = encoder.transform(songsdf_filtered)

songsdf_filtered.columns

In [None]:
#release
word2Vec = Word2Vec(vectorSize=30, minCount=0, inputCol="release", outputCol="release_enc")
model = word2Vec.fit(songsdf_filtered)
songsdf_filtered = model.transform(songsdf_filtered)

#title
word2Vec = Word2Vec(vectorSize=30, minCount=0, inputCol="title", outputCol="title_enc")
model = word2Vec.fit(songsdf_filtered)
songsdf_filtered = model.transform(songsdf_filtered)

#artist_name
word2Vec = Word2Vec(vectorSize=30, minCount=0, inputCol="artist_name", outputCol="artist_name_enc")
model = word2Vec.fit(songsdf_filtered)
songsdf_filtered = model.transform(songsdf_filtered)

songsdf_filtered.columns

In [None]:
featuresdf = songsdf_filtered.select('song_id', 'track_id', 'duration', 'end_of_fade_in', 'key', 'key_confidence', 'loudness', 'mode', 
                                     'mode_confidence', 'start_of_fade_out', 'tempo', 'time_signature', 'time_signature_confidence', 
                                     'artist_familiarity', 'artist_hotttnesss', 'song_hotttnesss', 'artist_name_enc', 'year_enc', 
                                     'release_enc', 'title_enc')

In [None]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=['duration', 'end_of_fade_in', 'key', 'key_confidence', 'loudness', 'mode', 'mode_confidence', 
               'start_of_fade_out', 'tempo', 'time_signature', 'time_signature_confidence', 'artist_familiarity', 'artist_hotttnesss',
               'song_hotttnesss', 'artist_name_enc', 'year_enc', 'release_enc', 'title_enc'],
    outputCol='features')

featuresdf = assembler.transform(featuresdf).select('song_id','track_id','features')

In [None]:
featuresdf.rdd.take(1)

In [None]:
from pyspark.ml.feature import PCA

pca = PCA(k=50, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(featuresdf)
featuresdf = model.transform(featuresdf).select("song_id","track_id",col("pcaFeatures").alias('features'))

In [None]:
seed = 180000
(split_80_df, split_20_df) = playcountdf.randomSplit([0.8, 0.2], seed = seed)

# Let's cache these datasets for performance
training_df = split_80_df.cache()
test_df = split_20_df.cache()

print('Training: {0}, test: {1}\n'.format(
  training_df.count(), test_df.count())
)
training_df.show(3)
test_df.show(3)

In [None]:
def dot_product(x,y):
    return np.dot(x, y)

In [None]:
training_df_1 = training_df.join(featuresdf, training_df.songId == featuresdf.song_id).cache()

In [None]:
def predict(songId, userId, similarity_metric): 
    song_features = featuresdf.filter(featuresdf.song_id == songId).rdd.take(1)[0].features
    user_history_features = training_df_1.filter(training_df.userId == userId).rdd.map(lambda x: (x.features, x.Plays))
    
    user_history_similarity = user_history_features.map(lambda x: (similarity_metric(song_features, x[0]), x[1]))
    numerator = user_history_similarity.map(lambda x: x[0]*x[1]).reduce(lambda a,b: a+b)
    denominator = user_history_similarity.map(lambda x: x[0]).reduce(lambda a,b: a+b)
    
    return numerator/denominator

In [None]:
#predict_udf = udf(lambda song_id, user_id: predict(song_id, user_id, dot_product), DoubleType())
test_df_list = test_df.rdd.map(lambda x: (x.songId, x.userId, x.Plays)).take(100)

In [None]:
import time
s = time.time()
predict(test_sample[0], test_sample[1], dot_product)
e = time.time()

In [None]:
s-e

In [None]:
s = time.time()
pred = []
for i in test_df_list:
    pred.append(predict(i[0], i[1], dot_product))
e = time.time()

In [None]:
pred