In [1]:
from pyspark.sql import SparkSession

In [2]:
MAX_MEMORY = "5g"
spark = SparkSession.builder.appName("taxi-fare-prediction")\
                    .config("spark.executor.memory", MAX_MEMORY)\
                    .config("spark.driver.memory", MAX_MEMORY)\
                    .getOrCreate()

22/08/01 00:33:48 WARN Utils: Your hostname, singyeongdeog-ui-Macmini.local resolves to a loopback address: 127.0.0.1; using 222.98.22.103 instead (on interface en0)
22/08/01 00:33:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/01 00:33:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
data_dir = "/Users/singyeongdeog/Documents/github_code/data-engineering/01-spark/data/data/"

In [4]:
train_df = spark.read.parquet(f"{data_dir}/train/")
test_df = spark.read.parquet(f"{data_dir}/test/")

                                                                                

In [5]:
toy_df = train_df.sample(False, 0.1, seed=1)

In [6]:
toy_df.printSchema()

root
 |-- passenger_count: double (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)



In [17]:
train_df.printSchema()

root
 |-- passenger_count: double (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)



In [7]:
# Wednesday -> 3 -> [0,0,0,1,0,0] one-hot-encoding
from pyspark.ml.feature import OneHotEncoder, StringIndexer

cat_feats = [
    "pickup_location_id",
    "dropoff_location_id",
    "day_of_week"
]

stages = []

for c in cat_feats:
    cat_indexer = StringIndexer(inputCol=c, outputCol= c + "_idx").setHandleInvalid("keep")
    onehot_encoder = OneHotEncoder(inputCols=[cat_indexer.getOutputCol()], outputCols=[c + "_onehot"])
    stages += [cat_indexer, onehot_encoder]

In [8]:
stages

[StringIndexer_98bdaf45145a,
 OneHotEncoder_08972455ca00,
 StringIndexer_498659704e2a,
 OneHotEncoder_8120ab409c8d,
 StringIndexer_c46ef6e95060,
 OneHotEncoder_cd00b84f3c08]

In [9]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

num_feats = [
    "passenger_count",
    "trip_distance",
    "pickup_time"
]

for n in num_feats:
    num_assembler = VectorAssembler(inputCols=[n], outputCol= n + "_vector")
    num_scaler = StandardScaler(inputCol=num_assembler.getOutputCol(), outputCol= n + "_scaled")
    stages += [num_assembler, num_scaler]

In [10]:
stages

[StringIndexer_98bdaf45145a,
 OneHotEncoder_08972455ca00,
 StringIndexer_498659704e2a,
 OneHotEncoder_8120ab409c8d,
 StringIndexer_c46ef6e95060,
 OneHotEncoder_cd00b84f3c08,
 VectorAssembler_0a510e48c8d6,
 StandardScaler_e70e5dbb45b9,
 VectorAssembler_4b435420eb82,
 StandardScaler_72911f0ea10b,
 VectorAssembler_9d35a702a410,
 StandardScaler_ab88656aac21]

In [11]:
assembler_inputs = [c + "_onehot" for c in cat_feats] + [n + "_scaled" for n in num_feats]
print(assembler_inputs)

['pickup_location_id_onehot', 'dropoff_location_id_onehot', 'day_of_week_onehot', 'passenger_count_scaled', 'trip_distance_scaled', 'pickup_time_scaled']


In [12]:
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="feature_vector")

In [13]:
stages += [assembler]
print(stages)

[StringIndexer_98bdaf45145a, OneHotEncoder_08972455ca00, StringIndexer_498659704e2a, OneHotEncoder_8120ab409c8d, StringIndexer_c46ef6e95060, OneHotEncoder_cd00b84f3c08, VectorAssembler_0a510e48c8d6, StandardScaler_e70e5dbb45b9, VectorAssembler_4b435420eb82, StandardScaler_72911f0ea10b, VectorAssembler_9d35a702a410, StandardScaler_ab88656aac21, VectorAssembler_fa0c91eaa2d7]


## Hyperparameter Tuning

In [14]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

lr = LinearRegression(
    maxIter=30,
    solver="normal",
    labelCol="total_amount",
    featuresCol="feature_vector"
)

cv_stages = stages + [lr]

In [15]:
cv_pipeline = Pipeline(stages = cv_stages)

In [16]:
param_grid = ParamGridBuilder() \
                .addGrid(lr.elasticNetParam, [0.1,0.2,0.3,0.4,0.5])\
                .addGrid(lr.regParam, [0.01, 0.02, 0.03, 0.04, 0.05])\
                .build()

