<a href="https://colab.research.google.com/github/parkrye/Python/blob/main/202210_Bigdata/12_%ED%83%9D%EC%8B%9C%EB%B9%84_%EC%98%88%EC%B8%A1%ED%95%98%EA%B8%B0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from pyspark.sql import SparkSession

MAX_MEMORY="5g"
spark = SparkSession.builder.appName("taxi-fare-prediciton")\
                .config("spark.executor.memory", MAX_MEMORY)\
                .config("spark.driver.memory", MAX_MEMORY)\
                .getOrCreate()

In [None]:
directory="C:\\Users\\mhso_lec\\study_notebook\\data"
trip_files = "\\trips\\*"

In [None]:
trips_df = spark.read.csv(f"file:///{directory}\\{trip_files}", inferSchema=True, header=True)
trips_df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [None]:
trips_df.createOrReplaceTempView("trips")

운행 거리(`trip distance`)에 따른 요금(`total amount`)를 예측하는 회귀 모델을 생성

In [None]:
query = """
SELECT
    trip_distance,
    total_amount
FROM trips

WHERE total_amount < 5000
  AND total_amount > 0
  AND trip_distance > 0
  AND trip_distance < 500
  AND passenger_count < 4
  AND TO_DATE(tpep_pickup_datetime) >= "2021-01-01"
  AND TO_DATE(tpep_pickup_datetime) < "2021-08-01"
"""

data_df = spark.sql(query)
data_df.createOrReplaceTempView("data")

In [None]:
data_df.show(5)

+-------------+------------+
|trip_distance|total_amount|
+-------------+------------+
|          2.1|        11.8|
|          0.2|         4.3|
|         14.7|       51.95|
|         10.6|       36.35|
|         4.94|       24.36|
+-------------+------------+
only showing top 5 rows



**train / test split**

In [None]:
train_df, test_df = data_df.randomSplit([0.8, 0.2], seed=1)

In [None]:
train_df.count(), test_df.count()

(10499160, 2626880)

`feature`는 `vector` 형태로 존재해야 하기 때문에 1차원 배열로 만들어 주는 `VectorAssembler` 사용

In [None]:
from pyspark.ml.feature import VectorAssembler


# inputCols에 지정된 컬럼의 데이터들을 1차원 배열 형식으로 묶어서
# outputCol에 지정된 컬럼의 이름으로 새로운 컬럼을 생성
vassembler = VectorAssembler(inputCols=["trip_distance"], outputCol="features")
vtrain_df  = vassembler.transform(train_df)

vtrain_df.show()

+-------------+------------+--------+
|trip_distance|total_amount|features|
+-------------+------------+--------+
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
+-------------+------------+--------+
only showing top 20 rows



**모델 생성 및 훈련**

In [None]:
from pyspark.ml.regression import LinearRegression

In [None]:
lr = LinearRegression(
    maxIter=50,
    labelCol = "total_amount",
    featuresCol = "features"
)

In [None]:
model = lr.fit(vtrain_df)

In [None]:
# 테스트 데이터도 vector assemble 형식으로 변환.
#  테스트 데이터 세트를 위해서 Transformer를 새로 마세요!!! 반드시 훈련 데이터 세트에서 사용했던 Transformer를 사용
vtest_df = vassembler.transform(test_df)
vtest_df.show(5)

+-------------+------------+--------+
|trip_distance|total_amount|features|
+-------------+------------+--------+
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
+-------------+------------+--------+
only showing top 5 rows



In [None]:
predictions = model.transform(vtest_df)
predictions.show()

