In [1]:
# usual imports
import os
import sys
import time
import glob
import datetime
import sqlite3
import pandas as pd
import numpy as np # get it at: http://numpy.scipy.org/
# path to the Million Song Dataset subset (uncompressed)
# CHANGE IT TO YOUR LOCAL CONFIGURATION
# msd_subset_path='./data/MSD/MillionSongSubset'
# msd_subset_data_path=os.path.join(msd_subset_path,'data')
# msd_subset_addf_path=os.path.join(msd_subset_path,'AdditionalFiles')
# assert os.path.isdir(msd_subset_path),'wrong path' # sanity check
# # path to the Million Song Dataset code
# # CHANGE IT TO YOUR LOCAL CONFIGURATION
# msd_code_path='./data/MSD/MSongsDB'
# assert os.path.isdir(msd_code_path),'wrong path' # sanity check
# # we add some paths to python so we can import MSD code
# # Ubuntu: you can change the environment variable PYTHONPATH
# # in your .bashrc file so you do not have to type these lines
# sys.path.append(os.path.join(msd_code_path,'PythonSrc') )

taste_profile_data_path="./data/train_triplets.txt"
# filtered_taste_profile_data_path="./data/FilteredEchoNestTasteProfileSubset.csv"

# assert os.path.isfile(taste_profile_data_path)

# imports specific to the MSD
# import hdf5_getters as GETTERS

from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row


In [30]:
def reduce_taste_subset(path='./data/FullEchoNestTasteProfileSubset.txt', 
                        to_path='./data/TeenyTinyEchoNestTasteProfileSubset.csv', downsample=0.001):
    data = pd.read_csv(path, sep="\t", header=None)
    data.columns = ['user', 'song', 'play_count']
    data.astype({'user': np.str, 'song': np.str, 'play_count': np.int32})
    data.sample(frac=downsample).to_csv(to_path, index=False)
# the following function simply gives us a nice string for
# a time lag in seconds
def strtimedelta(starttime,stoptime):
    return str(datetime.timedelta(seconds=stoptime-starttime))

# Get full data set

In [31]:
data = pd.read_csv(taste_profile_data_path, sep="\t", header=None)
data.columns = ['user', 'song', 'play_count']
data = data.astype({'user': np.str, 'song': np.str, 'play_count': np.int32})

In [93]:
data.shape

(48373586, 3)

# Subset dataset by the tracks that we have in MSD

In [277]:
# Get the song ids which are also in the database. 
song_ids = []

# for song_id in data['song'].values:
# let's redo all this work in SQLite in a few seconds
t1 = time.time()
# connect to database to get the metadata from MSD
conn = sqlite3.connect(os.path.join(msd_subset_addf_path,
                                    'subset_track_metadata.db'))

q = "SELECT DISTINCT song_id FROM songs"
#     q += " WHERE song_id='" + song_id + "'"
res = conn.execute(q)
res = res.fetchall()
conn.close()
t2 = time.time()
song_ids_dict = {r[0]: 1 for r in res}



In [278]:
a = time.time()
print "SOAPDEY12A81C210A9" in song_ids_dict
print(time.time() - a)

False
2.98820495605


In [279]:
b = time.time()
print "SOAPDEY12A81C210A9" in song_ids_dict.values()
print(time.time() - b)

False
0.0225830078125


In [280]:
small_data = data.iloc[:1000000]
# print(small_data.shape)
not_included = {}
a = time.time()
def foo(x):
    if x not in song_ids_dict:
        if x not in not_included:
            not_included[x] = 1
        return False
    return True
    
filtered_data = data[data["song"].map(foo)]

# filtered_data = data[data["song"].map(lambda x: x in song_ids_dict)]
print(time.time() - a)

20.8473460674


# Store and filter by 100,000 most popular songs

In [33]:
song_popularity = data.drop(["user"], axis=1)
song_popularity = song_popularity.groupby(["song"]).sum()

In [34]:
song_popularity.size

384546

