In [1]:
import pyspark
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col

In [2]:
os.environ['PYSPARK_PYTHON']=sys.executable
os.environ['PYSPARK_DRIVER_PYTHON']=sys.executable
spark=SparkSession.builder.getOrCreate()

In [3]:
path=r"C:\Users\LENOVO\Desktop\SUB 6TH SEM\BDA\LAB\LAB6\covertype.csv"
data_without_header=spark.read.csv(path,inferSchema=True,header=False)
colnames = ["Elevation", "Aspect", "Slope","Horizontal_Distance_To_Hydrology","Vertical_Distance_To_Hydrology", "Horizontal_Distance_To_Roadways","Hillshade_9am", "Hillshade_Noon", "Hillshade_3pm","Horizontal_Distance_To_Fire_Points"] +[f"Wilderness_Area_{i}" for i in range(4)] +[f"Soil_Type_{i}" for i in range(40)] +["Cover_Type"]
data=data_without_header.toDF(*colnames).withColumn("label",col("Cover_Type").cast(DoubleType()))

In [4]:
data.head()

Row(Elevation=0, Aspect=1, Slope=2, Horizontal_Distance_To_Hydrology=3, Vertical_Distance_To_Hydrology=4, Horizontal_Distance_To_Roadways=5, Hillshade_9am=6, Hillshade_Noon=7, Hillshade_3pm=8, Horizontal_Distance_To_Fire_Points=9, Wilderness_Area_0=10, Wilderness_Area_1=11, Wilderness_Area_2=12, Wilderness_Area_3=13, Soil_Type_0=14, Soil_Type_1=15, Soil_Type_2=16, Soil_Type_3=17, Soil_Type_4=18, Soil_Type_5=19, Soil_Type_6=20, Soil_Type_7=21, Soil_Type_8=22, Soil_Type_9=23, Soil_Type_10=24, Soil_Type_11=25, Soil_Type_12=26, Soil_Type_13=27, Soil_Type_14=28, Soil_Type_15=29, Soil_Type_16=30, Soil_Type_17=31, Soil_Type_18=32, Soil_Type_19=33, Soil_Type_20=34, Soil_Type_21=35, Soil_Type_22=36, Soil_Type_23=37, Soil_Type_24=38, Soil_Type_25=39, Soil_Type_26=40, Soil_Type_27=41, Soil_Type_28=42, Soil_Type_29=43, Soil_Type_30=44, Soil_Type_31=45, Soil_Type_32=46, Soil_Type_33=47, Soil_Type_34=48, Soil_Type_35=49, Soil_Type_36=50, Soil_Type_37=51, Soil_Type_38=52, Soil_Type_39=53, Cover_Type=

In [5]:
(train,test)=data.randomSplit([0.8,0.1])

In [6]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline

assembler=VectorAssembler().setInputCols(train.columns[:-1]).setOutputCol('featureVector')
classifier=DecisionTreeClassifier().setFeaturesCol('featureVector').setPredictionCol('prediction')
pipeline=Pipeline().setStages([assembler,classifier])
model=pipeline.fit(train)
tree=model.stages[1]
print(tree.toDebugString)



DecisionTreeClassificationModel: uid=DecisionTreeClassifier_898436c72629, depth=5, numNodes=13, numClasses=55, numFeatures=55
  If (feature 54 <= 1.5)
   Predict: 1.0
  Else (feature 54 > 1.5)
   If (feature 54 <= 2.5)
    Predict: 2.0
   Else (feature 54 > 2.5)
    If (feature 54 <= 3.5)
     Predict: 3.0
    Else (feature 54 > 3.5)
     If (feature 54 <= 6.5)
      If (feature 54 <= 5.5)
       Predict: 5.0
      Else (feature 54 > 5.5)
       Predict: 6.0
     Else (feature 54 > 6.5)
      If (feature 0 <= 2252.5)
       Predict: 54.0
      Else (feature 0 > 2252.5)
       Predict: 7.0



In [7]:
import pandas as pd
df=pd.DataFrame(tree.featureImportances.toArray(),columns=['Importance'],index=colnames).sort_values(by=['Importance'],ascending=False)
df

Unnamed: 0,Importance
Cover_Type,0.999994
Elevation,6e-06
Slope,0.0
Soil_Type_15,0.0
Soil_Type_17,0.0
Soil_Type_18,0.0
Soil_Type_19,0.0
Soil_Type_20,0.0
Soil_Type_21,0.0
Soil_Type_22,0.0


In [8]:
prediction=model.transform(test)
prediction.printSchema()

root
 |-- Elevation: integer (nullable = true)
 |-- Aspect: integer (nullable = true)
 |-- Slope: integer (nullable = true)
 |-- Horizontal_Distance_To_Hydrology: integer (nullable = true)
 |-- Vertical_Distance_To_Hydrology: integer (nullable = true)
 |-- Horizontal_Distance_To_Roadways: integer (nullable = true)
 |-- Hillshade_9am: integer (nullable = true)
 |-- Hillshade_Noon: integer (nullable = true)
 |-- Hillshade_3pm: integer (nullable = true)
 |-- Horizontal_Distance_To_Fire_Points: integer (nullable = true)
 |-- Wilderness_Area_0: integer (nullable = true)
 |-- Wilderness_Area_1: integer (nullable = true)
 |-- Wilderness_Area_2: integer (nullable = true)
 |-- Wilderness_Area_3: integer (nullable = true)
 |-- Soil_Type_0: integer (nullable = true)
 |-- Soil_Type_1: integer (nullable = true)
 |-- Soil_Type_2: integer (nullable = true)
 |-- Soil_Type_3: integer (nullable = true)
 |-- Soil_Type_4: integer (nullable = true)
 |-- Soil_Type_5: integer (nullable = true)
 |-- Soil_Type

In [9]:
prediction.select('Cover_Type','probability','prediction').show()

+----------+--------------------+----------+
|Cover_Type|         probability|prediction|
+----------+--------------------+----------+
|         3|[0.0,0.0,0.0,1.0,...|       3.0|
|         6|[0.0,0.0,0.0,0.0,...|       6.0|
|         6|[0.0,0.0,0.0,0.0,...|       6.0|
|         3|[0.0,0.0,0.0,1.0,...|       3.0|
|         3|[0.0,0.0,0.0,1.0,...|       3.0|
|         6|[0.0,0.0,0.0,0.0,...|       6.0|
|         3|[0.0,0.0,0.0,1.0,...|       3.0|
|         3|[0.0,0.0,0.0,1.0,...|       3.0|
|         4|[0.0,0.0,0.0,0.0,...|       5.0|
|         3|[0.0,0.0,0.0,1.0,...|       3.0|
|         6|[0.0,0.0,0.0,0.0,...|       6.0|
|         6|[0.0,0.0,0.0,0.0,...|       6.0|
|         3|[0.0,0.0,0.0,1.0,...|       3.0|
|         4|[0.0,0.0,0.0,0.0,...|       5.0|
|         3|[0.0,0.0,0.0,1.0,...|       3.0|
|         4|[0.0,0.0,0.0,0.0,...|       5.0|
|         3|[0.0,0.0,0.0,1.0,...|       3.0|
|         3|[0.0,0.0,0.0,1.0,...|       3.0|
|         3|[0.0,0.0,0.0,1.0,...|       3.0|
|         

In [10]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

accuracy=MulticlassClassificationEvaluator().setPredictionCol('prediction').setLabelCol('label').setMetricName('accuracy')
f1=MulticlassClassificationEvaluator().setPredictionCol('prediction').setLabelCol('label').setMetricName('f1')
acc=accuracy.evaluate(prediction)
f1=f1.evaluate(prediction)
print(acc,f1)

0.9952389785444877 0.993153398349374


In [11]:
#Confusion Matrix

conf_matrix=prediction.groupBy('Cover_Type').pivot('prediction',range(1,8)).count().na.fill(0.0).orderBy('Cover_Type')
conf_matrix.show()

+----------+-----+-----+----+---+----+----+----+
|Cover_Type|    1|    2|   3|  4|   5|   6|   7|
+----------+-----+-----+----+---+----+----+----+
|         1|23809|    0|   0|  0|   0|   0|   0|
|         2|    0|31108|   0|  0|   0|   0|   0|
|         3|    0|    0|4060|  0|   0|   0|   0|
|         4|    0|    0|   0|  0| 308|   0|   0|
|         5|    0|    0|   0|  0|1089|   0|   0|
|         6|    0|    0|   0|  0|   0|1959|   0|
|         7|    0|    0|   0|  0|   0|   0|2359|
+----------+-----+-----+----+---+----+----+----+



In [12]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType

def class_probabilities(data):
    total = data.count()
    return (
        data.groupBy("Cover_Type")
            .count()
            .orderBy("Cover_Type")
            .select(col("count").cast(DoubleType()))
            .withColumn("count_proportion", col("count") / total)
            .select("count_proportion")
            .collect()
    )

train_prior_probabilities = class_probabilities(train)
test_prior_probabilities = class_probabilities(test)

train_prior_probabilities


[Row(count_proportion=0.36417461230513576),
 Row(count_proportion=0.48844226750413017),
 Row(count_proportion=0.061384293879195306),
 Row(count_proportion=0.004723805539577124),
 Row(count_proportion=0.016276696086349383),
 Row(count_proportion=0.029841900678066552),
 Row(count_proportion=0.03515448722790667),
 Row(count_proportion=1.9367796390230107e-06)]

In [13]:
train_prior_probabilities = [p[0] for p in train_prior_probabilities]
test_prior_probabilities = [p[0] for p in test_prior_probabilities]
sum([train_p * cv_p for train_p, cv_p in zip(train_prior_probabilities,
test_prior_probabilities)])

0.37523782860298305

In [14]:
#tuning

assembler=VectorAssembler().setInputCols(data.columns[:-1]).setOutputCol('featureVector')
classifier=DecisionTreeClassifier().setFeaturesCol('featureVector').setPredictionCol('Prediction')
pipeline=Pipeline().setStages([assembler,classifier])

from pyspark.ml.tuning import ParamGridBuilder
paramGrid=ParamGridBuilder().addGrid(classifier.impurity,['gini','entropy']).addGrid(classifier.maxDepth,[1,20]).\
addGrid(classifier.minInfoGain,[0.0,0.5]).\
addGrid(classifier.maxBins,[40,300]).build()

accuracy=MulticlassClassificationEvaluator().setLabelCol('Cover_Type').setPredictionCol('Prediction').setMetricName('accuracy')

In [21]:
from pyspark.ml.tuning import TrainValidationSplit
validator=TrainValidationSplit(estimator=pipeline,evaluator=accuracy,estimatorParamMaps=paramGrid,trainRatio=0.9)
validator_model=validator.fit(train)

In [22]:
from pprint import pprint
best_model = validator_model.bestModel
pprint(best_model.stages[1].extractParamMap())

{Param(parent='DecisionTreeClassifier_41f1cf868350', name='impurity', doc='Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini'): 'gini',
 Param(parent='DecisionTreeClassifier_41f1cf868350', name='minInstancesPerNode', doc='Minimum number of instances each child must have after split. If a split causes the left or right child to have fewer than minInstancesPerNode, the split will be discarded as invalid. Should be >= 1.'): 1,
 Param(parent='DecisionTreeClassifier_41f1cf868350', name='maxMemoryInMB', doc='Maximum memory in MB allocated to histogram aggregation. If too small, then 1 node will be split per iteration, and its aggregates may exceed this size.'): 256,
 Param(parent='DecisionTreeClassifier_41f1cf868350', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory i

In [24]:
validator_model = validator.fit(train)
metrics = validator_model.validationMetrics
params = validator_model.getEstimatorParamMaps()
metrics_and_params = list(zip(metrics, params))
metrics_and_params.sort(key=lambda x: x[0], reverse=True)
metrics_and_params

[(1.0,
  {Param(parent='DecisionTreeClassifier_41f1cf868350', name='impurity', doc='Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini'): 'gini',
   Param(parent='DecisionTreeClassifier_41f1cf868350', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 20,
   Param(parent='DecisionTreeClassifier_41f1cf868350', name='minInfoGain', doc='Minimum information gain for a split to be considered at a tree node.'): 0.0,
   Param(parent='DecisionTreeClassifier_41f1cf868350', name='maxBins', doc='Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature.'): 40}),
 (1.0,
  {Param(parent='DecisionTreeClassifier_41f1cf868350', name='impurity', doc='Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini'): 'gini',
   Param(parent

In [25]:
metrics.sort(reverse=True)
print(metrics[0])


1.0


In [28]:
accuracy.evaluate(best_model.transform(test))

1.0

In [30]:
#0.0.4 Categorical Features Revisited
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import VectorAssembler

def unencode_one_hot(data):
    wilderness_cols = ['Wilderness_Area_' + str(i) for i in range(4)]
    wilderness_assembler = VectorAssembler(). \
        setInputCols(wilderness_cols). \
        setOutputCol("wilderness")
    
    unhot_udf = udf(lambda v: v.toArray().tolist().index(1), IntegerType())
    
    with_wilderness = wilderness_assembler.transform(data). \
        drop(*wilderness_cols). \
        withColumn("wilderness", unhot_udf(col("wilderness")))
    
    soil_cols = ['Soil_Type_' + str(i) for i in range(40)]
    soil_assembler = VectorAssembler(). \
        setInputCols(soil_cols). \
        setOutputCol("soil")
    
    with_soil = soil_assembler. \
        transform(with_wilderness). \
        drop(*soil_cols). \
        withColumn("soil", unhot_udf(col("soil")))
    
    return with_soil
unenc_train_data = unencode_one_hot(train)
unenc_train_data.printSchema()

root
 |-- Elevation: integer (nullable = true)
 |-- Aspect: integer (nullable = true)
 |-- Slope: integer (nullable = true)
 |-- Horizontal_Distance_To_Hydrology: integer (nullable = true)
 |-- Vertical_Distance_To_Hydrology: integer (nullable = true)
 |-- Horizontal_Distance_To_Roadways: integer (nullable = true)
 |-- Hillshade_9am: integer (nullable = true)
 |-- Hillshade_Noon: integer (nullable = true)
 |-- Hillshade_3pm: integer (nullable = true)
 |-- Horizontal_Distance_To_Fire_Points: integer (nullable = true)
 |-- Cover_Type: integer (nullable = true)
 |-- label: double (nullable = true)
 |-- wilderness: integer (nullable = true)
 |-- soil: integer (nullable = true)



In [32]:
from pyspark.ml.feature import VectorIndexer
cols = unenc_train_data.columns
input_cols = [c for c in cols if c!='Cover_Type']
assembler = VectorAssembler().setInputCols(input_cols).setOutputCol("featureVector")
indexer = VectorIndexer().setMaxCategories(40).setInputCol("featureVector").setOutputCol("indexedVector")
classifier = DecisionTreeClassifier().setLabelCol("Cover_Type").setFeaturesCol("indexedVector").setPredictionCol("prediction")
pipeline = Pipeline().setStages([assembler, indexer, classifier])


In [33]:
#Random Forrest
from pyspark.ml.classification import RandomForestClassifier

classifier = RandomForestClassifier(
    seed=1234,
    labelCol="Cover_Type",
    featuresCol="indexedVector",
    predictionCol="prediction"
)


In [34]:
unenc_train_data.columns


['Elevation',
 'Aspect',
 'Slope',
 'Horizontal_Distance_To_Hydrology',
 'Vertical_Distance_To_Hydrology',
 'Horizontal_Distance_To_Roadways',
 'Hillshade_9am',
 'Hillshade_Noon',
 'Hillshade_3pm',
 'Horizontal_Distance_To_Fire_Points',
 'Cover_Type',
 'label',
 'wilderness',
 'soil']

In [None]:
from pyspark.ml.feature import VectorAssembler, VectorIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Define input columns
cols = unenc_train_data.columns
input_cols = [c for c in cols if c != 'Cover_Type']

# Vector assembler
assembler = VectorAssembler().setInputCols(input_cols).setOutputCol("featureVector")

# Vector indexer
indexer = VectorIndexer().setMaxCategories(40).setInputCol("featureVector").setOutputCol("indexedVector")

# Classifier
classifier = RandomForestClassifier(seed=1234, labelCol="Cover_Type", featuresCol="indexedVector", predictionCol="prediction")

# Pipeline
pipeline = Pipeline().setStages([assembler, indexer, classifier])

# Parameter grid
paramGrid = ParamGridBuilder(). \
    addGrid(classifier.impurity, ["gini", "entropy"]). \
    addGrid(classifier.maxDepth, [1, 20]). \
    addGrid(classifier.maxBins, [40, 300]). \
    addGrid(classifier.minInfoGain, [0.0, 0.05]). \
    build()

# Multiclass classification evaluator
multiclassEval = MulticlassClassificationEvaluator(). \
    setLabelCol("Cover_Type"). \
    setPredictionCol("prediction"). \
    setMetricName("accuracy")

# Validator
validator = TrainValidationSplit(seed=1234,
                                 estimator=pipeline,
                                 evaluator=multiclassEval,
                                 estimatorParamMaps=paramGrid,
                                 trainRatio=0.9)

# Fit the model
validator_model = validator.fit(unenc_train_data)

# Get the best model
best_model = validator_model.bestModel


In [None]:
forest_model = best_model.stages[2]
feature_importance_list = list(zip(input_cols,
forest_model.featureImportances.toArray()))
feature_importance_list.sort(key=lambda x: x[1], reverse=True)
pprint(feature_importance_list)

In [None]:
unenc_test_data = unencode_one_hot(test_data)
best_model.transform(unenc_test_data.drop("Cover_Type")).\
select("prediction").show(1)
