In [2]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [3]:
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,DoubleType
from pyspark.ml.linalg import Vectors

In [4]:
spark = SparkSession.builder.appName("DecisionTreeClassifier").getOrCreate()

In [5]:
df = spark.read.csv('nyc-rolling-sales.csv',inferSchema=True,header='true')

In [6]:
colu = df.columns
print(colu)

['_c0', 'BOROUGH', 'NEIGHBORHOOD', 'BUILDING CLASS CATEGORY', 'TAX CLASS AT PRESENT', 'BLOCK', 'LOT', 'EASE-MENT', 'BUILDING CLASS AT PRESENT', 'ADDRESS', 'APARTMENT NUMBER', 'ZIP CODE', 'RESIDENTIAL UNITS', 'COMMERCIAL UNITS', 'TOTAL UNITS', 'LAND SQUARE FEET', 'GROSS SQUARE FEET', 'YEAR BUILT', 'TAX CLASS AT TIME OF SALE', 'BUILDING CLASS AT TIME OF SALE', 'SALE PRICE', 'SALE DATE']


In [None]:
colu.index('GROSS SQUARE FEET')

In [16]:
df.summary().toPandas()

Unnamed: 0,summary,_c0,BOROUGH,NEIGHBORHOOD,BUILDING CLASS CATEGORY,TAX CLASS AT PRESENT,BLOCK,LOT,EASE-MENT,BUILDING CLASS AT PRESENT,...,ZIP CODE,RESIDENTIAL UNITS,COMMERCIAL UNITS,TOTAL UNITS,LAND SQUARE FEET,GROSS SQUARE FEET,YEAR BUILT,TAX CLASS AT TIME OF SALE,BUILDING CLASS AT TIME OF SALE,SALE PRICE
0,count,84548.0,84548.0,84548,84548,84548.0,84548.0,84548.0,84548.0,84548,...,84548.0,84548.0,84548.0,84548.0,84548.0,84548.0,84548.0,84548.0,84548,84548
1,mean,10344.359878412262,2.9987581019066094,,,1.651910272669626,4237.218976202867,376.22401476084593,,,...,10731.99161423097,2.0252637554998345,0.1935586885556133,2.249183895538629,2717.7926266736054,2724.4452973458865,1789.322976297488,1.657484505842835,,1276456.4977638705
2,stddev,7151.77943634439,1.289790049229646,,,0.8448206674575622,3568.263406631048,658.1368139996075,,,...,1290.8791471461095,16.721037014467946,8.713183368246815,18.97258443202677,34909.49649365539,28810.800805431947,537.3449934336502,0.8193412115061967,,1.1405255345987292E7
3,min,4.0,1.0,AIRPORT LA GUARDIA,01 ONE FAMILY DWELLINGS,,1.0,1.0,,,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,A0,-
4,25%,4231.0,2.0,,,1.0,1322.0,22.0,,,...,10305.0,0.0,0.0,1.0,0.0,0.0,1920.0,1.0,,225000.0
5,50%,8943.0,3.0,,,1.0,3311.0,50.0,,,...,11209.0,1.0,0.0,1.0,1770.0,1076.0,1940.0,2.0,,530000.0
6,75%,15985.0,4.0,,,2.0,6280.0,1001.0,,,...,11357.0,2.0,0.0,2.0,2656.0,2080.0,1965.0,2.0,,950000.0
7,max,26739.0,5.0,WYCKOFF HEIGHTS,49 CONDO WAREHOUSES/FACTORY/INDUS,4.0,16322.0,9106.0,,Z9,...,11694.0,1844.0,2261.0,2261.0,9996.0,9992.0,2017.0,4.0,Z9,9999999


In [None]:
df.select('BOROUGH').collect()

In [None]:
df.createOrReplaceTempView('nyc')

In [13]:
from pyspark.sql import functions as F

In [None]:
"UPDATE nyc SET `LAND SQUARE FEET`=0 WHERE `LAND SQUARE FEET` like'%-%'"

In [14]:
def hapusStrip(x):
    return df.withColumn(x,F.when(F.col(x).like('%-%'),0).otherwise(F.col(x)))

In [15]:
df = hapusStrip('LAND SQUARE FEET')
df = hapusStrip('GROSS SQUARE FEET')

In [17]:
data=df.rdd.map(lambda x:(Vectors.dense(x[colu.index('BLOCK')],
                                        x[colu.index('LOT')],
                                        x[colu.index('LAND SQUARE FEET')],
                                        x[colu.index('GROSS SQUARE FEET')]),
                          x[colu.index('TAX CLASS AT PRESENT')])).toDF(
    ["features", "label"])

In [18]:
data.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[392.0,6.0,1633.0...|   2A|
|[399.0,26.0,4616....|    2|
|[399.0,39.0,2212....|    2|
|[402.0,21.0,2272....|   2B|
|[404.0,55.0,2369....|   2A|
|[405.0,16.0,2581....|    2|
|[406.0,32.0,1750....|   2B|
|[407.0,18.0,5163....|    2|
|[379.0,34.0,1534....|    2|
|[387.0,153.0,4489...|    2|
|[394.0,44.0,4295....|    2|
|[400.0,21.0,3717....|   2B|
|[373.0,40.0,0.0,0.0]|    2|
|[373.0,40.0,0.0,0.0]|    2|
|[373.0,40.0,0.0,0.0]|    2|
|[373.0,40.0,0.0,0.0]|    2|
|[373.0,40.0,0.0,0.0]|    2|
|[373.0,46.0,0.0,0.0]|    2|
|[373.0,49.0,0.0,0.0]|    2|
|[373.0,49.0,0.0,0.0]|    2|
+--------------------+-----+
only showing top 20 rows



In [19]:
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel",handleInvalid='skip').fit(data)

In [20]:
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

In [21]:
(trainingData, testData) = data.randomSplit([0.7, 0.3])

In [22]:
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

In [23]:
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

In [24]:
model = pipeline.fit(trainingData)

In [25]:
predictions = model.transform(testData)

In [26]:
predictions.select("prediction", "indexedLabel", "features").show(5)

+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       1.0|         1.0|[7.0,1002.0,0.0,0.0]|
|       1.0|         1.0|[15.0,1179.0,0.0,...|
|       1.0|         1.0|[15.0,1231.0,0.0,...|
|       1.0|         1.0|[15.0,1285.0,0.0,...|
|       1.0|         1.0|[15.0,1310.0,0.0,...|
+----------+------------+--------------------+
only showing top 5 rows



In [27]:
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

Test Error = 0.157029 


In [None]:
df.select(['BOROUGH']).collect()

In [None]:
treeModel = model.stages[2]

In [None]:
print(treeModel)