In [1]:
from pyspark.sql import SparkSession

In [2]:
MAX_MEMORY = '25g'

spark = SparkSession.builder.appName('taxi-fare-predication') \
    .config("spark.excutor.memory", MAX_MEMORY) \
    .config("spark.driver.memory", MAX_MEMORY) \
    .getOrCreate()

22/12/26 02:15:21 WARN Utils: Your hostname, PSui-MacBookPro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.7 instead (on interface en0)
22/12/26 02:15:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/26 02:15:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/12/26 02:15:22 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
target_directory = '../../../../data'
trip_files = f"{target_directory}/trips/*"

In [4]:
trips_df = spark.read.csv(f"{trip_files}", inferSchema=True, header=True)

                                                                                

In [5]:
trips_df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



In [6]:
trips_df.createOrReplaceTempView('trips')

In [7]:
query = """
SELECT
    passenger_count,
    PULocationID as pickup_location_id,
    DOLocationID as dropoff_location_id,
    trip_distance,
    HOUR(tpep_pickup_datetime) as pickup_time,
    DATE_FORMAT(TO_DATE(tpep_pickup_datetime), 'EEEE') AS day_of_week,
    total_amount
FROM
    trips
WHERE
    total_amount < 5000      -- 최대 $5000 달러 미만
    AND total_amount > 0     -- 총 금액 0원 초과
    AND trip_distance > 0    -- 0마일 초과
    AND trip_distance < 500  -- 500마일 미만
    AND passenger_count < 4  -- 탑승자 4명 미만
    AND TO_DATE(tpep_pickup_datetime) >= '2021-01-01'
    AND TO_DATE(tpep_pickup_datetime) < '2021-08-01'
"""
data_df = spark.sql(query)
data_df.createOrReplaceTempView("data")

In [8]:
data_df.show()

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|            1.0|               142|                 43|          2.1|          0|     Friday|        11.8|
|            1.0|               238|                151|          0.2|          0|     Friday|         4.3|
|            1.0|               132|                165|         14.7|          0|     Friday|       51.95|
|            0.0|               138|                132|         10.6|          0|     Friday|       36.35|
|            1.0|                68|                 33|         4.94|          0|     Friday|       24.36|
|            1.0|               224|                 68|          1.6|          0|     Friday|       14.15|
|            1.0|           

In [9]:
data_df.printSchema()

root
 |-- passenger_count: double (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)



# train, test 데이터로 나누기

In [10]:
train_df, test_df = data_df.randomSplit([0.8, 0.2], seed=1)

In [11]:
data_directory = '../../../../data'
train_df.write.format('parquet').save(f'{data_directory}/train')
test_df.write.format('parquet').save(f'{data_directory}/test')

                                                                                

In [12]:
train_df = spark.read.parquet(f'{data_directory}/train/')
test_df = spark.read.parquet(f'{data_directory}/test/')

In [14]:
train_df.printSchema()

root
 |-- passenger_count: double (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)



# Preprocessing // 전처리

## numeric, category
numeric: passenger_count와 같은 숫자
categorical: 문자 의미

In [15]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

cat_feats = [
    'pickup_location_id',
    'dropoff_location_id',
    'day_of_week',
]

stages = []

for c in cat_feats:
    cat_indexer = StringIndexer(inputCol=c, outputCol=c + '_idx').setHandleInvalid('keep')
    onehot_encoder = OneHotEncoder(inputCols=[cat_indexer.getOutputCol()], outputCols=[c + '_onehot'])
    stages += (cat_indexer, onehot_encoder)

In [16]:
stages

[StringIndexer_280331087400,
 OneHotEncoder_37dbf697eeab,
 StringIndexer_b44b10c2cd17,
 OneHotEncoder_dcef24c37690,
 StringIndexer_a42aca9fe793,
 OneHotEncoder_71e046f7042f]

- StringIndexer + OneHotEncoder 하나씩 세트가 됨.

In [17]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

# 숫자이고, 감소 또는 증가의 의미가 있는 값들
num_feats = [
    'passenger_count',
    'trip_distance',
    'pickup_time'
]

