In [1]:
%%configure -f 
{
"conf":{
        "spark.pyspark.python": "python3",
        "spark.pyspark.virtualenv.enabled": "true",
        "spark.pyspark.virtualenv.type":"native",
        "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv",
    
        "spark.executor.heartbeatInterval":"10800s",
        "spark.network.timeout":"24h",
    
        "spark.driver.memory": "8G",
        "spark.executor.memory": "8G",
        "spark.executor.cores":"4",
    
        "livy.server.session.timeout":"24h",
    
        "spark.dynamicAllocation.enabled":"false",
        "spark.ext.h2o.fail.on.unsupported.spark.param":"false", 
    
        "spark.app.name": "msds694-group7"
      }
}

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder
import json

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql.functions import *

ss = SparkSession.builder.getOrCreate()
sc = ss.sparkContext

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
21,application_1615435774710_0012,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
%%info


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
21,application_1615435774710_0012,pyspark,idle,Link,Link,✔


In [4]:
train_path = "s3://msds694-final-group7/lastfm_train_for_genre_clean/All_with_genre/"
#test_path = "s3://msds694-final-group7/lastfm_test_for_genre_clean/All_with_genre/"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
lastfm_train_genre = ss.read.parquet(train_path).cache()
#lastfm_test_genre = ss.read.parquet(test_path).cache()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
lastfm_train_genre.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- Track_ID: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Artist: string (nullable = true)
 |-- Tag_1: string (nullable = true)
 |-- Tag_2: string (nullable = true)
 |-- Tag_3: string (nullable = true)
 |-- Similar_1: string (nullable = true)
 |-- Similar_2: string (nullable = true)
 |-- Similar_3: string (nullable = true)
 |-- Similar_4: string (nullable = true)
 |-- Similar_5: string (nullable = true)
 |-- Similar_6: string (nullable = true)
 |-- Similar_7: string (nullable = true)
 |-- Similar_8: string (nullable = true)
 |-- Similar_9: string (nullable = true)
 |-- Similar_10: string (nullable = true)
 |-- genre: string (nullable = true)

In [7]:
def indexStringColumns(df, cols):
    """
    cols --> columns to convert from string to numeric values
    """
    #variable newdf will be updated several times
    newdf = df
    
    for c in cols:
        #For each given colum, fits StringIndexerModel.
        si = StringIndexer(inputCol=c, outputCol=c+"-num")
        sm = si.fit(newdf)
        #Creates a DataFame by putting the transformed values in the new colum with suffix "-num" 
        #and then drops the original columns.
        #and drop the "-num" suffix. 
        newdf = sm.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-num", c)
    return newdf

dfnumeric = indexStringColumns(lastfm_train_genre,
                                         ["Title", "Artist", "Tag_1", "Tag_2", "Tag_3", 
                                                   "Similar_1", "Similar_2", "Similar_3", "Similar_4", 
                                                   "Similar_5", "Similar_6", "Similar_7", "Similar_8", 
                                                   "Similar_9", "Similar_10", "genre"])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
def oneHotEncodeColumns(df, cols):
    newdf = df
    for c in cols:
        ohe = OneHotEncoder(inputCol=c, outputCol=c+"-onehot", dropLast=False)

        newdf = ohe.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-onehot", c)
    return newdf

dfhot = oneHotEncodeColumns(dfnumeric,
                                              ["Title", "Artist", "Tag_1", "Tag_2", "Tag_3", 
                                                   "Similar_1", "Similar_2", "Similar_3", "Similar_4", 
                                                   "Similar_5", "Similar_6", "Similar_7", "Similar_8", 
                                                   "Similar_9", "Similar_10"]) 

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
input_cols = ["Title", "Artist", "Tag_1", "Tag_2", "Tag_3", 
              "Similar_1", "Similar_2", "Similar_3", "Similar_4", 
              "Similar_5", "Similar_6", "Similar_7", "Similar_8", 
              "Similar_9", "Similar_10"]
va = VectorAssembler(outputCol="features", inputCols=input_cols)
#lpoints - labeled data.
lpoints = va.transform(dfhot).select("features", "genre").withColumnRenamed("genre", "label")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
#Divide the dataset into training and vaildation sets.
splits = lpoints.randomSplit([0.7, 0.3])

#cache() : the algorithm is interative and training and data sets are going to be reused many times.
last_fm_genre_train = splits[0].cache()
last_fm_genre_test = splits[1].cache()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
lr = LogisticRegression(regParam=0.3, maxIter=100, fitIntercept=True)
lrmodel = lr.fit(last_fm_genre_train)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
test_predicts = lrmodel.transform(last_fm_genre_test)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator().setMetricName("f1")
evaluator.evaluate(test_predicts)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0.5931902436506449

It is picking only about 60% of the genres correctly given tags and similarities. Therefore, its not a great feature space to predict genre. 