In [47]:
import pandas as pd
import numpy as np
import pyspark
import os
import sys
from pyspark import SparkContext
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
from pyspark.sql import SparkSession

In [48]:
spark = SparkSession.builder.config("spark.driver.memory", "16g").appName('chapter_4').getOrCreate()

### Reading Data and Display Statistics

In [49]:
df = spark.read.option("inferSchema", True).option("header", False).csv("./covtype.data")
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: integer (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: integer (nullable = true)
 |-- _c4: integer (nullable = true)
 |-- _c5: integer (nullable = true)
 |-- _c6: integer (nullable = true)
 |-- _c7: integer (nullable = true)
 |-- _c8: integer (nullable = true)
 |-- _c9: integer (nullable = true)
 |-- _c10: integer (nullable = true)
 |-- _c11: integer (nullable = true)
 |-- _c12: integer (nullable = true)
 |-- _c13: integer (nullable = true)
 |-- _c14: integer (nullable = true)
 |-- _c15: integer (nullable = true)
 |-- _c16: integer (nullable = true)
 |-- _c17: integer (nullable = true)
 |-- _c18: integer (nullable = true)
 |-- _c19: integer (nullable = true)
 |-- _c20: integer (nullable = true)
 |-- _c21: integer (nullable = true)
 |-- _c22: integer (nullable = true)
 |-- _c23: integer (nullable = true)
 |-- _c24: integer (nullable = true)
 |-- _c25: integer (nullable = true)
 |-- _c26: integer (nullable = true)
 |-- _

In [50]:
df.describe().show()



+-------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+-------------------+-------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+-------------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+--------------------

                                                                                

In [51]:
cols = df.columns

print('Number of Distinct Values in each column')
for col in cols:
    col_distinct = df.select(col).distinct().count()
    print(f'{col} : {col_distinct}')

Number of Distinct Values in each column
_c0 : 1978
_c1 : 361
_c2 : 67
_c3 : 551
_c4 : 700
_c5 : 5785
_c6 : 207
_c7 : 185
_c8 : 255
_c9 : 5827
_c10 : 2
_c11 : 2
_c12 : 2
_c13 : 2
_c14 : 2
_c15 : 2
_c16 : 2
_c17 : 2
_c18 : 2
_c19 : 2
_c20 : 2
_c21 : 2
_c22 : 2
_c23 : 2
_c24 : 2
_c25 : 2
_c26 : 2
_c27 : 2
_c28 : 2
_c29 : 2
_c30 : 2
_c31 : 2
_c32 : 2
_c33 : 2
_c34 : 2
_c35 : 2
_c36 : 2
_c37 : 2
_c38 : 2
_c39 : 2
_c40 : 2
_c41 : 2
_c42 : 2
_c43 : 2
_c44 : 2
_c45 : 2
_c46 : 2
_c47 : 2
_c48 : 2
_c49 : 2
_c50 : 2
_c51 : 2
_c52 : 2
_c53 : 2
_c54 : 7


### Handling Missing values and Encoding Categorical Data

In [52]:
# Finding missing data in each column
from pyspark.sql.functions import col,isnan, when, count
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in cols]).show()

+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|_c0|_c1|_c2|_c3|_c4|_c5|_c6|_c7|_c8|_c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|_c17|_c18|_c19|_c20|_c21|_c22|_c23|_c24|_c25|_c26|_c27|_c28|_c29|_c30|_c31|_c32|_c33|_c34|_c35|_c36|_c37|_c38|_c39|_c40|_c41|_c42|_c43|_c44|_c45|_c46|_c47|_c48|_c49|_c50|_c51|_c52|_c53|_c54|
+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   

None of the columns have missing data so there is no need for imputing the data

In [53]:
from pyspark.sql.types import StringType, LongType, DoubleType, ArrayType, IntegerType

# Finding the categorical columns in the dataset
cat_cols_count=0
cat_cols=[]
for field in df.schema.fields:
    if isinstance(field.dataType, StringType) or isinstance(field.dataType, LongType):
        cat_cols_count+=1
        cat_cols.append(field.name)