+-------------+------------+--------+-----------------+
|trip_distance|total_amount|features|       prediction|
+-------------+------------+--------+-----------------+
|         0.01|         3.3|  [0.01]|9.365820268418798|
|         0.01|         3.3|  [0.01]|9.365820268418798|
|         0.01|         3.3|  [0.01]|9.365820268418798|
|         0.01|         3.3|  [0.01]|9.365820268418798|
|         0.01|         3.3|  [0.01]|9.365820268418798|
|         0.01|         3.3|  [0.01]|9.365820268418798|
|         0.01|         3.3|  [0.01]|9.365820268418798|
|         0.01|         3.3|  [0.01]|9.365820268418798|
|         0.01|         3.3|  [0.01]|9.365820268418798|
|         0.01|         3.3|  [0.01]|9.365820268418798|
|         0.01|         3.8|  [0.01]|9.365820268418798|
|         0.01|         3.8|  [0.01]|9.365820268418798|
|         0.01|         3.8|  [0.01]|9.365820268418798|
|         0.01|         3.8|  [0.01]|9.365820268418798|
|         0.01|         3.8|  [0.01]|9.365820268

In [None]:
# RMSE
model.summary.rootMeanSquaredError

6.229850179738432

In [None]:
# R^2
model.summary.r2

0.77055628637138

**실제 데이터를 만들어서 예측**

In [None]:
from pyspark.sql.types import DoubleType
distance_list = [1.1, 5.5, 10.5, 3.0]
distance_df   = spark.createDataFrame(distance_list, DoubleType()).toDF("trip_distance")

distance_df.show(5)

+-------------+
|trip_distance|
+-------------+
|          1.1|
|          5.5|
|         10.5|
|          3.0|
+-------------+



In [None]:
vdistance_df = vassembler.transform(distance_df)
vdistance_df.show()

+-------------+--------+
|trip_distance|features|
+-------------+--------+
|          1.1|   [1.1]|
|          5.5|   [5.5]|
|         10.5|  [10.5]|
|          3.0|   [3.0]|
+-------------+--------+



In [None]:
model.transform(vdistance_df).show()

+-------------+--------+------------------+
|trip_distance|features|        prediction|
+-------------+--------+------------------+
|          1.1|   [1.1]|12.633147289165791|
|          5.5|   [5.5]|25.822357281172003|
|         10.5|  [10.5]| 40.81009590845179|
|          3.0|   [3.0]|18.328487967532112|
+-------------+--------+------------------+



# 두 번째 모델
- feature를 늘려서 예측

In [None]:
query = """
SELECT 
    passenger_count,
    PULocationID as pickup_location_id,
    DOLocationID as dropoff_location_id,
    trip_distance,
    HOUR(tpep_pickup_datetime) as pickup_time,
    DATE_FORMAT(TO_DATE(tpep_pickup_datetime), 'EEEE') AS day_of_week,
    total_amount
FROM
    trips
WHERE
    total_amount < 5000
    AND total_amount > 0
    AND trip_distance > 0
    AND trip_distance < 500
    AND passenger_count < 4
    AND TO_DATE(tpep_pickup_datetime) >= '2021-01-01'
    AND TO_DATE(tpep_pickup_datetime) < '2021-08-01'
"""

data_df = spark.sql(query)
data_df.createOrReplaceTempView("data")

In [None]:
data_df.show(5)

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|              1|               142|                 43|          2.1|          0|     Friday|        11.8|
|              1|               238|                151|          0.2|          0|     Friday|         4.3|
|              1|               132|                165|         14.7|          0|     Friday|       51.95|
|              0|               138|                132|         10.6|          0|     Friday|       36.35|
|              1|                68|                 33|         4.94|          0|     Friday|       24.36|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
only showing top 5 rows



In [None]:
train_df, test_df = data_df.randomSplit([0.8, 0.2], seed=1)

## 파이프라인 생성
- pipeline에 넣을 과정(`stage`)을 하나 씩 모아 놓기

In [None]:
stages = []

`One Hot Encode` stage
- `pickup_location_id`
- `dropoff_location_id`
- `day_of_week`

위 세 `feature`는 범주형 (category)

In [None]:
# StringIndexer : String 값을 Integer로 바꿔준다.
# OneHotEncoder : StringIndexer에 의해 정수가 된 값을 OneHotEncoding을 시켜준다.

