#KK-Box's Music Recommendation System
>___Team members___ <br />
*Anushi Doshi <br />
*Mahesh kumar Badam venkata <br />
*Manideep Kannaiah <br />
*Vidushi Mishra <br />

In [1]:
#imports
from pyspark.sql import SparkSession
from pyspark.sql import Row ,functions
from pyspark.sql.types import StringType, DateType
from pyspark.sql.dataframe import DataFrame
from pyspark.ml import feature, Pipeline
from pyspark.ml.classification import LogisticRegression
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
#Setting up spark session and spark context
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [2]:
# read only cell

import os

# get the databricks runtime version
db_env = os.getenv("DATABRICKS_RUNTIME_VERSION")

# Define a function to read the data file.  The full path data file name is constructed
# by checking runtime environment variables to determine if the runtime environment is 
# databricks, or a student's personal computer.  The full path file name is then
# constructed based on the runtime env.
# 
# Params
#   data_file_name: The base name of the data file to load
# 
# Returns the full path file name based on the runtime env
#
def get_training_filename(data_file_name):    
    # if the databricks env var exists
    if db_env != None:
        # build the full path file name assuming data brick env
        full_path_name = "/FileStore/tables/%s" % data_file_name
    # else the data is assumed to be in the same dir as this notebook
    else:
        # Assume the student is running on their own computer and load the data
        # file from the same dir as this notebook
        full_path_name = data_file_name
    
    # return the full path file name to the caller
    return full_path_name

#Function defining the shape of spark dataframe
def spark_df_shape(self):
    return (self.count(), len(self.columns))
  
#Plug the function into pyspark
DataFrame.shape = spark_df_shape

In [3]:
#Importing Data 
members = spark.read.csv(get_training_filename('members.csv'),header='true',inferSchema='true')
songs = spark.read.csv(get_training_filename('songs.csv'),header='true',inferSchema='true')
songs_extra_info = spark.read.csv(get_training_filename('song_extra_info.csv'),header='true',inferSchema='true')
logs = spark.read.csv(get_training_filename('train.csv'),header='true',inferSchema='true')

##Basic Data Cleaning and Analysis

In [4]:
print("Data Dimensions:")
print("members:" , members.shape())
print("songs:" , songs.shape())
print("songs_extra_info:" , songs_extra_info.shape())
print("logs:" , logs.shape())

Data Dimensions:
members: (34403, 7)
songs: (2296833, 7)
songs_extra_info: (2296869, 3)
logs: (7377418, 6)


In [5]:
members.show(5)

+--------------------+----+---+------+--------------+----------------------+---------------+
|                msno|city| bd|gender|registered_via|registration_init_time|expiration_date|
+--------------------+----+---+------+--------------+----------------------+---------------+
|XQxgAYj3klVKjR3ox...|   1|  0|  null|             7|              20110820|       20170920|
|UizsfmJb9mV54qE9h...|   1|  0|  null|             7|              20150628|       20170622|
|D8nEhsIOBSoE6VthT...|   1|  0|  null|             4|              20160411|       20170712|
|mCuD+tZ1hERA/o5GP...|   1|  0|  null|             9|              20150906|       20150907|
|q4HRBfVSssAFS9iRf...|   1|  0|  null|             4|              20170126|       20170613|
+--------------------+----+---+------+--------------+----------------------+---------------+
only showing top 5 rows



In [6]:
songs.show(5)

+--------------------+-----------+---------+-------------------+--------------------+-----------+--------+
|             song_id|song_length|genre_ids|        artist_name|            composer|   lyricist|language|
+--------------------+-----------+---------+-------------------+--------------------+-----------+--------+
|CXoTN1eb7AI+DntdU...|     247640|      465|張信哲 (Jeff Chang)|                董貞|     何啟弘|     3.0|
|o0kFgae9QtnYgRkVP...|     197328|      444|          BLACKPINK|TEDDY|  FUTURE BO...|      TEDDY|    31.0|
|DwVvVurfpuz+XPuFv...|     231781|      465|       SUPER JUNIOR|                null|       null|    31.0|
|dKMBWoZyScdxSkihK...|     273554|      465|              S.H.E|              湯小康|     徐世珍|     3.0|
|W3bqWd3T+VeHFzHAU...|     140329|      726|           貴族精選|         Traditional|Traditional|    52.0|
+--------------------+-----------+---------+-------------------+--------------------+-----------+--------+
only showing top 5 rows



In [7]:
songs_extra_info.show(5)

