PySpark을 로컬머신에 설치하고 노트북을 사용하기 보다는 머신러닝 관련 다양한 라이브러리가 이미 설치되었고 좋은 하드웨어를 제공해주는 Google Colab을 통해 실습을 진행한다.

이를 위해 pyspark과 Py4J 패키지를 설치한다. Py4J 패키지는 파이썬 프로그램이 자바가상머신상의 오브젝트들을 접근할 수 있게 해준다. Local Standalone Spark을 사용한다.

In [1]:
!pip install pyspark==3.0.1 py4j==0.10.9 



In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Taipei Housing Price Prediction") \
    .getOrCreate()

# 타이베이 주택 가격 예측 모델 만들기




데이터셋 설명

이번 문제는 대만 타이베이 시의 신단 지역에서 수집된 주택 거래 관련 정보를 바탕으로 주택 가격(정확히는 주택의 평당 가격)을 예측하는 Regression 모델을 만들어보는 것이다. 총 6개의 피쳐와 주택의 평당 가격에 해당하는 레이블 정보가 훈련 데이터로 제공된다. 레이블의 경우에는 주택의 최종 가격이 아니라 평당 가격이란 점을 다시 한번 강조한다.

각 컬럼에 대한 설명은 아래와 같으며 모든 필드는 X4를 제외하고는 실수 타입이다.

* X1: 주택 거래 날짜를 실수로 제공한다. 소수점 부분은 달을 나타낸다. 예를 들어 2013.250이라면 2013년 3월임을 나타낸다 (0.250 = 3/12)
* X2: 주택 나이 (년수)
* X3: 가장 가까운 지하철역까지의 거리 (미터)
* X4: 주택 근방 걸어갈 수 있는 거리내 편의점 수
* X5: 주택 위치의 위도 (latitude)
* X6: 주택 위치의 경도 (longitude)
* Y: 주택 평당 가격



In [3]:
spark

In [4]:
!wget https://grepp-reco-test.s3.ap-northeast-2.amazonaws.com/Taipei_sindan_housing.csv

--2021-07-25 16:26:01--  https://grepp-reco-test.s3.ap-northeast-2.amazonaws.com/Taipei_sindan_housing.csv
Resolving grepp-reco-test.s3.ap-northeast-2.amazonaws.com (grepp-reco-test.s3.ap-northeast-2.amazonaws.com)... 52.219.146.70
Connecting to grepp-reco-test.s3.ap-northeast-2.amazonaws.com (grepp-reco-test.s3.ap-northeast-2.amazonaws.com)|52.219.146.70|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 20014 (20K) [text/csv]
Saving to: ‘Taipei_sindan_housing.csv.1’


2021-07-25 16:26:02 (104 KB/s) - ‘Taipei_sindan_housing.csv.1’ saved [20014/20014]



In [5]:
!ls -tl

total 44
-rw-r--r-- 1 root root 20014 Jul 17 17:35 Taipei_sindan_housing.csv
-rw-r--r-- 1 root root 20014 Jul 17 17:35 Taipei_sindan_housing.csv.1
drwxr-xr-x 1 root root  4096 Jul 16 13:20 sample_data


In [6]:
data = spark.read.csv('./Taipei_sindan_housing.csv', header=True, inferSchema=True)

In [7]:
data.printSchema()

root
 |-- X1: double (nullable = true)
 |-- X2: double (nullable = true)
 |-- X3: double (nullable = true)
 |-- X4: integer (nullable = true)
 |-- X5: double (nullable = true)
 |-- X6: double (nullable = true)
 |-- Y: double (nullable = true)



In [8]:
data.show()

