In [44]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("taxi-fare-prediction") \
    .getOrCreate()

In [45]:
trips_path = '../../data/trips/*'
trips_df = spark.read.options(inferSchema=True, header=True).format('parquet').load(trips_path)

                                                                                

In [46]:
trips_df.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)


In [47]:
trips_df.createOrReplaceTempView("trips")

In [48]:
data_df = spark.sql("""
    SELECT 
        trip_distance,
        total_amount
    FROM
        trips
    WHERE
        total_amount < 5000
        AND total_amount > 0
        AND trip_distance > 0
        AND trip_distance < 500
        AND passenger_count < 5
        AND TO_DATE(tpep_pickup_datetime) >= '2021-01-01'
        AND TO_DATE(tpep_pickup_datetime) < '2021-08-01'
""")

In [49]:
data_df.createOrReplaceTempView("data")

In [50]:
data_df.show()

+-------------+------------+
|trip_distance|total_amount|
+-------------+------------+
|         16.5|       70.07|
|         1.13|       11.16|
|         2.68|       18.59|
|         12.4|        43.8|
|          9.7|        32.3|
|          9.3|       43.67|
|         9.58|        46.1|
|         16.2|        45.3|
|         3.58|        19.3|
|         0.91|        14.8|
|         2.57|        12.8|
|          0.4|         5.3|
|         3.26|        17.3|
|        13.41|       47.25|
|         18.3|       61.42|
|         1.53|       14.16|
|          2.0|        11.8|
|         16.6|       54.96|
|         15.5|       56.25|
|          1.3|        16.8|
+-------------+------------+


                                                                                

In [51]:
data_df.describe().show()



+-------+-----------------+------------------+
|summary|    trip_distance|      total_amount|
+-------+-----------------+------------------+
|  count|         13326362|          13326362|
|   mean|2.887408208631757|17.990672623941556|
| stddev|3.833595726187914|13.011619004927265|
|    min|             0.01|              0.01|
|    max|            475.5|            4973.3|
+-------+-----------------+------------------+


                                                                                

In [52]:
## ML Split Train, Test

train_df, test_df = data_df.randomSplit([0.8, 0.2], seed=6)

In [53]:
print(trips_df.count(), test_df.count())



15000936 2665097


                                                                                

In [54]:
# 벡터 어셈블러로 트레인이 가능한 상태로 변환한다.
from pyspark.ml.feature import VectorAssembler

vassembler = VectorAssembler(inputCols=["trip_distance"], outputCol="features")

In [55]:
vtrain_df = vassembler.transform(train_df)
vtrain_df.show()

[Stage 49:>                 (0 + 2) / 2][Stage 69:>                 (0 + 1) / 1]

+-------------+------------+--------+
|trip_distance|total_amount|features|
+-------------+------------+--------+
|         0.01|        3.05|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
+-------------+------------+--------+


                                                                                

In [57]:
# 리그레션 모델 생성
from pyspark.ml.regression import LinearRegression

# 베이스 라인 코드
lr = LinearRegression(
    maxIter=50,
    labelCol="total_amount",
    featuresCol="features"
)

model = lr.fit(vtrain_df)

24/03/25 18:01:53 WARN Instrumentation: [1a513624] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

In [58]:
vtest_df = vassembler.transform(test_df)

In [59]:
# 예측
prediction = model.transform(vtest_df)

In [60]:
prediction.show()

[Stage 78:>                                                         (0 + 1) / 1]

+-------------+------------+--------+----------------+
|trip_distance|total_amount|features|      prediction|
+-------------+------------+--------+----------------+
|         0.01|         3.3|  [0.01]|9.41192023584718|
|         0.01|         3.3|  [0.01]|9.41192023584718|
|         0.01|         3.3|  [0.01]|9.41192023584718|
|         0.01|         3.3|  [0.01]|9.41192023584718|
|         0.01|         3.3|  [0.01]|9.41192023584718|
|         0.01|         3.3|  [0.01]|9.41192023584718|
|         0.01|         3.3|  [0.01]|9.41192023584718|
|         0.01|         3.3|  [0.01]|9.41192023584718|
|         0.01|         3.3|  [0.01]|9.41192023584718|
|         0.01|         3.3|  [0.01]|9.41192023584718|
|         0.01|         3.3|  [0.01]|9.41192023584718|
|         0.01|         3.3|  [0.01]|9.41192023584718|
|         0.01|         3.3|  [0.01]|9.41192023584718|
|         0.01|         3.3|  [0.01]|9.41192023584718|
|         0.01|         3.3|  [0.01]|9.41192023584718|
|         

                                                                                

