# Classification in PySpark's MLlib 

### Music Genre classification
Try to Classify songs based on a number of characteristics into a set of 23 electronic genres. This technology could be used by an application like Pandora to recommend songs to users or just create meaningful channels. Super fun!

### Dataset
*beatsdataset.csv*
Each row is an electronic music song. The dataset contains 100 song for each genre among 23 electronic music genres, they were the top (100) songs of their genres on November 2016. The 71 columns are audio features extracted of a two random minutes sample of the file audio. These features have been extracted using pyAudioAnalysis (https://github.com/tyiannak/pyAudioAnalysis).


### Data Exploration and Pre-Processing
This fase will explore and load the data, then prepare and analyze it for be used in the MLLib, the steps will be:
- Load the .csv file
- Check the Dataframe to see if there is: Negative Numbers, How Many Classes

In [51]:
# Create PySpark Instance

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ClassProject").getOrCreate()
spark


In [5]:
# Import the data
audio =  spark.read.csv("Datasets/beatsdataset.csv", inferSchema=True, header=True)

In [6]:
# Show the first 5 rows
audio.limit(5).toPandas()

Unnamed: 0,_c0,1-ZCRm,2-Energym,3-EnergyEntropym,4-SpectralCentroidm,5-SpectralSpreadm,6-SpectralEntropym,7-SpectralFluxm,8-SpectralRolloffm,9-MFCCs1m,...,63-ChromaVector8std,64-ChromaVector9std,65-ChromaVector10std,66-ChromaVector11std,67-ChromaVector12std,68-ChromaDeviationstd,69-BPM,70-BPMconf,71-BPMessentia,class
0,0,0.13644,0.088861,3.201201,0.262825,0.249212,1.114423,0.007003,0.256682,-22.723259,...,0.003431,0.004981,0.010818,0.024001,0.005201,0.015056,133.333333,0.132792,128.0,BigRoom
1,1,0.117039,0.108389,3.194001,0.247657,0.250288,1.065668,0.005387,0.199821,-21.775871,...,0.004461,0.006441,0.007469,0.015499,0.005589,0.019339,120.0,0.112767,126.0,BigRoom
2,2,0.085308,0.128525,3.123837,0.217205,0.228652,0.789647,0.008247,0.156822,-22.472722,...,0.001529,0.004556,0.007723,0.017482,0.002901,0.022201,133.333333,0.123373,129.0,BigRoom
3,3,0.10305,0.167042,3.15083,0.233593,0.245032,0.967082,0.006571,0.168083,-21.470751,...,0.001591,0.003514,0.009477,0.023162,0.004165,0.015379,133.333333,0.158876,129.0,BigRoom
4,4,0.15173,0.148405,3.194498,0.29373,0.267231,1.353005,0.003872,0.292055,-21.371157,...,0.003945,0.004131,0.01133,0.028188,0.002639,0.019079,133.333333,0.190708,129.0,BigRoom


In [10]:
# Show the total number rows
audio.count()

2300

In [7]:
# Print the Schema from the dataset
audio.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- 1-ZCRm: double (nullable = true)
 |-- 2-Energym: double (nullable = true)
 |-- 3-EnergyEntropym: double (nullable = true)
 |-- 4-SpectralCentroidm: double (nullable = true)
 |-- 5-SpectralSpreadm: double (nullable = true)
 |-- 6-SpectralEntropym: double (nullable = true)
 |-- 7-SpectralFluxm: double (nullable = true)
 |-- 8-SpectralRolloffm: double (nullable = true)
 |-- 9-MFCCs1m: double (nullable = true)
 |-- 10-MFCCs2m: double (nullable = true)
 |-- 11-MFCCs3m: double (nullable = true)
 |-- 12-MFCCs4m: double (nullable = true)
 |-- 13-MFCCs5m: double (nullable = true)
 |-- 14-MFCCs6m: double (nullable = true)
 |-- 15-MFCCs7m: double (nullable = true)
 |-- 16-MFCCs8m: double (nullable = true)
 |-- 17-MFCCs9m: double (nullable = true)
 |-- 18-MFCCs10m: double (nullable = true)
 |-- 19-MFCCs11m: double (nullable = true)
 |-- 20-MFCCs12m: double (nullable = true)
 |-- 21-MFCCs13m: double (nullable = true)
 |-- 22-ChromaVector1m: double (null

In [9]:
# Print the number of classes from the dataset
audio.groupBy("class").count().show()

+--------------------+-----+
|               class|count|
+--------------------+-----+
|           PsyTrance|  100|
|           HardDance|  100|
|              Breaks|  100|
|  HardcoreHardTechno|  100|
|   IndieDanceNuDisco|  100|
|              Trance|  100|
|           DeepHouse|  100|
|ElectronicaDowntempo|  100|
|           ReggaeDub|  100|
|             Minimal|  100|
|         DrumAndBass|  100|
|             Dubstep|  100|
|             BigRoom|  100|
|              Techno|  100|
|               House|  100|
|         FutureHouse|  100|
|        ElectroHouse|  100|
|           GlitchHop|  100|
|           TechHouse|  100|
|              HipHop|  100|
+--------------------+-----+
only showing top 20 rows



In [15]:
# Import the ML Functions
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import * 
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import MinMaxScaler

In [13]:
# Separate the datafrae in two input and the class
input_columns = audio.columns[1:-1]
dependent = "class"

In [17]:
# Pyspark MLLib needs to the Classes be in the number format, so this will change fro string to Number
indexer = StringIndexer(inputCol="class", outputCol="label") #Pyspark is expecting the this naming convention 
indexed = indexer.fit(audio).transform(audio)

#Show the  dataframe with new column "label" added
indexed.limit(5).toPandas()

Unnamed: 0,_c0,1-ZCRm,2-Energym,3-EnergyEntropym,4-SpectralCentroidm,5-SpectralSpreadm,6-SpectralEntropym,7-SpectralFluxm,8-SpectralRolloffm,9-MFCCs1m,...,64-ChromaVector9std,65-ChromaVector10std,66-ChromaVector11std,67-ChromaVector12std,68-ChromaDeviationstd,69-BPM,70-BPMconf,71-BPMessentia,class,label
0,0,0.13644,0.088861,3.201201,0.262825,0.249212,1.114423,0.007003,0.256682,-22.723259,...,0.004981,0.010818,0.024001,0.005201,0.015056,133.333333,0.132792,128.0,BigRoom,0.0
1,1,0.117039,0.108389,3.194001,0.247657,0.250288,1.065668,0.005387,0.199821,-21.775871,...,0.006441,0.007469,0.015499,0.005589,0.019339,120.0,0.112767,126.0,BigRoom,0.0
2,2,0.085308,0.128525,3.123837,0.217205,0.228652,0.789647,0.008247,0.156822,-22.472722,...,0.004556,0.007723,0.017482,0.002901,0.022201,133.333333,0.123373,129.0,BigRoom,0.0
3,3,0.10305,0.167042,3.15083,0.233593,0.245032,0.967082,0.006571,0.168083,-21.470751,...,0.003514,0.009477,0.023162,0.004165,0.015379,133.333333,0.158876,129.0,BigRoom,0.0
4,4,0.15173,0.148405,3.194498,0.29373,0.267231,1.353005,0.003872,0.292055,-21.371157,...,0.004131,0.01133,0.028188,0.002639,0.019079,133.333333,0.190708,129.0,BigRoom,0.0


In [22]:
#show echa new labble created bby the indexer for each class
indexed.groupBy("class", "label").count().orderBy("label").show()

+--------------------+-----+-----+
|               class|label|count|
+--------------------+-----+-----+
|             BigRoom|  0.0|  100|
|              Breaks|  1.0|  100|
|               Dance|  2.0|  100|
|           DeepHouse|  3.0|  100|
|         DrumAndBass|  4.0|  100|
|             Dubstep|  5.0|  100|
|        ElectroHouse|  6.0|  100|
|ElectronicaDowntempo|  7.0|  100|
|           FunkRAndB|  8.0|  100|
|         FutureHouse|  9.0|  100|
|           GlitchHop| 10.0|  100|
|           HardDance| 11.0|  100|
|  HardcoreHardTechno| 12.0|  100|
|              HipHop| 13.0|  100|
|               House| 14.0|  100|
|   IndieDanceNuDisco| 15.0|  100|
|             Minimal| 16.0|  100|
|    ProgressiveHouse| 17.0|  100|
|           PsyTrance| 18.0|  100|
|           ReggaeDub| 19.0|  100|
+--------------------+-----+-----+
only showing top 20 rows



In [38]:
# check if thete is any StringType on the input_columns

str_col = []
num_col = []

for column in input_columns:
    if str(indexed.schema[column].dataType) == 'StringType':
        str_col.append(columns)
    else:
        num_col.append(columns)

if len(str_col) == 0:
    print("No Input Variable with String Type")
else:
    print("There ate {} Variables with String Type".format(len(str_col)))

No Input Variable with String Type


In [39]:
# check if there is any negative number in the dataframe, some Models cant work with negative type
minimums = audio.select([min(c).alias(c) for c in audio.columns if c in num_col]).collect() 

if minimums[0][0] > 0:
    print("No Negative Number in the Dataframe")
else:
    print("There is Negative Number in the Dataframe Please Threat")


No Negative Number in the Dataframe


In [40]:
# we need to transofmr all the data inputs in vector to use in the PySpark Classifier

# Create your vector assembler object
assembler = VectorAssembler(inputCols=input_columns,outputCol='features')

# And call on the vector assembler to transform your dataframe
output = assembler.transform(indexed).select('features','label')

In [41]:
# show the output Dataframe
output.toPandas()

Unnamed: 0,features,label
0,"[0.136439587512, 0.0888612604609, 3.2012005559...",0.0
1,"[0.117038518483, 0.108389033282, 3.19400106287...",0.0
2,"[0.0853077737447, 0.128525418596, 3.1238373468...",0.0
3,"[0.103049917216, 0.167041735198, 3.15083006899...",0.0
4,"[0.151729948738, 0.148404713864, 3.19449794602...",0.0
...,...,...
2295,"[0.123267225459, 0.0570996688715, 3.2089518318...",22.0
2296,"[0.142275627249, 0.0929513098533, 3.1156074636...",22.0
2297,"[0.125568603337, 0.0301962736057, 3.1929216927...",22.0
2298,"[0.118206231891, 0.108736687701, 3.14955719792...",22.0


In [42]:
# Create a MinMax Scaler fot the DataFrame
# Try StandardScaler

scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures",min=0,max=1000)
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))

# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(output)

# rescale each feature to range [min, max].
scaled_data = scalerModel.transform(output)
final_data = scaled_data.select('label','scaledFeatures')
final_data = final_data.withColumnRenamed('scaledFeatures','features')
final_data.toPandas()

Features scaled to range: [0.000000, 1000.000000]


Unnamed: 0,label,features
0,0.0,"[519.8182667002392, 303.38998025826874, 895.77..."
1,0.0,"[435.29546399256594, 373.99300906442693, 881.6..."
2,0.0,"[297.0571291217422, 446.79648775095126, 743.49..."
3,0.0,"[374.3526477343687, 586.0529526571695, 796.628..."
4,0.0,"[586.4323374662597, 518.6704525553854, 882.580..."
...,...,...
2295,22.0,"[462.4314826351166, 188.5553502951001, 911.030..."
2296,22.0,"[545.2435891166101, 318.17763053030626, 727.29..."
2297,22.0,"[472.4576779982298, 91.28562023748975, 879.477..."
2298,22.0,"[440.3827305682355, 375.2499601352999, 794.122..."


In [43]:
# Create a train and test dataset from the final_data with 70/30 propostion
train,test = final_data.randomSplit([0.7,0.3],seed=40)

### Machine Learning Model Training and Test

This fase will be responsible for training and test in the Data after all the preparation, and the steps will be:
- Prepare the MLFlows to record the experiments
- Fit the Algoritms that work with MultiClasses
- Test the Aloritms
- Present the results

