# Predict Music Recommendation
* Resources
  * From https://www.kaggle.com/c/kkbox-music-recommendation-challenge/data
* Group Members
  * Shagun Garg
  * Ashish Devrani
  * Jasdev Singh Sachdeva
  * Sanchit Bogra

## Objective
* In this task, we predict the chances of a user listening to a song repetitively after the first observable listening event within a time window was triggered. If there are recurring listening event(s) triggered within a month after the user’s very first observable listening event, its target is marked 1, and 0 otherwise in the training set. The same rule applies to the testing set.

* KKBOX provides a training data set consists of information of the first observable listening event for each unique user-song pair within a specific time duration. The use of public data to increase the level of accuracy of your prediction is encouraged.

* Use Logistic Regression
  * Train on a portion of the dataset
  * Test the trained model against the remainder of the dataset
    * Accuracy can be determined because the dataset is labeled (i.e., this uses supervised learning)

## Download Data
* Downloading from OneDrive cloud

In [4]:
%sh
wget --no-check-certificate 'https://onedrive.live.com/download?cid=19C57AC336968345&resid=19C57AC336968345%2113473&authkey=AJKZ--BuqqEJrT0' -O song_recommendations.csv

In [5]:
%sh
ls

Loading file into a dataframe

In [7]:
songs_df = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .option('inferSchema', 'true')\
  .load("file:/databricks/driver/song_recommendations.csv")
songs_df.show(3)

## Data Explored and Explained

In [9]:
songs_df.printSchema()

## Data Cleaning

Removing null values from all the columns

In [12]:
songs_df = songs_df.filter(songs_df.source_screen_name.isNotNull() & songs_df.source_system_tab.isNotNull() & songs_df.msno.isNotNull() & songs_df.song_id.isNotNull() & songs_df.source_type.isNotNull() ) 

In [13]:
songs_df.count()

In [14]:
# The songs to be predicted...
songs_df.select('target').distinct().show()

In [15]:
# Songs by their targets
songs_df.select('target').groupBy('target').count().show()

## Data Transformation

In [17]:
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *
from pyspark.ml.feature import OneHotEncoder, StringIndexer, IndexToString, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline, Model
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [18]:
# Convert results for to MLlib input, which requires labels as a float
def labelForResults(s):
     if s == 0:
         return 0.0
     elif s == 1:
         return 1.0
     else:
         return -1.0
label = UserDefinedFunction(labelForResults, DoubleType())
labeledData = songs_df.select(label(songs_df.target).alias('label'), songs_df.msno, songs_df.song_id, songs_df.source_system_tab, songs_df.source_type, songs_df.source_screen_name).where('label >= 0')
labeledData.take(1)

In [19]:
# Split into training and testing data
songs_train, songs_test = labeledData.randomSplit([0.8, 0.2], seed=12345)
display(songs_train)

## Data Modeling

In [21]:
# Configure an ML pipeline into stages:
stringIndexer_msno = StringIndexer(inputCol="msno", outputCol="MSNO_IX")
stringIndexer_sonid = StringIndexer(inputCol="song_id", outputCol="SONG_ID_IX")
stringIndexer_ssn = StringIndexer(inputCol="source_screen_name", outputCol="SOURCE_SCREEN_NAME_IX")
stringIndexer_st = StringIndexer(inputCol="source_type", outputCol="SOURCE_TYPE_IX")
stringIndexer_msno.setHandleInvalid("skip").fit(songs_train).transform(songs_test)
stringIndexer_sonid.setHandleInvalid("skip").fit(songs_train).transform(songs_test)
stringIndexer_ssn.setHandleInvalid("skip").fit(songs_train).transform(songs_test)
stringIndexer_st.setHandleInvalid("skip").fit(songs_train).transform(songs_test)
vectorAssembler_features = VectorAssembler(inputCols=["MSNO_IX", "SONG_ID_IX", "SOURCE_SCREEN_NAME_IX", "SOURCE_TYPE_IX"], outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[stringIndexer_msno, stringIndexer_sonid, stringIndexer_ssn, stringIndexer_st, vectorAssembler_features, lr])

In [22]:
model = pipeline.fit(songs_train)

## Prediction

In [24]:
predictionsDf = model.transform(songs_test)
predictionsDf.registerTempTable('Predictions')
predictionsDf.show(3)

## Model Evaluation

In [26]:
numSuccesses = predictionsDf.where("(label = 0 AND prediction = 0) OR  (label = 1 AND prediction = 1)").count()
numData = predictionsDf.count()

print "There were", numData, "song and person listening combinations and there were", numSuccesses, "successful predictions"
print "This is a", str((float(numSuccesses) / float(numData)) * 100) + "%", "success rate"

In [27]:
truePositive = int(predictionsDf.where("(label = 1 AND prediction = 1)").count())
trueNegative = int(predictionsDf.where("(label = 0 AND prediction = 0)").count())
falsePositive = int(predictionsDf.where("(label = 0 AND prediction = 1)").count())
falseNegative = int(predictionsDf.where("(label = 1 AND prediction = 0)").count())

