# House price predict

### Import pyspark module

In [None]:
import numpy as np
import pandas as pd
import pyspark
import sys

In [1]:
# import SparkSession, 有 pip3 install pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as fn
from pyspark.sql.functions import *
from pyspark.sql.types import StringType, DoubleType, IntegerType

In [2]:
import pyspark.pandas as ps

In [65]:
# spark ml module
from pyspark.ml.regression import LinearRegression
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, StandardScaler

In [None]:
from pyspark.mllib.classification import SVMWithSGD, SVMModel
from pyspark.mllib.regression import LabeledPoint

### Set spark session

In [4]:
# Local mode
spark = SparkSession\
        .builder\
        .appName("price_predict")\
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/07/17 12:01:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
# standalone mode
spark = SparkSession\
        .builder\
        .master("spark://bdse187.example.com:7077")\
        .config('spark.cores.max','99')\
        .config('spark.executor.memory','1G')\
        .appName("team1gogogogo")\
        .getOrCreate()

In [5]:
# Check spark app name
spark.sparkContext.appName

'price_predict'

In [6]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)
ps.set_option("compute.default_index_type", "distributed")

In [7]:
# Spark version
spark.version

'3.3.0'

In [8]:
spark

## Data processing

### Load data

#### Local mode

In [9]:
df = spark.read.csv('file:///home/dtsurfer07/00_final_project_tutorial/dataset/all_combined_AB.csv', inferSchema=True, header=True)
df.createOrReplaceTempView("dfTable")



22/07/17 12:03:27 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


                                                                                

In [None]:
df.show(5)

#### standalone mode

In [None]:
df = spark.read.csv('hdfs://bdse197.example.com/tmp/all_combined_AB.csv', inferSchema=True, header=True)
df.createOrReplaceTempView("dfTable")

In [None]:
df.show(5)

### Inspect data

In [None]:
df.columns

In [None]:
df.count(),len(df.columns)

In [None]:
# print dataframe schema
df.printSchema()

In [None]:
df.describe().show()

In [None]:
# info about dataframe
df.summary().show()

### Select features

In [10]:
# select columns
# df = df.filter(coalesce('鄉鎮市區', '交易標的', '建物移轉總面積平方公尺','主建物面積', '建物現況格局-房', '車位總價元', '主要建材', '總價元').isNotNull())
features_df = df.select('城市代碼', '鄉鎮市區', '交易標的', '建物移轉總面積平方公尺', '主建物面積', '建物現況格局-房', '車位總價元', '總價元') # without '主要建材'
features_df.count(), len(features_df.columns)

                                                                                

(3678001, 8)

In [None]:
# print dataframe schema
features_df.printSchema()

In [11]:
features_df = features_df.filter(~col('交易標的').isin(['車位', '土地'])) # select target without garage & land
features_df = features_df.filter(~(col('主建物面積') == 0)) # remove NaN value
features_df.count(), len(features_df.columns)

                                                                                

(2398819, 8)

In [12]:
# reset price unit
features_df = features_df.withColumn("總價元", df.總價元/10000)
features_df = features_df.withColumn("車位總價元", df.車位總價元/10000)

In [13]:
# set datatype
features_df = features_df.withColumn("建物移轉總面積平方公尺", features_df["建物移轉總面積平方公尺"].cast(DoubleType()))
features_df = features_df.withColumn("主建物面積", features_df["主建物面積"].cast(DoubleType()))
features_df = features_df.withColumn("建物現況格局-房", features_df["建物現況格局-房"].cast(IntegerType()))
features_df = features_df.withColumn("車位總價元", features_df["車位總價元"].cast(IntegerType()))
features_df = features_df.withColumn("總價元", features_df["總價元"].cast(IntegerType()))

In [14]:
# print dataframe schema
features_df.printSchema()

root
 |-- 城市代碼: string (nullable = true)
 |-- 鄉鎮市區: string (nullable = true)
 |-- 交易標的: string (nullable = true)
 |-- 建物移轉總面積平方公尺: double (nullable = true)
 |-- 主建物面積: double (nullable = true)
 |-- 建物現況格局-房: integer (nullable = true)
 |-- 車位總價元: integer (nullable = true)
 |-- 總價元: integer (nullable = true)



In [None]:
features_df.show(10)

In [15]:
# drop NaN columns
features_df = features_df.dropna()
features_df.count()

                                                                                

2398819

In [16]:
# check nan values
features_df.select(
    [count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in features_df.columns]
   ).show()