In [61]:
model.summary.rootMeanSquaredError

6.29603647659225

In [62]:
model.summary.r2

0.7667360682747258

In [25]:
from pyspark.sql.types import DoubleType

distance_list = [1.1, 5.5, 10.5, 30.0]
distance_df = spark.createDataFrame(distance_list, DoubleType()).toDF("trip_distance")

distance_df.show()

                                                                                

+-------------+
|trip_distance|
+-------------+
|          1.1|
|          5.5|
|         10.5|
|         30.0|
+-------------+


In [26]:
vdistance_df = vassembler.transform(distance_df)

In [27]:
model.transform(vdistance_df).show()

+-------------+--------+------------------+
|trip_distance|features|        prediction|
+-------------+--------+------------------+
|          1.1|   [1.1]|12.661914396267541|
|          5.5|   [5.5]|25.781156878698354|
|         10.5|  [10.5]| 40.68938697236974|
|         30.0|  [30.0]| 98.83148433768814|
+-------------+--------+------------------+


In [64]:
# 예측에 사용할 컬럼 추가, 전처리
query = """
SELECT 
    passenger_count,
    PULocationID as pickup_location_id,
    DOLocationID as dropoff_location_id,
    trip_distance,
    HOUR(tpep_pickup_datetime) as pickup_time,
    DATE_FORMAT(TO_DATE(tpep_pickup_datetime), 'EEEE') AS day_of_week,
    total_amount
FROM
    trips
WHERE
    total_amount < 5000
    AND total_amount > 0
    AND trip_distance > 0
    AND trip_distance < 500
    AND passenger_count < 5
    AND TO_DATE(tpep_pickup_datetime) >= '2021-01-01'
    AND TO_DATE(tpep_pickup_datetime) < '2021-08-01'
"""
data_df = spark.sql(query)
data_df.createOrReplaceTempView("data")

data_df.printSchema()

root
 |-- passenger_count: double (nullable = true)
 |-- pickup_location_id: long (nullable = true)
 |-- dropoff_location_id: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)


In [65]:
train_df, test_df = data_df.randomSplit([0.8, 0.2], seed=6)

In [67]:
data_dir = '../../data/result/taxi/'

train_df.write.format("parquet").save(f"{data_dir}/train/")
test_df.write.format("parquet").save(f"{data_dir}/test/")

                                                                                

In [68]:
# read Data
train_df = spark.read.parquet(f"{data_dir}/train/")
test_df = spark.read.parquet(f"{data_dir}/test/")

In [69]:
# 스트링 값을 숫자값으로 바꾸어 원핫인코딩을 진행한다.
from pyspark.ml.feature import OneHotEncoder, StringIndexer

# 카테고리 피쳐 설정
cat_feats = [
    "pickup_location_id",
    "dropoff_location_id",
    "day_of_week"
]

# 스테이지를 담는 배열
stages = []


for c in cat_feats:
    cat_indexer = StringIndexer(inputCol=c, outputCol= c + "_idx").setHandleInvalid("keep")
    onehot_encoder = OneHotEncoder(inputCols=[cat_indexer.getOutputCol()], outputCols=[c + "_onehot"])
    stages += [cat_indexer, onehot_encoder]


# 숫자형 전처리: 벡터 어셈블러, 스탠다드 스칼라
from pyspark.ml.feature import VectorAssembler, StandardScaler

num_feats = [
    "passenger_count",
    "trip_distance",
    "pickup_time"
]

for n in num_feats:
    num_assembler = VectorAssembler(inputCols=[n], outputCol= n + "_vecotr")
    num_scaler = StandardScaler(inputCol=num_assembler.getOutputCol(), outputCol= n + "_scaled")
    stages += [num_assembler, num_scaler]


# 두가지를 프리프로세싱을 했는데 합치는 과정을 진행한다.
# 벡터 어셈블러를 이용하여 가능하다.
assembler_inputs = [c + "_onehot" for c in cat_feats] + [n + "_scaled" for n in num_feats]
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="feature_vector")
stages += [assembler]


# 스테이지로 파이프 라인 생성
from pyspark.ml import Pipeline

transform_stages = stages
pipeline = Pipeline(stages=transform_stages)
fitted_transformer = pipeline.fit(train_df)


