In [185]:
from pyspark.context import SparkContext
'''Random Forest'''
from pyspark.ml.regression import RandomForestRegressor
from pyspark.mllib.linalg import Vectors
from pyspark.sql import DataFrame

from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.util import MLUtils

In [186]:
sc = SparkContext.getOrCreate()

In [187]:
data_path = "./properties_price.csv"
data_set = spark.read.options(header="true", parserLib="univocity", inferSchema="true").csv(data_path)
print data_set.schema
headers = data_set.columns
print(headers)
print("\n")
print 'headers names and types: \n'
print data_set.dtypes

StructType(List(StructField(_c0,IntegerType,true),StructField(Parcel ID,StringType,true),StructField(Lat,DoubleType,true),StructField(Lng,DoubleType,true),StructField(Property address,StringType,true),StructField(Tax rate area,IntegerType,true),StructField(Neighborhood,IntegerType,true),StructField(Lot sq. ft.,IntegerType,true),StructField(Property class,IntegerType,true),StructField(Year built,IntegerType,true),StructField(Square feet,IntegerType,true),StructField(Rooms,IntegerType,true),StructField(Bedrooms,IntegerType,true),StructField(Bathrooms,DoubleType,true),StructField(ZIP code,IntegerType,true),StructField(Median Price(2017),IntegerType,true),StructField(Median $ per Sq. Ft.(2017),IntegerType,true),StructField(Transaction year,IntegerType,true),StructField(Transaction price,IntegerType,true)))
['_c0', 'Parcel ID', 'Lat', 'Lng', 'Property address', 'Tax rate area', 'Neighborhood', 'Lot sq. ft.', 'Property class', 'Year built', 'Square feet', 'Rooms', 'Bedrooms', 'Bathrooms', 'Z

In [188]:
data_set.take(5)

[Row(_c0=0, Parcel ID=u'429-01-002', Lat=37.3084232, Lng=-121.8951796, Property address=u'1246 Curtiss Av San Jose, CA 95125', Tax rate area=17108, Neighborhood=0, Lot sq. ft.=13200, Property class=0, Year built=1900, Square feet=1327, Rooms=8, Bedrooms=4, Bathrooms=2.0, ZIP code=95125, Median Price(2017)=1250000, Median $ per Sq. Ft.(2017)=713, Transaction year=2015, Transaction price=917184),
 Row(_c0=1, Parcel ID=u'429-01-003', Lat=37.3085298, Lng=-121.895274, Property address=u'1232 Curtiss Av San Jose, CA 95125', Tax rate area=17108, Neighborhood=0, Lot sq. ft.=12200, Property class=0, Year built=1947, Square feet=1575, Rooms=7, Bedrooms=3, Bathrooms=2.0, ZIP code=95125, Median Price(2017)=1250000, Median $ per Sq. Ft.(2017)=713, Transaction year=2011, Transaction price=442886),
 Row(_c0=2, Parcel ID=u'429-01-005', Lat=37.3088046, Lng=-121.8955309, Property address=u'1220 Curtiss Av San Jose, CA 95125', Tax rate area=17108, Neighborhood=0, Lot sq. ft.=18564, Property class=0, Year

In [189]:
type(data_set)
data_set = reduce(lambda x,y: x.drop(y),[headers[1], headers[4]],data_set)
del headers[4]
del headers[1]
print headers
print data_set.dtypes

['_c0', 'Lat', 'Lng', 'Tax rate area', 'Neighborhood', 'Lot sq. ft.', 'Property class', 'Year built', 'Square feet', 'Rooms', 'Bedrooms', 'Bathrooms', 'ZIP code', 'Median Price(2017)', 'Median $ per Sq. Ft.(2017)', 'Transaction year', 'Transaction price']
[('_c0', 'int'), ('Lat', 'double'), ('Lng', 'double'), ('Tax rate area', 'int'), ('Neighborhood', 'int'), ('Lot sq. ft.', 'int'), ('Property class', 'int'), ('Year built', 'int'), ('Square feet', 'int'), ('Rooms', 'int'), ('Bedrooms', 'int'), ('Bathrooms', 'double'), ('ZIP code', 'int'), ('Median Price(2017)', 'int'), ('Median $ per Sq. Ft.(2017)', 'int'), ('Transaction year', 'int'), ('Transaction price', 'int')]


In [190]:
used_header = headers[1:-3] + headers[-2:-1]
print used_header
print len(used_header)

['Lat', 'Lng', 'Tax rate area', 'Neighborhood', 'Lot sq. ft.', 'Property class', 'Year built', 'Square feet', 'Rooms', 'Bedrooms', 'Bathrooms', 'ZIP code', 'Median Price(2017)', 'Transaction year']
14


In [191]:
transformed_df = data_set.rdd.map(lambda row: LabeledPoint(row[-1], Vectors.dense(row[1:-3]+row[-2:-1])))
trained_data, tested_data = transformed_df.randomSplit([0.7, 0.3], seed = 100)

In [192]:
trained_data.take(10)

[LabeledPoint(917184.0, [37.3084232,-121.8951796,17108.0,0.0,13200.0,0.0,1900.0,1327.0,8.0,4.0,2.0,95125.0,1250000.0,2015.0]),
 LabeledPoint(841500.0, [37.3089809,-121.8956939,17108.0,0.0,5040.0,0.0,1905.0,1044.0,5.0,3.0,1.0,95125.0,1250000.0,2016.0]),
 LabeledPoint(375370.0, [37.3095539,-121.896306,17108.0,0.0,3300.0,0.0,1926.0,960.0,5.0,2.0,1.0,95125.0,1250000.0,2010.0]),
 LabeledPoint(363802.0, [37.3081991,-121.895019,17108.0,0.0,8500.0,0.0,1989.0,2581.0,10.0,4.0,2.5,95125.0,1250000.0,1989.0]),
 LabeledPoint(840727.0, [37.3083563,-121.8948274,17108.0,0.0,8500.0,0.0,1988.0,2371.0,7.0,3.0,2.5,95125.0,1250000.0,2005.0]),
 LabeledPoint(129342.0, [37.30863,-121.8936671,17108.0,0.0,6300.0,0.0,2006.0,2122.0,6.0,3.0,3.0,95125.0,1250000.0,2011.0]),
 LabeledPoint(85780.0, [37.308529,-121.89352,17108.0,0.0,5000.0,0.0,1922.0,848.0,6.0,2.0,1.0,95125.0,1250000.0,1985.0]),
 LabeledPoint(115436.0, [37.3084411,-121.8934057,17108.0,0.0,5000.0,0.0,1939.0,1093.0,5.0,2.0,1.0,95125.0,1250000.0,2013.0]),


In [196]:
labeled_price = headers[15]
rf = RandomForest.trainRegressor(trained_data, categoricalFeaturesInfo={},
                                    numTrees=10, featureSubsetStrategy="auto",
                                    impurity='variance', maxDepth=4, maxBins=32)
predictions = rf.predict(tested_data.map(lambda x: x.features))
labelsAndPredictions = tested_data.map(lambda lp: lp.label).zip(predictions)
testMSE = labelsAndPredictions.map(lambda lp: (lp[0] - lp[1]) * (lp[0] - lp[1])).sum() /\
    float(tested_data.count())
print('Test Mean Squared Error = ' + str(testMSE))
print('Learned regression forest model:')
print(rf.toDebugString())

Test Mean Squared Error = 44586302602.6
Learned regression forest model:
TreeEnsembleModel regressor with 10 trees

  Tree 0:
    If (feature 13 <= 2002.0)
     If (feature 6 <= 1997.0)
      If (feature 13 <= 1997.0)
       If (feature 7 <= 1846.0)
        Predict: 77350.80647482014
       Else (feature 7 > 1846.0)
        Predict: 109331.77027027027
      Else (feature 13 > 1997.0)
       If (feature 12 <= 1170000.0)
        Predict: 155578.9058927001
       Else (feature 12 > 1170000.0)
        Predict: 238879.85239852397
     Else (feature 6 > 1997.0)
      If (feature 12 <= 815000.0)
       If (feature 7 <= 1037.0)
        Predict: 97278.2
       Else (feature 7 > 1037.0)
        Predict: 198437.0
      Else (feature 12 > 815000.0)
       If (feature 10 <= 4.0)
        Predict: 276231.6911764706
       Else (feature 10 > 4.0)
        Predict: 891022.0
    Else (feature 13 > 2002.0)
     If (feature 7 <= 2359.0)
      If (feature 12 <= 989000.0)
       If (feature 12 <= 670000.0)
 

In [202]:
# Save and load model
rf.save(sc, "myrfModel_v2")
sameModel = RandomForestModel.load(sc, "myrfModel_v2")

In [204]:
#prediction = rf.predict([60.0,37.307741,-121.89593,17108.0,0.0,8400.0,0.0,1938.0,1320.0,6.0,2.0,1.0,95125.0,713.0,753543.2])
prediction = sameModel.predict([37.307741,-121.89593,17108.0,0.0,13200.0,0.0,1900.0,1327.0,8.0,4.0,2.0,95125.0,12500.0, 713.0,2015])
print prediction

155741.726087
