https://www.kaggle.com/onestar/kernel-xgboost-stacking
https://www.kaggle.com/satishgunjal/ensemble-learning-bagging-boosting-stacking

import os
import pandas as pd
import numpy as np
import warnings

from sklearn.model_selection import train_test_split
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import RobustScaler
from sklearn.linear_model import LinearRegression, Lasso, ElasticNet
from sklearn.kernel_ridge import KernelRidge

from sklearn.ensemble import BaggingRegressor

from sklearn.ensemble import GradientBoostingRegressor
import xgboost as xgb
import lightgbm as lgb
from sklearn.ensemble import StackingRegressor

from sklearn.model_selection import cross_val_score
from sklearn.model_selection import cross_val_predict
from sklearn.metrics import mean_squared_error

# Global settings

warnings.filterwarnings("ignore") # To ignore warnings
n_jobs = -1 # This parameter conrols the parallel processing. -1 means using all processors.
random_state = 42 # This parameter controls the randomness of the data. Using some int value to get same results everytime this code is run.

In [1]:
import pyspark
import sys

import pyspark.sql.functions as fn

from pyspark.sql import SparkSession

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler, StringIndexer

In [2]:
# local mode
spark = SparkSession \
        .builder \
        .appName("LocalMode") \
        .getOrCreate()

In [3]:
spark.stop()

In [2]:
spark.sparkContext.appName

'PySparkShell'

In [3]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)

In [6]:
sys.version

'3.8.5 (default, Jul 28 2020, 12:59:40) \n[GCC 9.3.0]'

In [7]:
spark.version

'3.0.1'

# Load Data

In [4]:
%%time
# Local Mode讀取本機檔案
df = spark.read.csv('HouseVarCoFinal.csv', header=True, inferSchema=True)
df

CPU times: user 0 ns, sys: 2.69 ms, total: 2.69 ms
Wall time: 3.87 s


DataFrame[Address: string, Area: string, St: string, 交易年月日: int, year: int, 交易標的: string, 交易筆棟數: string, 建物型態: string, 建物現況格局.廳: int, 建物現況格局.房: int, 建物現況格局.衛: int, 建物現況格局.隔間: string, 有無管理組織: string, 總價元: double, 總坪數: double, 單價元坪: double, 車位數: int, floor: int, EightCount: int, ParkCount: int, FuneralCount: int, GasCount: int, CrimeCount: int, PoliceCount: int, busCount: int, subwayCount: int, govCount: int, clinicCount: int, hospitalCount: int, pharmacyCount: int, fireareaCount: int, firewayCount: int, martCount: int, mallCount: int, cinemaCount: int, 土地面積: double, 總人口數: int, 男性人數: int, 女性人數: int, 人口密度: int, 每戶人數: double, 每戶成年人數: double, 所得收入總計: int, 可支配所得: int, 消費支出: int, 儲蓄: int, 所得總額: int, Lontitude: double, Latitude: double]

In [246]:
#AttributeError
df.shape

AttributeError: 'DataFrame' object has no attribute 'shape'

In [5]:
df.columns

['Address',
 'Area',
 'St',
 '交易年月日',
 'year',
 '交易標的',
 '交易筆棟數',
 '建物型態',
 '建物現況格局.廳',
 '建物現況格局.房',
 '建物現況格局.衛',
 '建物現況格局.隔間',
 '有無管理組織',
 '總價元',
 '總坪數',
 '單價元坪',
 '車位數',
 'floor',
 'EightCount',
 'ParkCount',
 'FuneralCount',
 'GasCount',
 'CrimeCount',
 'PoliceCount',
 'busCount',
 'subwayCount',
 'govCount',
 'clinicCount',
 'hospitalCount',
 'pharmacyCount',
 'fireareaCount',
 'firewayCount',
 'martCount',
 'mallCount',
 'cinemaCount',
 '土地面積',
 '總人口數',
 '男性人數',
 '女性人數',
 '人口密度',
 '每戶人數',
 '每戶成年人數',
 '所得收入總計',
 '可支配所得',
 '消費支出',
 '儲蓄',
 '所得總額',
 'Lontitude',
 'Latitude']

In [247]:
#AttributeError
df.info()

AttributeError: 'DataFrame' object has no attribute 'info'

In [6]:
%%time
# Local Mode
df.describe().show()

+-------+-----------------------------+------+------+------------------+------------------+--------------------+---------------+--------+------------------+------------------+------------------+-----------------+------------+--------------------+-----------------+------------------+------------------+----------------+------------------+-----------------+-----------------+------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+-------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+
|summary|                      Address|  Area|    St|        交易年月日|              year|       

In [7]:
%%time
df.printSchema()

root
 |-- Address: string (nullable = true)
 |-- Area: string (nullable = true)
 |-- St: string (nullable = true)
 |-- 交易年月日: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- 交易標的: string (nullable = true)
 |-- 交易筆棟數: string (nullable = true)
 |-- 建物型態: string (nullable = true)
 |-- 建物現況格局.廳: integer (nullable = true)
 |-- 建物現況格局.房: integer (nullable = true)
 |-- 建物現況格局.衛: integer (nullable = true)
 |-- 建物現況格局.隔間: string (nullable = true)
 |-- 有無管理組織: string (nullable = true)
 |-- 總價元: double (nullable = true)
 |-- 總坪數: double (nullable = true)
 |-- 單價元坪: double (nullable = true)
 |-- 車位數: integer (nullable = true)
 |-- floor: integer (nullable = true)
 |-- EightCount: integer (nullable = true)
 |-- ParkCount: integer (nullable = true)
 |-- FuneralCount: integer (nullable = true)
 |-- GasCount: integer (nullable = true)
 |-- CrimeCount: integer (nullable = true)
 |-- PoliceCount: integer (nullable = true)
 |-- busCount: integer (nullable = true)
 |-- subwayCount: int