from pyspark.ml.feature import OneHotEncoder, StringIndexer

# OHE 할 컬럼 지정
cat_features = [
    "pickup_location_id",
    "dropoff_location_id",
    "day_of_week"
]

for c in cat_features:
    cat_indexer = StringIndexer(inputCol=c, outputCol=c+"_idx").setHandleInvalid("keep")
    onehot_encoder = OneHotEncoder(inputCols=[cat_indexer.getOutputCol()], outputCols=[c + "_onehot"])
    stages += [cat_indexer, onehot_encoder]

stages

[StringIndexer_d84f555fe9ed,
 OneHotEncoder_693a6166fec5,
 StringIndexer_0f04a5fc9fc1,
 OneHotEncoder_a56c6b6dc774,
 StringIndexer_c452e9fb7272,
 OneHotEncoder_ed5e1f315d8a]

`Numerical Data` Preprocessing stage

- `passenger_count`
- `trip_distance`
- `pickup_time`

In [None]:
# 각 컬럼의 데이터를 벡터화 시키고, StandardScaler를 수행한다.
from pyspark.ml.feature import VectorAssembler, StandardScaler

num_features = [
    "passenger_count",
    "trip_distance",
    "pickup_time"
]

for n in num_features:
    num_assembler = VectorAssembler(inputCols=[n], outputCol = n + "_vector")
    num_scaler    = StandardScaler(inputCol = num_assembler.getOutputCol(), outputCol= n+"_scaled")
    stages += [num_assembler, num_scaler]

stages

[StringIndexer_d84f555fe9ed,
 OneHotEncoder_693a6166fec5,
 StringIndexer_0f04a5fc9fc1,
 OneHotEncoder_a56c6b6dc774,
 StringIndexer_c452e9fb7272,
 OneHotEncoder_ed5e1f315d8a,
 VectorAssembler_94dc33be9a6f,
 StandardScaler_b789e3249acf,
 VectorAssembler_9c5bb69dfe58,
 StandardScaler_cf0eebcaef70,
 VectorAssembler_ff1bfceff5a5,
 StandardScaler_2cd19904b6d0]

category, numeric 형식으로 각각 작업된 벡터 결과물들을 하나로 합쳐주기 ( `VectorAssembler` )

In [None]:
# _onehot이 붙은 컬럼과 _scaled가 붙은 컬럼만 있으면 된다.
assembler_inputs = [c + "_onehot" for c in cat_features ] + [ n + "_scaled" for n in num_features ]
assembler_inputs

['pickup_location_id_onehot',
 'dropoff_location_id_onehot',
 'day_of_week_onehot',
 'passenger_count_scaled',
 'trip_distance_scaled',
 'pickup_time_scaled']

In [None]:
assembler = VectorAssembler(inputCols= assembler_inputs, outputCol="feature_vector")
stages += [assembler]
stages

[StringIndexer_d84f555fe9ed,
 OneHotEncoder_693a6166fec5,
 StringIndexer_0f04a5fc9fc1,
 OneHotEncoder_a56c6b6dc774,
 StringIndexer_c452e9fb7272,
 OneHotEncoder_ed5e1f315d8a,
 VectorAssembler_94dc33be9a6f,
 StandardScaler_b789e3249acf,
 VectorAssembler_9c5bb69dfe58,
 StandardScaler_cf0eebcaef70,
 VectorAssembler_ff1bfceff5a5,
 StandardScaler_2cd19904b6d0,
 VectorAssembler_23dabf536232]

**파이프라인 생성**

In [None]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=stages)
fitted_transformer = pipeline.fit(train_df)

transform을 이용해 데이터 변환

In [None]:
vtrain_df = fitted_transformer.transform(train_df)
vtrain_df.printSchema()