+--------+----+--------+---+--------+---------+----+
|      X1|  X2|      X3| X4|      X5|       X6|   Y|
+--------+----+--------+---+--------+---------+----+
|2012.917|32.0|84.87882| 10|24.98298|121.54024|37.9|
|2012.917|19.5|306.5947|  9|24.98034|121.53951|42.2|
|2013.583|13.3|561.9845|  5|24.98746|121.54391|47.3|
|  2013.5|13.3|561.9845|  5|24.98746|121.54391|54.8|
|2012.833| 5.0|390.5684|  5|24.97937|121.54245|43.1|
|2012.667| 7.1| 2175.03|  3|24.96305|121.51254|32.1|
|2012.667|34.5|623.4731|  7|24.97933|121.53642|40.3|
|2013.417|20.3|287.6025|  6|24.98042|121.54228|46.7|
|  2013.5|31.7|5512.038|  1|24.95095|121.48458|18.8|
|2013.417|17.9| 1783.18|  3|24.96731|121.51486|22.1|
|2013.083|34.8|405.2134|  1|24.97349|121.53372|41.4|
|2013.333| 6.3|90.45606|  9|24.97433| 121.5431|58.1|
|2012.917|13.0|492.2313|  5|24.96515|121.53737|39.3|
|2012.667|20.4|2469.645|  4|24.96108|121.51046|23.8|
|  2013.5|13.2|1164.838|  4|24.99156|121.53406|34.3|
|2013.583|35.7|579.2083|  2| 24.9824|121.54619

In [9]:
cols = data.columns[:]
X, y = data.select(cols[:-1]), data.select(cols[-1])

# 코드 작성

## 1. Feature Vector와 기본 Linear Regression

In [10]:
from pyspark.ml.feature import VectorAssembler
import copy


def make_feature_vector(data=None, feature_columns=None):
    _schema = copy.deepcopy(data.schema)
    df_copy = data.rdd.zipWithIndex().toDF(_schema)
    df_assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')
    return df_assembler.transform(df_copy)


feature_columns = data.columns[:-1]
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')
data_FV = make_feature_vector(data=data, feature_columns=feature_columns)

In [11]:
feature_columns

['X1', 'X2', 'X3', 'X4', 'X5', 'X6']

In [12]:
data_FV = assembler.transform(data)

In [13]:
data_FV.show()

