In [1]:
import pandas as pd
import numpy as np
import pyspark.pandas as ps
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as fn
from sklearn import preprocessing

from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.tree import RandomForest
from pyspark.ml.feature import HashingTF , Tokenizer
from pyspark.ml import Pipeline

import findspark
findspark.init('/opt/spark-3.2.1-bin-hadoop3.2/')



In [2]:
# yarn mode
spark = SparkSession\
        .builder\
        .master("yarn")\
        .config('spark.executor.instances','18')\
        .config('spark.executor.memory','8G')\
        .appName("iv")\
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2022-05-31 19:37:45,492 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark

In [5]:
spark.conf.set("spark.sql.execution.arrow.pyscpark.enabled", True)

os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

ps.set_option("compute.default_index_type", "distributed")

---

# pandas dataframe to spark-dataframe

In [32]:
df = spark.read.csv('1_taipei_purged.csv', header = True, inferSchema = True)
df.printSchema()



root
 |-- address: string (nullable = true)
 |-- style: string (nullable = true)
 |-- percent: double (nullable = true)
 |-- district: string (nullable = true)
 |-- parking_price: double (nullable = true)
 |-- date: integer (nullable = true)
 |-- floor: integer (nullable = true)
 |-- age: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- management: integer (nullable = true)
 |-- size: double (nullable = true)
 |-- total_price: double (nullable = true)
 |-- room: integer (nullable = true)
 |-- living: integer (nullable = true)
 |-- bath: integer (nullable = true)
 |-- avg: double (nullable = true)



                                                                                

In [33]:
print((df.count(), len(df.columns)))

(486774, 17)


In [34]:
df = df.drop('district', 'avg', 'style', 'address')

In [35]:
df.printSchema()

root
 |-- percent: double (nullable = true)
 |-- parking_price: double (nullable = true)
 |-- date: integer (nullable = true)
 |-- floor: integer (nullable = true)
 |-- age: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- management: integer (nullable = true)
 |-- size: double (nullable = true)
 |-- total_price: double (nullable = true)
 |-- room: integer (nullable = true)
 |-- living: integer (nullable = true)
 |-- bath: integer (nullable = true)



In [36]:
# show
numeric_features = [ t[0] for t in df.dtypes ]
df.select(numeric_features).describe().toPandas().transpose()

                                                                                

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
percent,486774,63.30324996815774,19.80554113947344,0.0,100.0
parking_price,486774,43.53548874837193,106.11649419483994,0.0,19720.0
date,486774,201697.78076479022,277.6166137983045,201301,202112
floor,486774,6.901668946985665,5.1604540850357195,1,45
age,486774,15.458083217263042,14.662437326941834,0.0,88.0
latitude,486774,25.04942000155616,0.0825757280634249,0.0,25.29294802135113
longitude,486774,121.49138015480173,0.2570793573066798,0.0,121.92797110266169
management,486774,0.7030531622477783,0.4569133858683895,0,1
size,486774,38.83292838154863,27.898046153454107,0.02,10738.54


In [38]:
numeric_features

['percent',
 'parking_price',
 'date',
 'floor',
 'age',
 'latitude',
 'longitude',
 'management',
 'size',
 'total_price',
 'room',
 'living',
 'bath']

In [39]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=numeric_features, outputCol="features")
df = assembler.transform(df)
df.show()