In [44]:
# Read in dependencies
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.sql.functions import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import warnings

# Mlflow libaries
import mlflow
from mlflow import spark

In [46]:
# Set experiment
# This will actually automatically create one if the one you call on doesn't exist
mlflow.set_experiment(experiment_name = "Experiment-1")

# set up your client
from  mlflow.tracking import MlflowClient
client = MlflowClient()

In [47]:
# Create a run and attach it to the experiment you just created
experiments = client.list_experiments() # returns a list of mlflow.entities.Experiment

experiment_name = "Experiment-1"
def create_run(experiment_name):
    mlflow.set_experiment(experiment_name = experiment_name)
    for x in experiments:
        if experiment_name in x.name:
#             print(experiment_name)
#             print(x)
            experiment_index = experiments.index(x)
            run = client.create_run(experiments[experiment_index].experiment_id) # returns mlflow.entities.Run
#             print(run)
            return run

In [60]:
# Create a Class for Train and Teste a List for Classifier for the train and test data
def ClassTrainEval(classifier,features,classes,train,test):

    # log the experiment
    run = create_run(experiment_name)
    
    def FindMtype(classifier):
        # Intstantiate Model
        M = classifier
        # Learn what it is
        Mtype = type(M).__name__
        
        return Mtype
    
    Mtype = FindMtype(classifier)  # Get the Classifier Nae
    

    def IntanceFitModel(Mtype,classifier,classes,features,train):
        
        if Mtype == "OneVsRest":
            # instantiate the base classifier.
            lr = LogisticRegression()
            # instantiate the One Vs Rest Classifier.
            OVRclassifier = OneVsRest(classifier=lr)
