## This notebook was for testing Random Forests through Spark MLlib

In [3]:
import operator
import sys
from pyspark import SparkConf, SparkContext
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
import numpy as np
import matplotlib.pyplot as plt
from scipy import linalg
import random
from sklearn.manifold import TSNE
import time

from pyspark.ml.regression import RandomForestRegressor

In [4]:
sc.stop()

In [5]:
# Spark settings
conf = SparkConf()
sc = SparkContext(conf=conf)
spark = SparkSession.builder.appName('project').getOrCreate()
df = spark.read.csv('gs://eecse6893/Project/Cleaned_Sales_Data.csv',inferSchema=True,header = True)
df = df.drop('_c0')
df = df.drop('block')

In [6]:
df.show(2)

+-------+-----------------------+--------+-----------------+----------------+-----------+----------------+-----------------+----------+-------------------------+------------------------------+----------+-------------------+
|BOROUGH|BUILDING CLASS CATEGORY|ZIP CODE|RESIDENTIAL_UNITS|COMMERCIAL UNITS|TOTAL UNITS|LAND_SQUARE_FEET|GROSS_SQUARE_FEET|YEAR BUILT|TAX CLASS AT TIME OF SALE|BUILDING CLASS AT TIME OF SALE|SALE_PRICE|          SALE DATE|
+-------+-----------------------+--------+-----------------+----------------+-----------+----------------+-----------------+----------+-------------------------+------------------------------+----------+-------------------+
|      2|   01  ONE FAMILY HO...| 10457.0|                1|               0|          1|            1768|             1188|      1901|                        1|                            A5|    345752|2009-03-16 00:00:00|
|      2|   02  TWO FAMILY HO...| 10457.0|                2|               0|          2|            235

In [7]:
df.printSchema()

root
 |-- BOROUGH: integer (nullable = true)
 |-- BUILDING CLASS CATEGORY: string (nullable = true)
 |-- ZIP CODE: double (nullable = true)
 |-- RESIDENTIAL_UNITS: integer (nullable = true)
 |-- COMMERCIAL UNITS: integer (nullable = true)
 |-- TOTAL UNITS: integer (nullable = true)
 |-- LAND_SQUARE_FEET: integer (nullable = true)
 |-- GROSS_SQUARE_FEET: integer (nullable = true)
 |-- YEAR BUILT: integer (nullable = true)
 |-- TAX CLASS AT TIME OF SALE: integer (nullable = true)
 |-- BUILDING CLASS AT TIME OF SALE: string (nullable = true)
 |-- SALE_PRICE: integer (nullable = true)
 |-- SALE DATE: timestamp (nullable = true)



In [8]:
indexer1 = StringIndexer(inputCol="BUILDING CLASS CATEGORY", outputCol="BUILDING CLASS CATEGORY cat")
indexer2 = StringIndexer(inputCol="BUILDING CLASS AT TIME OF SALE", outputCol="BUILDING CLASS AT TIME OF SALE cat")
indexer3 = StringIndexer(inputCol="SALE_PRICE", outputCol="label")

encoder = OneHotEncoderEstimator(inputCols=["BOROUGH", "BUILDING CLASS CATEGORY cat","ZIP CODE", "TAX CLASS AT TIME OF SALE","BUILDING CLASS AT TIME OF SALE cat"],
                        outputCols=["BOROUGH_vec", "BUILDING CLASS CATEGORY cat_vec","ZIP CODE_vec", "TAX CLASS AT TIME OF SALE_vec","BUILDING CLASS AT TIME OF SALE_vec"] )

assembler = VectorAssembler(inputCols=["BOROUGH_vec","BUILDING CLASS CATEGORY cat_vec","ZIP CODE_vec","RESIDENTIAL_UNITS","COMMERCIAL UNITS","TOTAL UNITS", 
                                       "LAND_SQUARE_FEET","GROSS_SQUARE_FEET", "YEAR BUILT","TAX CLASS AT TIME OF SALE_vec", 
                                       "BUILDING CLASS AT TIME OF SALE_vec"],outputCol="features")
# pipeline = Pipeline(stages=[indexer1, indexer2,indexer3,indexer4,indexer5,indexer6,indexer7,indexer8, encoder,assembler])
pipeline = Pipeline(stages=[indexer1, indexer2,indexer3,encoder,assembler])

In [9]:

pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)
train,test = df.randomSplit([0.9,0.1], seed = 100) 



## Train model and get training time

In [None]:
start_time = time.time()
rfreg = RandomForestRegressor(featuresCol = 'features', labelCol = 'label',maxDepth=30)
rfmodel = rfreg.fit(train)
print(time.time()-start_time)

23500.56433916092
