## Lab No 6: Prediction with Decision Trees

In [72]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [49]:
spark=SparkSession.builder.appName('DT').master('local[*]').getOrCreate()

In [50]:
df=spark.read.csv('../covtype.data',header=False,inferSchema=True) #header=False !!!!!!

In [51]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: integer (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: integer (nullable = true)
 |-- _c4: integer (nullable = true)
 |-- _c5: integer (nullable = true)
 |-- _c6: integer (nullable = true)
 |-- _c7: integer (nullable = true)
 |-- _c8: integer (nullable = true)
 |-- _c9: integer (nullable = true)
 |-- _c10: integer (nullable = true)
 |-- _c11: integer (nullable = true)
 |-- _c12: integer (nullable = true)
 |-- _c13: integer (nullable = true)
 |-- _c14: integer (nullable = true)
 |-- _c15: integer (nullable = true)
 |-- _c16: integer (nullable = true)
 |-- _c17: integer (nullable = true)
 |-- _c18: integer (nullable = true)
 |-- _c19: integer (nullable = true)
 |-- _c20: integer (nullable = true)
 |-- _c21: integer (nullable = true)
 |-- _c22: integer (nullable = true)
 |-- _c23: integer (nullable = true)
 |-- _c24: integer (nullable = true)
 |-- _c25: integer (nullable = true)
 |-- _c26: integer (nullable = true)
 |-- _

In [52]:
colNames=['Elevation','Aspect','Slope','Horizontal_Distance_To_Hydrology','Vertical_Distance_To_Hydrology','Horizontal_Distance_To_Roadways','Hillshade_9am',\
        'Hillshade_Noon','Hillshade_3pm','Horizontal_Distance_To_Fire_Points',]+[f'Wilderness_Area_{i+1}' for i in range(4)]+[f'Soil_Type_{i+1}' for i in range(40)]+['Cover_Type']

In [53]:
df=df.toDF(*colNames)

In [54]:
print(len(colNames))

55


In [55]:
df=df.withColumn('Cover_Type',F.col('Cover_Type').cast(T.DoubleType()))

In [56]:
input_cols=colNames[:-1]


In [57]:
assembler=VectorAssembler(inputCols=input_cols,outputCol='featureVector')

In [58]:
classifier=DecisionTreeClassifier(labelCol='Cover_Type',
                                  predictionCol='prediction',
                                  featuresCol='featureVector')

In [59]:
pipeline=Pipeline(stages=[assembler,classifier])

In [85]:
paramGrid=ParamGridBuilder()\
    .addGrid(classifier.impurity,['gini','entropy'])\
    .addGrid(classifier.maxDepth,[10,20])\
    .addGrid(classifier.maxBins,[32,128])\
    .addGrid(classifier.minInfoGain,[0.01,0.1])\
    .addGrid(classifier.minInstancesPerNode,[1,10])\
    .build()

In [86]:
evaluator=MulticlassClassificationEvaluator(
    labelCol='Cover_Type', predictionCol='prediction',metricName='accuracy')


In [87]:
crossval=CrossValidator(estimator=pipeline,estimatorParamMaps=paramGrid,evaluator=evaluator,numFolds=5)

In [88]:
train,test=df.randomSplit([0.9,0.1])

In [88]:
crossModel=crossval.fit(train)

25/09/10 16:19:34 WARN DAGScheduler: Broadcasting large task binary with size 1114.5 KiB
25/09/10 16:19:34 WARN DAGScheduler: Broadcasting large task binary with size 1277.4 KiB
25/09/10 16:19:34 WARN DAGScheduler: Broadcasting large task binary with size 1442.4 KiB
25/09/10 16:19:34 WARN DAGScheduler: Broadcasting large task binary with size 1591.5 KiB
25/09/10 16:19:35 WARN DAGScheduler: Broadcasting large task binary with size 1727.9 KiB
25/09/10 16:19:35 WARN DAGScheduler: Broadcasting large task binary with size 1222.5 KiB
25/09/10 16:19:37 WARN DAGScheduler: Broadcasting large task binary with size 1023.2 KiB
25/09/10 16:19:37 WARN DAGScheduler: Broadcasting large task binary with size 1072.8 KiB




25/09/10 16:19:42 ERROR Executor: Exception in task 2.0 in stage 356.0 (TID 5579)
java.lang.OutOfMemoryError: Java heap space
	at java.base/java.lang.reflect.Array.newInstance(Array.java:78)
	at java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2121)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1721)
	at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:509)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:467)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:88)
	at org.apache.spark.serializer.DeserializationStream.re

ConnectionRefusedError: [Errno 111] Connection refused

ConnectionRefusedError: [Errno 111] Connection refused

In [None]:
predictions=cvModel.transform(test)
accuracy=evaluator.evaluate(predictions)
print("Accuracy Output:",accuracy)


In [None]:
#can check which was the best model
bestModel=cvModel.bestModel

In [None]:
conf_matrix=predictions.groupBy('Cover_Type').pivot('prediction',range(1,9)).count().na.fill(0.0).orderBy('Cover_Type')

In [None]:
conf_matrix.show()