In [1]:
import pyspark
import sys

import pyspark.sql.functions as fn

from pyspark.sql import SparkSession

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler, StringIndexer

In [157]:
spark.stop()

In [2]:
spark.sparkContext.appName

'PySparkShell'

In [2]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)

In [4]:
sys.version

'3.8.5 (default, Jul 28 2020, 12:59:40) \n[GCC 9.3.0]'

In [5]:
spark.version

'3.0.1'

# Load Data

In [3]:
%%time
# Standalone Mode讀取HDFS檔案
df = spark.read.csv('hdfs://bdse120.example.org/dataset/HouseVarCoFinal.csv', header=True, inferSchema=True)
df

CPU times: user 0 ns, sys: 3.75 ms, total: 3.75 ms
Wall time: 8.85 s


DataFrame[Address: string, Area: string, St: string, 交易年月日: int, year: int, 交易標的: string, 交易筆棟數: string, 建物型態: string, 建物現況格局.廳: int, 建物現況格局.房: int, 建物現況格局.衛: int, 建物現況格局.隔間: string, 有無管理組織: string, 總價元: double, 總坪數: double, 單價元坪: double, 車位數: int, floor: int, EightCount: int, ParkCount: int, FuneralCount: int, GasCount: int, CrimeCount: int, PoliceCount: int, busCount: int, subwayCount: int, govCount: int, clinicCount: int, hospitalCount: int, pharmacyCount: int, fireareaCount: int, firewayCount: int, martCount: int, mallCount: int, cinemaCount: int, 土地面積: double, 總人口數: int, 男性人數: int, 女性人數: int, 人口密度: int, 每戶人數: double, 每戶成年人數: double, 所得收入總計: int, 可支配所得: int, 消費支出: int, 儲蓄: int, 所得總額: int, Lontitude: double, Latitude: double]

In [11]:
%%time
# Standalone Mode
df.describe().show()

+-------+-----------------------------+------+------+-----------------+------------------+--------------------+---------------+--------+------------------+------------------+------------------+-----------------+------------+--------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+--------------------+
|summary|                      Address|  Area|    St|       交易年月日|              year|      

In [12]:
%%time
df.printSchema()

root
 |-- Address: string (nullable = true)
 |-- Area: string (nullable = true)
 |-- St: string (nullable = true)
 |-- 交易年月日: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- 交易標的: string (nullable = true)
 |-- 交易筆棟數: string (nullable = true)
 |-- 建物型態: string (nullable = true)
 |-- 建物現況格局.廳: integer (nullable = true)
 |-- 建物現況格局.房: integer (nullable = true)
 |-- 建物現況格局.衛: integer (nullable = true)
 |-- 建物現況格局.隔間: string (nullable = true)
 |-- 有無管理組織: string (nullable = true)
 |-- 總價元: double (nullable = true)
 |-- 總坪數: double (nullable = true)
 |-- 單價元坪: double (nullable = true)
 |-- 車位數: integer (nullable = true)
 |-- floor: integer (nullable = true)
 |-- EightCount: integer (nullable = true)
 |-- ParkCount: integer (nullable = true)
 |-- FuneralCount: integer (nullable = true)
 |-- GasCount: integer (nullable = true)
 |-- CrimeCount: integer (nullable = true)
 |-- PoliceCount: integer (nullable = true)
 |-- busCount: integer (nullable = true)
 |-- subwayCount: int

In [13]:
# 句點會造成AnalysisException，所以要修改欄位名稱
# table.columns，如果欄位名稱是中文也會導致AnalysisException
dfDrop = df
dfDrop = dfDrop.withColumnRenamed("建物現況格局.廳","廳數")
dfDrop = dfDrop.withColumnRenamed("建物現況格局.房","房數")
dfDrop = dfDrop.withColumnRenamed("建物現況格局.衛","衛數")
dfDrop = dfDrop.withColumnRenamed("建物現況格局.隔間","隔間數")
dfDrop = dfDrop.withColumnRenamed("單價元坪","unitPrice")