root
 |-- passenger_count: integer (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_location_id_idx: double (nullable = false)
 |-- pickup_location_id_onehot: vector (nullable = true)
 |-- dropoff_location_id_idx: double (nullable = false)
 |-- dropoff_location_id_onehot: vector (nullable = true)
 |-- day_of_week_idx: double (nullable = false)
 |-- day_of_week_onehot: vector (nullable = true)
 |-- passenger_count_vector: vector (nullable = true)
 |-- passenger_count_scaled: vector (nullable = true)
 |-- trip_distance_vector: vector (nullable = true)
 |-- trip_distance_scaled: vector (nullable = true)
 |-- pickup_time_vector: vector (nullable = true)
 |-- pickup_time_scaled: vector (nullable = true)
 |-- feature_vector: vector (nul

In [None]:
vtrain_df.select("feature_vector").show()

+--------------------+
|      feature_vector|
+--------------------+
|(534,[62,312,526,...|
|(534,[62,312,528,...|
|(534,[62,282,525,...|
|(534,[62,281,525,...|
|(534,[62,309,528,...|
|(534,[62,291,530,...|
|(534,[62,280,528,...|
|(534,[62,289,528,...|
|(534,[62,304,526,...|
|(534,[62,267,526,...|
|(534,[62,288,528,...|
|(534,[62,294,524,...|
|(534,[63,320,529,...|
|(534,[63,273,524,...|
|(534,[63,273,528,...|
|(534,[63,293,525,...|
|(534,[63,376,525,...|
|(534,[63,274,529,...|
|(534,[63,274,529,...|
|(534,[63,301,524,...|
+--------------------+
only showing top 20 rows



**모델 생성**
`VectorAssembler`를 이용해 `feature`들이 모여있는 `feature_vector` 컬럼을 지정

In [None]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(
    maxIter=50,
    solver="normal",
    labelCol="total_amount",
    featuresCol="feature_vector"
)

In [None]:
model = lr.fit(vtrain_df)

In [None]:
vtest_df = fitted_transformer.transform(test_df)

In [None]:
predictions = model.transform(vtest_df)

In [None]:
# 예측 결과물은 바뀌지 않기 때문에 cache로 지정해서 메모리를 낭비하지 않도록 하는 것이 좋다.
predictions.cache()

DataFrame[passenger_count: int, pickup_location_id: int, dropoff_location_id: int, trip_distance: double, pickup_time: int, day_of_week: string, total_amount: double, pickup_location_id_idx: double, pickup_location_id_onehot: vector, dropoff_location_id_idx: double, dropoff_location_id_onehot: vector, day_of_week_idx: double, day_of_week_onehot: vector, passenger_count_vector: vector, passenger_count_scaled: vector, trip_distance_vector: vector, trip_distance_scaled: vector, pickup_time_vector: vector, pickup_time_scaled: vector, feature_vector: vector, prediction: double]

In [None]:
predictions.select(["trip_distance", "day_of_week", "total_amount", "prediction"]).show()

+-------------+-----------+------------+------------------+
|trip_distance|day_of_week|total_amount|        prediction|
+-------------+-----------+------------+------------------+
|          0.5|     Sunday|        9.35|11.356032832741402|
|          1.7|   Saturday|       14.15|14.907556372535563|
|          0.5|  Wednesday|         7.8| 9.390609712032322|
|          0.8|  Wednesday|         5.8|10.008479842548248|
|          1.5|     Friday|        11.3|12.114028311477544|
|          3.2|     Monday|        11.8|  15.5512753323334|
|          8.0|  Wednesday|        32.8| 32.04826439376774|
|          0.8|   Saturday|         9.0| 13.54602969239609|
|          4.0|   Thursday|        21.3|24.233709526927527|
|          4.4|  Wednesday|       22.55|22.437962421237835|
|          8.3|     Friday|        28.8|31.978691163050875|
|          0.5|    Tuesday|         8.8|13.161766963577456|
|          1.5|     Friday|       11.75|15.895953352085353|
|          4.4|    Tuesday|       21.62|

In [None]:
model.summary.rootMeanSquaredError

5.858288514111962

In [None]:
model.summary.r2

0.796780338507229

In [None]:
spark.stop()