In [2]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('yarn').appName('myapp').getOrCreate()

In [4]:
sc = spark.sparkContext

from pyspark.sql.types import StructType,StructField,IntegerType,DoubleType

colNames = ["Elevation", "Aspect", "Slope", "Horizontal_Distance_To_Hydrology", "Vertical_Distance_To_Hydrology", "Horizontal_Distance_To_Roadways", "Hillshade_9am", "Hillshade_Noon", "Hillshade_3pm", "Horizontal_Distance_To_Fire_Points"] 

for i in range(4): 
    colNames += ["Wilderness_Area_"+str(i),]
for i in range(40): 
    colNames += ["Soil_Type_"+str(i),] 
colNames += ["Cover_Type",]

In [5]:
schema = StructType()
for name in colNames:
    if name == 'Cover_Type':
        schema.add(StructField(name,DoubleType(),True))
    else:
        schema.add(StructField(name,IntegerType(),True))
        
        
data = spark.read.csv('s3://ssds2/covtype.data',header=False,schema=schema)
data.count()

581012

In [6]:
data.head()

Row(Elevation=2596, Aspect=51, Slope=3, Horizontal_Distance_To_Hydrology=258, Vertical_Distance_To_Hydrology=0, Horizontal_Distance_To_Roadways=510, Hillshade_9am=221, Hillshade_Noon=232, Hillshade_3pm=148, Horizontal_Distance_To_Fire_Points=6279, Wilderness_Area_0=1, Wilderness_Area_1=0, Wilderness_Area_2=0, Wilderness_Area_3=0, Soil_Type_0=0, Soil_Type_1=0, Soil_Type_2=0, Soil_Type_3=0, Soil_Type_4=0, Soil_Type_5=0, Soil_Type_6=0, Soil_Type_7=0, Soil_Type_8=0, Soil_Type_9=0, Soil_Type_10=0, Soil_Type_11=0, Soil_Type_12=0, Soil_Type_13=0, Soil_Type_14=0, Soil_Type_15=0, Soil_Type_16=0, Soil_Type_17=0, Soil_Type_18=0, Soil_Type_19=0, Soil_Type_20=0, Soil_Type_21=0, Soil_Type_22=0, Soil_Type_23=0, Soil_Type_24=0, Soil_Type_25=0, Soil_Type_26=0, Soil_Type_27=0, Soil_Type_28=1, Soil_Type_29=0, Soil_Type_30=0, Soil_Type_31=0, Soil_Type_32=0, Soil_Type_33=0, Soil_Type_34=0, Soil_Type_35=0, Soil_Type_36=0, Soil_Type_37=0, Soil_Type_38=0, Soil_Type_39=0, Cover_Type=5.0)

In [7]:
data.printSchema()

root
 |-- Elevation: integer (nullable = true)
 |-- Aspect: integer (nullable = true)
 |-- Slope: integer (nullable = true)
 |-- Horizontal_Distance_To_Hydrology: integer (nullable = true)
 |-- Vertical_Distance_To_Hydrology: integer (nullable = true)
 |-- Horizontal_Distance_To_Roadways: integer (nullable = true)
 |-- Hillshade_9am: integer (nullable = true)
 |-- Hillshade_Noon: integer (nullable = true)
 |-- Hillshade_3pm: integer (nullable = true)
 |-- Horizontal_Distance_To_Fire_Points: integer (nullable = true)
 |-- Wilderness_Area_0: integer (nullable = true)
 |-- Wilderness_Area_1: integer (nullable = true)
 |-- Wilderness_Area_2: integer (nullable = true)
 |-- Wilderness_Area_3: integer (nullable = true)
 |-- Soil_Type_0: integer (nullable = true)
 |-- Soil_Type_1: integer (nullable = true)
 |-- Soil_Type_2: integer (nullable = true)
 |-- Soil_Type_3: integer (nullable = true)
 |-- Soil_Type_4: integer (nullable = true)
 |-- Soil_Type_5: integer (nullable = true)
 |-- Soil_Type

In [8]:
(trainData,testData) = data.randomSplit([0.9,0.1])
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col,udf
inputCols = trainData.drop('Cover_Type').columns

