In [1]:
import pyspark
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

import os

In [2]:
# Create Spark Session
spark = SparkSession.builder.appName("Training").getOrCreate()

# Define data path
BASE_PATH = os.getcwd()
DATA_PATH = os.path.join(BASE_PATH, 'data')
TRAINING_DATASET_PATH = os.path.join(DATA_PATH, 'training')
MODEL_PATH = os.path.join(BASE_PATH, 'model')

In [3]:
def get_pipeline(data: pyspark.sql.DataFrame):
    # Creates an ML Pipeline and appropriate Param Map for Cross Validation 
    
    
    #Index labels, adding metadata to the label column.
    # Fit on whole dataset to include all labels in index.
    labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

    # Automatically identify categorical features, and index them.
    # Set maxCategories so features with > 4 distinct values are treated as continuous.
    featureIndexer =\
        VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

    # Train a RandomForest model.
    rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

    # Convert indexed labels back to original labels.
    labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                                   labels=labelIndexer.labels)
    
    paramMap = ParamGridBuilder() \
        .addGrid(rf.numTrees, [5, 10, 15, 20]) \
        .build()
    
    return (Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter]), paramMap)

In [4]:
def get_model(pipeline: pyspark.ml.Pipeline, 
              paramGrid, evaluator, 
              n_folds: int, 
              dataset: pyspark.sql.DataFrame, 
              model_path):
    # Creates a cross validation evaluator according to defined pipeline and param maps.
    # Finds the best model according to the cross validation results
    # Persists the model to the given model path
    model = CrossValidator(estimator=pipeline,
                           estimatorParamMaps=paramGrid,
                           evaluator=evaluator,
                           numFolds=n_folds)
    cv_model = model.fit(dataset)
    model = cv_model.bestModel
    evaluation = list(zip(cv_model.avgMetrics, paramGrid))
    model.write().overwrite().save(model_path)
    return model, evaluation

In [13]:
def training(training_data_path: str, model_save_path: str):
    # Given training data path and model path
    # Produces and persists an ML Model based on training data and defined configuration
    
    # Load training dataset
    training_df = spark.read.parquet(TRAINING_DATASET_PATH)
    
    training_df.printSchema()
    print(training_df.head(1))
    
    # Get pipeline configuration
    pipeline, paramMap = get_pipeline(training_df)
    evaluator = MulticlassClassificationEvaluator(
        labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
    num_folds = 2
    
    # Create and persist ML Model based on training data and pipeline configuration
    model, evaluations = get_model(pipeline=pipeline,
                                   paramGrid=paramMap,
                                   evaluator=evaluator,
                                   n_folds=num_folds,
                                   dataset=training_df,
                                   model_path=os.path.join(BASE_PATH, 'model'))
    return model, evaluations

In [14]:
model, evaluations = training(training_data_path=TRAINING_DATASET_PATH,
                              model_save_path=MODEL_PATH)

rf = model.stages[2]
print(f'Best Model: \n'\
      f'Random Forest model with {rf.getNumTrees} trees.\n'\
      f'Saved to {MODEL_PATH}\n\n'\
      'Cross Validated models and average results:')

for evaluation, params in evaluations:
    print(f'Model: Random Forest model with {list(params.values())[0]} trees\t\tEvaluation: {evaluation}')


root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)

[Row(label=0.0, features=SparseVector(692, {98: 70.0, 99: 255.0, 100: 165.0, 101: 114.0, 126: 122.0, 127: 253.0, 128: 253.0, 129: 253.0, 130: 120.0, 154: 165.0, 155: 253.0, 156: 253.0, 157: 253.0, 158: 234.0, 159: 52.0, 182: 99.0, 183: 253.0, 184: 253.0, 185: 253.0, 186: 253.0, 187: 228.0, 188: 26.0, 208: 60.0, 209: 168.0, 210: 238.0, 211: 202.0, 212: 174.0, 213: 253.0, 214: 253.0, 215: 253.0, 216: 127.0, 234: 91.0, 235: 81.0, 236: 1.0, 237: 215.0, 238: 128.0, 239: 28.0, 240: 12.0, 241: 181.0, 242: 253.0, 243: 253.0, 244: 175.0, 245: 3.0, 261: 18.0, 262: 204.0, 263: 253.0, 264: 77.0, 269: 7.0, 270: 253.0, 271: 253.0, 272: 253.0, 273: 54.0, 288: 54.0, 289: 248.0, 290: 253.0, 291: 253.0, 292: 143.0, 297: 1.0, 298: 127.0, 299: 253.0, 300: 253.0, 301: 188.0, 316: 104.0, 317: 253.0, 318: 253.0, 319: 253.0, 320: 20.0, 326: 81.0, 327: 249.0, 328: 253.0, 329: 191.0, 344: 192.0, 345: 253.0, 346: 253.0, 347: 218.0