#             fitModel = OVRclassifier.fit(train)
            # Add parameters of your choice here:
            paramGrid = ParamGridBuilder() \
                .addGrid(lr.regParam, [0.1, 0.01]) \
                .build()
            #Cross Validator requires the following parameters:
            crossval = CrossValidator(estimator=OVRclassifier,
                                      estimatorParamMaps=paramGrid,
                                      evaluator=MulticlassClassificationEvaluator(),
                                      numFolds=2) # 3 is best practice
            # Run cross-validation, and choose the best set of parameters.
            fitModel = crossval.fit(train)
            return fitModel
        if Mtype == "MultilayerPerceptronClassifier":
            # specify layers for the neural network:
            # input layer of size features, two intermediate of features+1 and same size as features
            # and output of size number of classes
            # Note: crossvalidator cannot be used here
            features_count = len(features[0][0])
            layers = [features_count, features_count+1, features_count, classes]
            MPC_classifier = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)
            fitModel = MPC_classifier.fit(train)
            return fitModel
        if Mtype in("LinearSVC","GBTClassifier") and classes != 2: # These classifiers currently only accept binary classification
            print(Mtype," could not be used because PySpark currently only accepts binary classification data for this algorithm")
            return
        if Mtype in("LogisticRegression","NaiveBayes","RandomForestClassifier","GBTClassifier","LinearSVC","DecisionTreeClassifier"):
  
            # Add parameters of your choice here:
            if Mtype in("LogisticRegression"):
                paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.regParam, [0.1, 0.01]) \
                             .addGrid(classifier.maxIter, [10, 15,20])
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("NaiveBayes"):
                paramGrid = (ParamGridBuilder() \
                             .addGrid(classifier.smoothing, [0.0, 0.2, 0.4, 0.6]) \
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("RandomForestClassifier"):
                paramGrid = (ParamGridBuilder() \
                               .addGrid(classifier.maxDepth, [2, 5, 10])
#                                .addGrid(classifier.maxBins, [5, 10, 20])
#                                .addGrid(classifier.numTrees, [5, 20, 50])
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("GBTClassifier"):
                paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.maxDepth, [2, 5, 10, 20, 30]) \
#                              .addGrid(classifier.maxBins, [10, 20, 40, 80, 100]) \
                             .addGrid(classifier.maxIter, [10, 15,50,100])
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("LinearSVC"):
                paramGrid = (ParamGridBuilder() \
                             .addGrid(classifier.maxIter, [10, 15]) \
                             .addGrid(classifier.regParam, [0.1, 0.01]) \
                             .build())
            
            # Add parameters of your choice here:
            if Mtype in("DecisionTreeClassifier"):
                paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.maxDepth, [2, 5, 10, 20, 30]) \
                             .addGrid(classifier.maxBins, [10, 20, 40, 80, 100]) \
                             .build())
            
            #Cross Validator requires all of the following parameters:
            crossval = CrossValidator(estimator=classifier,
                                      estimatorParamMaps=paramGrid,
                                      evaluator=MulticlassClassificationEvaluator(),
                                      numFolds=2) # 3 + is best practice
            # Fit Model: Run cross-validation, and choose the best set of parameters.
            fitModel = crossval.fit(train)
            return fitModel
    
    fitModel = IntanceFitModel(Mtype,classifier,classes,features,train)
    
 
   
    # Set the column names to match the external results dataframe that we will join with later:
    columns = ['Classifier', 'Result']
    
    if Mtype in("LinearSVC","GBTClassifier") and classes != 2:
        Mtype = [Mtype] # make this a list
        score = ["N/A"]
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
    else:
        predictions = fitModel.transform(test)
        MC_evaluator = MulticlassClassificationEvaluator(metricName="accuracy") # redictionCol="prediction",
        accuracy = (MC_evaluator.evaluate(predictions))*100
        Mtype = [Mtype] # make this a string
        score = [str(accuracy)] #make this a string and convert to a list
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
        result = result.withColumn('Result',result.Result.substr(0, 5))
    
    return result
    #Also returns the fit model important scores or p values
    