+--------+--------+--------+----------------------+----------+---------------+----------+------+
|城市代碼|鄉鎮市區|交易標的|建物移轉總面積平方公尺|主建物面積|建物現況格局-房|車位總價元|總價元|
+--------+--------+--------+----------------------+----------+---------------+----------+------+
|       0|       0|       0|                     0|         0|              0|         0|     0|
+--------+--------+--------+----------------------+----------+---------------+----------+------+



                                                                                

In [None]:
features_df.describe().show()

In [None]:
# info about dataframe
features_df.summary().show()

### Select City to build model

In [17]:
# ABDEFH

def split_city(df, city_code):
    df = df.filter(col('城市代碼') == city_code)
    return df

In [45]:
city_df = split_city(features_df, "E")
city_df.count()

                                                                                

308365

### Feature engineering

In [19]:
def dummies_encoding(df, cols_list):
    for i in cols_list:
        categ = df.select(i).distinct().rdd.flatMap(lambda x:x).collect()
        exprs = [fn.when(fn.col(i) == cat,1).otherwise(0)\
                .alias(str(cat)) for cat in categ]
        df = df.select(exprs + df.columns)
    return df    

In [46]:
cols_list = ['鄉鎮市區', '交易標的'] # without '主要建材'
city_df = dummies_encoding(city_df, cols_list)

                                                                                

In [47]:
# delete a column
new_df = city_df.drop('鄉鎮市區', '交易標的', '城市代碼')

In [48]:
new_df.show(5)

+---------------+--------------------+----+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+--------+------+----------------------+----------+---------------+----------+------+
|房地(土地+建物)|房地(土地+建物)+車位|建物|燕巢區|梓官區|彌陀區|杉林區|路竹區|大社區|桃源區|大寮區|苓雅區|前鎮區|小港區|鳳山區|永安區|岡山區|林園區|甲仙區|仁武區|鼓山區|橋頭區|內門區|茄萣區|鹽埕區|美濃區|楠梓區|新興區|鳥松區|旗津區|六龜區|三民區|左營區|大樹區|前金區|湖內區|阿蓮區|旗山區|那瑪夏區|田寮區|建物移轉總面積平方公尺|主建物面積|建物現況格局-房|車位總價元|總價元|
+---------------+--------------------+----+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+--------+------+----------------------+----------+---------------+----------+------+
|              0|                   1|  

In [49]:
feature_cols = new_df.columns[:-1]
assembler = VectorAssembler(inputCols = feature_cols, outputCol = 'features')
new_df = assembler.transform(new_df)

In [50]:
# validate the presence of dense vectors 
new_df.printSchema()

root
 |-- 房地(土地+建物): integer (nullable = false)
 |-- 房地(土地+建物)+車位: integer (nullable = false)
 |-- 建物: integer (nullable = false)
 |-- 燕巢區: integer (nullable = false)
 |-- 梓官區: integer (nullable = false)
 |-- 彌陀區: integer (nullable = false)
 |-- 杉林區: integer (nullable = false)
 |-- 路竹區: integer (nullable = false)
 |-- 大社區: integer (nullable = false)
 |-- 桃源區: integer (nullable = false)
 |-- 大寮區: integer (nullable = false)
 |-- 苓雅區: integer (nullable = false)
 |-- 前鎮區: integer (nullable = false)
 |-- 小港區: integer (nullable = false)
 |-- 鳳山區: integer (nullable = false)
 |-- 永安區: integer (nullable = false)
 |-- 岡山區: integer (nullable = false)
 |-- 林園區: integer (nullable = false)
 |-- 甲仙區: integer (nullable = false)
 |-- 仁武區: integer (nullable = false)
 |-- 鼓山區: integer (nullable = false)
 |-- 橋頭區: integer (nullable = false)
 |-- 內門區: integer (nullable = false)
 |-- 茄萣區: integer (nullable = false)
 |-- 鹽埕區: integer (nullable = false)
 |-- 美濃區: integer (nullable = false)
 |-- 楠梓區: integer (

In [51]:
# view the details of dense vector
new_df.select('features').show(5,False)

+-----------------------------------------------+
|features                                       |
+-----------------------------------------------+
|(44,[1,27,40,41,43],[1.0,1.0,96.69,45.28,98.0])|
|(44,[1,27,40,41,43],[1.0,1.0,96.35,45.28,98.0])|
|(44,[1,27,40,41,43],[1.0,1.0,96.69,45.28,98.0])|
|(44,[0,31,40,41,42],[1.0,1.0,277.0,277.0,8.0]) |
|(44,[0,14,40,41,42],[1.0,1.0,47.77,47.77,2.0]) |
+-----------------------------------------------+
only showing top 5 rows



In [52]:
# only select the features and label column
model_df = new_df.select(['features', '總價元'])

In [53]:
model_df = model_df.withColumnRenamed('總價元', 'price')

In [None]:
# Reading for machine learning
model_df.show(10,False)

In [None]:
# size of model df
model_df.count(), len(model_df.columns)

### Split Data - Train & Test sets

In [82]:
# use Logistic Regression to train on the training set
train_df, test_df = model_df.randomSplit([0.8, 0.2])

### Build Linear Regression with lasso

In [None]:
import sys

from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.regression import StreamingLinearRegressionWithSGD

def parse(lp):
    label = float(lp[lp.find('(') + 1: lp.find(',')])
    vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
    return LabeledPoint(label, vec)

trainingData = model_df.textFileStream(sys.argv[1]).map(parse).cache()
testData = model_df.textFileStream(sys.argv[2]).map(parse)

### Build SMVModel 

In [None]:
from pyspark.mllib.classification import SVMWithSGD, SVMModel
from pyspark.mllib.regression import LabeledPoint

# Load and parse the data
def parsePoint(line):
    values = [float(x) for x in line.split(' ')]
    return LabeledPoint(values[0], values[1:])

data = sc.textFile("data/mllib/sample_svm_data.txt")
parsedData = data.map(parsePoint)

# Build the model
model = SVMWithSGD.train(parsedData, iterations=100)

# Evaluating the model on training data
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))