print(f'Number of Categorical Columns : {cat_cols_count}')
print(cat_cols)

Number of Categorical Columns : 0
[]


None of the columns are categorical columns so there is no need for Encoding the columns

### Training a Decision Tree on the dataset

In [54]:
# Scaling the data
from pyspark.ml.feature import MinMaxScaler, VectorAssembler

input_cols=cols[:-1]
target_cols=cols[-1]

# Assembling columns into a single vector column
assembler = VectorAssembler(inputCols=input_cols, outputCol="features")
assembled_df = assembler.transform(df)

minmax = MinMaxScaler(inputCol="features", outputCol="scaled_features")
minmax_model = minmax.fit(assembled_df)
scaled_df = minmax_model.transform(assembled_df)

scaled_df.show()



+----+---+---+---+---+----+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+--------------------+--------------------+
| _c0|_c1|_c2|_c3|_c4| _c5|_c6|_c7|_c8| _c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|_c17|_c18|_c19|_c20|_c21|_c22|_c23|_c24|_c25|_c26|_c27|_c28|_c29|_c30|_c31|_c32|_c33|_c34|_c35|_c36|_c37|_c38|_c39|_c40|_c41|_c42|_c43|_c44|_c45|_c46|_c47|_c48|_c49|_c50|_c51|_c52|_c53|_c54|            features|     scaled_features|
+----+---+---+---+---+----+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+--------------------+--------------------+
|2596| 51|  3|258|  0| 510|221|232|148|6279|   1|   0|   0|   0|

                                                                                

In [55]:
(train_data, test_data) = scaled_df.randomSplit([0.9, 0.1])

In [58]:
%%time
from pyspark.ml.classification import DecisionTreeClassifier

tree_model = DecisionTreeClassifier(seed=1234, labelCol='_c54', featuresCol='scaled_features', predictionCol='preds')

model = tree_model.fit(train_data)

                                                                                

CPU times: user 34.2 ms, sys: 4.9 ms, total: 39.1 ms
Wall time: 5.24 s


### Evaluating the model

In [60]:
train_preds = model.transform(train_data)
train_preds.select('_c54', 'preds', 'probability').show() 

+----+-----+--------------------+
|_c54|preds|         probability|
+----+-----+--------------------+
|   6|  3.0|[0.0,3.1657591490...|
|   6|  4.0|[0.0,0.0,0.037788...|
|   6|  3.0|[0.0,3.1657591490...|
|   6|  3.0|[0.0,3.1657591490...|
|   6|  3.0|[0.0,3.1657591490...|
|   6|  3.0|[0.0,3.1657591490...|
|   6|  3.0|[0.0,3.1657591490...|
|   6|  3.0|[0.0,3.1657591490...|
|   6|  3.0|[0.0,3.1657591490...|
|   6|  3.0|[0.0,3.1657591490...|
|   3|  3.0|[0.0,0.0,0.017918...|
|   6|  3.0|[0.0,3.1657591490...|
|   6|  3.0|[0.0,3.1657591490...|
|   6|  3.0|[0.0,3.1657591490...|
|   3|  3.0|[0.0,0.0,0.017918...|
|   6|  3.0|[0.0,3.1657591490...|
|   6|  3.0|[0.0,3.1657591490...|
|   3|  3.0|[0.0,3.1657591490...|
|   6|  3.0|[0.0,3.1657591490...|
|   6|  3.0|[0.0,3.1657591490...|
+----+-----+--------------------+
only showing top 20 rows



In [62]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol='_c54', predictionCol='preds')

train_accuracy = evaluator.setMetricName('accuracy').evaluate(train_preds)
train_f1 = evaluator.setMetricName('f1').evaluate(train_preds)
train_recall = evaluator.setMetricName('recall').evaluate(train_preds)

                                                                                

IllegalArgumentException: MulticlassClassificationEvaluator_ad5cb44b59d1 parameter metricName given invalid value recall.