In [61]:
# Run!
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.sql import functions
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Comment out Naive Bayes if your data still contains negative values
classifiers = [ LogisticRegression()
                ,OneVsRest()
               ,LinearSVC()
               ,NaiveBayes()
               ,RandomForestClassifier()
               ,GBTClassifier()
               ,DecisionTreeClassifier()
               ,MultilayerPerceptronClassifier()]
              
features = final_data.select(['features']).collect()
# Learn how many classes there are in order to specify evaluation type based on binary or multi and turn the df into an object
class_count = final_data.select(countDistinct("label")).collect()
classes = class_count[0][0]

#set up your results table
columns = ['Classifier', 'Result']
vals = [("Place Holder","N/A")]
results = spark.createDataFrame(vals, columns)

for classifier in classifiers:
    new_result = ClassTrainEval(classifier,features,classes,train,test)
    results = results.union(new_result)
results = results.where("Classifier!='Place Holder'")
results.show(100,False)

LinearSVC  could not be used because PySpark currently only accepts binary classification data for this algorithm
GBTClassifier  could not be used because PySpark currently only accepts binary classification data for this algorithm
+------------------------------+------+
|Classifier                    |Result|
+------------------------------+------+
|LogisticRegression            |40.98 |
|OneVsRest                     |44.11 |
|LinearSVC                     |N/A   |
|NaiveBayes                    |37.25 |
|RandomForestClassifier        |47.09 |
|GBTClassifier                 |N/A   |
|DecisionTreeClassifier        |34.27 |
|MultilayerPerceptronClassifier|18.47 |
+------------------------------+------+