In [17]:
cross_val = CrossValidator(estimator = cv_pipeline,
                          estimatorParamMaps = param_grid,
                          evaluator = RegressionEvaluator(labelCol="total_amount"),
                          numFolds=5)

In [18]:
cv_model = cross_val.fit(toy_df)

[Stage 20:>                                                         (0 + 8) / 9]

22/08/01 00:40:36 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/08/01 00:40:36 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


                                                                                

22/08/01 00:40:38 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/08/01 00:40:38 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS


                                                                                

In [24]:
alpha = cv_model.bestModel.stages[-1]._java_obj.getElasticNetParam()

In [25]:
reg_param = cv_model.bestModel.stages[-1]._java_obj.getRegParam()

## Training

In [26]:
transform_stages = stages
pipeline = Pipeline(stages = transform_stages)
fitted_transformer = pipeline.fit(train_df)

                                                                                

In [27]:
vtrain_df = fitted_transformer.transform(train_df)

In [30]:
lr = LinearRegression(
    maxIter = 5,
    solver = "normal",
    labelCol = "total_amount",
    featuresCol = "feature_vector",
    elasticNetParam=alpha,
    regParam=reg_param
)

In [31]:
model = lr.fit(vtrain_df)

                                                                                

In [32]:
vtest_df = fitted_transformer.transform(test_df)

In [33]:
predictions = model.transform(vtest_df)

In [34]:
predictions.cache()

DataFrame[passenger_count: double, pickup_location_id: int, dropoff_location_id: int, trip_distance: double, pickup_time: int, day_of_week: string, total_amount: double, pickup_location_id_idx: double, pickup_location_id_onehot: vector, dropoff_location_id_idx: double, dropoff_location_id_onehot: vector, day_of_week_idx: double, day_of_week_onehot: vector, passenger_count_vector: vector, passenger_count_scaled: vector, trip_distance_vector: vector, trip_distance_scaled: vector, pickup_time_vector: vector, pickup_time_scaled: vector, feature_vector: vector, prediction: double]

In [35]:
predictions.select(["trip_distance","day_of_week","total_amount","prediction"]).show()

[Stage 3068:>                                                       (0 + 1) / 1]

+-------------+-----------+------------+------------------+
|trip_distance|day_of_week|total_amount|        prediction|
+-------------+-----------+------------+------------------+
|          0.7|   Saturday|       12.35|12.796814635178729|
|          1.5|     Friday|        11.8|14.539577778945041|
|          2.9|     Sunday|        15.8|16.528791281387818|
|          2.1|   Saturday|       15.35|16.714789390300968|
|          1.7|   Saturday|        13.3| 14.48086011218359|
|          0.4|   Thursday|         4.8| 9.432882486209522|
|          1.4|     Friday|         8.3|11.708234468448047|
|          2.2|    Tuesday|        13.3|12.259181629401228|
|          3.8|    Tuesday|       27.25|17.405601071185306|
|          1.7|    Tuesday|        11.8|12.351997948184742|
|          4.5|  Wednesday|       27.65|18.950831747448834|
|         13.4|     Monday|       66.35| 61.06223741876362|
|         16.2|     Monday|       82.37| 67.51155225025437|
|          7.2|  Wednesday|       32.75|

                                                                                

In [36]:
model.summary.rootMeanSquaredError

5.714217632611252

In [37]:
model.summary.r2 # 정확도

0.8038821465514306

In [38]:
# 저장
model_dir = "/Users/singyeongdeog/Documents/github_code/data-engineering/01-spark/data/model/"
model.save(model_dir)

In [39]:
# 불러오기 
from pyspark.ml.regression import LinearRegressionModel
lr_model = LinearRegressionModel().load(model_dir)

In [40]:
predictions = lr_model.transform(vtest_df)

In [41]:
predictions.show()

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+----------------------+-------------------------+-----------------------+--------------------------+---------------+------------------+----------------------+----------------------+--------------------+--------------------+------------------+--------------------+--------------------+------------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|pickup_location_id_idx|pickup_location_id_onehot|dropoff_location_id_idx|dropoff_location_id_onehot|day_of_week_idx|day_of_week_onehot|passenger_count_vector|passenger_count_scaled|trip_distance_vector|trip_distance_scaled|pickup_time_vector|  pickup_time_scaled|      feature_vector|        prediction|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+----------------------+-------------------------+----------------------