print [['TP', truePositive], ['TN', trueNegative], ['FP', falsePositive], ['FN', falseNegative]]
resultDF = sqlContext.createDataFrame([['TP', truePositive], ['TN', trueNegative], ['FP', falsePositive], ['FN', falseNegative]], ['metric', 'value'])
display(resultDF)

In [28]:
resultDF.createOrReplaceTempView("LRresult")

In [29]:
%r
library(SparkR)
sparkdf <- sql("FROM LRresult SELECT *")
rdf <- collect(sparkdf)
print( rdf)
vals <- (t(rdf[2]))
labels <- (t(rdf[1]))
# Simple Pie Chart
pie(vals,labels)

Visualisation based on Source System tab and success rate
* For each source system tab we are trying to find the success rate

In [31]:
disc = int(predictionsDf.where("source_system_tab = 'discover' and label = prediction").count())
exp = int(predictionsDf.where("source_system_tab = 'explore' and label = prediction").count())
lis = int(predictionsDf.where("source_system_tab = 'listen with' and label = prediction").count())
mylib = int(predictionsDf.where("source_system_tab = 'my library' and label = prediction").count())
noti = int(predictionsDf.where("source_system_tab = 'notification' and label = prediction").count())
search = int(predictionsDf.where("source_system_tab = 'search' and label = prediction").count())
radio = int(predictionsDf.where("source_system_tab = 'radio' and label = prediction").count())
set = int(predictionsDf.where("source_system_tab = 'settings' and label = prediction").count())

print [['Discovery', disc], ['Explore', exp], ['Listen', lis], ['My Library', mylib],['Notifications', noti],['Search',search],['Radio', radio],['Settings',set]]
result1DF = sqlContext.createDataFrame([['Discovery', disc], ['Explore', exp], ['Listen', lis], ['My Library', mylib],['Notifications', noti],['Search',search],['Radio', radio],['Settings',set]], ['metric', 'value'])
display(result1DF)

In [32]:
result1DF.createOrReplaceTempView("LRresult")

In [33]:
%r
library(SparkR)
sparkdf <- sql("FROM LRresult SELECT *")
rdf <- collect(sparkdf)
print( rdf)
vals <- (t(rdf[2]))
labels <- (t(rdf[1]))
# Simple Pie Chart
pie(vals,labels)

Visualisation based on Source Screen Name and success rate
* For each screen name we are trying to know the success rate.

In [35]:
albm = int(predictionsDf.where("source_screen_name = 'Album more' and label = prediction").count())
artm = int(predictionsDf.where("source_screen_name = 'Artist more' and label = prediction").count())
con = int(predictionsDf.where("source_screen_name = 'Concert' and label = prediction").count())
discc = int(predictionsDf.where("source_screen_name = 'Discover Chart' and label = prediction").count())
discf = int(predictionsDf.where("source_system_tab = 'Discover Feature' and label = prediction").count())
discg = int(predictionsDf.where("source_system_tab = 'Discover Genre' and label = prediction").count())
discn = int(predictionsDf.where("source_system_tab = 'Discover New' and label = prediction").count())
ex = int(predictionsDf.where("source_system_tab = 'Explore' and label = prediction").count())
locp = int(predictionsDf.where("source_system_tab = 'Local playlist more' and label = prediction").count())
mylib = int(predictionsDf.where("source_system_tab = 'My library' and label = prediction").count())
mylibs = int(predictionsDf.where("source_system_tab = 'My library_Search' and label = prediction").count())
opm = int(predictionsDf.where("source_system_tab = 'Online playlist more' and label = prediction").count())
otpm = int(predictionsDf.where("source_system_tab = 'Others profile more' and label = prediction").count())
p = int(predictionsDf.where("source_system_tab = 'Payment' and label = prediction").count())
s = int(predictionsDf.where("source_system_tab = 'Search' and label = prediction").count())
sh = int(predictionsDf.where("source_system_tab = 'Search Home' and label = prediction").count())
st = int(predictionsDf.where("source_system_tab = 'Search Trends' and label = prediction").count())
spm = int(predictionsDf.where("source_system_tab = 'Self profile more' and label = prediction").count())

print [['Album More', albm], ['Artist More', artm], ['Concert', con], ['Discover Chart', discc],['Discover Feature', discf],['Discover Genre',discg],['Discover New', discn],['Explore',ex],['Local playlist more', locp], ['My library', mylib], ['My library_Search', mylibs], ['Online playlist more', opm],['Others profile more', otpm],['Payment',p],['Search', s],['Search Home', sh], ['Search Trends', st], ['Self profile more', spm]]
result1DF = sqlContext.createDataFrame([['Album More', albm], ['Artist More', artm], ['Concert', con], ['Discover Chart', discc],['Discover Feature', discf],['Discover Genre',discg],['Discover New', discn],['Explore',ex],['Local playlist more', locp], ['My library', mylib], ['My library_Search', mylibs], ['Online playlist more', opm],['Others profile more', otpm],['Payment',p],['Search', s],['Search Home', sh], ['Search Trends', st], ['Self profile more', spm]], ['metric', 'value'])
display(result1DF)