In [327]:
import os

In [328]:
spark = SparkSession.builder.master("local[2]").appName("Linear-Regression-California-Housing").getOrCreate()

In [329]:
path_mush = os.path.join(os.path.abspath(''), 'mushrooms.csv') 

In [330]:
df_mush = spark.read.csv(path_mush,inferSchema=True, header=True)

In [331]:
df_mush.show(5)

+-----+---------+-----------+---------+-------+----+---------------+------------+---------+----------+-----------+----------+------------------------+------------------------+----------------------+----------------------+---------+----------+-----------+---------+-----------------+----------+-------+
|class|cap-shape|cap-surface|cap-color|bruises|odor|gill-attachment|gill-spacing|gill-size|gill-color|stalk-shape|stalk-root|stalk-surface-above-ring|stalk-surface-below-ring|stalk-color-above-ring|stalk-color-below-ring|veil-type|veil-color|ring-number|ring-type|spore-print-color|population|habitat|
+-----+---------+-----------+---------+-------+----+---------------+------------+---------+----------+-----------+----------+------------------------+------------------------+----------------------+----------------------+---------+----------+-----------+---------+-----------------+----------+-------+
|    p|        x|          s|        n|      t|   p|              f|           c|        n|   

In [332]:
from pyspark.ml.feature import StringIndexer

categorical_columns = df_mush.columns

for column in categorical_columns:
    
    stringIndexer = StringIndexer(inputCol = column, outputCol = column+"_encoded").fit(df_mush)
    df_mush = stringIndexer.transform(df_mush)
    
    df_mush = df_mush.withColumn(column+"_encoded", df_mush[column+"_encoded"].cast('int'))
    
df_mush =  df_mush.select([col + '_encoded' for col in categorical_columns])

In [333]:
df_mush.show(5)

+-------------+-----------------+-------------------+-----------------+---------------+------------+-----------------------+--------------------+-----------------+------------------+-------------------+------------------+--------------------------------+--------------------------------+------------------------------+------------------------------+-----------------+------------------+-------------------+-----------------+-------------------------+------------------+---------------+
|class_encoded|cap-shape_encoded|cap-surface_encoded|cap-color_encoded|bruises_encoded|odor_encoded|gill-attachment_encoded|gill-spacing_encoded|gill-size_encoded|gill-color_encoded|stalk-shape_encoded|stalk-root_encoded|stalk-surface-above-ring_encoded|stalk-surface-below-ring_encoded|stalk-color-above-ring_encoded|stalk-color-below-ring_encoded|veil-type_encoded|veil-color_encoded|ring-number_encoded|ring-type_encoded|spore-print-color_encoded|population_encoded|habitat_encoded|
+-------------+-------------

In [334]:
df_col = df_mush.columns
df_col.remove('class_encoded')

In [335]:
from pyspark.ml.feature import VectorAssembler

featureAssembler = VectorAssembler(inputCols=df_col, outputCol="features")

output = featureAssembler.transform(df_mush)

In [336]:
output = output.select("features", "class_encoded")

In [353]:
train, test = output.randomSplit([0.8, 0.2], seed=42)

In [338]:
train.count()

6471

In [339]:
# from pyspark.ml.classification import DecisionTreeClassifier

# dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'class_encoded', maxDepth = 3)
# dtModel = dt.fit(train)

In [340]:
# predictions = dtModel.transform(test)

In [341]:
# from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# # Create both evaluators
# evaluatorMulti = MulticlassClassificationEvaluator(labelCol="class_encoded", predictionCol="prediction")
# evaluator = BinaryClassificationEvaluator(labelCol="class_encoded", rawPredictionCol="prediction", metricName='areaUnderROC')

# # Make predicitons
# predictionAndTarget = dtModel.transform(test).select("class_encoded", "prediction")

# # Get metrics
# acc = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "accuracy"})
# f1 = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "f1"})
# weightedPrecision = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "weightedPrecision"})
# weightedRecall = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "weightedRecall"})
# auc = evaluator.evaluate(predictionAndTarget)

In [342]:
# print('Accuracy         : ' + str(acc))
# print('F1 score         : ' + str(f1))
# print('Precision        : ' + str(weightedPrecision))
# print('Recall           : ' + str(weightedRecall))
# print('Area Under ROC   : ' + str(auc))

In [343]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator


def train_test_model(model, train, test) :
    
    model_train = model.fit(train)
    
    # predictions = model_train.transform(test)

    # Create both evaluators
    evaluatorMulti      = MulticlassClassificationEvaluator(labelCol="class_encoded", predictionCol="prediction")
    evaluator           = BinaryClassificationEvaluator(labelCol="class_encoded", rawPredictionCol="prediction", metricName='areaUnderROC')

    # Make predicitons
    predictionAndTarget = dtModel.transform(test).select("class_encoded", "prediction")

    # Get metrics
    acc                 = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "accuracy"})
    f1                  = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "f1"})
    weightedPrecision   = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "weightedPrecision"})
    weightedRecall      = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "weightedRecall"})
    auc                 = evaluator.evaluate(predictionAndTarget)
    
    
    print('Accuracy         : ' + str(acc))
    print('F1 score         : ' + str(f1))
    print('Precision        : ' + str(weightedPrecision))
    print('Recall           : ' + str(weightedRecall))
    print('Area Under ROC   : ' + str(auc))

In [347]:
from pyspark.ml.classification import DecisionTreeClassifier

dt_mod = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'class_encoded', maxDepth = 3)

train_test_model(dt_mod, train, test)

Accuracy         : 0.9832946635730858
F1 score         : 0.9832917154116472
Precision        : 0.9833257285904247
Recall           : 0.9832946635730859
Area Under ROC   : 0.9831149229774084