In [50]:
import pickle
song_popularity_subset = song_popularity.sort_values(["play_count"], ascending=False).iloc[:50000]
with open("./data/subset_popularity.pkl", "wb") as f:
    pickle.dump(song_popularity_subset, f)

In [51]:
song_popularity_subset.index

Index([u'SOBONKR12A58A7A7E0', u'SOAUWYT12A81C206F1', u'SOSXLTC12AF72A7F54',
       u'SOFRQTD12A81C233C0', u'SOEGIYH12A6D4FC0E3', u'SOAXGDH12A8C13F8A1',
       u'SONYKOW12AB01849C9', u'SOPUCYA12A8C13A694', u'SOUFTBI12AB0183F65',
       u'SOVDSJC12A58A7A271',
       ...
       u'SOHGECL12AB01883C2', u'SOLQSUI12A6D4F6943', u'SOXICPA12AB018DCB5',
       u'SORQIYM12AB0189D4D', u'SOTRVOS12A67AE1370', u'SOMULFL12A6D4F6D57',
       u'SOHSFGD12AB0189677', u'SOHMCUG12A8C1376FD', u'SOZKLHQ12AF72A76DB',
       u'SOFMFJH12AB017E625'],
      dtype='object', name=u'song', length=50000)

In [52]:
song_popularity_subset_dict = {s: 1 for s in song_popularity_subset.index}

In [53]:
data_subset = data[data["song"].map(lambda x: x in song_popularity_subset_dict)]

In [92]:
data.shape

(48373586, 3)

In [55]:
len(data_subset)

40318527

### Stored filtered Taste data

In [281]:
filtered_data.to_csv(filtered_taste_profile_data_path, index=False)

# Train the WMF model

In [9]:
def tune_ALS(train_data, validation_data, maxIter, regParams, ranks):
    """
    grid search function to select the best model based on RMSE of
    validation data
    Parameters
    ----------
    train_data: spark DF with columns ['user', 'song', 'play_count']
    
    validation_data: spark DF with columns ['user', 'song', 'play_count']
    
    maxIter: int, max number of learning iterations
    
    regParams: list of float, one dimension of hyper-param tuning grid
    
    ranks: list of float, one dimension of hyper-param tuning grid
    
    Return
    ------
    The best fitted ALS model with lowest RMSE score on validation data
    """
    # initial
    min_error = float('inf')
    best_rank = -1
    best_regularization = 0
    best_model = None
    for rank in ranks:
        for reg in regParams:
            # get ALS model
            # Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
            als = ALS(userCol="userId", itemCol="songId", ratingCol="Plays", implicitPrefs=True, alpha=40).setMaxIter(maxIter).setRank(rank).setRegParam(reg).setColdStartStrategy("drop")
            # train ALS model
            model = als.fit(train_data)
            # evaluate the model by computing the RMSE on the validation data
            predictions = model.transform(validation_data)  # give Nan to data that hasn't been seen before. cold_start drops those
            evaluator = RegressionEvaluator(metricName="rmse",
                                            labelCol="Plays",
                                            predictionCol="prediction")
            rmse = evaluator.evaluate(predictions)
            print('{} latent factors and regularization = {}: '
                  'validation RMSE is {}'.format(rank, reg, rmse))
            if rmse < min_error:
                min_error = rmse
                best_rank = rank
                best_regularization = reg
                best_model = model
    print('\nThe best model has {} latent factors and '
          'regularization = {}'.format(best_rank, best_regularization))
    return best_model

In [57]:
filtered_data = data_subset #pd.read_csv(filtered_taste_profile_data_path)

In [58]:
user_mapping = {}
last_user = 0

def user_map(user):
    global user_mapping, last_user
    if user not in user_mapping:
        user_mapping[user] = last_user
        last_user += 1

a = filtered_data["user"].map(lambda x: user_map(x))

In [59]:
song_mapping = {}
last_song = 0

def song_map(song):
    global song_mapping, last_song
    if song not in song_mapping:
        song_mapping[song] = last_song
        last_song += 1

a = filtered_data["song"].map(lambda x: song_map(x))