# 적용
vtrain_df = fitted_transformer.transform(train_df)

                                                                                

In [70]:
stages

[StringIndexer_793927ac7def,
 OneHotEncoder_d69d3a10ce19,
 StringIndexer_85f5d974c9af,
 OneHotEncoder_f6d6b811fa69,
 StringIndexer_d27148ee81e1,
 OneHotEncoder_5b30e76edd0a,
 VectorAssembler_c139ccbbafef,
 StandardScaler_91d8c868d8c8,
 VectorAssembler_d7e57e838d8f,
 StandardScaler_dc7285ffa500,
 VectorAssembler_8acc1453c431,
 StandardScaler_e0e08579b39c,
 VectorAssembler_ccd0afec9306]

In [71]:
# 모델링

from pyspark.ml.regression import LinearRegression

lr = LinearRegression(
    maxIter=50,
    solver="normal",
    labelCol="total_amount",
    featuresCol="feature_vector"
)

vtrain_df.printSchema()

root
 |-- passenger_count: double (nullable = true)
 |-- pickup_location_id: long (nullable = true)
 |-- dropoff_location_id: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_location_id_idx: double (nullable = false)
 |-- pickup_location_id_onehot: vector (nullable = true)
 |-- dropoff_location_id_idx: double (nullable = false)
 |-- dropoff_location_id_onehot: vector (nullable = true)
 |-- day_of_week_idx: double (nullable = false)
 |-- day_of_week_onehot: vector (nullable = true)
 |-- passenger_count_vecotr: vector (nullable = true)
 |-- passenger_count_scaled: vector (nullable = true)
 |-- trip_distance_vecotr: vector (nullable = true)
 |-- trip_distance_scaled: vector (nullable = true)
 |-- pickup_time_vecotr: vector (nullable = true)
 |-- pickup_time_scaled: vector (nullable = true)
 |-- feature_vector: vector (nullable =

In [73]:
model = lr.fit(vtrain_df)

vtest_df = fitted_transformer.transform(test_df)

predictions = model.transform(vtest_df)

# 캐싱
predictions.cache()

24/03/25 18:11:53 WARN Instrumentation: [6a439c81] regParam is zero, which might cause numerical instability and overfitting.
24/03/25 18:12:14 ERROR RetryingBlockTransferor: Exception while beginning fetch of 1 outstanding blocks (after 3 retries)
java.io.IOException: Connecting to /192.168.219.114:53065 failed in the last 4750 ms, fail this connection directly
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:210)
	at org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(NettyBlockTransferService.scala:130)
	at org.apache.spark.network.shuffle.RetryingBlockTransferor.transferAllOutstanding(RetryingBlockTransferor.java:173)
	at org.apache.spark.network.shuffle.RetryingBlockTransferor.lambda$initiateRetry$0(RetryingBlockTransferor.java:206)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/ja

KeyboardInterrupt: 

In [75]:
model.save("../../data/result/model")

                                                                                

In [76]:
from pyspark.ml.regression import LinearRegressionModel


lr_model = LinearRegressionModel().load('../../data/result/model')


predictions = lr_model.transform(vtest_df)


predictions.show()

[Stage 104:>                (0 + 2) / 2][Stage 112:>                (0 + 1) / 1]

+-------------+------------+--------+----------------+
|trip_distance|total_amount|features|      prediction|
+-------------+------------+--------+----------------+
|         0.01|         3.3|  [0.01]|9.41192023584718|
|         0.01|         3.3|  [0.01]|9.41192023584718|
|         0.01|         3.3|  [0.01]|9.41192023584718|
|         0.01|         3.3|  [0.01]|9.41192023584718|
|         0.01|         3.3|  [0.01]|9.41192023584718|
|         0.01|         3.3|  [0.01]|9.41192023584718|
|         0.01|         3.3|  [0.01]|9.41192023584718|
|         0.01|         3.3|  [0.01]|9.41192023584718|
|         0.01|         3.3|  [0.01]|9.41192023584718|
|         0.01|         3.3|  [0.01]|9.41192023584718|
|         0.01|         3.3|  [0.01]|9.41192023584718|
|         0.01|         3.3|  [0.01]|9.41192023584718|
|         0.01|         3.3|  [0.01]|9.41192023584718|
|         0.01|         3.3|  [0.01]|9.41192023584718|
|         0.01|         3.3|  [0.01]|9.41192023584718|
|         

                                                                                