+-------+-------------+------+-----+----+------------------+------------------+----------+-----+-----------+----+------+----+--------------------+
|percent|parking_price|  date|floor| age|          latitude|         longitude|management| size|total_price|room|living|bath|            features|
+-------+-------------+------+-----+----+------------------+------------------+----------+-----+-----------+----+------+----+--------------------+
|   63.8|          0.0|202112|    2|34.0|25.055331674864203| 121.5376145905065|         1|10.52|      876.0|   1|     1|   1|[63.8,0.0,202112....|
|   48.9|        180.0|202112|    5| 6.0| 25.05682409251319| 121.5267584666694|         1|46.39|     3188.0|   3|     2|   1|[48.9,180.0,20211...|
|   83.3|        500.0|202112|    7|33.0|25.055016099844707|121.53750419269846|         1|82.75|     4052.6|   3|     0|   0|[83.3,500.0,20211...|
|   80.2|        300.0|202112|    7|33.0|25.055016099844707|121.53750419269846|         1| 85.5|     4811.7|   4|     

2022-05-31 19:29:14,895 WARN util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [40]:
from pyspark.ml.feature import StringIndexer

label_stringIdx = StringIndexer(inputCol = 'total_price', outputCol = 'label')
df = label_stringIdx.fit(df).transform(df)
df.show()

                                                                                

+-------+-------------+------+-----+----+------------------+------------------+----------+-----+-----------+----+------+----+--------------------+-------+
|percent|parking_price|  date|floor| age|          latitude|         longitude|management| size|total_price|room|living|bath|            features|  label|
+-------+-------------+------+-----+----+------------------+------------------+----------+-----+-----------+----+------+----+--------------------+-------+
|   63.8|          0.0|202112|    2|34.0|25.055331674864203| 121.5376145905065|         1|10.52|      876.0|   1|     1|   1|[63.8,0.0,202112....|  711.0|
|   48.9|        180.0|202112|    5| 6.0| 25.05682409251319| 121.5267584666694|         1|46.39|     3188.0|   3|     2|   1|[48.9,180.0,20211...|  782.0|
|   83.3|        500.0|202112|    7|33.0|25.055016099844707|121.53750419269846|         1|82.75|     4052.6|   3|     0|   0|[83.3,500.0,20211...|11211.0|
|   80.2|        300.0|202112|    7|33.0|25.055016099844707|121.537504

# Random Forest

In [6]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler

In [17]:
df = spark.read.csv('1_taipei_purged.csv', header = True, inferSchema = True)

                                                                                

In [18]:
df = df.drop('district','avg','style','address')
df.printSchema()

root
 |-- percent: double (nullable = true)
 |-- parking_price: double (nullable = true)
 |-- date: integer (nullable = true)
 |-- floor: integer (nullable = true)
 |-- age: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- management: integer (nullable = true)
 |-- size: double (nullable = true)
 |-- total_price: double (nullable = true)
 |-- room: integer (nullable = true)
 |-- living: integer (nullable = true)
 |-- bath: integer (nullable = true)



In [19]:
# 移除total_price特徵
feature_list = []
for col in df.columns:
    if col == 'total_price':
        continue
    else:
        feature_list.append(col)

# make X from feature_list which is aka features in spark
assembler = VectorAssembler(inputCols=feature_list, outputCol="features")

In [20]:
# make y & X
rf = RandomForestRegressor(labelCol="total_price", featuresCol="features")

In [21]:
pipeline = Pipeline(stages=[assembler, rf])

In [32]:
# 答案
rfevaluator = RegressionEvaluator(labelCol="total_price", predictionCol="prediction")

# 超參數
rfparamGrid = (ParamGridBuilder()             
               .addGrid(rf.maxDepth, [5, 7, 9])             
               .addGrid(rf.maxBins, [5, 7, 9])             
               .addGrid(rf.numTrees, [75, 100, 125])
               .build())

# Kfold
rfcv = CrossValidator(estimator = pipeline,
                      estimatorParamMaps = rfparamGrid,
                      evaluator = rfevaluator,
                      numFolds = 5)

In [33]:
(trainingData, testData) = df.randomSplit([0.8, 0.2])

In [34]:
# train and get best parameter
rfcvModel = rfcv.fit(trainingData)

2022-05-31 19:55:47,854 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 1488.1 KiB
2022-05-31 19:56:10,154 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 1023.4 KiB
2022-05-31 19:56:15,559 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 1956.3 KiB
2022-05-31 19:56:43,076 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 1256.3 KiB
2022-05-31 19:56:49,537 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 2.4 MiB
2022-05-31 19:57:15,802 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 1476.1 KiB
2022-05-31 19:57:38,138 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 1022.2 KiB
2022-05-31 19:57:43,388 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 1948.4 KiB
2022-05-31 19:58:11,065 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 1255.3 KiB
2022-05-31 19:58:17,760 WARN scheduler.DAGScheduler: Broad

In [35]:
# pred
rfpredictions = rfcvModel.transform(testData)

print('r2:', rfevaluator.evaluate(rfpredictions,{rfevaluator.metricName: "r2"}))
print('mae:', rfevaluator.evaluate(rfpredictions,{rfevaluator.metricName: "mae"}))
print('mse:', rfevaluator.evaluate(rfpredictions,{rfevaluator.metricName: "mse"}))

                                                                                

r2: 0.6549547648962749


                                                                                

mae: 331.5699460661897




mse: 579240.2577312682


                                                                                

In [36]:
# best parameters
print(f'bestPipeline = {rfcvModel.bestModel}')
print(f'bestModel = {bestPipeline.stages[1]}')
print(f'importances = {bestModel.featureImportances}')

bestPipeline = PipelineModel_33dd5c1000ba
bestModel = RandomForestRegressionModel: uid=RandomForestRegressor_7615a5d19c5c, numTrees=100, numFeatures=12
importances = (12,[0,1,2,3,4,5,6,7,8,9,10,11],[0.034863124863503375,0.12422743996525529,0.004315426561074794,0.009174426191053655,0.019899210932685193,0.021520635281870083,0.11689061607872192,0.0037649750392898416,0.4703085758020689,0.04267420216997969,0.0420169223206343,0.11034444479386274])


---