for n in num_feats:
    num_assembler = VectorAssembler(inputCols=[n], outputCol=n + '_vector')
    num_scaler = StandardScaler(inputCol=num_assembler.getOutputCol(), outputCol=n + "_scaled")
    stages += [num_assembler, num_scaler]

In [18]:
stages

[StringIndexer_280331087400,
 OneHotEncoder_37dbf697eeab,
 StringIndexer_b44b10c2cd17,
 OneHotEncoder_dcef24c37690,
 StringIndexer_a42aca9fe793,
 OneHotEncoder_71e046f7042f,
 VectorAssembler_099193f94301,
 StandardScaler_600a9d7bdfb9,
 VectorAssembler_bfc7478b1640,
 StandardScaler_a29f71ac0fdd,
 VectorAssembler_18bfb36767b0,
 StandardScaler_a7481a30d91b]

## 두 개를 합치는 작업은 VectorAssembler로 가능하다.

In [25]:
# onehot, scaled
assembler_inputs = [c + '_onehot' for c in cat_feats] + [c + '_scaled' for c in num_feats]
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol='feature_vector')
stages += [assembler]

In [26]:
from pyspark.ml import Pipeline

transform_stages = stages

pipeline = Pipeline(stages=transform_stages)
fitted_transformer = pipeline.fit(train_df)

                                                                                

In [27]:
v_train_df = fitted_transformer.transform(train_df)


In [28]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(
    maxIter=50,
    solver='normal',
    labelCol='total_amount',
    featuresCol='feature_vector'
)

# lr을 training 한 model 생성
model = lr.fit(v_train_df)


22/12/26 02:45:26 WARN Instrumentation: [e275727b] regParam is zero, which might cause numerical instability and overfitting.


[Stage 44:>                                                       (0 + 10) / 12]

22/12/26 02:45:30 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/12/26 02:45:30 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/12/26 02:45:30 WARN InstanceBuilder$JavaBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


                                                                                

22/12/26 02:45:33 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
22/12/26 02:45:33 WARN Instrumentation: [e275727b] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
22/12/26 02:45:34 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/12/26 02:45:34 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS


                                                                                

In [29]:
v_test_df = fitted_transformer.transform(test_df)

In [33]:
predictions = model.transform(v_test_df)

In [34]:
predictions.cache()

DataFrame[passenger_count: double, pickup_location_id: int, dropoff_location_id: int, trip_distance: double, pickup_time: int, day_of_week: string, total_amount: double, pickup_location_id_idx: double, pickup_location_id_onehot: vector, dropoff_location_id_idx: double, dropoff_location_id_onehot: vector, day_of_week_idx: double, day_of_week_onehot: vector, passenger_count_vector: vector, passenger_count_scaled: vector, trip_distance_vector: vector, trip_distance_scaled: vector, pickup_time_vector: vector, pickup_time_scaled: vector, feature_vector: vector, prediction: double]

In [35]:
predictions.select(["trip_distance", "day_of_week", "total_amount", "prediction"]).show()

[Stage 48:>                                                         (0 + 1) / 1]

+-------------+-----------+------------+------------------+
|trip_distance|day_of_week|total_amount|        prediction|
+-------------+-----------+------------+------------------+
|          0.7|   Saturday|       12.35|12.505664808592833|
|          3.1|    Tuesday|        18.0|17.818332958232702|
|          2.1|   Saturday|       15.35|16.768456651353908|
|          1.7|   Saturday|        13.3| 14.31597446743558|
|          4.1|     Friday|        21.3|20.954315125289412|
|          1.4|     Friday|         8.3|12.025221374451174|
|          7.3|    Tuesday|        29.3|28.044553395152988|
|          0.7|  Wednesday|         5.8|  9.70430693523349|
|          5.0|  Wednesday|        24.3|21.171501180856723|
|          6.7|   Saturday|        29.8| 37.50280312520359|
|         16.8|     Friday|       82.37| 71.32483145666382|
|         29.3|     Monday|        80.8|103.55970559455761|
|          4.1|     Friday|        20.8| 22.25851095601274|
|          0.1|  Wednesday|        55.3|

                                                                                

In [36]:
model.summary.rootMeanSquaredError

5.848484065033833

In [37]:
model.summary.r2

0.7969874815768934

In [38]:
spark.stop()