In [60]:
prep_data = filtered_data.copy()
prep_data["user_id"] = prep_data["user"].map(lambda x: user_mapping[x])
prep_data["song_id"] = prep_data["song"].map(lambda x: song_mapping[x])

In [83]:
prep_data = prep_data.astype({'user': np.str, 'song': np.str, 'play_count': np.int32})

AttributeError: 'DataFrame' object has no attribute 'dytpes'

In [91]:
filtered_data.shape

(40318527, 3)

In [87]:
# with open("prep_cf_data.pkl", "wb") as f:
#     pickle.dump(prep_data, f)
prep_data.to_csv("prep_cf_data.txt", header=None, index=None, sep='\t', mode='a')


In [25]:
d = pd.read_csv("prep_cf_data.txt", sep="\t", header=None)


In [30]:
d.columns =["user", "song", "Plays", "userId", "songId"]

In [31]:
d.head()

Unnamed: 0,user,song,Plays,userId,songId
0,b80344d063b5ccb3212f76538f3d9e43d87dca9e,SOAKIMP12A8C130995,1,0,0
1,b80344d063b5ccb3212f76538f3d9e43d87dca9e,SOAPDEY12A81C210A9,1,0,1
2,b80344d063b5ccb3212f76538f3d9e43d87dca9e,SOBBMDR12A8C13253B,2,0,2
3,b80344d063b5ccb3212f76538f3d9e43d87dca9e,SOBFNSP12AF72A0E22,1,0,3
4,b80344d063b5ccb3212f76538f3d9e43d87dca9e,SOBFOVM12A58A7D494,1,0,4


In [47]:
user_mapping = {}
song_mapping = {}

In [59]:
user_df = d[["user", "userId"]]
user_mapping = user_df.set_index('user').to_dict()["userId"]


In [60]:
song_df = d[["song", "songId"]]
song_mapping = song_df.set_index('song').to_dict()["songId"]


In [62]:
song_mapping