dfDrop

DataFrame[Address: string, Area: string, St: string, 交易年月日: int, year: int, 交易標的: string, 交易筆棟數: string, 建物型態: string, 廳數: int, 房數: int, 衛數: int, 隔間數: string, 有無管理組織: string, 總價元: double, 總坪數: double, unitPrice: double, 車位數: int, floor: int, EightCount: int, ParkCount: int, FuneralCount: int, GasCount: int, CrimeCount: int, PoliceCount: int, busCount: int, subwayCount: int, govCount: int, clinicCount: int, hospitalCount: int, pharmacyCount: int, fireareaCount: int, firewayCount: int, martCount: int, mallCount: int, cinemaCount: int, 土地面積: double, 總人口數: int, 男性人數: int, 女性人數: int, 人口密度: int, 每戶人數: double, 每戶成年人數: double, 所得收入總計: int, 可支配所得: int, 消費支出: int, 儲蓄: int, 所得總額: int, Lontitude: double, Latitude: double]

In [14]:
dfDrop2 = dfDrop.select("廳數","房數","衛數","unitPrice","floor","ParkCount","GasCount","govCount","hospitalCount","firewayCount","martCount","每戶人數","所得總額")
dfDrop2.show()

+----+----+----+----------------+-----+---------+--------+--------+-------------+------------+---------+--------+--------+
|廳數|房數|衛數|       unitPrice|floor|ParkCount|GasCount|govCount|hospitalCount|firewayCount|martCount|每戶人數|所得總額|
+----+----+----+----------------+-----+---------+--------+--------+-------------+------------+---------+--------+--------+
|   2|   5|   3|246580.260178484|    7|        8|       7|      33|            4|          18|       11|    2.89| 1720988|
|   2|   2|   2|713658.438145298|    7|        8|       7|      33|            4|          18|       11|    2.89| 1720988|
|   0|   0|   0|262043.416225031|    7|        8|       7|      33|            4|          18|       11|    2.89| 1720988|
|   1|   1|   1|743902.890773758|    7|        8|       7|      33|            4|          18|       11|    2.89| 1720988|
|   0|   0|   0| 1343112.5819135|    5|        8|       7|      33|            4|          18|       11|    2.89| 1720988|
|   1|   1|   1|709845.7857740

In [15]:
yDf = dfDrop2.select("unitPrice")
xDf = dfDrop2.select("廳數","房數","衛數","floor","ParkCount","GasCount","govCount","hospitalCount","firewayCount","martCount","每戶人數","所得總額")

# Train and Test Data

In [16]:
dfDrop2.columns
xDf.columns

['廳數',
 '房數',
 '衛數',
 'floor',
 'ParkCount',
 'GasCount',
 'govCount',
 'hospitalCount',
 'firewayCount',
 'martCount',
 '每戶人數',
 '所得總額']

In [17]:
# vectorize all numerical columns into a single feature column
feature_cols = xDf.columns
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
dfDrop2 = assembler.transform(dfDrop2)

In [18]:
# validate the presence of dense vectors 
dfDrop2.printSchema()

root
 |-- 廳數: integer (nullable = true)
 |-- 房數: integer (nullable = true)
 |-- 衛數: integer (nullable = true)
 |-- unitPrice: double (nullable = true)
 |-- floor: integer (nullable = true)
 |-- ParkCount: integer (nullable = true)
 |-- GasCount: integer (nullable = true)
 |-- govCount: integer (nullable = true)
 |-- hospitalCount: integer (nullable = true)
 |-- firewayCount: integer (nullable = true)
 |-- martCount: integer (nullable = true)
 |-- 每戶人數: double (nullable = true)
 |-- 所得總額: integer (nullable = true)
 |-- features: vector (nullable = true)



In [19]:
# view the details of dense vector
dfDrop2.select('features').show(5,False)

