# 라이브러리 및 데이터 로드

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
!pip install pyspark
!pip install -q findspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [3]:
import os
import pandas as pd
import numpy as np

# pyspark for objecy, sql
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext

from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col

# pyspark for ML
from pyspark.ml.regression import LinearRegression
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.feature import VectorAssembler, StandardScaler

import seaborn as sns
import matplotlib.pyplot as plt

# Setting for visualization
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = 'all'

pd.set_option('display.max_columns', 200)
pd.set_option('display.max_colwidth', 400)

from matplotlib import rcParams
sns.set(context='notebook', style='whitegrid',
       rc={'figure.figsize': (18, 4)})
rcParams['figure.figsize'] = 18, 4

# 노트북 재실행을 대비하기 위해 랜덤 시드 설정해놓기
rnd_seed = 23
np.random.seed = rnd_seed
np.random.set_state = rnd_seed

+ 데이터의 header인 column name이 존재하지 않기 때문에 직접 스키마를 정의

In [4]:
## 데이터 스미카 직접 명시해주기 ##
data_path = '/content/drive/MyDrive/Colab Notebooks/cal_housing.data'
# data의 칼럼명을 스키마로 정의해주기
schema_string = 'long lat medage totrooms totbdrms pop houshlds medinc medhv'

fields = [StructField(field, FloatType(), True)for field in schema_string.split()]
schema = StructType(fields)

## 데이터 로드를 위한 SparkSession 만들어주기 ##
spark = SparkSession.builder.master('local[2]')\
        .appName('Linear-Regression-California-Housing')\
        .getOrCreate()

# 데이터파일 로드
# cache 메소드를 이용해서 메모리에 keep해놓기
housing_df = spark.read.csv(path=data_path, schema=schema).cache()

# 상위 5개 행 미리 보기 -> 하나의 Row가 namedTuple 형태로 되어 있음..!
housing_df.take(5)

[Row(long=-122.2300033569336, lat=37.880001068115234, medage=41.0, totrooms=880.0, totbdrms=129.0, pop=322.0, houshlds=126.0, medinc=8.325200080871582, medhv=452600.0),
 Row(long=-122.22000122070312, lat=37.86000061035156, medage=21.0, totrooms=7099.0, totbdrms=1106.0, pop=2401.0, houshlds=1138.0, medinc=8.301400184631348, medhv=358500.0),
 Row(long=-122.23999786376953, lat=37.849998474121094, medage=52.0, totrooms=1467.0, totbdrms=190.0, pop=496.0, houshlds=177.0, medinc=7.257400035858154, medhv=352100.0),
 Row(long=-122.25, lat=37.849998474121094, medage=52.0, totrooms=1274.0, totbdrms=235.0, pop=558.0, houshlds=219.0, medinc=5.643099784851074, medhv=341300.0),
 Row(long=-122.25, lat=37.849998474121094, medage=52.0, totrooms=1627.0, totbdrms=280.0, pop=565.0, houshlds=259.0, medinc=3.8461999893188477, medhv=342200.0)]

+ `take()`: Python named tuple 형태인 DataFrame의 Row 형식으로 데이터 미리보기

# 데이터 탐색

In [5]:
# 그룹핑해서 집계해보기
result_df = housing_df.groupBy('medage').count()
# 값 정렬 기준 설정해주고 내림차순으로 정렬
result_df.sort('medage', ascending=False).show(5)

# 수치형 변수들 기술통계량 살펴보기
housing_df.describe().show(10)

+------+-----+
|medage|count|
+------+-----+
|  52.0| 1273|
|  51.0|   48|
|  50.0|  136|
|  49.0|  134|
|  48.0|  177|
+------+-----+
only showing top 5 rows

+-------+-------------------+-----------------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+
|summary|               long|              lat|            medage|          totrooms|         totbdrms|               pop|         houshlds|            medinc|             medhv|
+-------+-------------------+-----------------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+
|  count|              20640|            20640|             20640|             20640|            20640|             20640|            20640|             20640|             20640|
|   mean|-119.56970444871473|35.63186143109965|28.639486434108527|2635.7630813953488|537.8980135658915|1425.4767441860465|49

+ `df.sort('columnA', ascending = True)`: `columnA` 값을 기준으로 오름차순 정렬
+ `df.describe()`: 수치형 변수들의 기술통계랑을 보여주는 메소드

In [6]:
 # 반환되는 통계량 값들을 소수점 제거하거나 하는 등 커스터마이징해서 출력
(housing_df.describe()).select('summary',
                              F.round('medage', 4).alias('medage'),
                              F.round('totrooms', 4).alias('totrooms'),
                              F.round('totbdrms', 4).alias('totbdrms'),
                              F.round('pop', 4).alias('pop'),
                              F.round('houshlds', 4).alias('houshlds'),
                              F.round('medinc', 4).alias('medinc'),
                              F.round('medhv', 4).alias('medhv')).show(10)

