In [1]:
# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('basics').getOrCreate()

In [2]:
# Importing data which has a header. Schema is automatically configured.
sales_data = spark.read.csv('rollingsales_nyc2019.csv', header=True, inferSchema=True)

# Let's see the data. You'll notice nulls.
sales_data.limit(5).toPandas()

Unnamed: 0,ID,BOROUGH,NEIGHBORHOOD,BUILDING CLASS CATEGORY,TAX CLASS AT PRESENT,BLOCK,LOT,EASE-MENT,BUILDING CLASS AT PRESENT,ADDRESS,...,RESIDENTIAL UNITS,COMMERCIAL UNITS,TOTAL UNITS,LAND SQUARE FEET,GROSS SQUARE FEET,YEAR BUILT,TAX CLASS AT TIME OF SALE,BUILDING CLASS AT TIME OF SALE,SALE PRICE,SALE DATE
0,1,Manhattan,ALPHABET CITY,01 ONE FAMILY DWELLINGS,1,400,19,,A4,526 EAST 5TH STREET,...,1,0,1,1883,5200,1900,1,A4,6100000,3/12/2018
1,2,Manhattan,ALPHABET CITY,02 TWO FAMILY DWELLINGS,1,404,1,,B9,166 AVENUE A,...,2,0,2,1510,4520,1900,1,B9,0,29/11/2018
2,3,Manhattan,ALPHABET CITY,02 TWO FAMILY DWELLINGS,1,404,1,,B9,166 AVENUE A,...,2,0,2,1510,4520,1900,1,B9,0,29/11/2018
3,4,Manhattan,ALPHABET CITY,03 THREE FAMILY DWELLINGS,1,377,56,,C0,263 EAST 7TH STREET,...,3,0,3,2430,3600,1899,1,C0,6300000,30/04/2019
4,5,Manhattan,ALPHABET CITY,07 RENTALS - WALKUP APARTMENTS,2,373,19,,C7,332 EAST 4TH STREET,...,28,2,30,4651,17478,1920,2,C7,14000000,9/01/2019


## Data Understanding

In [3]:
sales_data.count()

79621