+-----------------------------------------------------------+
|features                                                   |
+-----------------------------------------------------------+
|[2.0,5.0,3.0,7.0,8.0,7.0,33.0,4.0,18.0,11.0,2.89,1720988.0]|
|[2.0,2.0,2.0,7.0,8.0,7.0,33.0,4.0,18.0,11.0,2.89,1720988.0]|
|[0.0,0.0,0.0,7.0,8.0,7.0,33.0,4.0,18.0,11.0,2.89,1720988.0]|
|[1.0,1.0,1.0,7.0,8.0,7.0,33.0,4.0,18.0,11.0,2.89,1720988.0]|
|[0.0,0.0,0.0,5.0,8.0,7.0,33.0,4.0,18.0,11.0,2.89,1720988.0]|
+-----------------------------------------------------------+
only showing top 5 rows



In [20]:
# only select the features and label column
data = dfDrop2.select(['features', 'unitPrice'])
data.show(10)

+--------------------+----------------+
|            features|       unitPrice|
+--------------------+----------------+
|[2.0,5.0,3.0,7.0,...|246580.260178484|
|[2.0,2.0,2.0,7.0,...|713658.438145298|
|[0.0,0.0,0.0,7.0,...|262043.416225031|
|[1.0,1.0,1.0,7.0,...|743902.890773758|
|[0.0,0.0,0.0,5.0,...| 1343112.5819135|
|[1.0,1.0,1.0,7.0,...|709845.785774059|
|[2.0,4.0,2.0,11.0...|849122.942206655|
|[2.0,4.0,2.0,11.0...|190912.323582579|
|[2.0,3.0,2.0,7.0,...|636806.518723994|
|[0.0,0.0,0.0,7.0,...|1843904.60526316|
+--------------------+----------------+
only showing top 10 rows



In [21]:
# size of model df
data.count(), len(data.columns)

(121820, 2)

In [22]:
# use Logistic Regression to train on the training set
train, test = data.randomSplit([0.80, 0.20], seed=40)

In [23]:
train.count(), len(train.columns)

(97718, 2)

In [24]:
test.count(), len(test.columns)

(24102, 2)

# Regression Model

## Linear Regression

In [25]:
# Build Linear Regression model 
from pyspark.ml.regression import LinearRegression
lin_Reg=LinearRegression(labelCol='unitPrice')

In [26]:
%%time
# Standalone Mode
lr_model=lin_Reg.fit(train)
lr_model

CPU times: user 10.1 ms, sys: 5.55 ms, total: 15.6 ms
Wall time: 3.67 s


LinearRegressionModel: uid=LinearRegression_774379a6c53f, numFeatures=12

In [27]:
lr_model.intercept

622074.8009364132

In [28]:
lr_model.coefficients

DenseVector([-19958.9919, -27961.301, 26508.1375, 5413.3379, 5259.2734, -4193.8159, -1443.7211, 3050.033, -71.1805, 972.7294, -400019.0525, 0.6897])

In [29]:
training_predictions=lr_model.evaluate(train)
training_predictions

<pyspark.ml.regression.LinearRegressionSummary at 0x7f79e3d519a0>

In [30]:
print('MSE:\t',training_predictions.meanSquaredError)
print('RMSE:\t',training_predictions.meanSquaredError ** 0.5)
print('R2:\t',training_predictions.r2)

MSE:	 512690035661.02606
RMSE:	 716023.767525231
R2:	 0.03168337623295303


In [31]:
data.select('features')

DataFrame[features: vector]

In [35]:
lr_predictions = lr_model.transform(test)

In [36]:
lr_predictions.select("prediction", "unitPrice", "features").show()
lr_predictions.select("prediction", "unitPrice", "features").count()

+-----------------+----------------+--------------------+
|       prediction|       unitPrice|            features|
+-----------------+----------------+--------------------+
|650484.2144113148|2002797.98937629|[0.0,0.0,0.0,2.0,...|
|655897.5523528673|        451791.3|[0.0,0.0,0.0,3.0,...|
|655897.5523528673|806123.148659851|[0.0,0.0,0.0,3.0,...|
|655897.5523528673|888002.197802198|[0.0,0.0,0.0,3.0,...|
|655897.5523528673|970350.475519549|[0.0,0.0,0.0,3.0,...|
|655897.5523528673|1118634.94856524|[0.0,0.0,0.0,3.0,...|
|661310.8902944196|54913.4551495017|[0.0,0.0,0.0,4.0,...|
|661310.8902944196|367514.174541412|[0.0,0.0,0.0,4.0,...|
|661310.8902944196|400012.200903161|[0.0,0.0,0.0,4.0,...|
|661310.8902944196|570666.584463625|[0.0,0.0,0.0,4.0,...|
|661310.8902944196|570877.462838822|[0.0,0.0,0.0,4.0,...|
|661310.8902944196|655910.714285714|[0.0,0.0,0.0,4.0,...|
|661310.8902944196|662759.047136341|[0.0,0.0,0.0,4.0,...|
|661310.8902944196|711210.668631424|[0.0,0.0,0.0,4.0,...|
|661310.890294

24102

# Predict

In [37]:
# yPred = stack.predict(xDf)
yPred = lr_model.transform(data)
yPred.show()
print(type(yPred))
yPred

+--------------------+----------------+-----------------+
|            features|       unitPrice|       prediction|
+--------------------+----------------+-----------------+
|[2.0,5.0,3.0,7.0,...|246580.260178484|577350.8278474931|
|[2.0,2.0,2.0,7.0,...|713658.438145298|634726.5933615589|
|[0.0,0.0,0.0,7.0,...|262043.416225031|677550.9041190771|
|[1.0,1.0,1.0,7.0,...|743902.890773758| 656138.748740318|
|[0.0,0.0,0.0,5.0,...| 1343112.5819135|666724.2282359721|
|[1.0,1.0,1.0,7.0,...|709845.785774059| 656138.748740318|
|[2.0,4.0,2.0,11.0...|849122.942206655|600457.3431009385|
|[2.0,4.0,2.0,11.0...|190912.323582579|600457.3431009385|
|[2.0,3.0,2.0,7.0,...|636806.518723994|606765.2923481439|
|[0.0,0.0,0.0,7.0,...|1843904.60526316|677550.9041190771|
|[2.0,4.0,2.0,11.0...|782834.297812279|600457.3431009385|
|[2.0,3.0,1.0,7.0,...|700241.231527094|580257.1548219644|
|[0.0,0.0,0.0,7.0,...|490958.910891089|677550.9041190771|
|[1.0,1.0,1.0,7.0,...|437336.138084633| 656138.748740318|
|[2.0,2.0,2.0,

DataFrame[features: vector, unitPrice: double, prediction: double]

In [38]:
dfNew = dfDrop.withColumnRenamed('unitPrice','unitPriceOrigin')

In [39]:
from pyspark.sql import SQLContext

In [40]:
result = df
SQLContext.registerDataFrameAsTable(dfNew,dfNew,"X")
SQLContext.registerDataFrameAsTable(yPred, yPred, "y")
result = spark.sql("select * from X join y on X.unitPriceOrigin=y.unitPrice")
result.show()

+-------------------------+------+------+----------+----+--------------------+---------------+--------+----+----+----+------+------------+-----------+---------+----------------+------+-----+----------+---------+------------+--------+----------+-----------+--------+-----------+--------+-----------+-------------+-------------+-------------+------------+---------+---------+-----------+--------+--------+--------+--------+--------+--------+------------+------------+----------+--------+------+--------+-----------+----------+--------------------+----------------+------------------+
|                  Address|  Area|    St|交易年月日|year|            交易標的|     交易筆棟數|建物型態|廳數|房數|衛數|隔間數|有無管理組織|     總價元|   總坪數| unitPriceOrigin|車位數|floor|EightCount|ParkCount|FuneralCount|GasCount|CrimeCount|PoliceCount|busCount|subwayCount|govCount|clinicCount|hospitalCount|pharmacyCount|fireareaCount|firewayCount|martCount|mallCount|cinemaCount|土地面積|總人口數|男性人數|女性人數|人口密度|每戶人數|每戶成年人數|所得收入總計|可支配所得|消費支出|  儲蓄|所得總額|  Lontitud

In [43]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

def updown(unit,pred):
    return round(unit-pred, 2)
    
#convert to a UDF Function by passing in the function and return type of function
udfud = F.udf(updown, DoubleType())

result = result.withColumn("漲跌", udfud("unitPrice","prediction"))

In [44]:
updown = result.groupby(['Area']).mean()
updown.printSchema()

root
 |-- Area: string (nullable = true)
 |-- avg(交易年月日): double (nullable = true)
 |-- avg(year): double (nullable = true)
 |-- avg(廳數): double (nullable = true)
 |-- avg(房數): double (nullable = true)
 |-- avg(衛數): double (nullable = true)
 |-- avg(總價元): double (nullable = true)
 |-- avg(總坪數): double (nullable = true)
 |-- avg(unitPriceOrigin): double (nullable = true)
 |-- avg(車位數): double (nullable = true)
 |-- avg(floor): double (nullable = true)
 |-- avg(EightCount): double (nullable = true)
 |-- avg(ParkCount): double (nullable = true)
 |-- avg(FuneralCount): double (nullable = true)
 |-- avg(GasCount): double (nullable = true)
 |-- avg(CrimeCount): double (nullable = true)
 |-- avg(PoliceCount): double (nullable = true)
 |-- avg(busCount): double (nullable = true)
 |-- avg(subwayCount): double (nullable = true)
 |-- avg(govCount): double (nullable = true)
 |-- avg(clinicCount): double (nullable = true)
 |-- avg(hospitalCount): double (nullable = true)
 |-- avg(pharmacyCount): do

In [45]:
updown = updown.select('Area', 'avg(unitPrice)','avg(prediction)','avg(漲跌)')
# updown.show()
updown

DataFrame[Area: string, avg(unitPrice): double, avg(prediction): double, avg(漲跌): double]

In [46]:
updown.printSchema()

root
 |-- Area: string (nullable = true)
 |-- avg(unitPrice): double (nullable = true)
 |-- avg(prediction): double (nullable = true)
 |-- avg(漲跌): double (nullable = true)



In [48]:
%%time
# Standalone Mode
updown.orderBy('avg(漲跌)').show()

+------+------------------+------------------+-------------------+
|  Area|    avg(unitPrice)|   avg(prediction)|          avg(漲跌)|
+------+------------------+------------------+-------------------+
|文山區| 470894.2001388704|  536434.855348447| -65540.65520864162|
|南港區| 526077.3259428595| 553319.4645386441|-27242.138618055582|
|松山區| 688876.4291906786| 716081.6069487213|-27205.177789736503|
|北投區| 472527.7286182643| 494101.4705440427| -21573.74191439992|
|萬華區| 470096.7236820338|479295.68741015287| -9198.963718944091|
|內湖區|523306.69566241343| 527113.3103269787|-3806.6146562483023|
|中山區| 636868.5271113187|  639622.115295151| -2753.588183292158|
|士林區| 564129.9154914799|  565590.933153442|-1461.0177016582136|
|中正區| 727578.6470990789| 723769.3232256584|  3809.323894919107|
|信義區| 686091.2646814411| 677952.2462552729|  8139.018386486895|
|大安區|  816875.841215338| 806399.4869502825| 10476.354311393823|
|大同區|  552593.240185469| 524063.5813984793| 28529.658851199136|
+------+------------------+------

12

In [None]:
spark.stop()