# Save and load model
model.save(sc, "target/tmp/pythonSVMWithSGDModel")
sameModel = SVMModel.load(sc, "target/tmp/pythonSVMWithSGDModel")

### Build Linear Regression Model 

In [84]:
reg = 0.05

In [85]:
# Build Linear Regression model 
lin_Reg=LinearRegression(labelCol='price', regParam=reg)

# fit the linear regression model on training data set 
lr = lin_Reg.fit(train_df)

lr_model = lr.evaluate(train_df)

# prediction = lr.transform(test_df)

# print(prediction.show(10))
print(lr_model.meanSquaredError)
print(lr_model.rootMeanSquaredError)
print(lr_model.r2)

[Stage 148:>                                                        (0 + 1) / 1]

+--------------------+-----+------------------+
|            features|price|        prediction|
+--------------------+-----+------------------+
|(44,[0,3,40,41],[...|  200| 134.1771808275414|
|(44,[0,3,40,41,42...|  360|335.11187290840104|
|(44,[0,3,40,41,42...|  195|341.07451596558815|
|(44,[0,3,40,41,42...|  303| 658.8070764897707|
|(44,[0,3,40,41,42...|  350| 660.0530019047053|
|(44,[0,3,40,41,42...|  265|364.12413614187864|
|(44,[0,3,40,41,42...|  115|476.65026030542566|
|(44,[0,3,40,41,42...|  260|462.80651548361215|
|(44,[0,3,40,41,42...|  636| 383.5249747458607|
|(44,[0,3,40,41,42...|  145|496.69370765322066|
+--------------------+-----+------------------+
only showing top 10 rows

None
3238436.6073515746
1799.565671864068
0.4051226421330256


                                                                                

In [71]:
# Build Linear Regression model 
lin_Reg=LinearRegression(labelCol='price', regParam=reg)

In [72]:
# fit the linear regression model on training data set 
lr_model=lin_Reg.fit(train_df)

                                                                                

In [None]:
lr_model.intercept

In [None]:
lr_model.coefficients

In [None]:
training_predictions=lr_model.evaluate(train_df)

In [None]:
training_predictions.meanSquaredError

In [None]:
training_predictions.rootMeanSquaredError

In [None]:
training_predictions.r2

### Evaluate Model

In [56]:
test_results = lr_model.evaluate(test_df)
test_results.residuals.show(10,False)

print(test_results.meanSquaredError)
print(test_results.rootMeanSquaredError)
print(test_results.r2)

[Stage 81:>                                                         (0 + 1) / 1]

+------------------+
|residuals         |
+------------------+
|68.20114685042199 |
|-45.37147891778909|
|-40.63308305285068|
|197.63666621244184|
|145.90280146375116|
|291.8375136295352 |
|289.00131170572394|
|-67.52838874698642|
|155.66043347701014|
|250.4678316797486 |
+------------------+
only showing top 10 rows

1795478.8895991864
1339.9548087899034
0.6114986163598133


                                                                                

In [None]:
# make predictions on test data 
test_results = lr_model.evaluate(test_df)

In [None]:
# view the residual errors based on predictions 
test_results.residuals.show(10,False)

In [None]:
# coefficient of determination value for model
test_results.r2

In [None]:
# RMSE
test_results.rootMeanSquaredError

In [None]:
# MSE
test_results.meanSquaredError