+-------+-------+---------+--------+---------+--------+-------+-----------+
|summary| medage| totrooms|totbdrms|      pop|houshlds| medinc|      medhv|
+-------+-------+---------+--------+---------+--------+-------+-----------+
|  count|20640.0|  20640.0| 20640.0|  20640.0| 20640.0|20640.0|    20640.0|
|   mean|28.6395|2635.7631| 537.898|1425.4767|499.5397| 3.8707|206855.8169|
| stddev|12.5856|2181.6153|421.2479|1132.4621|382.3298| 1.8998|115395.6159|
|    min|    1.0|      2.0|     1.0|      3.0|     1.0| 0.4999|    14999.0|
|    max|   52.0|  39320.0|  6445.0|  35682.0|  6082.0|15.0001|   500001.0|
+-------+-------+---------+--------+---------+--------+-------+-----------+



+ `F`: `pyspark.sql.functions`를 alias

# Feature Engineering

In [7]:
# 파생변수 생성하기
housing_df = (housing_df.withColumn('rmsperhh',
                                   F.round(col('totrooms')/col('houshlds'), 2))\
                        .withColumn('popperhh',
                                   F.round(col('pop')/col('houshlds'), 2))\
                        .withColumn('bdrmsperrm',
                                   F.round(col('totbdrms')/col('totrooms'), 2)))
housing_df.show(5)

+-------+-----+------+--------+--------+------+--------+------+--------+--------+--------+----------+
|   long|  lat|medage|totrooms|totbdrms|   pop|houshlds|medinc|   medhv|rmsperhh|popperhh|bdrmsperrm|
+-------+-----+------+--------+--------+------+--------+------+--------+--------+--------+----------+
|-122.23|37.88|  41.0|   880.0|   129.0| 322.0|   126.0|8.3252|452600.0|    6.98|    2.56|      0.15|
|-122.22|37.86|  21.0|  7099.0|  1106.0|2401.0|  1138.0|8.3014|358500.0|    6.24|    2.11|      0.16|
|-122.24|37.85|  52.0|  1467.0|   190.0| 496.0|   177.0|7.2574|352100.0|    8.29|     2.8|      0.13|
|-122.25|37.85|  52.0|  1274.0|   235.0| 558.0|   219.0|5.6431|341300.0|    5.82|    2.55|      0.18|
|-122.25|37.85|  52.0|  1627.0|   280.0| 565.0|   259.0|3.8462|342200.0|    6.28|    2.18|      0.17|
+-------+-----+------+--------+--------+------+--------+------+--------+--------+--------+----------+
only showing top 5 rows



+ Feature Extraction: 기존 변수에서 파생 변수 생성
+ `withColumn`: DataFrame에 새로운 column 추가

In [8]:
# 사용하지 않을 변수 제외하고 필요한 변수들만 select
housing_df = housing_df.select('medhv',
                              'totbdrms',
                              'pop',
                              'houshlds',
                              'medinc',
                              'rmsperhh',
                              'popperhh',
                              'bdrmsperrm')

featureCols = ['totbdrms', 'pop', 'houshlds', 'medinc', 'rmsperhh', 'popperhh', 'bdrmsperrm']

# VectorAssembler로 feature vector로 변환
assembler = VectorAssembler(inputCols=featureCols, outputCol='features')
assembled_df = assembler.transform(housing_df)
assembled_df.show(10, truncate=True)