In [4]:
# For type, we can use print schema. 
# But wait! What if you want to change the format of the data? Maybe change age to an integer instead of long?
sales_data.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- BOROUGH: string (nullable = true)
 |-- NEIGHBORHOOD: string (nullable = true)
 |-- BUILDING CLASS CATEGORY: string (nullable = true)
 |-- TAX CLASS AT PRESENT: string (nullable = true)
 |-- BLOCK: integer (nullable = true)
 |-- LOT: integer (nullable = true)
 |-- EASE-MENT: string (nullable = true)
 |-- BUILDING CLASS AT PRESENT: string (nullable = true)
 |-- ADDRESS: string (nullable = true)
 |-- APARTMENT NUMBER: string (nullable = true)
 |-- ZIP CODE: integer (nullable = true)
 |-- RESIDENTIAL UNITS: integer (nullable = true)
 |-- COMMERCIAL UNITS: integer (nullable = true)
 |-- TOTAL UNITS: integer (nullable = true)
 |-- LAND SQUARE FEET: integer (nullable = true)
 |-- GROSS SQUARE FEET: string (nullable = true)
 |-- YEAR BUILT: integer (nullable = true)
 |-- TAX CLASS AT TIME OF SALE: integer (nullable = true)
 |-- BUILDING CLASS AT TIME OF SALE: string (nullable = true)
 |--  SALE PRICE : long (nullable = true)
 |-- SALE DATE: string (

In [5]:
# We can use the describe method get some general statistics on our data too. Remember to show the DataFrame!
# But what about data type?
sales_data.describe().toPandas()

Unnamed: 0,summary,ID,BOROUGH,NEIGHBORHOOD,BUILDING CLASS CATEGORY,TAX CLASS AT PRESENT,BLOCK,LOT,EASE-MENT,BUILDING CLASS AT PRESENT,...,RESIDENTIAL UNITS,COMMERCIAL UNITS,TOTAL UNITS,LAND SQUARE FEET,GROSS SQUARE FEET,YEAR BUILT,TAX CLASS AT TIME OF SALE,BUILDING CLASS AT TIME OF SALE,SALE PRICE,SALE DATE
0,count,79621.0,79621,79621,79621,79621.0,79621.0,79621.0,0.0,79621,...,44540.0,44540.0,44540.0,46389.0,62113,79621.0,79621.0,79621,79621.0,79621
1,mean,39811.0,,,,1.6196742201784031,4401.547581668153,354.493940040944,,,...,2.83926807364167,0.3271890435563538,3.166457117198024,3914.4696371984737,3760.974449381762,1807.3508998882205,1.624031348513583,,1271078.415091496,
2,stddev,22984.74722868769,,,,0.8279772307173136,3665.2890711503296,636.3318060565701,,,...,13.102490530106984,6.935774027550197,14.92577704896296,15748.16853107207,24267.49379039857,508.7754976148334,0.8025203518095368,,12225343.545780145,
3,min,1.0,Bronx,1 BEACH,01 ONE FAMILY DWELLINGS,1.0,1.0,1.0,,A0,...,0.0,0.0,0.0,0.0,########,0.0,1.0,A0,0.0,1-Apr
4,max,79621.0,Staten Island,WYCKOFF HEIGHTS,49 CONDO WAREHOUSES/FACTORY/INDUS,4.0,16350.0,9057.0,,Z9,...,1327.0,1132.0,1348.0,1845000.0,999,2019.0,4.0,Z9,2155000000.0,9/12/2018


## Data Preparation

In [6]:
#Rename Column Header
for col in sales_data.columns:
    sales_data = sales_data.withColumnRenamed(col,col.strip().lower().replace(" ","_"))
sales_data.columns

['id',
 'borough',
 'neighborhood',
 'building_class_category',
 'tax_class_at_present',
 'block',
 'lot',
 'ease-ment',
 'building_class_at_present',
 'address',
 'apartment_number',
 'zip_code',
 'residential_units',
 'commercial_units',
 'total_units',
 'land_square_feet',
 'gross_square_feet',
 'year_built',
 'tax_class_at_time_of_sale',
 'building_class_at_time_of_sale',
 'sale_price',
 'sale_date']

In [7]:
#Select relevant data
sales_data = sales_data.select([c for c in sales_data.columns if c not in ["id","ease-ment", "apartment_number", "residential_units", "commercial_units", "total_units", "land_square_feet", "address","sale_date","neighborhood","zip_code","block","lot"]])
sales_data.columns

['borough',
 'building_class_category',
 'tax_class_at_present',
 'building_class_at_present',
 'gross_square_feet',
 'year_built',
 'tax_class_at_time_of_sale',
 'building_class_at_time_of_sale',
 'sale_price']

In [8]:
#Cleaning data with abnormal values
sales_data = sales_data[sales_data.sale_price > 100]
sales_data = sales_data[sales_data.year_built > 1000]
sales_data.describe().toPandas()

Unnamed: 0,summary,borough,building_class_category,tax_class_at_present,building_class_at_present,gross_square_feet,year_built,tax_class_at_time_of_sale,building_class_at_time_of_sale,sale_price
0,count,51294,51294,51294.0,51294,37470.0,51294.0,51294.0,51294,51294.0
1,mean,,,1.59226184207755,,3496.415318921804,1951.2193044020744,1.6060552891176356,,1829846.5367099463
2,stddev,,,0.7278410983232105,,23262.21439767196,34.8796477611941,0.71153272194856,,15004439.872354891
3,min,Bronx,01 ONE FAMILY DWELLINGS,1.0,A0,0.0,1030.0,1.0,A0,125.0
4,max,Staten Island,49 CONDO WAREHOUSES/FACTORY/INDUS,4.0,Z9,999.0,2019.0,4.0,Z9,2155000000.0


In [9]:
#nConstructing new column

import pyspark.sql.functions as F

sales_data = sales_data.withColumn(
    'price_category',
    F.when(F.col('sale_price').between(0, 250000), 'Low')\
    .when(F.col('sale_price').between(250001,500000), 'Middle-Low')\
    .when(F.col('sale_price').between(500001,750000), 'Middle-High')\
    .otherwise('High')
)
sales_data.limit(5).toPandas()

Unnamed: 0,borough,building_class_category,tax_class_at_present,building_class_at_present,gross_square_feet,year_built,tax_class_at_time_of_sale,building_class_at_time_of_sale,sale_price,price_category
0,Manhattan,01 ONE FAMILY DWELLINGS,1,A4,5200,1900,1,A4,6100000,High
1,Manhattan,03 THREE FAMILY DWELLINGS,1,C0,3600,1899,1,C0,6300000,High
2,Manhattan,07 RENTALS - WALKUP APARTMENTS,2,C7,17478,1920,2,C7,14000000,High
3,Manhattan,07 RENTALS - WALKUP APARTMENTS,2A,C2,6294,1900,2,C2,872500,High
4,Manhattan,07 RENTALS - WALKUP APARTMENTS,2B,C7,14347,1920,2,C7,1550000,High


In [10]:
#Select relevant data
sales_manhattan = sales_data.filter(sales_data['borough'] == "Manhattan")
sales_bronx = sales_data.filter(sales_data['borough'] == "Bronx")
sales_queens = sales_data.filter(sales_data['borough'] == "Queens")
sales_brooklyn = sales_data.filter(sales_data['borough'] == "Brooklyn")
sales_staten_island = sales_data.filter(sales_data['borough'] == "Staten Island")

In [11]:
from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame

def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

sales_data = unionAll(sales_manhattan, sales_bronx, sales_queens, sales_brooklyn, sales_staten_island)

## Data Transformation

In [12]:
from pyspark.ml.feature import StringIndexer

borough_indexer = StringIndexer(inputCol="borough", outputCol="boroughIndex")
building_class_category_indexer = StringIndexer(inputCol="building_class_category", outputCol="building_class_categoryIndex")
tax_class_at_present_indexer = StringIndexer(inputCol="tax_class_at_present", outputCol="tax_class_at_presentIndex")
building_class_at_present_indexer = StringIndexer(inputCol="building_class_at_present", outputCol="building_class_at_presentIndex")
building_class_at_time_of_sale_indexer = StringIndexer(inputCol="building_class_at_time_of_sale", outputCol="building_class_at_time_of_saleIndex")
price_category_indexer = StringIndexer(inputCol="price_category", outputCol="price_categoryIndex")


sales_data = borough_indexer.fit(sales_data).transform(sales_data)
sales_data = building_class_category_indexer.fit(sales_data).transform(sales_data)
sales_data = tax_class_at_present_indexer.fit(sales_data).transform(sales_data)
sales_data = building_class_at_present_indexer.fit(sales_data).transform(sales_data)
sales_data = building_class_at_time_of_sale_indexer.fit(sales_data).transform(sales_data)
sales_data = price_category_indexer.fit(sales_data).transform(sales_data)

sales_data.limit(5).toPandas()

Unnamed: 0,borough,building_class_category,tax_class_at_present,building_class_at_present,gross_square_feet,year_built,tax_class_at_time_of_sale,building_class_at_time_of_sale,sale_price,price_category,boroughIndex,building_class_categoryIndex,tax_class_at_presentIndex,building_class_at_presentIndex,building_class_at_time_of_saleIndex,price_categoryIndex
0,Manhattan,01 ONE FAMILY DWELLINGS,1,A4,5200,1900,1,A4,6100000,High,2.0,0.0,0.0,28.0,28.0,0.0
1,Manhattan,03 THREE FAMILY DWELLINGS,1,C0,3600,1899,1,C0,6300000,High,2.0,4.0,0.0,6.0,6.0,0.0
2,Manhattan,07 RENTALS - WALKUP APARTMENTS,2,C7,17478,1920,2,C7,14000000,High,2.0,6.0,1.0,26.0,26.0,0.0
3,Manhattan,07 RENTALS - WALKUP APARTMENTS,2A,C2,6294,1900,2,C2,872500,High,2.0,6.0,4.0,20.0,20.0,0.0
4,Manhattan,07 RENTALS - WALKUP APARTMENTS,2B,C7,14347,1920,2,C7,1550000,High,2.0,6.0,6.0,26.0,26.0,0.0


In [13]:
sales_data = sales_data.select([c for c in sales_data.columns if c not in ["borough","building_class_category", "tax_class_at_present", "building_class_at_present", "building_class_at_time_of_sale", "price_category"]])
sales_data.limit(5).toPandas()

Unnamed: 0,gross_square_feet,year_built,tax_class_at_time_of_sale,sale_price,boroughIndex,building_class_categoryIndex,tax_class_at_presentIndex,building_class_at_presentIndex,building_class_at_time_of_saleIndex,price_categoryIndex
0,5200,1900,1,6100000,2.0,0.0,0.0,28.0,28.0,0.0
1,3600,1899,1,6300000,2.0,4.0,0.0,6.0,6.0,0.0
2,17478,1920,2,14000000,2.0,6.0,1.0,26.0,26.0,0.0
3,6294,1900,2,872500,2.0,6.0,4.0,20.0,20.0,0.0
4,14347,1920,2,1550000,2.0,6.0,6.0,26.0,26.0,0.0


In [14]:
from pyspark.sql.types import DoubleType

sales_data = sales_data.withColumn("gross_square_feet", sales_data["gross_square_feet"].cast(DoubleType()))

sales_data = sales_data.na.fill(0)
sales_data.describe().toPandas()

Unnamed: 0,summary,gross_square_feet,year_built,tax_class_at_time_of_sale,sale_price,boroughIndex,building_class_categoryIndex,tax_class_at_presentIndex,building_class_at_presentIndex,building_class_at_time_of_saleIndex,price_categoryIndex
0,count,51294.0,51294.0,51294.0,51294.0,51294.0,51294.0,51294.0,51294.0,51294.0,51294.0
1,mean,2554.113190626584,1951.2193044020744,1.6060552891176356,1829846.5367099463,1.4013724802121106,3.009825710609428,0.8271727687448824,7.362264592349982,7.441104222716107,0.9764494872694662
2,stddev,19942.343147189076,34.87964776119363,0.7115327219485587,15004439.872354835,1.2831496693129358,4.20362495818914,1.180136408414844,12.08824222009273,12.241198854912469,1.0202132256759378
3,min,0.0,1030.0,1.0,125.0,0.0,0.0,0.0,0.0,0.0,0.0
4,max,1741458.0,2019.0,4.0,2155000000.0,4.0,43.0,9.0,154.0,154.0,3.0


In [15]:
new_sales_data = sales_data.limit(10000)

new_sales_data.describe().toPandas()

Unnamed: 0,summary,gross_square_feet,year_built,tax_class_at_time_of_sale,sale_price,boroughIndex,building_class_categoryIndex,tax_class_at_presentIndex,building_class_at_presentIndex,building_class_at_time_of_saleIndex,price_categoryIndex
0,count,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0
1,mean,4008.6602,1955.9164,2.0635,4616679.3983,2.0,3.5688,1.1863,6.0063,6.0364,0.4516
2,stddev,37154.040672136354,37.97828124105419,0.4295185660944832,31952652.71251124,0.0,4.350121712767537,0.7609534907085316,13.901740399986412,14.008372442573387,0.7766065617641775
3,min,0.0,1800.0,1.0,128.0,2.0,0.0,0.0,0.0,0.0,0.0
4,max,1300727.0,2017.0,4.0,2155000000.0,2.0,43.0,8.0,150.0,153.0,3.0


In [16]:
from pyspark.ml.feature import VectorAssembler, VectorIndexer, OneHotEncoder, StringIndexer
from pyspark.mllib.linalg import Matrices, Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.stat import Statistics
import pandas as pd

fi_assembler = VectorAssembler(
    inputCols=[c for c in new_sales_data.columns if c not in ["sale_price"]], 
    outputCol="features"
)

fi_data = fi_assembler.transform(new_sales_data).select("sale_price", "features")

fi_rddMap = fi_data.rdd.map(lambda x: LabeledPoint(x["sale_price"], Vectors.dense(x["features"])))

fi_chiResult = Statistics.chiSqTest(fi_rddMap)

fi_result = pd.DataFrame(
    [[v.pValue, (1 - v.pValue)] for v in fi_chiResult],
    index = [c for c in sales_data.columns if c not in ["sale_price"]],
    columns = ["pval", "importance"]
)

# Show chi-squared test results
fi_result.sort_values("importance", ascending = False)

Unnamed: 0,pval,importance
gross_square_feet,0.0,1.0
tax_class_at_time_of_sale,0.0,1.0
building_class_categoryIndex,0.0,1.0
tax_class_at_presentIndex,0.0,1.0
building_class_at_presentIndex,0.0,1.0
building_class_at_time_of_saleIndex,0.0,1.0
price_categoryIndex,0.0,1.0
year_built,1.0,0.0
boroughIndex,1.0,0.0


In [17]:
#Select relevant data
new_sales_data = new_sales_data.select([c for c in sales_data.columns if c not in ["year_built","borough"]])
new_sales_data.columns

['gross_square_feet',
 'tax_class_at_time_of_sale',
 'sale_price',
 'boroughIndex',
 'building_class_categoryIndex',
 'tax_class_at_presentIndex',
 'building_class_at_presentIndex',
 'building_class_at_time_of_saleIndex',
 'price_categoryIndex']

In [18]:
from pyspark.sql.functions import log
new_sales_data.withColumn("log2", log("sale_price")).show()

+-----------------+-------------------------+----------+------------+----------------------------+-------------------------+------------------------------+-----------------------------------+-------------------+------------------+
|gross_square_feet|tax_class_at_time_of_sale|sale_price|boroughIndex|building_class_categoryIndex|tax_class_at_presentIndex|building_class_at_presentIndex|building_class_at_time_of_saleIndex|price_categoryIndex|              log2|
+-----------------+-------------------------+----------+------------+----------------------------+-------------------------+------------------------------+-----------------------------------+-------------------+------------------+
|           5200.0|                        1|   6100000|         2.0|                         0.0|                      0.0|                          28.0|                               28.0|                0.0|15.623799329143539|
|           3600.0|                        1|   6300000|         2.0|       

## Data Mining

### Linear Regression

In [19]:
splits = fi_data.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

train_df.limit(5).toPandas()

Unnamed: 0,sale_price,features
0,250,"[0.0, 1910.0, 2.0, 2.0, 5.0, 1.0, 8.0, 8.0, 3.0]"
1,250,"[0.0, 1910.0, 2.0, 2.0, 5.0, 1.0, 8.0, 8.0, 3.0]"
2,250,"[0.0, 1910.0, 2.0, 2.0, 5.0, 1.0, 8.0, 8.0, 3.0]"
3,363,"[0.0, 1926.0, 2.0, 2.0, 1.0, 1.0, 0.0, 0.0, 3.0]"
4,1000,"[5206.0, 1880.0, 4.0, 2.0, 10.0, 2.0, 29.0, 29..."


In [20]:
# If you're getting an error with numpy, please type 'sudo pip install numpy --user' into the EC2 console.
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(labelCol='sale_price')

# Fit the model to the data.
lrModel = lr.fit(train_df)

# Print the coefficients and intercept for linear regression.
print("Coefficients: {} Intercept: {}".format(lrModel.coefficients,lrModel.intercept))

Coefficients: [667.1382986083514,-10838.294710378104,4816934.00157315,0.0,-133098.9238607249,501640.5733409745,-1144652.3689519456,1047236.3223039263,-1894323.0742035545] Intercept: 14597198.812944036


In [21]:
# Let's evaluate the model against the test data.
test_results = lrModel.evaluate(test_df)

# Interesting results! This shows the difference between the predicted value and the test data.
test_results.predictions.show()

# Let's get some evaluation metrics (as discussed in the previous linear regression notebook).
print("RSME: {}".format(test_results.rootMeanSquaredError))

+----------+--------------------+--------------------+
|sale_price|            features|          prediction|
+----------+--------------------+--------------------+
|       128|[21455.0,2016.0,4...| 1.688888076255973E7|
|       250|[0.0,1910.0,2.0,2...|  -3096227.722489316|
|       263|[0.0,1926.0,2.0,2...| -1957916.3692283053|
|      1000|[2250.0,1900.0,2....|  11575.919495232403|
|      1800|[13200.0,1920.0,2...|  3715027.1085120738|
|      2000|[4179.0,1920.0,1....|  -5407283.277466152|
|      7500|[0.0,1910.0,2.0,2...|  -3096227.722489316|
|      8669|[0.0,1960.0,2.0,2...|  -4657618.386691209|
|     15000|[0.0,1965.0,2.0,2...|  -2380609.862933051|
|     15000|[40.0,2005.0,4.0,...|   2162889.352010537|
|     19475|[7437.0,1920.0,2....|  2378511.9603370167|
|     19475|[7485.0,1920.0,2....|   2410534.598670218|
|     20000|[0.0,1920.0,2.0,2...|  -3204610.669593092|
|     20000|[0.0,2007.0,2.0,2...| -2835818.2407689355|
|     24500|[0.0,2007.0,2.0,2...| -2835818.2407689355|
|     3000

In [22]:
# We can also get the R2 value. 
print("R2: {}".format(test_results.r2))

R2: 0.5572102063265285


In [23]:
test_results.predictions.toPandas().to_csv("linear_regression.csv")

### Random Forest

In [24]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

featureIndexer = VectorIndexer(inputCol="features", \
                               outputCol="indexedFeatures",\
                               maxCategories=4).fit(fi_data)

rf_data = featureIndexer.transform(fi_data)

from pyspark.sql.functions import col

rf_data = rf_data.select(col("features"),col("sale_price").alias("label"),col("indexedFeatures"))
rf_data.show(5,True)

+--------------------+--------+--------------------+
|            features|   label|     indexedFeatures|
+--------------------+--------+--------------------+
|[5200.0,1900.0,1....| 6100000|[5200.0,1900.0,0....|
|[3600.0,1899.0,1....| 6300000|[3600.0,1899.0,0....|
|[17478.0,1920.0,2...|14000000|[17478.0,1920.0,1...|
|[6294.0,1900.0,2....|  872500|[6294.0,1900.0,1....|
|[14347.0,1920.0,2...| 1550000|[14347.0,1920.0,1...|
+--------------------+--------+--------------------+
only showing top 5 rows



In [25]:
# Split the data into training and test sets (40% held out for testing)
(trainingData, testData) = rf_data.randomSplit([0.7, 0.3])

In [26]:
# Import LinearRegression class
from pyspark.ml.regression import RandomForestRegressor

# Define LinearRegression algorithm
rf = RandomForestRegressor() # featuresCol="indexedFeatures",numTrees=2, maxDepth=2, seed=42

In [27]:
# Chain indexer and tree in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, rf])
model = pipeline.fit(trainingData)

In [28]:
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("features","label", "prediction").show(5)

+--------------------+-------+------------------+
|            features|  label|        prediction|
+--------------------+-------+------------------+
|[0.0,1850.0,2.0,2...| 510000|  1032523.28409393|
|[0.0,1850.0,2.0,2...| 585000|  1032523.28409393|
|[0.0,1869.0,2.0,2...|2270000|2076809.4811932687|
|[0.0,1880.0,2.0,2...| 534581|1151243.4489521196|
|[0.0,1885.0,2.0,2...|2790000|2110265.6223577373|
+--------------------+-------+------------------+
only showing top 5 rows



In [29]:
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("RMSE: %g" % rmse)

RMSE: 2.22351e+07


In [30]:
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
print("R2: %g" % r2)

R2: 0.274454


In [31]:
predictions.select("features","label", "prediction").toPandas().to_csv("random_forest.csv")

### Decision Tree

In [32]:
from pyspark.ml.regression import DecisionTreeRegressor

# Train a DecisionTree model.
dt = DecisionTreeRegressor(featuresCol="indexedFeatures")

# Chain indexer and tree in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, dt])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

+------------------+-------+--------------------+
|        prediction|  label|            features|
+------------------+-------+--------------------+
|  626346.806072478| 510000|[0.0,1850.0,2.0,2...|
|  626346.806072478| 585000|[0.0,1850.0,2.0,2...|
|1975459.6357499266|2270000|[0.0,1869.0,2.0,2...|
|  626346.806072478| 534581|[0.0,1880.0,2.0,2...|
|1975459.6357499266|2790000|[0.0,1885.0,2.0,2...|
+------------------+-------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 2.48811e+07


In [33]:
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
print("R2: %g" % r2)

R2: 0.0914965


In [34]:
predictions.select("features","label", "prediction").toPandas().to_csv("decision_tree.csv")

# Iteration 2 starts here

In [35]:
low_q, high_q = fi_data.approxQuantile("sale_price", [0.20,0.80], 0)
sales_data_new = fi_data.filter(new_sales_data.sale_price.between(low_q, high_q))
sales_data_new.describe().show()

+-------+------------------+
|summary|        sale_price|
+-------+------------------+
|  count|              6029|
|   mean|1440166.2539392933|
| stddev| 717677.3491204829|
|    min|            610000|
|    max|           3375000|
+-------+------------------+



### Linear Regression

In [36]:
splits = sales_data_new.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

# If you're getting an error with numpy, please type 'sudo pip install numpy --user' into the EC2 console.
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(labelCol='sale_price')

# Fit the model to the data.
lrModel = lr.fit(train_df)

# Print the coefficients and intercept for linear regression.
print("Coefficients: {} Intercept: {}".format(lrModel.coefficients,lrModel.intercept))

# Let's evaluate the model against the test data.
test_results = lrModel.evaluate(test_df)

# Interesting results! This shows the difference between the predicted value and the test data.
test_results.predictions.show()

# Let's get some evaluation metrics (as discussed in the previous linear regression notebook).
print("RSME: {}".format(test_results.rootMeanSquaredError))

# We can also get the R2 value. 
print("R2: {}".format(test_results.r2))

Coefficients: [0.6746253466258367,1522.267735519173,-204597.7506589319,0.0,2360.314642150115,123481.40098558154,-20783.75420351618,28140.139312700874,-890489.427799769] Intercept: -1148213.4354138344
+----------+--------------------+-----------------+
|sale_price|            features|       prediction|
+----------+--------------------+-----------------+
|    610000|[0.0,1910.0,2.0,2...|585474.7259378843|
|    610000|[0.0,1910.0,2.0,2...|585474.7259378843|
|    610000|[0.0,1924.0,2.0,2...|606786.4742351528|
|    610000|[0.0,1929.0,2.0,2...|614397.8129127487|
|    610000|[0.0,1940.0,2.0,2...|631142.7580034593|
|    610000|[0.0,1952.0,2.0,2...|649409.9708296894|
|    610000|[0.0,1961.0,2.0,2...|663110.3804493621|
|    610000|[0.0,1963.0,2.0,2...|666154.9159204005|
|    610000|[0.0,1964.0,2.0,2...|782192.7072125217|
|    610000|[0.0,1970.0,2.0,2...|676810.7900690348|
|    610000|[0.0,2000.0,2.0,2...| 836994.345691212|
|    612500|[0.0,1974.0,2.0,2...|682899.8610111112|
|    613500|[0.0,192

In [37]:
test_results.predictions.toPandas().to_csv("linear_regression2.csv")

### Random Forest

In [38]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

featureIndexer = VectorIndexer(inputCol="features", \
                               outputCol="indexedFeatures",\
                               maxCategories=4).fit(sales_data_new)

rf_data = featureIndexer.transform(sales_data_new)

from pyspark.sql.functions import col

rf_data = rf_data.select(col("features"),col("sale_price").alias("label"),col("indexedFeatures"))
rf_data.show(5,True)

+--------------------+-------+--------------------+
|            features|  label|     indexedFeatures|
+--------------------+-------+--------------------+
|[6294.0,1900.0,2....| 872500|[6294.0,1900.0,1....|
|[14347.0,1920.0,2...|1550000|[14347.0,1920.0,1...|
|[0.0,1928.0,2.0,2...| 970000|[0.0,1928.0,1.0,0...|
|[0.0,1928.0,2.0,2...| 699000|[0.0,1928.0,1.0,0...|
|[0.0,1920.0,2.0,2...| 915600|[0.0,1920.0,1.0,0...|
+--------------------+-------+--------------------+
only showing top 5 rows



In [39]:
# Split the data into training and test sets (40% held out for testing)
(trainingData, testData) = rf_data.randomSplit([0.7, 0.3])

In [40]:
# Import LinearRegression class
from pyspark.ml.regression import RandomForestRegressor

# Define LinearRegression algorithm
rf = RandomForestRegressor() # featuresCol="indexedFeatures",numTrees=2, maxDepth=2, seed=42

In [41]:
# Chain indexer and tree in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, rf])
model = pipeline.fit(trainingData)

In [42]:
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("features","label", "prediction").show(5)

+--------------------+-------+------------------+
|            features|  label|        prediction|
+--------------------+-------+------------------+
|[0.0,1850.0,2.0,2...| 875000|1228213.2650120142|
|[0.0,1850.0,2.0,2...|1475000| 1306351.483708536|
|[0.0,1880.0,2.0,2...|3075000|1445744.8083968437|
|[0.0,1880.0,2.0,2...|1000000| 1306351.483708536|
|[0.0,1889.0,2.0,2...| 960000|1445744.8083968437|
+--------------------+-------+------------------+
only showing top 5 rows



In [43]:
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("RMSE: %g" % rmse)

RMSE: 518412


In [44]:
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
print("R2: %g" % r2)

R2: 0.482967


In [46]:
predictions.select("features","label", "prediction").toPandas().to_csv("random_forest2.csv")