### Hyper Parameter
The purpose of hyperparameter tuning is to maximize the generalizability of the model. That is, to ensure it performs well not just on the training data but also on unseen data (test data). By appropriately adjusting hyperparameters, one can prevent overfitting and improve the predictive performance of the model.

In [9]:
from pyspark.sql import SparkSession

In [10]:
#Create a Spark instance and pre-allocate Max memory to prevent errors.
MAX_MEMORY = "5g"
spark = SparkSession.builder.appName("taxi-duration-prediction-2")\
            .config("spark.executor.memory", MAX_MEMORY)\
            .config("spark.driver.memory", MAX_MEMORY).getOrCreate()

In [11]:
# 데이터를 읽기 전에 데이터 타입을 변환
data_dir = "/Users/kyungminpark/Desktop/Fall2023/CS4641/data/"

In [12]:
# 스키마를 사용하여 Parquet 파일을 읽음
train_df = spark.read.parquet(f"{data_dir}/train/")
test_df = spark.read.parquet(f"{data_dir}/test/")
toy_df = train_df.sample(False, 0.1, seed=1)

In [13]:
toy_df.printSchema()

root
 |-- passenger_count: double (nullable = true)
 |-- pickup_location_id: string (nullable = true)
 |-- dropoff_location_id: string (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)



In [14]:
## Categorical Feature PreProcessing Steps
from pyspark.ml.feature import OneHotEncoder, StringIndexer

cat_feats = [ ##categorical features
    "pickup_location_id",
    "dropoff_location_id",
    "day_of_week"
]

## 파이프라인 stage을 통해 데이터를 통과 (파이프라인은 여러 스테이지로 구성되어 있고, 각 스테이지별로 실행됨)
stages = []

for c in cat_feats:
    cat_indexer = StringIndexer(inputCol=c, outputCol= c + "_idx").setHandleInvalid("keep")
    onehot_encoder = OneHotEncoder(inputCols=[cat_indexer.getOutputCol()], outputCols=[c + "_onehot"])
    stages += [cat_indexer, onehot_encoder]

In [15]:
stages

[StringIndexer_c04b93a7d72d,
 OneHotEncoder_90b8024776a9,
 StringIndexer_f9c864b47fe7,
 OneHotEncoder_1f24680b8a7e,
 StringIndexer_b28979efdfdd,
 OneHotEncoder_32bfe22f3b77]

In [16]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

num_feats = [
    "pickup_time",
    "passenger_count",
    "trip_distance"
]

for n in num_feats:
    num_assembler = VectorAssembler(inputCols=[n], outputCol= n + "_vecotr")
    num_scaler = StandardScaler(inputCol=num_assembler.getOutputCol(), outputCol= n + "_scaled")
    stages += [num_assembler, num_scaler]

In [17]:
assembler_inputs = [c + "_onehot" for c in cat_feats] + [n + "_scaled" for n in num_feats]
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="feature_vector")
stages += [assembler]

## Hyperparameter Tuning

In [18]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

lr = LinearRegression(
    maxIter=30,
    solver="normal",
    labelCol='total_amount',
    featuresCol='feature_vector'
)

cv_stages = stages + [lr] ##Preprocessing + linear Regression

In [19]:
cv_pipeline = Pipeline(stages=cv_stages)

In [20]:
param_grid = ParamGridBuilder()\
                .addGrid(lr.elasticNetParam, [0.1, 0.2, 0.3, 0.4, 0.5])\
                .addGrid(lr.regParam, [0.01, 0.02, 0.03, 0.04, 0.05])\
                .build()

cross_val = CrossValidator(estimator=cv_pipeline,
                           estimatorParamMaps=param_grid,
                           evaluator=RegressionEvaluator(labelCol="total_amount"),
                           numFolds=5)

In [21]:
cv_model = cross_val.fit(toy_df) ##Train toy_def

23/11/07 08:54:43 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/11/07 08:54:43 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
                                                                                

In [22]:
alpha = cv_model.bestModel.stages[-1]._java_obj.getElasticNetParam()
reg_param = cv_model.bestModel.stages[-1]._java_obj.getRegParam()

## 이걸 실행하면 alpha - 0.1 , reg_param - 0.05 인걸 알게됨.
## 이 숫자들을 가지고 다시 Training Section 적용해서 성능 비교.

## Training

In [23]:
transform_stages = stages
pipeline = Pipeline(stages=transform_stages)
fitted_transformer = pipeline.fit(train_df)