+--------+----+--------+---+--------+---------+----+--------------------+
|      X1|  X2|      X3| X4|      X5|       X6|   Y|            features|
+--------+----+--------+---+--------+---------+----+--------------------+
|2012.917|32.0|84.87882| 10|24.98298|121.54024|37.9|[2012.917,32.0,84...|
|2012.917|19.5|306.5947|  9|24.98034|121.53951|42.2|[2012.917,19.5,30...|
|2013.583|13.3|561.9845|  5|24.98746|121.54391|47.3|[2013.583,13.3,56...|
|  2013.5|13.3|561.9845|  5|24.98746|121.54391|54.8|[2013.5,13.3,561....|
|2012.833| 5.0|390.5684|  5|24.97937|121.54245|43.1|[2012.833,5.0,390...|
|2012.667| 7.1| 2175.03|  3|24.96305|121.51254|32.1|[2012.667,7.1,217...|
|2012.667|34.5|623.4731|  7|24.97933|121.53642|40.3|[2012.667,34.5,62...|
|2013.417|20.3|287.6025|  6|24.98042|121.54228|46.7|[2013.417,20.3,28...|
|  2013.5|31.7|5512.038|  1|24.95095|121.48458|18.8|[2013.5,31.7,5512...|
|2013.417|17.9| 1783.18|  3|24.96731|121.51486|22.1|[2013.417,17.9,17...|
|2013.083|34.8|405.2134|  1|24.97349|1

### 1-1. train / test 나누기

In [14]:
train, test = data_FV.randomSplit([0.7, 0.3])

### 1-2. 기본적인 LinearRegression

In [15]:
from pyspark.ml.regression import LinearRegression


LR = LinearRegression(featuresCol="features", labelCol="Y")
model = LR.fit(train)

In [16]:
evaluation = model.evaluate(test)

In [17]:
predictions = model.transform(test)

In [18]:
predictions.show()

+--------+----+--------+---+--------+---------+----+--------------------+------------------+
|      X1|  X2|      X3| X4|      X5|       X6|   Y|            features|        prediction|
+--------+----+--------+---+--------+---------+----+--------------------+------------------+
|2012.667| 0.0|185.4296|  0| 24.9711| 121.5317|37.9|[2012.667,0.0,185...|43.262434927813956|
|2012.667| 5.7|90.45606|  9|24.97433| 121.5431|53.5|[2012.667,5.7,90....|50.175696151694865|
|2012.667|12.4|1712.632|  2|24.96412| 121.5167|31.3|[2012.667,12.4,17...| 32.39629033613528|
|2012.667|12.6|383.2805|  7|24.96735|121.54464|42.5|[2012.667,12.6,38...|42.937979159118186|
|2012.667|15.5|815.9314|  4|24.97886|121.53464|37.4|[2012.667,15.5,81...| 40.32598880853948|
|2012.667|15.6|289.3248|  5|24.98203|121.54348|46.1|[2012.667,15.6,28...|  44.2082499918306|
|2012.667|29.4|4510.359|  1|24.94925|121.49542|13.2|[2012.667,29.4,45...| 8.958724282269031|
|2012.667|32.7|392.4459|  6|24.96398| 121.5425|30.5|[2012.667,32.7,39.

In [19]:
evaluation.rootMeanSquaredError

8.85366889841316

## 2. 성능향상을 위한 Feature Scaling


- X1: 주택 거래 날짜를 실수로 제공한다. 소수점 부분은 달을 나타낸다. 예를 들어 2013.250이라면 2013년 3월임을 나타낸다 (0.250 = 3/12)
- X2: 주택 나이 (년수)
- X3: 가장 가까운 지하철역까지의 거리 (미터)
- X4: 주택 근방 걸어갈 수 있는 거리내 편의점 수
- X5: 주택 위치의 위도 (latitude)
- X6: 주택 위치의 경도 (longitude)
- Y: 주택 평당 가격

### 2-0. 결측치 파악

In [20]:
from pyspark.sql.functions import isnan, isnull, when, count

# isnull
data.select([count(when(isnull(col), col)).alias(col) for col in data.columns]).show()

+---+---+---+---+---+---+---+
| X1| X2| X3| X4| X5| X6|  Y|
+---+---+---+---+---+---+---+
|  0|  0|  0|  0|  0|  0|  0|
+---+---+---+---+---+---+---+



In [21]:
# isnan()
data.select([count(when(isnan(col), col)).alias(col) for col in data.columns]).show()

+---+---+---+---+---+---+---+
| X1| X2| X3| X4| X5| X6|  Y|
+---+---+---+---+---+---+---+
|  0|  0|  0|  0|  0|  0|  0|
+---+---+---+---+---+---+---+



- 결측치 없음 (isnull(), isnan())

### 2-1. X1(주택거래 년, 월)은 카테고리컬하게 변환
- 어느 특정 기간에 프리미엄이 있을지 모름

In [22]:
from pyspark.ml.feature import StringIndexer


X1_indexer = StringIndexer(inputCol='X1', outputCol='X1_indexed')
X1_indexer_model = X1_indexer.fit(X)
data_transformed = X1_indexer_model.transform(data)

In [23]:
data_transformed.show()

+--------+----+--------+---+--------+---------+----+----------+
|      X1|  X2|      X3| X4|      X5|       X6|   Y|X1_indexed|
+--------+----+--------+---+--------+---------+----+----------+
|2012.917|32.0|84.87882| 10|24.98298|121.54024|37.9|       3.0|
|2012.917|19.5|306.5947|  9|24.98034|121.53951|42.2|       3.0|
|2013.583|13.3|561.9845|  5|24.98746|121.54391|47.3|      11.0|
|  2013.5|13.3|561.9845|  5|24.98746|121.54391|54.8|       1.0|
|2012.833| 5.0|390.5684|  5|24.97937|121.54245|43.1|       5.0|
|2012.667| 7.1| 2175.03|  3|24.96305|121.51254|32.1|       6.0|
|2012.667|34.5|623.4731|  7|24.97933|121.53642|40.3|       6.0|
|2013.417|20.3|287.6025|  6|24.98042|121.54228|46.7|       0.0|
|  2013.5|31.7|5512.038|  1|24.95095|121.48458|18.8|       1.0|
|2013.417|17.9| 1783.18|  3|24.96731|121.51486|22.1|       0.0|
|2013.083|34.8|405.2134|  1|24.97349|121.53372|41.4|       2.0|
|2013.333| 6.3|90.45606|  9|24.97433| 121.5431|58.1|       7.0|
|2012.917|13.0|492.2313|  5|24.96515|121

### 2-2. X2(주택 나이)는 log로 변환
- 오래될 수록 감가상각 폭이 크다고 생각.
    - 로그로 변환 후, 크기 조정하면 될듯

In [24]:
from pyspark.sql.functions import col, log


# X_transformed = X_transformed.withColumn("X2_indexed", log(col("X2")))
data_transformed = data_transformed.withColumn("X2_indexed", log(col("X2") + 1))

In [25]:
data_transformed.show()

+--------+----+--------+---+--------+---------+----+----------+------------------+
|      X1|  X2|      X3| X4|      X5|       X6|   Y|X1_indexed|        X2_indexed|
+--------+----+--------+---+--------+---------+----+----------+------------------+
|2012.917|32.0|84.87882| 10|24.98298|121.54024|37.9|       3.0|3.4965075614664802|
|2012.917|19.5|306.5947|  9|24.98034|121.53951|42.2|       3.0|3.0204248861443626|
|2013.583|13.3|561.9845|  5|24.98746|121.54391|47.3|      11.0|2.6602595372658615|
|  2013.5|13.3|561.9845|  5|24.98746|121.54391|54.8|       1.0|2.6602595372658615|
|2012.833| 5.0|390.5684|  5|24.97937|121.54245|43.1|       5.0| 1.791759469228055|
|2012.667| 7.1| 2175.03|  3|24.96305|121.51254|32.1|       6.0|2.0918640616783932|
|2012.667|34.5|623.4731|  7|24.97933|121.53642|40.3|       6.0|  3.56953269648137|
|2013.417|20.3|287.6025|  6|24.98042|121.54228|46.7|       0.0|3.0587070727153796|
|  2013.5|31.7|5512.038|  1|24.95095|121.48458|18.8|       1.0| 3.487375077903208|
|201

- X2 값이 0인 경우, log값이 null이 되는 문제가 있음.
    - X1 값에 1을 더한 후, log를 취하는 방법으로 해결

### 2-3. X3(지하철역까지 거리)도 log로 변환
- 거리이기 때문에 로그로 변환하면 좋을 듯
    - 지하철역이 너무 멀어서 일정 거리 이상은 똑같이 지하철역이 의미가 없는 경우는 없을 것.
        - 지하철 설치를 그렇게 하지는 않았겠지...

In [26]:
from pyspark.sql.functions import col


data_transformed = data_transformed.withColumn("X3_indexed", log(col("X3") + 1))

In [27]:
data_transformed.show()

+--------+----+--------+---+--------+---------+----+----------+------------------+------------------+
|      X1|  X2|      X3| X4|      X5|       X6|   Y|X1_indexed|        X2_indexed|        X3_indexed|
+--------+----+--------+---+--------+---------+----+----------+------------------+------------------+
|2012.917|32.0|84.87882| 10|24.98298|121.54024|37.9|       3.0|3.4965075614664802| 4.452937232813715|
|2012.917|19.5|306.5947|  9|24.98034|121.53951|42.2|       3.0|3.0204248861443626| 5.728783007313997|
|2013.583|13.3|561.9845|  5|24.98746|121.54391|47.3|      11.0|2.6602595372658615| 6.333252096677222|
|  2013.5|13.3|561.9845|  5|24.98746|121.54391|54.8|       1.0|2.6602595372658615| 6.333252096677222|
|2012.833| 5.0|390.5684|  5|24.97937|121.54245|43.1|       5.0| 1.791759469228055|  5.97016021281406|
|2012.667| 7.1| 2175.03|  3|24.96305|121.51254|32.1|       6.0|2.0918640616783932| 7.685257394645503|
|2012.667|34.5|623.4731|  7|24.97933|121.53642|40.3|       6.0|  3.56953269648137|

### 2-4. X4(근처 편의점 수)
- double 형으로만 변환해두자

In [28]:
data_transformed

DataFrame[X1: double, X2: double, X3: double, X4: int, X5: double, X6: double, Y: double, X1_indexed: double, X2_indexed: double, X3_indexed: double]

In [29]:
from pyspark.sql.types import DoubleType


data_transformed = data_transformed.withColumn("X4_indexed", data_transformed["X4"].cast(DoubleType()))

In [30]:
data_transformed.show()

+--------+----+--------+---+--------+---------+----+----------+------------------+------------------+----------+
|      X1|  X2|      X3| X4|      X5|       X6|   Y|X1_indexed|        X2_indexed|        X3_indexed|X4_indexed|
+--------+----+--------+---+--------+---------+----+----------+------------------+------------------+----------+
|2012.917|32.0|84.87882| 10|24.98298|121.54024|37.9|       3.0|3.4965075614664802| 4.452937232813715|      10.0|
|2012.917|19.5|306.5947|  9|24.98034|121.53951|42.2|       3.0|3.0204248861443626| 5.728783007313997|       9.0|
|2013.583|13.3|561.9845|  5|24.98746|121.54391|47.3|      11.0|2.6602595372658615| 6.333252096677222|       5.0|
|  2013.5|13.3|561.9845|  5|24.98746|121.54391|54.8|       1.0|2.6602595372658615| 6.333252096677222|       5.0|
|2012.833| 5.0|390.5684|  5|24.97937|121.54245|43.1|       5.0| 1.791759469228055|  5.97016021281406|       5.0|
|2012.667| 7.1| 2175.03|  3|24.96305|121.51254|32.1|       6.0|2.0918640616783932| 7.68525739464

### 2-5. X5(위도), X6(경도)
- 가장 비싼 집으로 부터의 거리 차이를 로그로?
- 어떻게 가공해야 의미가 있을까
- 의미가 있게 가공하기 힘들어 일단 그냥 삭제

### 2-6. 훈련 및 평가

In [31]:
data_transformed.show()

+--------+----+--------+---+--------+---------+----+----------+------------------+------------------+----------+
|      X1|  X2|      X3| X4|      X5|       X6|   Y|X1_indexed|        X2_indexed|        X3_indexed|X4_indexed|
+--------+----+--------+---+--------+---------+----+----------+------------------+------------------+----------+
|2012.917|32.0|84.87882| 10|24.98298|121.54024|37.9|       3.0|3.4965075614664802| 4.452937232813715|      10.0|
|2012.917|19.5|306.5947|  9|24.98034|121.53951|42.2|       3.0|3.0204248861443626| 5.728783007313997|       9.0|
|2013.583|13.3|561.9845|  5|24.98746|121.54391|47.3|      11.0|2.6602595372658615| 6.333252096677222|       5.0|
|  2013.5|13.3|561.9845|  5|24.98746|121.54391|54.8|       1.0|2.6602595372658615| 6.333252096677222|       5.0|
|2012.833| 5.0|390.5684|  5|24.97937|121.54245|43.1|       5.0| 1.791759469228055|  5.97016021281406|       5.0|
|2012.667| 7.1| 2175.03|  3|24.96305|121.51254|32.1|       6.0|2.0918640616783932| 7.68525739464

In [32]:
from pyspark.ml.feature import VectorAssembler

feature_columns = data_transformed.columns[-4:]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

In [33]:
data_transformed_2 = assembler.transform(data_transformed)

In [34]:
data_transformed_3 = data_transformed_2.select('features', 'Y')

In [35]:
data_transformed_3.show()

+--------------------+----+
|            features|   Y|
+--------------------+----+
|[3.0,3.4965075614...|37.9|
|[3.0,3.0204248861...|42.2|
|[11.0,2.660259537...|47.3|
|[1.0,2.6602595372...|54.8|
|[5.0,1.7917594692...|43.1|
|[6.0,2.0918640616...|32.1|
|[6.0,3.5695326964...|40.3|
|[0.0,3.0587070727...|46.7|
|[1.0,3.4873750779...|18.8|
|[0.0,2.9391619220...|22.1|
|[2.0,3.5779478934...|41.4|
|[7.0,1.9878743481...|58.1|
|[3.0,2.6390573296...|39.3|
|[6.0,3.0633909220...|23.8|
|[1.0,2.6532419646...|34.3|
|[11.0,3.602776755...|50.5|
|[4.0,0.0,5.683572...|70.1|
|[9.0,2.9285235238...|37.4|
|[0.0,2.8848007128...|42.3|
|[6.0,0.9162907318...|47.7|
+--------------------+----+
only showing top 20 rows



In [36]:
train_transformed, test_transformed = data_transformed_3.randomSplit([0.7, 0.3])

In [37]:
LR_transformed = LinearRegression(featuresCol="features", labelCol="Y")
model_transformed = LR.fit(train_transformed)

In [38]:
evaluation_transformed = model_transformed.evaluate(test_transformed)

In [39]:
predictions_transformed = model_transformed.transform(test_transformed)

In [40]:
predictions_transformed.show()

+--------------------+----+------------------+
|            features|   Y|        prediction|
+--------------------+----+------------------+
|[0.0,0.0,5.683572...|63.3|53.645723362995774|
|[0.0,1.2809338454...|31.1| 34.96924277669019|
|[0.0,1.5040773967...|36.7| 39.53851007544946|
|[0.0,1.7047480922...|29.3| 30.72131416730827|
|[0.0,2.0014800002...|59.5| 57.59698280187941|
|[0.0,2.3702437414...|49.3|42.313737484532034|
|[0.0,2.3887627892...|23.1| 29.00834140166183|
|[0.0,2.5572273113...|46.6| 23.36629938475761|
|[0.0,2.5802168295...|30.1|30.233122701346637|
|[0.0,2.6461747973...|45.9| 39.25961371596318|
|[0.0,2.7536607123...|30.5| 28.60219723299874|
|[0.0,2.8564702062...|51.0| 43.45725643363785|
|[0.0,2.9014215940...|40.1| 41.09051196645667|
|[0.0,2.9391619220...|22.1|  28.3768882789391|
|[0.0,2.9549102790...|43.1| 38.99279417119572|
|[0.0,3.0445224377...|29.4| 28.70526369437969|
|[0.0,3.1223649244...|42.0| 41.78527929957667|
|[0.0,3.1223649244...|44.2|  37.6617395679364|
|[0.0,3.44680

In [45]:
print(evaluation.rootMeanSquaredError, evaluation_transformed.rootMeanSquaredError)

8.85366889841316 8.478302595701324


In [48]:
print(evaluation.meanAbsoluteError, evaluation_transformed.meanAbsoluteError)

6.590959584442128 5.926922734910455


In [49]:
print(evaluation.r2, evaluation_transformed.r2)

0.5767593813096183 0.5470782518439314


## 3. 생각

- RMSE 값이 Feature Engeeniering 이전이 8.85366889841316, 이후가 8.478302595701324 로 별 차이가 없었다.
    - MAE, r2도 소폭 나아지긴 했지만, 유의미한 향상은 아님.
        - feature 들에 대한 잘못된 가정이 있었을 것. EDA부터하며 데이터를 봐야했는데, 잘못했을 것.
        - scale을 맞추면 더 나았을 듯.