+--------+--------+------+--------+------+--------+--------+----------+--------------------+
|   medhv|totbdrms|   pop|houshlds|medinc|rmsperhh|popperhh|bdrmsperrm|            features|
+--------+--------+------+--------+------+--------+--------+----------+--------------------+
|452600.0|   129.0| 322.0|   126.0|8.3252|    6.98|    2.56|      0.15|[129.0,322.0,126....|
|358500.0|  1106.0|2401.0|  1138.0|8.3014|    6.24|    2.11|      0.16|[1106.0,2401.0,11...|
|352100.0|   190.0| 496.0|   177.0|7.2574|    8.29|     2.8|      0.13|[190.0,496.0,177....|
|341300.0|   235.0| 558.0|   219.0|5.6431|    5.82|    2.55|      0.18|[235.0,558.0,219....|
|342200.0|   280.0| 565.0|   259.0|3.8462|    6.28|    2.18|      0.17|[280.0,565.0,259....|
|269700.0|   213.0| 413.0|   193.0|4.0368|    4.76|    2.14|      0.23|[213.0,413.0,193....|
|299200.0|   489.0|1094.0|   514.0|3.6591|    4.93|    2.13|      0.19|[489.0,1094.0,514...|
|241400.0|   687.0|1157.0|   647.0|  3.12|     4.8|    1.79|      0.22

+ Feature Selection: 필요한 변수들만을 선정하여 DataFrame에 새로 할당
+ `VectorAssembler()`: Scaling을 적용시키기 전 필요한 Features를 Vector로 변환

In [9]:
# 위에서 만든 Feature vector인 'features' 넣기
standardScaler = StandardScaler(inputCol='features',
                               outputCol='features_scaled')
# fit, transform
scaled_df = standardScaler.fit(assembled_df).transform(assembled_df)

# scaling된 피처들만 추출해보기
scaled_df.select('features', 'features_scaled').show(10,
                                                    truncate=True)

+--------------------+--------------------+
|            features|     features_scaled|
+--------------------+--------------------+
|[129.0,322.0,126....|[0.30623297630686...|
|[1106.0,2401.0,11...|[2.62553233949916...|
|[190.0,496.0,177....|[0.45104081781631...|
|[235.0,558.0,219....|[0.55786627466754...|
|[280.0,565.0,259....|[0.66469173151877...|
|[213.0,413.0,193....|[0.50564049576249...|
|[489.0,1094.0,514...|[1.16083663111672...|
|[687.0,1157.0,647...|[1.63086864126214...|
|[665.0,1206.0,595...|[1.57864286235709...|
|[707.0,1551.0,714...|[1.67834662208491...|
+--------------------+--------------------+
only showing top 10 rows



+ `standardScaler()`: 표준화 함수

# 데이터 분할 및 회귀 모델링

In [10]:
# 데이터 분할
train_data, test_data = scaled_df.randomSplit([0.8, 0.2], seed=rnd_seed)

# Elastic Net 모델 정의
lr = LinearRegression(featuresCol='features_scaled',
                     labelCol='medhv',
                     predictionCol='predmedhv',
                     maxIter=10,
                     regParam=0.3,
                     elasticNetParam=0.8,
                     standardization=False)
# 모델 학습
linearModel = lr.fit(train_data)

+ `randomSplit([0.8, 0.2], seed = seed)`: 데이터를 학습 8, 테스트 2의 비율로 split

# 테스트 데이터에 대한 예측

In [11]:
# transoform 사용하면 모델 정의할 때 설정한 "예측 값 변수"를 새로 만들어 생성한 데이터프레임 반환
predictions = linearModel.transform(test_data)

print(type(predictions)) # 예측 결과값이 PySpark의 데이터프레임 형태로 반환됨!

<class 'pyspark.sql.dataframe.DataFrame'>


+ `transform(test_dataframe)`: 테스트 데이터를 통한 모델 평가 함수

In [12]:
# predictions 데이터프레임에서 y값과 예측값만 추출해 비교
pred_labels = predictions.select('predmedhv', 'medhv')
pred_labels.show()

+------------------+-------+
|         predmedhv|  medhv|
+------------------+-------+
|  65820.1760042079|14999.0|
|221374.23211624063|22500.0|
|40972.555421607016|22500.0|
|111770.06878754956|26900.0|
|176224.24591040626|34400.0|
| 80414.73100645794|36700.0|
|145622.16563780987|37500.0|
| 78208.77719512756|39400.0|
|136096.33496158186|39800.0|
|108725.69135250003|40900.0|
|  76631.2558769846|41700.0|
| 75815.91817725557|42500.0|
|101987.67841479855|42500.0|
| 80076.98310851539|43000.0|
|  76251.1868141838|43600.0|
|108256.88381314202|44000.0|
| 93202.79491312048|44000.0|
| 92720.30368070453|44400.0|
| 66433.07339323603|44500.0|
|141201.75137895046|44600.0|
+------------------+-------+
only showing top 20 rows



In [13]:
print(f"RMSE:{linearModel.summary.rootMeanSquaredError}")
print(f"MAE: {linearModel.summary.meanAbsoluteError}")
print(f"R2 score: {linearModel.summary.r2}")

RMSE:78116.25647989755
MAE: 56925.44456037992
R2 score: 0.5435224404301477


In [14]:
# RMSE
evaluator = RegressionEvaluator(predictionCol='predmedhv',
                               labelCol='medhv',
                               metricName='rmse')
print(f"RMSE: {evaluator.evaluate(pred_labels)}")

# MAE                            
evaluator = RegressionEvaluator(predictionCol='predmedhv',
                               labelCol='medhv',
                               metricName='mae')
print(f"MAE: {evaluator.evaluate(pred_labels)}")

# R2 Score
evaluator = RegressionEvaluator(predictionCol='predmedhv',
                               labelCol='medhv',
                               metricName='r2')
print(f"R2 score: {evaluator.evaluate(pred_labels)}")

RMSE: 75954.86368997341
MAE: 55516.49548175519
R2 score: 0.5595706743501385


In [15]:
# 예측값인 pred_labels를 RDD 자료구조로 변환하고 넣어야 함
metrics = RegressionMetrics(pred_labels.rdd)

print("RMSE:", metrics.rootMeanSquaredError)
print("MAE:", metrics.meanAbsoluteError)
print("R2 score:", metrics.r2)



RMSE: 75954.86368997341
MAE: 55516.49548175519
R2 score: 0.5595706743501385