In [9]:
assembler = VectorAssembler(inputCols = inputCols,outputCol='featureVector')
assembledTrainData = assembler.transform(trainData)
assembledTestData = assembler.transform(testData)

assembledTrainData.select('featureVector').show(truncate=False)

+----------------------------------------------------------------------------------------------------+
|featureVector                                                                                       |
+----------------------------------------------------------------------------------------------------+
|(54,[0,1,2,3,4,5,6,7,8,9,13,15],[1859.0,18.0,12.0,67.0,11.0,90.0,211.0,215.0,139.0,792.0,1.0,1.0])  |
|(54,[0,1,2,3,4,5,6,7,8,9,13,15],[1860.0,18.0,13.0,95.0,15.0,90.0,210.0,213.0,138.0,780.0,1.0,1.0])  |
|(54,[0,1,2,3,4,5,6,7,8,9,13,15],[1863.0,37.0,17.0,120.0,18.0,90.0,217.0,202.0,115.0,769.0,1.0,1.0]) |
|(54,[0,1,2,3,4,5,6,7,8,9,13,15],[1866.0,23.0,14.0,85.0,16.0,108.0,212.0,210.0,133.0,819.0,1.0,1.0]) |
|(54,[0,1,2,3,4,5,6,7,8,9,13,15],[1867.0,20.0,15.0,108.0,19.0,120.0,208.0,206.0,132.0,808.0,1.0,1.0])|
|(54,[0,1,2,3,4,5,6,7,8,9,13,15],[1868.0,27.0,16.0,67.0,17.0,95.0,212.0,204.0,125.0,859.0,1.0,1.0])  |
|(54,[0,1,2,3,4,5,6,7,8,9,13,18],[1871.0,22.0,22.0,60.0,12.0,85.0,200.0,1

In [10]:
assembledTrainData.take(3)

[Row(Elevation=1863, Aspect=37, Slope=17, Horizontal_Distance_To_Hydrology=120, Vertical_Distance_To_Hydrology=18, Horizontal_Distance_To_Roadways=90, Hillshade_9am=217, Hillshade_Noon=202, Hillshade_3pm=115, Horizontal_Distance_To_Fire_Points=769, Wilderness_Area_0=0, Wilderness_Area_1=0, Wilderness_Area_2=0, Wilderness_Area_3=1, Soil_Type_0=0, Soil_Type_1=1, Soil_Type_2=0, Soil_Type_3=0, Soil_Type_4=0, Soil_Type_5=0, Soil_Type_6=0, Soil_Type_7=0, Soil_Type_8=0, Soil_Type_9=0, Soil_Type_10=0, Soil_Type_11=0, Soil_Type_12=0, Soil_Type_13=0, Soil_Type_14=0, Soil_Type_15=0, Soil_Type_16=0, Soil_Type_17=0, Soil_Type_18=0, Soil_Type_19=0, Soil_Type_20=0, Soil_Type_21=0, Soil_Type_22=0, Soil_Type_23=0, Soil_Type_24=0, Soil_Type_25=0, Soil_Type_26=0, Soil_Type_27=0, Soil_Type_28=0, Soil_Type_29=0, Soil_Type_30=0, Soil_Type_31=0, Soil_Type_32=0, Soil_Type_33=0, Soil_Type_34=0, Soil_Type_35=0, Soil_Type_36=0, Soil_Type_37=0, Soil_Type_38=0, Soil_Type_39=0, Cover_Type=6.0, featureVector=SparseV

In [28]:
def fitTransformEvalution(machinemodel,train=None,test=None):
    if train:
        model = machinemodel.fit(train)
    else:
        model = machinemodel.fit(assembledTrainData)
        
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator

    evaluator = MulticlassClassificationEvaluator( labelCol="Cover_Type", predictionCol="prediction")

    if test:
        predicitions = model.transform(test)
    else:
        predicitions = model.transform(assembledTestData)
    predicitions.select(["Cover_Type", "prediction", "probability"]).show(truncate=False)

    print(evaluator.setMetricName('accuracy').evaluate(predicitions))
    print(evaluator.setMetricName('f1').evaluate(predicitions))

In [11]:
from pyspark.ml.classification import LinearSVC,LogisticRegression

logi = LogisticRegression(labelCol='Cover_Type',
                         featuresCol = 'featureVector',
                          family='multinomial',
                          predictionCol='prediction')

In [12]:
model = logi.fit(assembledTrainData)

In [15]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator( labelCol="Cover_Type", predictionCol="prediction")


predicitions = model.transform(assembledTestData)
predicitions.select(["Cover_Type", "prediction", "probability"]).show(truncate=False)

print(evaluator.setMetricName('accuracy').evaluate(predicitions))
print(evaluator.setMetricName('f1').evaluate(predicitions))

Exception ignored in: <object repr() failed>
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/ml/wrapper.py", line 40, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'MulticlassClassificationEvaluator' object has no attribute '_java_obj'


+----------+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Cover_Type|prediction|probability                                                                                                                                                               |
+----------+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|6.0       |3.0       |[2.312385918511953E-8,1.278553589076978E-6,9.792677317647752E-5,0.5514789127365558,0.04122003281064906,1.158773357981844E-4,0.40708561165901996,3.37007352168252E-7]      |
|6.0       |3.0       |[1.4759735090718072E-8,1.4139301378889199E-6,1.0717364759249224E-4,0.6560843909881598,0.006036619355674224,8.005205147888889E-5,0.33769021408343414,1.2118378753350913E-7]|
|3.0       |3.0       |[4

In [16]:
predicitions.head()

Row(Elevation=1861, Aspect=35, Slope=14, Horizontal_Distance_To_Hydrology=60, Vertical_Distance_To_Hydrology=11, Horizontal_Distance_To_Roadways=85, Hillshade_9am=218, Hillshade_Noon=209, Hillshade_3pm=124, Horizontal_Distance_To_Fire_Points=832, Wilderness_Area_0=0, Wilderness_Area_1=0, Wilderness_Area_2=0, Wilderness_Area_3=1, Soil_Type_0=0, Soil_Type_1=1, Soil_Type_2=0, Soil_Type_3=0, Soil_Type_4=0, Soil_Type_5=0, Soil_Type_6=0, Soil_Type_7=0, Soil_Type_8=0, Soil_Type_9=0, Soil_Type_10=0, Soil_Type_11=0, Soil_Type_12=0, Soil_Type_13=0, Soil_Type_14=0, Soil_Type_15=0, Soil_Type_16=0, Soil_Type_17=0, Soil_Type_18=0, Soil_Type_19=0, Soil_Type_20=0, Soil_Type_21=0, Soil_Type_22=0, Soil_Type_23=0, Soil_Type_24=0, Soil_Type_25=0, Soil_Type_26=0, Soil_Type_27=0, Soil_Type_28=0, Soil_Type_29=0, Soil_Type_30=0, Soil_Type_31=0, Soil_Type_32=0, Soil_Type_33=0, Soil_Type_34=0, Soil_Type_35=0, Soil_Type_36=0, Soil_Type_37=0, Soil_Type_38=0, Soil_Type_39=0, Cover_Type=3.0, featureVector=SparseVec

In [19]:
predicitions.select('Cover_Type').distinct().show()

+----------+
|Cover_Type|
+----------+
|       7.0|
|       1.0|
|       4.0|
|       3.0|
|       2.0|
|       6.0|
|       5.0|
+----------+



In [25]:
from pyspark.ml.classification import DecisionTreeClassifier

classifier = DecisionTreeClassifier(
labelCol="Cover_Type",
featuresCol='featureVector',
    predictionCol = 'prediction'
)

fitTransformEvalution(classifier)

+----------+----------+------------------------------------------------------------------------------------------------+
|Cover_Type|prediction|probability                                                                                     |
+----------+----------+------------------------------------------------------------------------------------------------+
|6.0       |3.0       |[0.0,0.0,0.03363532229511611,0.6307403936269915,0.051268700753236834,0.0,0.2843555833246555,0.0]|
|6.0       |3.0       |[0.0,0.0,0.03363532229511611,0.6307403936269915,0.051268700753236834,0.0,0.2843555833246555,0.0]|
|3.0       |3.0       |[0.0,0.0,0.03363532229511611,0.6307403936269915,0.051268700753236834,0.0,0.2843555833246555,0.0]|
|6.0       |3.0       |[0.0,0.0,0.03363532229511611,0.6307403936269915,0.051268700753236834,0.0,0.2843555833246555,0.0]|
|3.0       |3.0       |[0.0,0.0,0.03363532229511611,0.6307403936269915,0.051268700753236834,0.0,0.2843555833246555,0.0]|
|3.0       |4.0       |[0.0,0.0,

In [26]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier( labelCol="Cover_Type", featuresCol="featureVector", predictionCol="prediction")
fitTransformEvalution(rf)

+----------+----------+------------------------------------------------------------------------------------------------------------------------------------------------------+
|Cover_Type|prediction|probability                                                                                                                                           |
+----------+----------+------------------------------------------------------------------------------------------------------------------------------------------------------+
|6.0       |3.0       |[0.0,0.11979625818465763,0.24385047307090285,0.3881308548637986,0.042928366742550954,0.008843450192483827,0.18783822916121004,0.008612367784396157]   |
|6.0       |3.0       |[0.0,0.11090466087396426,0.2183191422504446,0.42165712368813013,0.041544514485190816,0.008233957714058823,0.19134561218052354,0.007994988807687732]   |
|3.0       |3.0       |[0.0,0.03814404309732002,0.1403430777774414,0.53469209720545,0.033633483766862614,0.004292691435866996

In [27]:
trainDataBinary = assembledTrainData.where("Cover_Type = 1 or Cover_Type = 2") 
testDataBinary = assembledTestData.filter(assembledTestData.Cover_Type <= 2)

In [29]:
trainDataBinary.groupBy('Cover_Type').count().show()
testDataBinary.groupBy('Cover_Type').count().show()

+----------+------+
|Cover_Type| count|
+----------+------+
|       1.0|190476|
|       2.0|254917|
+----------+------+

+----------+-----+
|Cover_Type|count|
+----------+-----+
|       1.0|21451|
|       2.0|28083|
+----------+-----+



In [30]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf

def convertTo01(label):
    if label==2.0:
        res =0
    else:
        res = 1
    return float(res)

indexer_udf = udf(convertTo01,DoubleType())
testDataBinary = testDataBinary.withColumn('label',indexer_udf(testDataBinary['Cover_Type']))
trainDataBinary = trainDataBinary.withColumn('label',indexer_udf(testDataBinary['Cover_Type']))

trainDataBinary.groupBy('label').count().show()
testDataBinary.groupBy('label').count().show()


+-----+------+
|label| count|
+-----+------+
|  0.0|254917|
|  1.0|190476|
+-----+------+

+-----+-----+
|label|count|
+-----+-----+
|  0.0|28083|
|  1.0|21451|
+-----+-----+



In [37]:
from pyspark.ml.classification import LinearSVC
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import random

random.randint(1,100)

rate = 0.01
while rate < 1:
    itera = int(random.randint(1,100))
    rate = rate*1.5
    print('penalty rate : ', rate, 'max iter : ',itera)
    svm = LinearSVC(labelCol="label", featuresCol="featureVector", maxIter=itera, regParam=rate)
    
#     from pyspark.ml.evaluation import BinaryClassificationEvaluator
    model = svm.fit(trainDataBinary)
    predictions = model.transform(testDataBinary)
    evaluator = BinaryClassificationEvaluator(labelCol="label",rawPredictionCol="prediction")
    print(evaluator.evaluate(predictions))

penalty rate :  0.015 max iter :  27
0.6529709130084504
penalty rate :  0.0225 max iter :  13
0.558081854574569
penalty rate :  0.03375 max iter :  51
0.6608732268527191
penalty rate :  0.050625 max iter :  28
0.653180351477583
penalty rate :  0.0759375 max iter :  57
0.6606417700995233
penalty rate :  0.11390625000000001 max iter :  59
0.662568701125736
penalty rate :  0.17085937500000004 max iter :  15
0.6190407024066344
penalty rate :  0.25628906250000005 max iter :  54
0.6526082952427726
penalty rate :  0.3844335937500001 max iter :  35
0.6535741656856918
penalty rate :  0.5766503906250001 max iter :  60
0.6330325948474895
penalty rate :  0.8649755859375001 max iter :  85
0.6158156745126442
penalty rate :  1.2974633789062502 max iter :  26
0.5226431923140094


0.5