+--------------------+---------------+------------+
|             song_id|           name|        isrc|
+--------------------+---------------+------------+
|LP7pLJoJFBvyuUwvu...|           我們|TWUM71200043|
|ClazTFnk6r0Bnuie4...|Let Me Love You|QMZSY1600015|
|u2ja/bZE3zhCGxvbb...|         原諒我|TWA530887303|
|92Fqsy0+p6+RHe2Eo...|        Classic|USSM11301446|
|0QFmz/+rJy1Q56C1D...|       愛投羅網|TWA471306001|
+--------------------+---------------+------------+
only showing top 5 rows



In [8]:
logs.show(5)

+--------------------+--------------------+-----------------+-------------------+---------------+------+
|                msno|             song_id|source_system_tab| source_screen_name|    source_type|target|
+--------------------+--------------------+-----------------+-------------------+---------------+------+
|FGtllVqz18RPiwJj/...|BBzumQNXUHKdEBOB7...|          explore|            Explore|online-playlist|     1|
|Xumu+NIjS6QYVxDS4...|bhp/MpSNoqoxOIB+/...|       my library|Local playlist more| local-playlist|     1|
|Xumu+NIjS6QYVxDS4...|JNWfrrC7zNN7BdMps...|       my library|Local playlist more| local-playlist|     1|
|Xumu+NIjS6QYVxDS4...|2A87tzfnJTSWqD7gI...|       my library|Local playlist more| local-playlist|     1|
|FGtllVqz18RPiwJj/...|3qm6XTZ6MOCU11x8F...|          explore|            Explore|online-playlist|     1|
+--------------------+--------------------+-----------------+-------------------+---------------+------+
only showing top 5 rows



In [9]:
#Merging songs and song_extra_info 
songs = songs.join(songs_extra_info,
                   songs.song_id == songs_extra_info.song_id,'left_outer')\
  .drop(songs_extra_info.song_id)\
  .withColumn('language',songs.language.cast(StringType()))

#Adjusting data types
members = members.withColumn('registrationDate', functions.unix_timestamp(members.registration_init_time.cast(StringType()), 'yyyyMMdd').cast('timestamp'))\
                               .withColumn('expirationDate', functions.unix_timestamp(members.expiration_date.cast(StringType()),'yyyyMMdd').cast('timestamp'))\
                               .withColumn('Age',members.bd)\
                               .drop('registration_init_time','expiration_date','bd')
                               

#Displaying the data
songs.show(5)
members.show(5)

+--------------------+-----------+---------+--------------+--------+--------+--------+--------------------+------------+
|             song_id|song_length|genre_ids|   artist_name|composer|lyricist|language|                name|        isrc|
+--------------------+-----------+---------+--------------+--------+--------+--------+--------------------+------------+
|++ThhAa6iO3bqSeLL...|     218592|      139|    Urban Hits|    null|    null|    52.0|          Area Codes|GBPS81514834|
|++XsB7MZl/8xl/d0/...|     452092|      947|  C'est La Vie| Honey B|    null|    -1.0|Le repos de la te...|TWC010601271|
|++bWxg1ih9h8vGuet...|     164675|     1138|  Quincy Jones|    null|    null|    -1.0| Boogie Stop Shuffle|QMDA61439475|
|++jROLqFoo1J68Q+l...|     186595|      465|Amanda Jenssen|    null|    null|    52.0|         Dry My Soul|SEBGA1200022|
|+/A0DjeA6RcVEUbk5...|     177120|     2022|       Karen O|    null|    null|    52.0|               Beast|QMKBG1400017|
+--------------------+----------

##Data Visualization

In [10]:
#display(songs.select('language').orderBy('language').groupBy('language').agg(functions.count('language')).dropna())
#display(songs.select('genre_ids').groupBy('genre_ids').agg(functions.count('genre_ids')))

#Model Building

___DataSet Preparation___

In [11]:
#combining data
logs = logs.join(members,members.msno == logs.msno, 'inner').drop(members.msno)
logs =logs.join(songs, logs.song_id == songs.song_id, 'inner').drop(songs.song_id)

#String Indexer for msno and Song_id
userIdStringIndexer = feature.StringIndexer().setInputCol('msno').setOutputCol('userId')
songIdStringIndexer = feature.StringIndexer().setInputCol('song_id').setOutputCol('songId')

#Data Cleaning PipeLines for StringIndexers
dcPipeline = Pipeline(stages = [userIdStringIndexer,songIdStringIndexer])

#Fit the pipeline and tranforming logsDF
logsDf = dcPipeline.fit(logs).transform(logs).drop('msno','song_id')

In [12]:
#logsDF = logsDf.drop('msno','song_id')
logsDf.show()