In [111]:
# pd.set_option('display.max_columns', 100) 
dfDrop = df.drop(["EightCount", "FuneralCount", "PoliceCount",
                  "總坪數", "總價元","建物現況格局.隔間", "有無管理組織",
                  "交易筆棟數", "交易標的","交易年月日",
                  "subwayCount", "busCount", "CrimeCount",
                  "clinicCount", "pharmacyCount", "fireareaCount",
                  "mallCount", "cinemaCount", "總人口數",
                  "男性人數", "女性人數", "土地面積",
                  "每戶成年人數",
                  "所得收入總計", "可支配所得", "消費支出",
                  "人口密度", "儲蓄"])#, axis=1)
# dfDrop.head()

dfDrop2 = df.drop(["Area","Address","St","year","建物型態",
                   "EightCount", "FuneralCount", "PoliceCount",
                  "總坪數", "總價元","建物現況格局.隔間", "有無管理組織",
                  "交易筆棟數", "交易標的","交易年月日",
                  "subwayCount", "busCount", "CrimeCount",
                  "clinicCount", "pharmacyCount", "fireareaCount",
                  "mallCount", "cinemaCount", "總人口數",
                  "男性人數", "女性人數", "土地面積",
                  "每戶成年人數",
                  "所得收入總計", "可支配所得", "消費支出",
                  "人口密度", "儲蓄"])#, axis=1)
dfDrop2

TypeError: col should be a string or a Column

In [8]:
df

DataFrame[Address: string, Area: string, St: string, 交易年月日: int, year: int, 交易標的: string, 交易筆棟數: string, 建物型態: string, 建物現況格局.廳: int, 建物現況格局.房: int, 建物現況格局.衛: int, 建物現況格局.隔間: string, 有無管理組織: string, 總價元: double, 總坪數: double, 單價元坪: double, 車位數: int, floor: int, EightCount: int, ParkCount: int, FuneralCount: int, GasCount: int, CrimeCount: int, PoliceCount: int, busCount: int, subwayCount: int, govCount: int, clinicCount: int, hospitalCount: int, pharmacyCount: int, fireareaCount: int, firewayCount: int, martCount: int, mallCount: int, cinemaCount: int, 土地面積: double, 總人口數: int, 男性人數: int, 女性人數: int, 人口密度: int, 每戶人數: double, 每戶成年人數: double, 所得收入總計: int, 可支配所得: int, 消費支出: int, 儲蓄: int, 所得總額: int, Lontitude: double, Latitude: double]

In [9]:
# AnalysisException
df.select("建物現況格局.隔間").show()

