In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType

In [3]:
colNames = ["Elevation", "Aspect", "Slope",
"Horizontal_Distance_To_Hydrology",
"Vertical_Distance_To_Hydrology",
"Horizontal_Distance_To_Roadways",
"Hillshade_9am",
"Hillshade_Noon",
"Hillshade_3pm",
"Horizontal_Distance_To_Fire_Points"]
for i in range(4):
    colNames += ["Wilderness_Area_"+str(i),]
for i in range(40):
    colNames += ["Soil_Type_"+str(i),]
colNames += ["Cover_Type",]

In [6]:
spark = SparkSession.builder.appName('test').getOrCreate()
sc = spark.sparkContext

In [7]:
# Create schema
schema = StructType()
for name in colNames:
    if name == "Cover_Type":
        schema.add(StructField(name, DoubleType(), True))
    else:
        schema.add(StructField(name, IntegerType(), True))
# Load CSV with schema
data = spark.read.csv("covtype.data", header=False,
schema=schema)
data.count() # 581012

581012

In [8]:
data.printSchema()

root
 |-- Elevation: integer (nullable = true)
 |-- Aspect: integer (nullable = true)
 |-- Slope: integer (nullable = true)
 |-- Horizontal_Distance_To_Hydrology: integer (nullable = true)
 |-- Vertical_Distance_To_Hydrology: integer (nullable = true)
 |-- Horizontal_Distance_To_Roadways: integer (nullable = true)
 |-- Hillshade_9am: integer (nullable = true)
 |-- Hillshade_Noon: integer (nullable = true)
 |-- Hillshade_3pm: integer (nullable = true)
 |-- Horizontal_Distance_To_Fire_Points: integer (nullable = true)
 |-- Wilderness_Area_0: integer (nullable = true)
 |-- Wilderness_Area_1: integer (nullable = true)
 |-- Wilderness_Area_2: integer (nullable = true)
 |-- Wilderness_Area_3: integer (nullable = true)
 |-- Soil_Type_0: integer (nullable = true)
 |-- Soil_Type_1: integer (nullable = true)
 |-- Soil_Type_2: integer (nullable = true)
 |-- Soil_Type_3: integer (nullable = true)
 |-- Soil_Type_4: integer (nullable = true)
 |-- Soil_Type_5: integer (nullable = true)
 |-- Soil_Type

In [9]:
(trainData, testData) = data.randomSplit([0.9, 0.1])# load library
from pyspark.ml.feature import VectorAssembler
# remove target label
inputCols = trainData.drop('Cover_Type').columns
# asemble features to vector
assembler = VectorAssembler(inputCols=inputCols,
outputCol="featureVector")
assembledTrainData = assembler.transform(trainData)
assembledTestData = assembler.transform(testData)
assembledTrainData.select('featureVector').show(truncate=False)

+-----------------------------------------------------------------------------------------------------+
|featureVector                                                                                        |
+-----------------------------------------------------------------------------------------------------+
|(54,[0,1,2,3,4,5,6,7,8,9,13,15],[1863.0,37.0,17.0,120.0,18.0,90.0,217.0,202.0,115.0,769.0,1.0,1.0])  |
|(54,[0,1,2,5,6,7,8,9,13,18],[1874.0,18.0,14.0,90.0,208.0,209.0,135.0,793.0,1.0,1.0])                 |
|(54,[0,1,2,3,4,5,6,7,8,9,13,18],[1879.0,28.0,19.0,30.0,12.0,95.0,209.0,196.0,117.0,778.0,1.0,1.0])   |
|(54,[0,1,2,3,4,5,6,7,8,9,13,15],[1888.0,33.0,22.0,150.0,46.0,108.0,209.0,185.0,103.0,735.0,1.0,1.0]) |
|(54,[0,1,2,3,4,5,6,7,8,9,13,14],[1889.0,28.0,22.0,150.0,23.0,120.0,205.0,185.0,108.0,759.0,1.0,1.0]) |
|(54,[0,1,2,3,4,5,6,7,8,9,13,18],[1889.0,353.0,30.0,95.0,39.0,67.0,153.0,172.0,146.0,600.0,1.0,1.0])  |
|(54,[0,1,2,3,4,5,6,7,8,9,13,18],[1896.0,337.0,12.0,30.0,6.0,175

In [14]:
# import library
from pyspark.ml.classification import LogisticRegression
# create logistic regression model
logi = LogisticRegression(labelCol="Cover_Type",
featuresCol="featureVector",
family="multinomial",
predictionCol="prediction")
# train model with train data
model = logi.fit(assembledTrainData)

In [15]:
# load library
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# create evaluator
evaluator = MulticlassClassificationEvaluator(
labelCol="Cover_Type",
predictionCol="prediction")

In [17]:
# transform test data using the fitted model
predictions = model.transform(assembledTestData)
predictions.select(["Cover_Type", "prediction", "probability"]).show(truncate=False)
print(evaluator.setMetricName("accuracy").evaluate(predictions))
print(evaluator.setMetricName("f1").evaluate(predictions))

+----------+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Cover_Type|prediction|probability                                                                                                                                                              |
+----------+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|6.0       |3.0       |[2.7239474008919554E-8,1.2779427903792646E-6,1.0949190521973466E-4,0.6178725835313205,0.07300426346790252,2.5228923310017436E-4,0.30875978216076505,2.845194275593344E-7]|
|6.0       |3.0       |[3.1984598148524444E-8,3.5948758140132483E-6,1.3997784708562368E-4,0.6086822244650321,0.01709551244601913,1.621833102508345E-4,0.3739151740204584,1.3010507418664877E-6] |
|6.0       |3.0       |[1.8811