+-----------------+--------------------+-------------------+------+----+------+--------------+-------------------+-------------------+---+-----------+-----------+-------------------+---------------+---------------+--------+--------------------+------------+-------+--------+
|source_system_tab|  source_screen_name|        source_type|target|city|gender|registered_via|   registrationDate|     expirationDate|Age|song_length|  genre_ids|        artist_name|       composer|       lyricist|language|                name|        isrc| userId|  songId|
+-----------------+--------------------+-------------------+------+----+------+--------------+-------------------+-------------------+---+-----------+-----------+-------------------+---------------+---------------+--------+--------------------+------------+-------+--------+
|       my library| Local playlist more|      local-library|     0|   1|  null|             7|2016-01-08 00:00:00|2016-12-23 00:00:00|  0|     452092|        947|       C'est 

In [13]:
logs = logsDf.select().toPandas()

Py4JJavaError: An error occurred while calling o186.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 45.0 failed 1 times, most recent failure: Lost task 7.0 in stage 45.0 (TID 1068, localhost, executor driver): java.lang.OutOfMemoryError: GC overhead limit exceeded
	at java.lang.AbstractStringBuilder.<init>(Unknown Source)
	at java.lang.StringBuilder.<init>(Unknown Source)
	at java.io.ObjectInputStream$BlockDataInputStream.readUTFBody(Unknown Source)
	at java.io.ObjectInputStream$BlockDataInputStream.readUTF(Unknown Source)
	at java.io.ObjectInputStream.readString(Unknown Source)
	at java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.io.ObjectInputStream.readArray(Unknown Source)
	at java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
	at java.io.ObjectInputStream.readSerialData(Unknown Source)
	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
	at java.io.ObjectInputStream.readSerialData(Unknown Source)
	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.io.ObjectInputStream.readArray(Unknown Source)
	at java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
	at java.io.ObjectInputStream.readSerialData(Unknown Source)
	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
	at java.io.ObjectInputStream.readSerialData(Unknown Source)
	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
	at java.io.ObjectInputStream.readSerialData(Unknown Source)
	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
	at java.io.ObjectInputStream.readSerialData(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3263)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3260)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3260)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
	at java.lang.AbstractStringBuilder.<init>(Unknown Source)
	at java.lang.StringBuilder.<init>(Unknown Source)
	at java.io.ObjectInputStream$BlockDataInputStream.readUTFBody(Unknown Source)
	at java.io.ObjectInputStream$BlockDataInputStream.readUTF(Unknown Source)
	at java.io.ObjectInputStream.readString(Unknown Source)
	at java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.io.ObjectInputStream.readArray(Unknown Source)
	at java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
	at java.io.ObjectInputStream.readSerialData(Unknown Source)
	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
	at java.io.ObjectInputStream.readSerialData(Unknown Source)
	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.io.ObjectInputStream.readArray(Unknown Source)
	at java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
	at java.io.ObjectInputStream.readSerialData(Unknown Source)
	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
	at java.io.ObjectInputStream.readSerialData(Unknown Source)
	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
	at java.io.ObjectInputStream.readSerialData(Unknown Source)
	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
	at java.io.ObjectInputStream.readSerialData(Unknown Source)


## Visualization

In [35]:
source_system_tab = logsDf.select('source_system_tab').groupby('source_system_tab').count()
target = logsDf.select('target').groupby('target').toPandas()
target.show()

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:59728)
Traceback (most recent call last):
  File "C:\Users\vidus\Anaconda3\lib\site-packages\py4j\java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\vidus\Anaconda3\lib\site-packages\py4j\java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:59728)

##Logistic Regression

In [None]:
encodeCol = ['source_system_tab','source_screen_name','source_type','city','gender','registered_via','language','genre_ids']

#String Indexers for all categorical variables
pipeline_stages=[]
for attr in encodeCol:
  si = feature.StringIndexer().setInputCol(attr).setOutputCol(attr+'_vec').setHandleInvalid('skip')
  pipeline_stages.append(si)

#One hot encoding of above String indexers
ohe = feature.OneHotEncoderEstimator().setInputCols([attr+'_vec' for attr in encodeCol]).setOutputCols([attr+'_indexed' for attr in encodeCol])

#feature assembler
features_indexed = [attr+'_indexed' for attr in encodeCol]
features_indexed.extend(['Age','song_length'])
feature_assembler = feature.VectorAssembler().setInputCols(features_indexed).setOutputCol('features')

#Appending one hot encoder and feature assembler pipeline stages
pipeline_stages.append(ohe)
pipeline_stages.append(feature_assembler)


In [None]:
Logit = LogisticRegression().setFeaturesCol('features').setLabelCol('target')

In [None]:
pipeline_stages.append(Logit)
Encode_pipeline = Pipeline().setStages(pipeline_stages)

In [None]:
fit = Encode_pipeline.fit(logsDf)

In [None]:
fit.stages[-1].summary.areaUnderROC