AnalysisException: cannot resolve '`建物現況格局.隔間`' given input columns: [Address, Area, CrimeCount, EightCount, FuneralCount, GasCount, Latitude, Lontitude, ParkCount, PoliceCount, St, busCount, cinemaCount, clinicCount, fireareaCount, firewayCount, floor, govCount, hospitalCount, mallCount, martCount, pharmacyCount, subwayCount, year, 交易年月日, 交易標的, 交易筆棟數, 人口密度, 儲蓄, 可支配所得, 單價元坪, 土地面積, 女性人數, 建物型態, 建物現況格局.廳, 建物現況格局.房, 建物現況格局.衛, 建物現況格局.隔間, 所得收入總計, 所得總額, 有無管理組織, 每戶人數, 每戶成年人數, 消費支出, 男性人數, 總人口數, 總價元, 總坪數, 車位數];;
'Project ['建物現況格局.隔間]
+- Relation[Address#16,Area#17,St#18,交易年月日#19,year#20,交易標的#21,交易筆棟數#22,建物型態#23,建物現況格局.廳#24,建物現況格局.房#25,建物現況格局.衛#26,建物現況格局.隔間#27,有無管理組織#28,總價元#29,總坪數#30,單價元坪#31,車位數#32,floor#33,EightCount#34,ParkCount#35,FuneralCount#36,GasCount#37,CrimeCount#38,PoliceCount#39,... 25 more fields] csv


In [10]:
# 句點會造成AnalysisException，所以要修改欄位名稱
# table.columns，如果欄位名稱是中文也會導致AnalysisException
dfDrop = df
dfDrop = dfDrop.withColumnRenamed("建物現況格局.廳","廳數")
dfDrop = dfDrop.withColumnRenamed("建物現況格局.房","房數")
dfDrop = dfDrop.withColumnRenamed("建物現況格局.衛","衛數")
dfDrop = dfDrop.withColumnRenamed("建物現況格局.隔間","隔間數")
dfDrop = dfDrop.withColumnRenamed("單價元坪","unitPrice")

In [11]:
dfDrop

DataFrame[Address: string, Area: string, St: string, 交易年月日: int, year: int, 交易標的: string, 交易筆棟數: string, 建物型態: string, 廳數: int, 房數: int, 衛數: int, 隔間數: string, 有無管理組織: string, 總價元: double, 總坪數: double, unitPrice: double, 車位數: int, floor: int, EightCount: int, ParkCount: int, FuneralCount: int, GasCount: int, CrimeCount: int, PoliceCount: int, busCount: int, subwayCount: int, govCount: int, clinicCount: int, hospitalCount: int, pharmacyCount: int, fireareaCount: int, firewayCount: int, martCount: int, mallCount: int, cinemaCount: int, 土地面積: double, 總人口數: int, 男性人數: int, 女性人數: int, 人口密度: int, 每戶人數: double, 每戶成年人數: double, 所得收入總計: int, 可支配所得: int, 消費支出: int, 儲蓄: int, 所得總額: int, Lontitude: double, Latitude: double]

In [12]:
df.select('Lontitude','Latitude').show()
# NameError
# df.select(col('Lontitude')).show()

+-----------+----------+
|  Lontitude|  Latitude|
+-----------+----------+
|121.5315679|25.0507038|
|121.5315679|25.0507038|
|121.5315679|25.0507038|
|121.5315679|25.0507038|
|121.5316034|25.0533504|
|121.5316034|25.0533504|
|121.5316034|25.0533504|
|121.5316034|25.0533504|
|121.5316034|25.0533504|
|121.5316034|25.0533504|
|121.5316034|25.0533504|
|121.5316034|25.0533504|
|121.5289176|25.0551651|
|121.5289176|25.0551651|
|121.5289176|25.0551651|
|121.5289176|25.0551651|
|121.5289176|25.0551651|
|121.5289176|25.0551651|
|121.5289176|25.0551651|
|121.5289176|25.0551651|
+-----------+----------+
only showing top 20 rows



In [15]:
# AnalysisException
# df2 = spark.sql("select Lontitude from df")

sqlContext.registerDataFrameAsTable(df, "table1")
df2 = spark.sql("select Lontitude,Latitude from table1")
# df2 = spark.sql("select 建物現況格局.廳,建物現況格局.房,建物現況格局.衛,單價元坪,車位數,floor,ParkCount,GasCount,govCount,hospitalCount,firewayCount,martCount,每戶人數,所得總額 from df")
df2.show()

NameError: name 'sqlContext' is not defined

In [13]:
# ["建物現況格局.廳","建物現況格局.房","建物現況格局.衛","單價元坪","車位數","floor","ParkCount","GasCount","govCount","hospitalCount","firewayCount","martCount","每戶人數","所得總額"]
dfDrop2 = dfDrop.select("廳數","房數","衛數","unitPrice","floor","ParkCount","GasCount","govCount","hospitalCount","firewayCount","martCount","每戶人數","所得總額")
dfDrop2.show()

+----+----+----+----------------+-----+---------+--------+--------+-------------+------------+---------+--------+--------+
|廳數|房數|衛數|       unitPrice|floor|ParkCount|GasCount|govCount|hospitalCount|firewayCount|martCount|每戶人數|所得總額|
+----+----+----+----------------+-----+---------+--------+--------+-------------+------------+---------+--------+--------+
|   2|   5|   3|246580.260178484|    7|        8|       7|      33|            4|          18|       11|    2.89| 1720988|
|   2|   2|   2|713658.438145298|    7|        8|       7|      33|            4|          18|       11|    2.89| 1720988|
|   0|   0|   0|262043.416225031|    7|        8|       7|      33|            4|          18|       11|    2.89| 1720988|
|   1|   1|   1|743902.890773758|    7|        8|       7|      33|            4|          18|       11|    2.89| 1720988|
|   0|   0|   0| 1343112.5819135|    5|        8|       7|      33|            4|          18|       11|    2.89| 1720988|
|   1|   1|   1|709845.7857740

In [14]:
# AttributeError
from dfply import *
yDf = dfDrop2 >> select(X.單價元坪)
xDf = dfDrop2.drop(['單價元坪'], axis = 1)
xDf.head()

AttributeError: 'DataFrame' object has no attribute 'copy'

In [15]:
yDf = dfDrop2.select("unitPrice")
xDf = dfDrop2.select("廳數","房數","衛數","floor","ParkCount","GasCount","govCount","hospitalCount","firewayCount","martCount","每戶人數","所得總額")

# Train and Test Data

In [16]:
# NameError
X_train, X_test, y_train, y_test = train_test_split(xDf, yDf, test_size= 0.2, random_state = random_state)
print(f'Training set: X_train shape= {X_train.shape}, y_train shape= {y_train.shape}')
print(f'Holdout set: X_test shape= {X_test.shape}, y_test shape= {y_test.shape}')

NameError: name 'train_test_split' is not defined

In [17]:
dfDrop2.columns[:-1]

['廳數',
 '房數',
 '衛數',
 'unitPrice',
 'floor',
 'ParkCount',
 'GasCount',
 'govCount',
 'hospitalCount',
 'firewayCount',
 'martCount',
 '每戶人數']

In [18]:
dfDrop2.columns
xDf.columns

['廳數',
 '房數',
 '衛數',
 'floor',
 'ParkCount',
 'GasCount',
 'govCount',
 'hospitalCount',
 'firewayCount',
 'martCount',
 '每戶人數',
 '所得總額']

In [19]:
# vectorize all numerical columns into a single feature column
feature_cols = xDf.columns
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
dfDrop2 = assembler.transform(dfDrop2)

# convert text labels into indices
data = dfDrop2.select(['features', '單價元坪'])
label_indexer = StringIndexer(inputCol='單價元坪', outputCol='label').fit(data)
data = label_indexer.transform(data)

In [22]:
# validate the presence of dense vectors 
dfDrop2.printSchema()

root
 |-- 廳數: integer (nullable = true)
 |-- 房數: integer (nullable = true)
 |-- 衛數: integer (nullable = true)
 |-- unitPrice: double (nullable = true)
 |-- floor: integer (nullable = true)
 |-- ParkCount: integer (nullable = true)
 |-- GasCount: integer (nullable = true)
 |-- govCount: integer (nullable = true)
 |-- hospitalCount: integer (nullable = true)
 |-- firewayCount: integer (nullable = true)
 |-- martCount: integer (nullable = true)
 |-- 每戶人數: double (nullable = true)
 |-- 所得總額: integer (nullable = true)
 |-- features: vector (nullable = true)



In [23]:
# view the details of dense vector
dfDrop2.select('features').show(5,False)

+-----------------------------------------------------------+
|features                                                   |
+-----------------------------------------------------------+
|[2.0,5.0,3.0,7.0,8.0,7.0,33.0,4.0,18.0,11.0,2.89,1720988.0]|
|[2.0,2.0,2.0,7.0,8.0,7.0,33.0,4.0,18.0,11.0,2.89,1720988.0]|
|[0.0,0.0,0.0,7.0,8.0,7.0,33.0,4.0,18.0,11.0,2.89,1720988.0]|
|[1.0,1.0,1.0,7.0,8.0,7.0,33.0,4.0,18.0,11.0,2.89,1720988.0]|
|[0.0,0.0,0.0,5.0,8.0,7.0,33.0,4.0,18.0,11.0,2.89,1720988.0]|
+-----------------------------------------------------------+
only showing top 5 rows



In [24]:
# only select the features and label column
data = dfDrop2.select(['features', 'unitPrice'])
data.show(10)

+--------------------+----------------+
|            features|       unitPrice|
+--------------------+----------------+
|[2.0,5.0,3.0,7.0,...|246580.260178484|
|[2.0,2.0,2.0,7.0,...|713658.438145298|
|[0.0,0.0,0.0,7.0,...|262043.416225031|
|[1.0,1.0,1.0,7.0,...|743902.890773758|
|[0.0,0.0,0.0,5.0,...| 1343112.5819135|
|[1.0,1.0,1.0,7.0,...|709845.785774059|
|[2.0,4.0,2.0,11.0...|849122.942206655|
|[2.0,4.0,2.0,11.0...|190912.323582579|
|[2.0,3.0,2.0,7.0,...|636806.518723994|
|[0.0,0.0,0.0,7.0,...|1843904.60526316|
+--------------------+----------------+
only showing top 10 rows



In [25]:
# size of model df
data.count(), len(data.columns)

(121820, 2)

In [26]:
# use Logistic Regression to train on the training set
train, test = data.randomSplit([0.80, 0.20], seed=40)

In [20]:
train.count(), len(train.columns)

NameError: name 'train' is not defined

In [21]:
test.count(), len(test.columns)

NameError: name 'test' is not defined

# Regression Model

models_scores = [] # To store model scores

def rmse(model):
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    
    return mean_squared_error(y_test, y_pred, squared= False) # squared= False > returns Root Mean Square Error    

## Linear Regression

linear_regression = make_pipeline(LinearRegression())
score = rmse(linear_regression)

models_scores.append(['LinearRegression', score])
print(f'LinearRegression Score= {score}')

In [29]:
# Build Linear Regression model 
from pyspark.ml.regression import LinearRegression
lin_Reg=LinearRegression(labelCol='unitPrice')

In [30]:
%%time
# Local Mode 
lr_model=lin_Reg.fit(train)
lr_model

CPU times: user 20 µs, sys: 8.42 ms, total: 8.44 ms
Wall time: 3.07 s


LinearRegressionModel: uid=LinearRegression_d89d116965bc, numFeatures=12

In [31]:
lr_model.intercept

624414.5378985564

In [32]:
lr_model.coefficients

DenseVector([-17688.0852, -28887.8065, 27288.1996, 5439.9844, 5116.7124, -4057.657, -1530.4889, 2652.8833, 28.4652, 579.8327, -400740.8756, 0.6911])

In [33]:
training_predictions=lr_model.evaluate(train)
training_predictions

<pyspark.ml.regression.LinearRegressionSummary at 0x7f98e5ac37c0>

In [34]:
print('MSE:\t',training_predictions.meanSquaredError)
print('RMSE:\t',training_predictions.meanSquaredError ** 0.5)
print('R2:\t',training_predictions.r2)

MSE:	 513629190518.81384
RMSE:	 716679.2800959253
R2:	 0.03170599571459565


In [35]:
data

DataFrame[features: vector, unitPrice: double]

In [36]:
data.select('features')

DataFrame[features: vector]

In [37]:
# Py4JJavaError
lr_model.predict(data)

Py4JJavaError: An error occurred while calling o165.predict.
: java.lang.ClassCastException: class org.apache.spark.sql.Dataset cannot be cast to class org.apache.spark.ml.linalg.Vector (org.apache.spark.sql.Dataset and org.apache.spark.ml.linalg.Vector are in unnamed module of loader 'app')
	at org.apache.spark.ml.regression.LinearRegressionModel.predict(LinearRegression.scala:631)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)


In [38]:
lr_predictions = lr_model.transform(test)

In [39]:
test.show()
test.count()

+--------------------+------------------+
|            features|         unitPrice|
+--------------------+------------------+
|[0.0,0.0,0.0,0.0,...|  621318.992307692|
|[0.0,0.0,0.0,0.0,...|          826447.5|
|[0.0,0.0,0.0,0.0,...|  1845231.87272727|
|[0.0,0.0,0.0,0.0,...|  1890255.12763354|
|[0.0,0.0,0.0,0.0,...|  2003509.09090909|
|[0.0,0.0,0.0,0.0,...|  3138093.67718639|
|[0.0,0.0,0.0,0.0,...|  1233039.17762504|
|[0.0,0.0,0.0,0.0,...|         4671225.0|
|[0.0,0.0,0.0,1.0,...|  1289.66622222222|
|[0.0,0.0,0.0,1.0,...|  4048094.80770616|
|[0.0,0.0,0.0,1.0,...|  4247479.18326693|
|[0.0,0.0,0.0,1.0,...|  2310873.73189401|
|[0.0,0.0,0.0,1.0,...|  3392120.06651885|
|[0.0,0.0,0.0,1.0,...|1.61050667481989E7|
|[0.0,0.0,0.0,2.0,...|  1800113.59223301|
|[0.0,0.0,0.0,2.0,...|  263147.462686567|
|[0.0,0.0,0.0,2.0,...|  633293.103448276|
|[0.0,0.0,0.0,2.0,...|  748763.306908267|
|[0.0,0.0,0.0,2.0,...|  948738.438072471|
|[0.0,0.0,0.0,2.0,...|  282235.164802475|
+--------------------+------------

24495

In [40]:
lr_predictions.select("prediction", "unitPrice", "features").show()
lr_predictions.select("prediction", "unitPrice", "features").count()

+------------------+------------------+--------------------+
|        prediction|         unitPrice|            features|
+------------------+------------------+--------------------+
|478902.89242850046|  621318.992307692|[0.0,0.0,0.0,0.0,...|
|478902.89242850046|          826447.5|[0.0,0.0,0.0,0.0,...|
|478902.89242850046|  1845231.87272727|[0.0,0.0,0.0,0.0,...|
|478902.89242850046|  1890255.12763354|[0.0,0.0,0.0,0.0,...|
|478902.89242850046|  2003509.09090909|[0.0,0.0,0.0,0.0,...|
|478902.89242850046|  3138093.67718639|[0.0,0.0,0.0,0.0,...|
| 580648.2882496977|  1233039.17762504|[0.0,0.0,0.0,0.0,...|
| 580648.2882496977|         4671225.0|[0.0,0.0,0.0,0.0,...|
|  752175.547722879|  1289.66622222222|[0.0,0.0,0.0,1.0,...|
|  484342.876838517|  4048094.80770616|[0.0,0.0,0.0,1.0,...|
|  484342.876838517|  4247479.18326693|[0.0,0.0,0.0,1.0,...|
| 640619.6161194312|  2310873.73189401|[0.0,0.0,0.0,1.0,...|
| 640619.6161194312|  3392120.06651885|[0.0,0.0,0.0,1.0,...|
| 640619.6161194312|1.61

24495

# Boosting 

## GradientBoostingRegressor ¶

gradient_boosting_regressor= GradientBoostingRegressor(n_estimators=3000, learning_rate=0.05,
                                   max_depth=4, max_features='sqrt',
                                   min_samples_leaf=15, min_samples_split=10, 
                                   loss='huber', random_state = random_state)

score = rmse(gradient_boosting_regressor)
models_scores.append(['GradientBoostingRegressor', score])
print(f'GradientBoostingRegressor Score= {score}')

import pyspark.sql.functions as F
from pyspark.sql.types import *

def somefunc(value):
    if   value < 3: 
        return 'low'
    else:
        return 'high'
    
#convert to a UDF Function by passing in the function and return type of function
udfsomefunc = F.udf(somefunc, StringType())

df_hl = df.withColumn("漲跌", udfsomefunc("廳數"))
df_hl.show()

## XGBRegressor 

xgb_regressor= xgb.XGBRegressor(colsample_bytree=0.4603, gamma=0.0468, 
                             learning_rate=0.05, max_depth=3, 
                             min_child_weight=1.7817, n_estimators=2200,
                             reg_alpha=0.4640, reg_lambda=0.8571,
                             subsample=0.5213,verbosity=0, nthread = -1, random_state = random_state)
score = rmse(xgb_regressor)
models_scores.append(['XGBRegressor', score])
print(f'XGBRegressor Score= {score}')

## Stacking

In [None]:
estimators = [ ('linear_regression', linear_regression), ('gradient_boosting_regressor', gradient_boosting_regressor),
              ('xgb_regressor', xgb_regressor) ]

stack = StackingRegressor(estimators=estimators, final_estimator= lasso, cv= 5, n_jobs= n_jobs, passthrough = True)

stack.fit(X_train, y_train)

pred = stack.predict(X_test)

rmse_val = mean_squared_error(y_test, pred, squared= False) # squared= False > returns Root Mean Square Error    
models_scores.append(['Stacking', rmse_val])
print(f'rmse= {rmse_val}')

# Predict

In [41]:
data.show()

+--------------------+----------------+
|            features|       unitPrice|
+--------------------+----------------+
|[2.0,5.0,3.0,7.0,...|246580.260178484|
|[2.0,2.0,2.0,7.0,...|713658.438145298|
|[0.0,0.0,0.0,7.0,...|262043.416225031|
|[1.0,1.0,1.0,7.0,...|743902.890773758|
|[0.0,0.0,0.0,5.0,...| 1343112.5819135|
|[1.0,1.0,1.0,7.0,...|709845.785774059|
|[2.0,4.0,2.0,11.0...|849122.942206655|
|[2.0,4.0,2.0,11.0...|190912.323582579|
|[2.0,3.0,2.0,7.0,...|636806.518723994|
|[0.0,0.0,0.0,7.0,...|1843904.60526316|
|[2.0,4.0,2.0,11.0...|782834.297812279|
|[2.0,3.0,1.0,7.0,...|700241.231527094|
|[0.0,0.0,0.0,7.0,...|490958.910891089|
|[1.0,1.0,1.0,7.0,...|437336.138084633|
|[2.0,2.0,2.0,5.0,...|1599057.38940204|
|[1.0,1.0,1.0,7.0,...|483610.646002317|
|[1.0,1.0,1.0,7.0,...|479778.371681416|
|[2.0,4.0,3.0,7.0,...| 573314.55026455|
|[1.0,1.0,1.0,7.0,...|557896.440625959|
|[1.0,1.0,1.0,7.0,...|407746.557437408|
+--------------------+----------------+
only showing top 20 rows



In [42]:
# yPred = stack.predict(xDf)
yPred = lr_model.transform(data)
yPred.show()
type(yPred)

+--------------------+----------------+-----------------+
|            features|       unitPrice|       prediction|
+--------------------+----------------+-----------------+
|[2.0,5.0,3.0,7.0,...|246580.260178484|575308.9187140829|
|[2.0,2.0,2.0,7.0,...|713658.438145298|634684.1384578639|
|[0.0,0.0,0.0,7.0,...|262043.416225031|673259.5225795308|
|[1.0,1.0,1.0,7.0,...|743902.890773758|653971.8305186973|
|[0.0,0.0,0.0,5.0,...| 1343112.5819135|662379.5537594975|
|[1.0,1.0,1.0,7.0,...|709845.785774059|653971.8305186973|
|[2.0,4.0,2.0,11.0...|849122.942206655| 598668.463186538|
|[2.0,4.0,2.0,11.0...|190912.323582579| 598668.463186538|
|[2.0,3.0,2.0,7.0,...|636806.518723994|605796.3320021677|
|[0.0,0.0,0.0,7.0,...|1843904.60526316|673259.5225795308|
|[2.0,4.0,2.0,11.0...|782834.297812279| 598668.463186538|
|[2.0,3.0,1.0,7.0,...|700241.231527094|578508.1323788603|
|[0.0,0.0,0.0,7.0,...|490958.910891089|673259.5225795308|
|[1.0,1.0,1.0,7.0,...|437336.138084633|653971.8305186973|
|[2.0,2.0,2.0,

pyspark.sql.dataframe.DataFrame

StackPred = pd.DataFrame(yPred, columns = ['pred'])
StackPred

import pyspark.sql.functions as F
from pyspark.sql.types import *

def somefunc(value):
    if   value < 3: 
        return 'low'
    else:
        return 'high'
    
#convert to a UDF Function by passing in the function and return type of function
udfsomefunc = F.udf(somefunc, StringType())

df_hl = df.withColumn("漲跌", udfsomefunc("廳數"))
df_hl.show()

In [240]:
# TypeError and PicklingError
import pyspark.sql.functions as F
from pyspark.sql.types import *

def somefunc(value):
    return lr_model.transform(value)
    
#convert to a UDF Function by passing in the function and return type of function
udfsomefunc = F.udf(somefunc)

result = df.withColumn("Pred", udfsomefunc("features"))
result.show()

Traceback (most recent call last):
  File "/usr/local/spark/python/pyspark/serializers.py", line 468, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
  File "/usr/local/spark/python/pyspark/cloudpickle.py", line 1097, in dumps
    cp.dump(obj)
  File "/usr/local/spark/python/pyspark/cloudpickle.py", line 357, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib/python3.8/pickle.py", line 485, in dump
    self.save(obj)
  File "/usr/lib/python3.8/pickle.py", line 558, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/usr/lib/python3.8/pickle.py", line 884, in save_tuple
    save(element)
  File "/usr/lib/python3.8/pickle.py", line 558, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/usr/local/spark/python/pyspark/cloudpickle.py", line 496, in save_function
    self.save_function_tuple(obj)
  File "/usr/local/spark/python/pyspark/cloudpickle.py", line 730, in save_function_tuple
    save(state)
  File "/usr/lib/pyth

PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object

In [44]:
dfNew = dfDrop.withColumnRenamed('unitPrice','unitPriceOrigin')

In [46]:
from pyspark.sql import SQLContext

In [54]:
# result = pd.concat([df, StackPred], axis=1).drop(["Unnamed: 0"], axis=1)
result = df
# result = df.union(yPred)
# result = df.withColumn("Pred", udfsomefunc("單價元坪"))

# SQLContext.registerDataFrameAsTable(dfNew, "X")
SQLContext.registerDataFrameAsTable(dfNew,dfNew,"X")
# SQLContext.registerDataFrameAsTable(yPred, "y")
SQLContext.registerDataFrameAsTable(yPred, yPred, "y")

result = spark.sql("select * from X join y on X.unitPriceOrigin=y.unitPrice")
result.show()

+-------------------------+------+------+----------+----+--------------------+---------------+--------+----+----+----+------+------------+-----------+---------+----------------+------+-----+----------+---------+------------+--------+----------+-----------+--------+-----------+--------+-----------+-------------+-------------+-------------+------------+---------+---------+-----------+--------+--------+--------+--------+--------+--------+------------+------------+----------+--------+------+--------+-----------+----------+--------------------+----------------+------------------+
|                  Address|  Area|    St|交易年月日|year|            交易標的|     交易筆棟數|建物型態|廳數|房數|衛數|隔間數|有無管理組織|     總價元|   總坪數| unitPriceOrigin|車位數|floor|EightCount|ParkCount|FuneralCount|GasCount|CrimeCount|PoliceCount|busCount|subwayCount|govCount|clinicCount|hospitalCount|pharmacyCount|fireareaCount|firewayCount|martCount|mallCount|cinemaCount|土地面積|總人口數|男性人數|女性人數|人口密度|每戶人數|每戶成年人數|所得收入總計|可支配所得|消費支出|  儲蓄|所得總額|  Lontitud

In [292]:
# TypeError
# result["漲跌"] = round(result["單價元坪"]-result["pred"], 2)
result["漲跌"] = round(result["unitPrice"]-result["prediction"], 2)

TypeError: type Column doesn't define __round__ method

In [55]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

def updown(unit,pred):
    return round(unit-pred, 2)
    
#convert to a UDF Function by passing in the function and return type of function
udfud = F.udf(updown, DoubleType())

result = result.withColumn("漲跌", udfud("unitPrice","prediction"))
result.show()

Py4JJavaError: An error occurred while calling o331.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 38.0 failed 1 times, most recent failure: Lost task 0.0 in stage 38.0 (TID 55, bdse125.example.org, executor driver): java.io.IOException: Cannot run program "python": error=2, No such file or directory
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1128)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1071)
	at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:209)
	at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:132)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:105)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:119)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:131)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:70)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:128)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.IOException: error=2, No such file or directory
	at java.base/java.lang.ProcessImpl.forkAndExec(Native Method)
	at java.base/java.lang.ProcessImpl.<init>(ProcessImpl.java:340)
	at java.base/java.lang.ProcessImpl.start(ProcessImpl.java:271)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1107)
	... 27 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2697)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2697)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2904)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.IOException: Cannot run program "python": error=2, No such file or directory
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1128)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1071)
	at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:209)
	at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:132)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:105)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:119)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:131)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:70)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:128)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: java.io.IOException: error=2, No such file or directory
	at java.base/java.lang.ProcessImpl.forkAndExec(Native Method)
	at java.base/java.lang.ProcessImpl.<init>(ProcessImpl.java:340)
	at java.base/java.lang.ProcessImpl.start(ProcessImpl.java:271)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1107)
	... 27 more


In [56]:
result.printSchema()

root
 |-- Address: string (nullable = true)
 |-- Area: string (nullable = true)
 |-- St: string (nullable = true)
 |-- 交易年月日: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- 交易標的: string (nullable = true)
 |-- 交易筆棟數: string (nullable = true)
 |-- 建物型態: string (nullable = true)
 |-- 廳數: integer (nullable = true)
 |-- 房數: integer (nullable = true)
 |-- 衛數: integer (nullable = true)
 |-- 隔間數: string (nullable = true)
 |-- 有無管理組織: string (nullable = true)
 |-- 總價元: double (nullable = true)
 |-- 總坪數: double (nullable = true)
 |-- unitPriceOrigin: double (nullable = true)
 |-- 車位數: integer (nullable = true)
 |-- floor: integer (nullable = true)
 |-- EightCount: integer (nullable = true)
 |-- ParkCount: integer (nullable = true)
 |-- FuneralCount: integer (nullable = true)
 |-- GasCount: integer (nullable = true)
 |-- CrimeCount: integer (nullable = true)
 |-- PoliceCount: integer (nullable = true)
 |-- busCount: integer (nullable = true)
 |-- subwayCount: integer (nullabl

In [57]:
result.head(2)

Py4JJavaError: An error occurred while calling o336.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 40.0 failed 1 times, most recent failure: Lost task 0.0 in stage 40.0 (TID 58, bdse125.example.org, executor driver): java.io.IOException: Cannot run program "python": error=2, No such file or directory
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1128)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1071)
	at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:209)
	at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:132)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:105)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:119)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:131)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:70)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:128)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.IOException: error=2, No such file or directory
	at java.base/java.lang.ProcessImpl.forkAndExec(Native Method)
	at java.base/java.lang.ProcessImpl.<init>(ProcessImpl.java:340)
	at java.base/java.lang.ProcessImpl.start(ProcessImpl.java:271)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1107)
	... 27 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3450)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3447)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.IOException: Cannot run program "python": error=2, No such file or directory
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1128)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1071)
	at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:209)
	at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:132)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:105)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:119)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:131)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:70)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:128)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: java.io.IOException: error=2, No such file or directory
	at java.base/java.lang.ProcessImpl.forkAndExec(Native Method)
	at java.base/java.lang.ProcessImpl.<init>(ProcessImpl.java:340)
	at java.base/java.lang.ProcessImpl.start(ProcessImpl.java:271)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1107)
	... 27 more


In [50]:
# TypeError
updown = result.groupby(['Area'], as_index=True).mean()[['pred','漲跌']].reset_index()
updown.sort_values(by=['漲跌'], inplace=True, ascending=False)

TypeError: groupBy() got an unexpected keyword argument 'as_index'

In [58]:
updown = result.groupby(['Area']).mean()
updown.printSchema()

root
 |-- Area: string (nullable = true)
 |-- avg(交易年月日): double (nullable = true)
 |-- avg(year): double (nullable = true)
 |-- avg(廳數): double (nullable = true)
 |-- avg(房數): double (nullable = true)
 |-- avg(衛數): double (nullable = true)
 |-- avg(總價元): double (nullable = true)
 |-- avg(總坪數): double (nullable = true)
 |-- avg(unitPriceOrigin): double (nullable = true)
 |-- avg(車位數): double (nullable = true)
 |-- avg(floor): double (nullable = true)
 |-- avg(EightCount): double (nullable = true)
 |-- avg(ParkCount): double (nullable = true)
 |-- avg(FuneralCount): double (nullable = true)
 |-- avg(GasCount): double (nullable = true)
 |-- avg(CrimeCount): double (nullable = true)
 |-- avg(PoliceCount): double (nullable = true)
 |-- avg(busCount): double (nullable = true)
 |-- avg(subwayCount): double (nullable = true)
 |-- avg(govCount): double (nullable = true)
 |-- avg(clinicCount): double (nullable = true)
 |-- avg(hospitalCount): double (nullable = true)
 |-- avg(pharmacyCount): do

In [59]:
updown = updown.select('Area', 'avg(unitPrice)','avg(prediction)','avg(漲跌)')
updown.show()

Py4JJavaError: An error occurred while calling o354.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 42.0 failed 1 times, most recent failure: Lost task 1.0 in stage 42.0 (TID 62, bdse125.example.org, executor driver): java.io.IOException: Cannot run program "python": error=2, No such file or directory
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1128)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1071)
	at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:209)
	at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:132)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:105)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:119)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:131)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:70)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:128)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.IOException: error=2, No such file or directory
	at java.base/java.lang.ProcessImpl.forkAndExec(Native Method)
	at java.base/java.lang.ProcessImpl.<init>(ProcessImpl.java:340)
	at java.base/java.lang.ProcessImpl.start(ProcessImpl.java:271)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1107)
	... 29 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2697)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2697)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2904)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.IOException: Cannot run program "python": error=2, No such file or directory
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1128)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1071)
	at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:209)
	at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:132)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:105)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:119)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:131)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:70)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:128)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: java.io.IOException: error=2, No such file or directory
	at java.base/java.lang.ProcessImpl.forkAndExec(Native Method)
	at java.base/java.lang.ProcessImpl.<init>(ProcessImpl.java:340)
	at java.base/java.lang.ProcessImpl.start(ProcessImpl.java:271)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1107)
	... 29 more


In [60]:
# 沒有作用
updown.sort('avg(漲跌)')
updown.show()

Py4JJavaError: An error occurred while calling o354.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 45.0 failed 1 times, most recent failure: Lost task 1.0 in stage 45.0 (TID 66, bdse125.example.org, executor driver): java.io.IOException: Cannot run program "python": error=2, No such file or directory
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1128)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1071)
	at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:209)
	at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:132)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:105)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:119)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:131)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:70)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:128)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.IOException: error=2, No such file or directory
	at java.base/java.lang.ProcessImpl.forkAndExec(Native Method)
	at java.base/java.lang.ProcessImpl.<init>(ProcessImpl.java:340)
	at java.base/java.lang.ProcessImpl.start(ProcessImpl.java:271)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1107)
	... 29 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2697)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2697)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2904)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.IOException: Cannot run program "python": error=2, No such file or directory
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1128)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1071)
	at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:209)
	at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:132)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:105)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:119)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:131)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:70)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:128)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: java.io.IOException: error=2, No such file or directory
	at java.base/java.lang.ProcessImpl.forkAndExec(Native Method)
	at java.base/java.lang.ProcessImpl.<init>(ProcessImpl.java:340)
	at java.base/java.lang.ProcessImpl.start(ProcessImpl.java:271)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1107)
	... 29 more


In [328]:
updown.orderBy('avg(漲跌)').show()

+------+------------------+-----------------+-------------------+
|  Area|    avg(unitPrice)|  avg(prediction)|          avg(漲跌)|
+------+------------------+-----------------+-------------------+
|文山區|470894.20013887255|537407.0058019717| -66512.80569517576|
|南港區| 526077.3259428595|555030.1649174213|-28952.838981944456|
|松山區| 688876.4291906742|717199.8266238418|-28323.397467711307|
|北投區|472527.72861826356|493762.2551277943|-21234.526502513792|
|萬華區|470096.72368203645|480465.0293231422|-10368.305678571442|
|內湖區| 523306.6956624151|528032.3343385347| -4725.638680766488|
|士林區| 564129.9154914792|566750.4101115301| -2620.494688226875|
|中山區| 636868.5271113252|638104.9373317077|-1236.4102416614692|
|中正區| 727578.6470990784|728087.6818519549|  -509.034730946867|
|大安區| 816875.8412153312|807816.6971096632|  9059.144127488886|
|信義區| 686091.2646814411|676752.0335638351|   9339.23110389612|
|大同區|  552593.240185469|525934.3036345702| 26658.936528421895|
+------+------------------+-----------------+---

In [None]:
spark.stop()