{'SOZJAWB12A6D4FC287': 22149,
 'SOCCXFA12AB0184578': 41056,
 'SOXGDOO12A6D4F9748': 33229,
 'SOWAVQV12A58A7BBEF': 805,
 'SOGSBDE12AC3DF81C1': 39417,
 'SOTGNTL12A6D4F8866': 14706,
 'SOTJEEP12A8C13234D': 38511,
 'SOPURHM12A58A7F379': 46027,
 'SOOKPGM12AB018C0EB': 27538,
 'SOKXFXD12A8AE46783': 41401,
 'SOBROKQ12A8C130DBD': 8842,
 'SOZYBLA12AB018C2A2': 29392,
 'SOQJAAQ12AB018FD48': 27889,
 'SORRVTP12A8C13FFAF': 12604,
 'SOOFKFU12A6D4FC27B': 37939,
 'SOMZXKE12A6D4F9E22': 24262,
 'SOSJKNF12A6D4FC814': 44744,
 'SOIOBZD12A81C22DFF': 39786,
 'SOKDOZY12AF72A0F25': 8889,
 'SOCYMMR12A8C1360BC': 41332,
 'SOPWWEY12AB018A75C': 10737,
 'SOYXCPK12AC9097E8A': 37582,
 'SOYOROC12AF72A48AE': 2309,
 'SOBFQSF12A8C13ED0C': 19318,
 'SOBCGTF12A6D4F801C': 895,
 'SODWGNH12A8C145306': 22902,
 'SOZWCTF12A6D4F79C7': 16553,
 'SOVALQH12AB0189114': 2937,
 'SOCFMXL12A8151BD37': 18149,
 'SONGFNE12A58A81A71': 42980,
 'SOTNWDU12A8C14093A': 31771,
 'SODJHJL12AB018DDB9': 14035,
 'SODFOLL12A8C13E5B2': 5525,
 'SOLWLSF12A58A788B

# fancy load data

In [2]:
# import libraries
import os
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql import SQLContext
from pyspark import SparkContext

from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [3]:
# list directories
triplets_filename = 'prep_cf_data.txt'

# base_dir2 = '/FileStore/tables/qgz865tr1472080731816/'
# songs2tracks_filename = base_dir2 + 'taste_profile_song_to_tracks.txt'

# base_dir3 = '/FileStore/tables/9204dwdp1472230135186/'
# metadata_filename = base_dir3 + 'track_metadata.csv'

if os.path.sep != '/':
  # Handle Windows.
  triplets_filename = triplets_filename.replace('/', os.path.sep)
#   songs2tracks_filename = songs2tracks_filename.replace('/', os.path.sep)
#   metadata_filename = metadata_filename.replace('/', os.path.sep)

In [4]:
plays_df_schema = StructType(
  [StructField('user', StringType()),
   StructField('song', StringType()),
   StructField('Plays', IntegerType()), 
   StructField('userId', IntegerType()),
   StructField('songId', IntegerType())]
)

# load in data
raw_plays_df_with_int_ids = sqlContext.read.format('com.databricks.spark.csv') \
                              .options(delimiter = '\t', header=False,inferSchema=False) \
                              .schema(plays_df_schema) \
                              .load(triplets_filename)

# cache
raw_plays_df_with_int_ids.cache()
raw_plays_df_with_int_ids.show(10)

+--------------------+------------------+-----+------+------+
|                user|              song|Plays|userId|songId|
+--------------------+------------------+-----+------+------+
|b80344d063b5ccb32...|SOAKIMP12A8C130995|    1|     0|     0|
|b80344d063b5ccb32...|SOAPDEY12A81C210A9|    1|     0|     1|
|b80344d063b5ccb32...|SOBBMDR12A8C13253B|    2|     0|     2|
|b80344d063b5ccb32...|SOBFNSP12AF72A0E22|    1|     0|     3|
|b80344d063b5ccb32...|SOBFOVM12A58A7D494|    1|     0|     4|
|b80344d063b5ccb32...|SOBSUJE12A6D4F8CF5|    2|     0|     5|
|b80344d063b5ccb32...|SOBXALG12A8C13C108|    1|     0|     6|
|b80344d063b5ccb32...|SOBXHDL12A81C204C0|    1|     0|     7|
|b80344d063b5ccb32...|SOBYHAJ12A6701BF1D|    1|     0|     8|
|b80344d063b5ccb32...|SOCNMUH12A6D4F6E6D|    1|     0|     9|
+--------------------+------------------+-----+------+------+
only showing top 10 rows



+--------------------+
|                user|
+--------------------+
|b80344d063b5ccb32...|
+--------------------+
only showing top 1 row



In [5]:
# We'll hold out 60% for training, 20% of our data for validation, and leave 20% for testing
seed = 1800009193L
(split_60_df, split_a_20_df, split_b_20_df) = raw_plays_df_with_int_ids.randomSplit([0.6, 0.2, 0.2], seed = seed)

# Let's cache these datasets for performance
training_df = split_60_df.cache()
validation_df = split_a_20_df.cache()
test_df = split_b_20_df.cache()

print('Training: {0}, validation: {1}, test: {2}\n'.format(
  training_df.count(), validation_df.count(), test_df.count())
)
training_df.show(3)
validation_df.show(3)
test_df.show(3)

Training: 24187619, validation: 8070090, test: 8060818

+--------------------+------------------+-----+------+------+
|                user|              song|Plays|userId|songId|
+--------------------+------------------+-----+------+------+
|00003a4459f33b929...|SOCEQVU12A6D4F78B3|    3| 14730| 40841|
|00003a4459f33b929...|SOJJRVI12A6D4FBE49|    1| 14730| 22887|
|00003a4459f33b929...|SOMZHIH12A8AE45D00|    3| 14730|  7085|
+--------------------+------------------+-----+------+------+
only showing top 3 rows

+--------------------+------------------+-----+------+------+
|                user|              song|Plays|userId|songId|
+--------------------+------------------+-----+------+------+
|00003a4459f33b929...|SOKJWZB12A6D4F9487|    4| 14730| 24008|
|00003a4459f33b929...|SOWIEUN12A81C1F953|    6| 14730| 22352|
|00005c6177188f12f...|SOGMLKG12AB018CF9B|    1| 21719|  1861|
+--------------------+------------------+-----+------+------+
only showing top 3 rows

+--------------------+----

In [23]:
# spark_data = spark.createDataFrame(prep_data)
# (training, val) = spark_data.randomSplit([0.8, 0.2])

In [24]:
training.show()

+----+----+----------+
|user|song|play_count|
+----+----+----------+
|   0|   0|         1|
|   0|   1|         1|
|   0|   4|         1|
|   0|   5|         1|
|   0|   6|         2|
|   0|   7|         1|
|   0|   8|         1|
|   0|  10|         1|
|   0|  11|         1|
|   0|  13|         5|
|   0|  15|         1|
|   0|  16|         1|
|   0|  17|         1|
|   0|  18|         1|
|   0|  19|         1|
|   0|  22|         1|
|   0|  25|         1|
|   0|  26|         1|
|   0|  27|         1|
|   0|  29|         1|
+----+----+----------+
only showing top 20 rows



In [6]:
sc.setCheckpointDir("checkpoint/")


In [84]:
training_df.select("userId").distinct().orderBy("userId", ascending=1).show()

+------+
|userId|
+------+
|     0|
|     1|
|     2|
|     3|
|     4|
|     5|
|     6|
|     7|
|     8|
|     9|
|    10|
|    11|
|    12|
|    13|
|    14|
|    15|
|    16|
|    17|
|    18|
|    19|
+------+
only showing top 20 rows



In [10]:
model = tune_ALS(training_df, validation_df, 10, [0.01], [50])

50 latent factors and regularization = 0.01: validation RMSE is 6.95318182636

The best model has 50 latent factors and regularization = 0.01


In [44]:
model.load("CF.model")

ALS_488ca98e85e316fcdd3e

In [30]:
predictions = model.transform(val)

In [31]:
predictions

DataFrame[user: bigint, song: bigint, play_count: bigint, prediction: float]

In [17]:
user_factors = model.userFactors.toPandas()

In [18]:
user_factors.head()

Unnamed: 0,id,features
0,0,"[-0.642754912376, -1.44765949249, 0.7702466845..."
1,10,"[-0.00871864613146, -0.640369713306, -0.732731..."
2,20,"[-0.65563249588, -1.33558595181, 0.43775263428..."
3,30,"[-1.22424197197, -1.55556643009, -0.5380949378..."
4,40,"[-0.114687010646, 0.00771158747375, -0.3957910..."


In [20]:
song_factors = model.itemFactors.toPandas()

In [21]:
song_factors.head()

Unnamed: 0,id,features
0,0,"[-0.0240161754191, -0.0768702998757, 0.0561221..."
1,10,"[0.00995913147926, -0.168614029884, 0.03676532..."
2,20,"[-0.00825371220708, -0.0511649250984, -0.02617..."
3,30,"[-0.0324394889176, -0.0138186281547, 0.0562140..."
4,40,"[0.017810029909, -0.0875535160303, -0.00549469..."


In [86]:
np.sort(song_factors["id"].values)

array([    0,     1,     2, ..., 49997, 49998, 49999], dtype=int32)

In [63]:
user_mapping_inverted = dict(map(reversed, user_mapping.items()))

In [64]:
song_mapping_inverted = dict(map(reversed, song_mapping.items()))

In [94]:
user_factors["user"] = user_factors["id"].map(lambda x: user_mapping_inverted[x])
song_factors["song"] = song_factors["id"].map(lambda x: song_mapping_inverted[x])

In [95]:
final_user_factors = user_factors.drop(columns=["id"])
final_song_factors = song_factors.drop(columns=["id"])

In [96]:
final_song_factors.head()

Unnamed: 0,features,song
0,"[-0.0240161754191, -0.0768702998757, 0.0561221...",SOAKIMP12A8C130995
1,"[0.00995913147926, -0.168614029884, 0.03676532...",SODACBL12A8C13C273
2,"[-0.00825371220708, -0.0511649250984, -0.02617...",SOHQWYZ12A6D4FA701
3,"[-0.0324394889176, -0.0138186281547, 0.0562140...",SOLGNOE12A8C139CA9
4,"[0.017810029909, -0.0875535160303, -0.00549469...",SONSAEZ12A8C138D7A


In [97]:
final_user_factors.head()

Unnamed: 0,features,user
0,"[-0.642754912376, -1.44765949249, 0.7702466845...",b80344d063b5ccb3212f76538f3d9e43d87dca9e
1,"[-0.00871864613146, -0.640369713306, -0.732731...",17aa9f6dbdf753831da8f38c71b66b64373de613
2,"[-0.65563249588, -1.33558595181, 0.43775263428...",0afaa5d9d04bf85af720fe8cc566a41ca3e41c97
3,"[-1.22424197197, -1.55556643009, -0.5380949378...",a58de017cbeda1763ea002fe027ed41b4ed53109
4,"[-0.114687010646, 0.00771158747375, -0.3957910...",ff4322e94814d3c7895d07e6f94139b092862611


In [98]:
final_song_factors.shape

(50000, 2)

In [99]:
final_user_factors.shape

(1017790, 2)

In [100]:
final_song_factors.to_csv("./data/song_factors.csv", index=False)
# final_user_factors.to_csv("./data/user_factors.csv", index=False)

In [101]:
dict_user_factors = final_user_factors.set_index("user").to_dict()["features"]

In [102]:
dict_song_factors = final_song_factors.set_index("song").to_dict()["features"]

In [103]:
import pickle

In [104]:
with open("./data/user_factors.pkl", "w") as f:
    pickle.dump(dict_user_factors, f)

In [105]:
with open("./data/song_factors.pkl", "w") as f:
    pickle.dump(dict_song_factors, f)

In [274]:
with open("./data/user_factors.pkl", "r") as f:
    a = pickle.load(f)

# EXTRA


Running the cell to get all valid_zone_ids showed that all songs in data are valid songs in the MSD

In [130]:
song_ids

[u'SOVLGJY12A8C13FBED',
 u'SOGDQZK12A8C13F37C',
 u'SODMVJR12A6D4F985D',
 u'SOIWBDR12A8C13A4AC',
 u'SOHCCIA12AC907577F',
 u'SOBOAQC12A8C13E3E9',
 u'SOKVLHX12AB0187B39',
 u'SOMMSMW12A8C13FCCC',
 u'SODPNJR12A6D4FA52D',
 u'SOFFLLP12AB018ED52',
 u'SOZSPTD12A8C143BFD',
 u'SOHWWOM12AB0186627',
 u'SOZVLXO12AF72ABCDB',
 u'SOVBVJW12AAF3B5F77',
 u'SOINHKC12AB01836E0',
 u'SODKUVJ12AB0187A4A',
 u'SOMJNMD12AB0187F93',
 u'SOZNBMX12AB0182AD1',
 u'SOIRIWN12AB0183AE7',
 u'SOESNFW12A6D4F995F',
 u'SOGNFKZ12A8C145224',
 u'SOSJVGP12AB0185B95',
 u'SOHHGUG12A6D4F4E77',
 u'SOBIPBF12AB0184B0C',
 u'SONUTGA12AB0187F44',
 u'SOIGIUZ12A8C1337B1',
 u'SORRQLD12A8C139E8D',
 u'SOHHKOU12A81C224B2',
 u'SOWUAYB12A6D4FA1D4',
 u'SOYTQGO12A8C138C1B',
 u'SOOOBCE12A8C134019',
 u'SOROLCY12AB0182652',
 u'SOWBEBK12AB018306A',
 u'SOLIAYU12A6D4F9923',
 u'SOOBRTK12A8C13D6A9',
 u'SONROBV12AB0188BC7',
 u'SOLWBGS12AB0187EAD',
 u'SOOCWAO12AB01852C1',
 u'SOJDJRK12AB018565D',
 u'SOYNXWG12AC468F51A',
 u'SOQYKBE12AC46867EE',
 u'SOHWKQE12A8C1