In [1]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.types import DoubleType, ArrayType
from pyspark.sql.functions import udf, col
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel

import json
import numpy as np

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
3,application_1638923320008_0004,pyspark,idle,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Lectura de datos

Se leen los dataframes de la zona trusted del datalake

In [2]:
df = sqlContext.read.load("s3://musicgenredatalake/trusted/datasets/df_train.csv", 
                                  format='csv', 
                                  header='true',
                                  inferSchema='true')
df_test = sqlContext.read.load("s3://musicgenredatalake/trusted/datasets/df_test.csv", 
                                  format='csv', 
                                  header='true',
                                  inferSchema='true')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
assembler = VectorAssembler(inputCols=[str(c) for c in df.columns if c != 'label'], outputCol='features')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
df = assembler.transform(df)[['features', 'label']]
df_test = assembler.transform(df_test)[['features', 'label']]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
df.show(3)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.18909223992693...|    1|
|[0.55899497746684...|    8|
|[0.38413610137891...|    7|
+--------------------+-----+
only showing top 3 rows

In [5]:
evaluator = MulticlassClassificationEvaluator()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
df = df[['features', 'label']]
df_test = df_test[['features', 'label']]

rf = RandomForestClassifier(maxDepth=10, numTrees=500)
model = rf.fit(df)
df = model.transform(df)
df_test = model.transform(df_test)

print(evaluator.evaluate(df))
print(evaluator.evaluate(df_test))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0.962754162218922
0.8113877639107623

In [25]:
rf = RandomForestClassifier()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Construcción del *ParamGrid* y entrenamiento

In [26]:
df = df[['features', 'label']]
df_test = df_test[['features', 'label']]

paramGrid = ParamGridBuilder() \
    .addGrid(rf.maxDepth, [10, 15, 20, 30, 40 , 50]) \
    .addGrid(rf.numTrees, [50, 100, 500, 1000, 10000]) \
    .build()

crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=3)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
gs = crossval.fit(df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Evaluación de resultados

In [27]:
df = df[['features', 'label']]
df_test = df_test[['features', 'label']]

df = gs.transform(df)
df_test = gs.transform(df_test)

print(evaluator.evaluate(df))
print(evaluator.evaluate(df_test))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0.9991865466779369
0.8523901421503776

In [28]:
gs.getEstimatorParamMaps()[np.argmax(gs.avgMetrics)]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{Param(parent='RandomForestClassifier_3269e2bb68d2', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 20, Param(parent='RandomForestClassifier_3269e2bb68d2', name='numTrees', doc='Number of trees to train (>= 1).'): 50}

In [29]:
gs.avgMetrics

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[0.7796302291083838, 0.817405766174061, 0.8192138659077628]

# Persistencia del modelo

In [31]:
gs.save('s3://musicgenredatalake/refined/models/gs_20_50')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…