PySpark을 로컬머신에 설치하고 노트북을 사용하기 보다는 머신러닝 관련 다양한 라이브러리가 이미 설치되었고 좋은 하드웨어를 제공해주는 Google Colab을 통해 실습을 진행한다.

이를 위해 pyspark과 Py4J 패키지를 설치한다. Py4J 패키지는 파이썬 프로그램이 자바가상머신상의 오브젝트들을 접근할 수 있게 해준다. Local Standalone Spark을 사용한다.

In [1]:
!pip install pyspark==3.0.1 py4j==0.10.9 

Collecting pyspark==3.0.1
  Downloading pyspark-3.0.1.tar.gz (204.2 MB)
[K     |████████████████████████████████| 204.2 MB 33 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 47.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=616571a42afade348032d962d0636a072ff3e295e28d5f332dfaa607d6171fa0
  Stored in directory: /root/.cache/pip/wheels/5e/34/fa/b37b5cef503fc5148b478b2495043ba61b079120b7ff379f9b
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Taipei Housing Price Prediction") \
    .getOrCreate()

In [3]:
spark

# 타이베이 주택 가격 예측 모델 만들기




데이터셋 설명

이번 문제는 대만 타이베이 시의 신단 지역에서 수집된 주택 거래 관련 정보를 바탕으로 주택 가격(정확히는 주택의 평당 가격)을 예측하는 Regression 모델을 만들어보는 것이다. 총 6개의 피쳐와 주택의 평당 가격에 해당하는 레이블 정보가 훈련 데이터로 제공된다. 레이블의 경우에는 주택의 최종 가격이 아니라 평당 가격이란 점을 다시 한번 강조한다.

각 컬럼에 대한 설명은 아래와 같으며 모든 필드는 X4를 제외하고는 실수 타입이다.

* X1: 주택 거래 날짜를 실수로 제공한다. 소수점 부분은 달을 나타낸다. 예를 들어 2013.250이라면 2013년 3월임을 나타낸다 (0.250 = 3/12)
* X2: 주택 나이 (년수)
* X3: 가장 가까운 지하철역까지의 거리 (미터)
* X4: 주택 근방 걸어갈 수 있는 거리내 편의점 수
* X5: 주택 위치의 위도 (latitude)
* X6: 주택 위치의 경도 (longitude)
* Y: 주택 평당 가격



In [4]:
!wget https://grepp-reco-test.s3.ap-northeast-2.amazonaws.com/Taipei_sindan_housing.csv

--2022-03-04 05:18:08--  https://grepp-reco-test.s3.ap-northeast-2.amazonaws.com/Taipei_sindan_housing.csv
Resolving grepp-reco-test.s3.ap-northeast-2.amazonaws.com (grepp-reco-test.s3.ap-northeast-2.amazonaws.com)... 52.219.146.50
Connecting to grepp-reco-test.s3.ap-northeast-2.amazonaws.com (grepp-reco-test.s3.ap-northeast-2.amazonaws.com)|52.219.146.50|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 20014 (20K) [text/csv]
Saving to: ‘Taipei_sindan_housing.csv’


2022-03-04 05:18:09 (109 KB/s) - ‘Taipei_sindan_housing.csv’ saved [20014/20014]



In [5]:
!ls -tl

total 24
drwxr-xr-x 1 root root  4096 Feb 18 14:33 sample_data
-rw-r--r-- 1 root root 20014 Jul 17  2021 Taipei_sindan_housing.csv


In [117]:
data = spark.read.csv('./Taipei_sindan_housing.csv', header=True, inferSchema=True)

### 데이터 확인

In [118]:
data.printSchema()

root
 |-- X1: double (nullable = true)
 |-- X2: double (nullable = true)
 |-- X3: double (nullable = true)
 |-- X4: integer (nullable = true)
 |-- X5: double (nullable = true)
 |-- X6: double (nullable = true)
 |-- Y: double (nullable = true)



In [119]:
data.show()

+--------+----+--------+---+--------+---------+----+
|      X1|  X2|      X3| X4|      X5|       X6|   Y|
+--------+----+--------+---+--------+---------+----+
|2012.917|32.0|84.87882| 10|24.98298|121.54024|37.9|
|2012.917|19.5|306.5947|  9|24.98034|121.53951|42.2|
|2013.583|13.3|561.9845|  5|24.98746|121.54391|47.3|
|  2013.5|13.3|561.9845|  5|24.98746|121.54391|54.8|
|2012.833| 5.0|390.5684|  5|24.97937|121.54245|43.1|
|2012.667| 7.1| 2175.03|  3|24.96305|121.51254|32.1|
|2012.667|34.5|623.4731|  7|24.97933|121.53642|40.3|
|2013.417|20.3|287.6025|  6|24.98042|121.54228|46.7|
|  2013.5|31.7|5512.038|  1|24.95095|121.48458|18.8|
|2013.417|17.9| 1783.18|  3|24.96731|121.51486|22.1|
|2013.083|34.8|405.2134|  1|24.97349|121.53372|41.4|
|2013.333| 6.3|90.45606|  9|24.97433| 121.5431|58.1|
|2012.917|13.0|492.2313|  5|24.96515|121.53737|39.3|
|2012.667|20.4|2469.645|  4|24.96108|121.51046|23.8|
|  2013.5|13.2|1164.838|  4|24.99156|121.53406|34.3|
|2013.583|35.7|579.2083|  2| 24.9824|121.54619

In [120]:
data.select(['*']).describe().show()

+-------+------------------+------------------+------------------+------------------+--------------------+--------------------+------------------+
|summary|                X1|                X2|                X3|                X4|                  X5|                  X6|                 Y|
+-------+------------------+------------------+------------------+------------------+--------------------+--------------------+------------------+
|  count|               414|               414|               414|               414|                 414|                 414|               414|
|   mean|2013.1489710144933| 17.71256038647343|1083.8856889130436| 4.094202898550725|  24.969030072463745|  121.53336108695667| 37.98019323671498|
| stddev|0.2819672402629999|11.392484533242524| 1262.109595407851|2.9455618056636177|0.012410196590450208|0.015347183004592374|13.606487697735316|
|    min|          2012.667|               0.0|          23.38284|                 0|            24.93207|           1

In [121]:
len(data.collect())

414

결측치 없음을 확인

### 데이터 전처리 및 정규화

X3 칼럼은 정규화(StandardScaling) <br>
X1, X2, X4 칼럼은 MinMaxScaling <br>
위도 경도는 학습간 제외

In [122]:
from pyspark.ml.feature import StandardScaler, MinMaxScaler
from pyspark.ml.feature import VectorAssembler

assembler1 = VectorAssembler(inputCols = ['X1', 'X2', 'X4'], outputCol = 'mmCols')
assembler2 = VectorAssembler(inputCols = ['X3'], outputCol = 'sCols')

data_2 = assembler1.transform(data)
data_2 = assembler2.transform(data_2)

mmscaler = MinMaxScaler(inputCol = 'mmCols', outputCol = 'new_mmcols')
mmscaler_model = mmscaler.fit(data_2)
final_data = mmscaler_model.transform(data_2)

sscaler = StandardScaler(inputCol = 'sCols', outputCol = 'new_scols')
sscaler_model = sscaler.fit(final_data)
final_data = sscaler_model.transform(final_data)

final_data.show()

+--------+----+--------+---+--------+---------+----+--------------------+----------+--------------------+--------------------+
|      X1|  X2|      X3| X4|      X5|       X6|   Y|              mmCols|     sCols|          new_mmcols|           new_scols|
+--------+----+--------+---+--------+---------+----+--------------------+----------+--------------------+--------------------+
|2012.917|32.0|84.87882| 10|24.98298|121.54024|37.9|[2012.917,32.0,10.0]|[84.87882]|[0.27292576419208...|[0.0672515448015205]|
|2012.917|19.5|306.5947|  9|24.98034|121.53951|42.2| [2012.917,19.5,9.0]|[306.5947]|[0.27292576419208...|[0.2429224063548331]|
|2013.583|13.3|561.9845|  5|24.98746|121.54391|47.3| [2013.583,13.3,5.0]|[561.9845]|[0.99999999999999...|[0.44527393028685...|
|  2013.5|13.3|561.9845|  5|24.98746|121.54391|54.8|   [2013.5,13.3,5.0]|[561.9845]|[0.90938864628813...|[0.44527393028685...|
|2012.833| 5.0|390.5684|  5|24.97937|121.54245|43.1|  [2012.833,5.0,5.0]|[390.5684]|[0.18122270742373...|[0.309

In [129]:
assembler = VectorAssembler(inputCols = ['new_mmcols', 'new_scols'], outputCol = 'features')
final_data_2 = assembler.transform(final_data)

final_data_2.show()

+--------+----+--------+---+--------+---------+----+--------------------+----------+--------------------+--------------------+--------------------+
|      X1|  X2|      X3| X4|      X5|       X6|   Y|              mmCols|     sCols|          new_mmcols|           new_scols|            features|
+--------+----+--------+---+--------+---------+----+--------------------+----------+--------------------+--------------------+--------------------+
|2012.917|32.0|84.87882| 10|24.98298|121.54024|37.9|[2012.917,32.0,10.0]|[84.87882]|[0.27292576419208...|[0.0672515448015205]|[0.27292576419208...|
|2012.917|19.5|306.5947|  9|24.98034|121.53951|42.2| [2012.917,19.5,9.0]|[306.5947]|[0.27292576419208...|[0.2429224063548331]|[0.27292576419208...|
|2013.583|13.3|561.9845|  5|24.98746|121.54391|47.3| [2013.583,13.3,5.0]|[561.9845]|[0.99999999999999...|[0.44527393028685...|[0.99999999999999...|
|  2013.5|13.3|561.9845|  5|24.98746|121.54391|54.8|   [2013.5,13.3,5.0]|[561.9845]|[0.90938864628813...|[0.4452

## 모델 학습 및 예측
1. 훈련 데이터, 검증 데이터, 테스트 데이터 분리
2. 훈련 데이터에 대해 4개 모델에 대해 학습
3. 검증 데이터를 이용하여 예측값 도출
4. 4개의 예측값들에 대해 한 번더 선형회귀 모델에 대해 학습(앙상블 기법)
5. 테스트 데이터를 4개 모델에 대해 학습, 4개의 예측 값들에 대해 선형회귀 진행하여 최종 예측값 도출

In [711]:
# seed 지정
train_valid, test = final_data_2.randomSplit([0.8, 0.2], seed = 51)
train, valid = train_valid.randomSplit([0.7, 0.3], seed = 51)

In [712]:
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, \
                                  GBTRegressor, RandomForestRegressor

algo1 = LinearRegression(featuresCol="features", labelCol="Y")
model_1 = algo1.fit(train)

algo2 = DecisionTreeRegressor(featuresCol="features", labelCol="Y")
model_2 = algo2.fit(train)

algo3 = GBTRegressor(featuresCol="features", labelCol="Y")
model_3 = algo3.fit(train)

algo4 = RandomForestRegressor(featuresCol="features", labelCol="Y")
model_4 = algo4.fit(train)

def get_prediction(d):
    prediction = []

    prediction.append(model_1.transform(d))
    prediction.append(model_2.transform(d))
    prediction.append(model_3.transform(d))
    prediction.append(model_4.transform(d))

    return prediction

In [713]:
from pyspark.ml.evaluation import RegressionEvaluator

model_name = ['LinearRegression', 'DecisionTreeRegressor', 'GBTRegressor',
              'RandomForestRegressor']

prediction = get_prediction(valid)

def show_models_prediction(prediction):
    for i in range(len(model_name)):
        prediction[i].select(['Y', 'prediction']).show(10)

        evaluator = RegressionEvaluator(labelCol = "Y", 
                                        predictionCol = "prediction", 
                                        metricName = "rmse")
        e = evaluator.evaluate(prediction[i])

        print(f"{model_name[i]} rmse: ", e)

show_models_prediction(prediction)

+----+------------------+
|   Y|        prediction|
+----+------------------+
|47.7|48.758403263625866|
|32.1| 30.17184454449717|
|37.2|40.596153434306295|
|40.3| 34.21673147548425|
|13.2| 8.176127474179317|
|52.2| 40.15255394860861|
|45.7|47.556668313633445|
|40.5| 40.91291582839258|
|21.8|22.549427123634224|
|37.4| 35.29582406874458|
+----+------------------+
only showing top 10 rows

LinearRegression rmse:  8.323657211601303
+----+------------------+
|   Y|        prediction|
+----+------------------+
|47.7| 44.00000000000001|
|32.1|26.279166666666658|
|37.2| 43.08333333333334|
|40.3|  37.2404761904762|
|13.2|14.900000000000002|
|52.2| 44.00000000000001|
|45.7| 44.00000000000001|
|40.5| 41.90714285714285|
|21.8|            32.225|
|37.4|  37.2404761904762|
+----+------------------+
only showing top 10 rows

DecisionTreeRegressor rmse:  8.463182192046819
+----+------------------+
|   Y|        prediction|
+----+------------------+
|47.7| 48.46176980163925|
|32.1|25.942735720474374|
|

4가지 모델을 돌렸을 때 결과는 위와 같음<br>
4가지 결과를 모아서 또 다시 하나의 테이블을 만들고<br>
타깃 값에에 대해 한 번 더 선형회귀 진행

In [714]:
from pyspark.sql.functions import monotonically_increasing_id


def merge_prediction(prediction):
    pred = prediction[0].select('Y')
    pred = pred.withColumn('idx', monotonically_increasing_id())

    for i in range(4):
        prediction[i] = prediction[i].withColumn('idx', monotonically_increasing_id())
        pred = pred.join(prediction[i].select(['prediction', 'idx']), ['idx'], how = 'inner')
        pred = pred.withColumnRenamed('prediction', f'prediction_{i}')
        
    pred.show()

    return pred

pred = merge_prediction(prediction)
pred.show()

+---+----+------------------+------------------+------------------+------------------+
|idx|   Y|      prediction_0|      prediction_1|      prediction_2|      prediction_3|
+---+----+------------------+------------------+------------------+------------------+
|  0|47.7|48.758403263625866| 44.00000000000001| 48.46176980163925|51.118564674256746|
|  1|32.1| 30.17184454449717|26.279166666666658|25.942735720474374|27.960624339901337|
|  2|37.2|40.596153434306295| 43.08333333333334| 49.44905810024216|47.283161110562936|
|  3|40.3| 34.21673147548425|  37.2404761904762| 38.15916438409222|  37.6426822889097|
|  4|13.2| 8.176127474179317|14.900000000000002|15.701931861358585|16.924179160021268|
|  5|52.2| 40.15255394860861| 44.00000000000001| 38.59498879322209| 50.58137520656457|
|  6|45.7|47.556668313633445| 44.00000000000001|  42.5561955925344| 54.50662822380018|
|  7|40.5| 40.91291582839258| 41.90714285714285| 40.79497911354784| 40.56638026870767|
|  8|21.8|22.549427123634224|            32

In [715]:
def get_feature_vec(pred):
    vector_assembler = VectorAssembler(inputCols = [f'prediction_{i}' for i in range(4)], outputCol = 'predictions')
    final_preds = vector_assembler.transform(pred)
    final_preds.show()

    return final_preds

final_preds = get_feature_vec(pred)

+---+----+------------------+------------------+------------------+------------------+--------------------+
|idx|   Y|      prediction_0|      prediction_1|      prediction_2|      prediction_3|         predictions|
+---+----+------------------+------------------+------------------+------------------+--------------------+
|  0|47.7|48.758403263625866| 44.00000000000001| 48.46176980163925|51.118564674256746|[48.7584032636258...|
|  1|32.1| 30.17184454449717|26.279166666666658|25.942735720474374|27.960624339901337|[30.1718445444971...|
|  2|37.2|40.596153434306295| 43.08333333333334| 49.44905810024216|47.283161110562936|[40.5961534343062...|
|  3|40.3| 34.21673147548425|  37.2404761904762| 38.15916438409222|  37.6426822889097|[34.2167314754842...|
|  4|13.2| 8.176127474179317|14.900000000000002|15.701931861358585|16.924179160021268|[8.17612747417931...|
|  5|52.2| 40.15255394860861| 44.00000000000001| 38.59498879322209| 50.58137520656457|[40.1525539486086...|
|  6|45.7|47.556668313633445

In [716]:
algo = LinearRegression(featuresCol="predictions", labelCol="Y")
final_model = algo.fit(final_preds)
final_prediction = final_model.transform(final_preds)

In [717]:
print('rmse: ', evaluator.evaluate(final_prediction))

rmse:  7.031390290129388


In [718]:
final_prediction.select(['Y', 'prediction']).show()

+----+------------------+
|   Y|        prediction|
+----+------------------+
|47.7| 49.55728725076309|
|32.1| 29.13051698658748|
|37.2| 44.86380024483536|
|40.3| 36.92943776211691|
|13.2| 16.62911530593404|
|52.2| 48.52629370655377|
|45.7| 52.56474155091754|
|40.5|40.535160287100275|
|21.8|24.944137034640505|
|37.4| 35.41715192453525|
|33.4|28.867948832148514|
|53.7| 49.09763361786445|
|43.1|48.480495955131936|
|15.0|15.341645373302994|
|41.1|37.289870140526965|
|28.5| 37.81449458078135|
|30.6|26.725727739512664|
|20.5|18.470668208581657|
|27.7|26.439884742828006|
|29.5| 29.02120426745954|
+----+------------------+
only showing top 20 rows



---

테스트 데이터에 대해 성능 측정

In [719]:
test_prediction = get_prediction(test)
show_models_prediction(test_prediction)

+----+------------------+
|   Y|        prediction|
+----+------------------+
|53.5|49.664405775834894|
|46.1| 40.63572604307098|
|23.8| 25.89078239257703|
|47.1| 42.95324819428266|
|40.3|35.747104120751935|
|43.2| 47.56813700381717|
|47.0|44.585879958299294|
|21.5| 27.70982163929161|
|35.7| 27.46739847618266|
|45.3|38.878618288190744|
+----+------------------+
only showing top 10 rows

LinearRegression rmse:  7.071245565932397
+----+------------------+
|   Y|        prediction|
+----+------------------+
|53.5| 44.00000000000001|
|46.1| 43.08333333333334|
|23.8|23.529999999999998|
|47.1| 43.08333333333334|
|40.3|  37.2404761904762|
|43.2| 51.62499999999999|
|47.0| 44.00000000000001|
|21.5|             27.98|
|35.7|  37.2404761904762|
|45.3|             117.5|
+----+------------------+
only showing top 10 rows

DecisionTreeRegressor rmse:  10.303108701589725
+----+------------------+
|   Y|        prediction|
+----+------------------+
|53.5| 49.02265044509651|
|46.1| 46.02380910944079|


In [720]:
evaluator = RegressionEvaluator(labelCol = "Y", 
                                predictionCol = "prediction", 
                                metricName = "rmse")

In [721]:
test_pred = merge_prediction(test_prediction)
test_final_pred = get_feature_vec(test_pred)
test_final_prediction = final_model.transform(test_final_pred)

print('rmse: ', evaluator.evaluate(test_final_prediction))

+---+----+------------------+------------------+------------------+------------------+
|idx|   Y|      prediction_0|      prediction_1|      prediction_2|      prediction_3|
+---+----+------------------+------------------+------------------+------------------+
|  0|53.5|49.664405775834894| 44.00000000000001| 49.02265044509651| 51.14449885300071|
|  1|46.1| 40.63572604307098| 43.08333333333334| 46.02380910944079|45.281161580933365|
|  2|23.8| 25.89078239257703|23.529999999999998|23.579590014667517|27.001703788648314|
|  3|47.1| 42.95324819428266| 43.08333333333334| 44.78541974458075| 47.91961179350069|
|  4|40.3|35.747104120751935|  37.2404761904762| 36.67299824087113| 37.43273358464203|
|  5|43.2| 47.56813700381717| 51.62499999999999| 50.52773089510646| 46.71825053118236|
|  6|47.0|44.585879958299294| 44.00000000000001| 42.43146019176621| 49.33956571523204|
|  7|21.5| 27.70982163929161|             27.98|25.275962608795034| 23.86665346020996|
|  8|35.7| 27.46739847618266|  37.240476190

In [722]:
test_final_prediction.select(['Y', 'prediction']).show()

+----+------------------+
|   Y|        prediction|
+----+------------------+
|53.5| 49.69169921223975|
|46.1|43.676475381213464|
|23.8|27.617582274968015|
|47.1| 46.28936663221773|
|40.3|37.212482621763726|
|43.2| 46.26461570899828|
|47.0| 48.01085026232618|
|21.5| 25.68465854761256|
|35.7|34.595974958910794|
|45.3| 54.54038958335904|
|20.9|26.882949604485255|
|40.9| 43.17435918720354|
|25.6|29.091478531120053|
|45.5| 44.52972627634187|
|56.3|50.597044824073926|
|48.0|28.331101000776677|
|34.4| 42.19530659666002|
|53.0| 44.44070788570184|
|42.5|37.983405899971316|
|16.1|22.828945271281864|
+----+------------------+
only showing top 20 rows