vtrain_df = fitted_transformer.transform(train_df)


lr = LinearRegression(
    maxIter=50,
    solver="normal",
    labelCol="total_amount",
    featuresCol="feature_vector",
    elasticNetParam=alpha, ## Newly added from the above
    regParam=reg_param,    ## Newly added from the above
)

In [24]:
vtrain_df.printSchema()

root
 |-- passenger_count: double (nullable = true)
 |-- pickup_location_id: string (nullable = true)
 |-- dropoff_location_id: string (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_location_id_idx: double (nullable = false)
 |-- pickup_location_id_onehot: vector (nullable = true)
 |-- dropoff_location_id_idx: double (nullable = false)
 |-- dropoff_location_id_onehot: vector (nullable = true)
 |-- day_of_week_idx: double (nullable = false)
 |-- day_of_week_onehot: vector (nullable = true)
 |-- pickup_time_vecotr: vector (nullable = true)
 |-- pickup_time_scaled: vector (nullable = true)
 |-- passenger_count_vecotr: vector (nullable = true)
 |-- passenger_count_scaled: vector (nullable = true)
 |-- trip_distance_vecotr: vector (nullable = true)
 |-- trip_distance_scaled: vector (nullable = true)
 |-- feature_vector: vector (nullab

In [25]:
model = lr.fit(vtrain_df)
vtest_df = fitted_transformer.transform(test_df)

predictions = model.transform(vtest_df)
predictions.cache()

                                                                                

DataFrame[passenger_count: double, pickup_location_id: string, dropoff_location_id: string, trip_distance: double, pickup_time: int, day_of_week: string, total_amount: double, pickup_location_id_idx: double, pickup_location_id_onehot: vector, dropoff_location_id_idx: double, dropoff_location_id_onehot: vector, day_of_week_idx: double, day_of_week_onehot: vector, pickup_time_vecotr: vector, pickup_time_scaled: vector, passenger_count_vecotr: vector, passenger_count_scaled: vector, trip_distance_vecotr: vector, trip_distance_scaled: vector, feature_vector: vector, prediction: double]

In [26]:
predictions.select(["trip_distance", "day_of_week", "total_amount", "prediction"]).show()

[Stage 3048:>                                                       (0 + 1) / 1]

+-------------+-----------+------------+------------------+
|trip_distance|day_of_week|total_amount|        prediction|
+-------------+-----------+------------+------------------+
|          0.3|    Tuesday|        12.6|14.690715913607383|
|          1.5|   Saturday|        14.0|  18.0011250791562|
|          1.9|   Thursday|       21.35| 20.18423144836835|
|          2.3|   Thursday|       29.85| 23.50635369736571|
|         16.7|     Friday|       90.55| 86.99375670121539|
|          0.9|    Tuesday|        17.0|15.940835772262776|
|          1.9|     Sunday|        22.4|19.563971842794913|
|          2.6|  Wednesday|        27.9|24.373894082368693|
|          3.3|    Tuesday|        47.0|27.207305688184444|
|          3.3|     Monday|        29.7| 26.57452001959966|
|          2.3|  Wednesday|        33.2|23.319352620882395|
|          0.6|  Wednesday|        15.4| 16.02458878822991|
|          0.8|  Wednesday|       14.25| 17.17093393251385|
|          0.9|   Thursday|       14.25|

                                                                                

In [27]:
model.summary.rootMeanSquaredError

6.672639084886589

In [28]:
model.summary.r2

0.9105094622761973

# 모델 저장 및 재사용

In [30]:
model_dir = "/Users/kyungminpark/Desktop/Fall2023/CS4641/data/model"
model.save(model_dir)

In [31]:
from pyspark.ml.regression import LinearRegressionModel

lr_model = LinearRegressionModel().load(model_dir)
predictions = lr_model.transform(vtest_df)

predictions.show()

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+----------------------+-------------------------+-----------------------+--------------------------+---------------+------------------+------------------+--------------------+----------------------+----------------------+--------------------+--------------------+--------------------+------------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|pickup_location_id_idx|pickup_location_id_onehot|dropoff_location_id_idx|dropoff_location_id_onehot|day_of_week_idx|day_of_week_onehot|pickup_time_vecotr|  pickup_time_scaled|passenger_count_vecotr|passenger_count_scaled|trip_distance_vecotr|trip_distance_scaled|      feature_vector|        prediction|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+----------------------+-------------------------+----------------------