In [2]:
from pyspark.sql import SparkSession
MAX_MEMORY = '8g'
spark = SparkSession.builder.appName("taxi-fare-prediction")\
    .config('spark.driver.memory', MAX_MEMORY)\
    .config('spark.executor.memory', MAX_MEMORY)\
    .getOrCreate()

In [3]:
import os
cwd = os.getcwd()
trip_data_path = os.path.join(cwd, 'learning_spark_data', 'trips', '*.csv')
trip_data_path

file_path = f"file:///{trip_data_path.replace(os.sep, '/')}"

trip_df = spark.read.csv(file_path, inferSchema=True, header=True)
trip_df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [4]:
trip_df.createOrReplaceTempView('trips')

In [5]:
query = """
SELECT 
    passenger_count,
    PULocationID as pickup_location_id,
    DOLocationID as dropoff_location_id,
    trip_distance,
    HOUR(tpep_pickup_datetime) as pickup_time,
    DATE_FORMAT(TO_DATE(tpep_pickup_datetime), 'EEEE') AS day_of_week,
    total_amount
FROM
    trips
WHERE
    total_amount < 5000
    AND total_amount > 0
    AND trip_distance > 0
    AND trip_distance < 500
    AND passenger_count < 4
    AND TO_DATE(tpep_pickup_datetime) >= '2021-01-01'
    AND TO_DATE(tpep_pickup_datetime) < '2021-08-01'
"""

In [6]:
data_df = spark.sql(query)
data_df.show(5)

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|              0|               138|                265|         16.5|          0|     Monday|       70.07|
|              1|                68|                264|         1.13|          0|     Monday|       11.16|
|              1|               239|                262|         2.68|          0|     Monday|       18.59|
|              1|               186|                 91|         12.4|          0|     Monday|        43.8|
|              2|               132|                265|          9.7|          0|     Monday|        32.3|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
only showing top 5 rows



In [7]:
# 모델 만들기

In [13]:
train_df, test_df = data_df.randomSplit([0.8,0.2], seed=1)

In [15]:
from pyspark.ml.feature import VectorAssembler
vassembler = VectorAssembler(inputCols=['passenger_count', 'pickup_location_id', 'dropoff_location_id','trip_distance', 'pickup_time'], outputCol='features')
vtrain_df = vassembler.transform(data_df)
vtrain_df.show(5)

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+--------------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|            features|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+--------------------+
|              0|               138|                265|         16.5|          0|     Monday|       70.07|[0.0,138.0,265.0,...|
|              1|                68|                264|         1.13|          0|     Monday|       11.16|[1.0,68.0,264.0,1...|
|              1|               239|                262|         2.68|          0|     Monday|       18.59|[1.0,239.0,262.0,...|
|              1|               186|                 91|         12.4|          0|     Monday|        43.8|[1.0,186.0,91.0,1...|
|              2|               132|                265|          9.7|          0|     Monday|   

In [16]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(    maxIter=50, labelCol='total_amount', featuresCol='features')
lr_model = lr.fit(vtrain_df) #천만건데이터 학습

vtest_df = vassembler.transform(test_df)
pred = lr_model.transform(vtest_df)

In [17]:
pred.show(5)

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+--------------------+------------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|            features|        prediction|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+--------------------+------------------+
|              0|                 4|                107|          1.0|          9|    Tuesday|       10.55|[0.0,4.0,107.0,1....|11.879656745172685|
|              0|                 4|                234|          1.7|         18|   Saturday|        13.3|[0.0,4.0,234.0,1....|14.515770368184366|
|              0|                 4|                236|          4.1|         15|     Friday|        21.3|[0.0,4.0,236.0,4....| 21.49762126708708|
|              0|                 4|                243|         11.5|          1|     Sunday|        41.3|[0.0,

In [18]:
# 강사님 답

In [19]:
train_df, test_df = data_df.randomSplit([0.8,0.2], seed=1)

# 파이프라인 생성
- 전처리 과정을 각 스테이지로 정의해서 쌓는다
- 범주형) StringIndexer & 원핫인코딩 : 'pickup_location_id', 'dropoff_location_id', 'day_of_week'
- 수치형) StandardScaler : 'passenger_count', 'trip_distance', 'pickup_time'

In [22]:
stages = []

In [23]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

cat_features = ['pickup_location_id', 'dropoff_location_id', 'day_of_week']
for cat in cat_features:
    cat_index = StringIndexer(inputCol=cat, outputCol=cat+'_idx').setHandleInvalid('keep')
    onehot_encode = OneHotEncoder(inputCols=[cat_index.getOutputCol()],   # _idx 컬럼들
                                 outputCols=[cat+'_onehot'])   # postfix 뒤에 붙이기
    stages += [cat_index, onehot_encode]   # 컬럼목록list
stages

[StringIndexer_0d58d2430a6f,
 OneHotEncoder_61795766de61,
 StringIndexer_28056f54160b,
 OneHotEncoder_05e794fc4d97,
 StringIndexer_79e245269036,
 OneHotEncoder_fb10663a68d0]

In [29]:
from pyspark.ml.feature import StandardScaler, VectorAssembler
num_features = ['passenger_count', 'trip_distance', 'pickup_time']

for num in num_features:
    num_assembler = VectorAssembler(inputCols=[num], outputCol=num+'_vector')
    num_scaler = StandardScaler(inputCol=num_assembler.getOutputCol(), outputCol=num+'_scaled')
    stages += [num_assembler, num_scaler]
stages

[StringIndexer_0d58d2430a6f,
 OneHotEncoder_61795766de61,
 StringIndexer_28056f54160b,
 OneHotEncoder_05e794fc4d97,
 StringIndexer_79e245269036,
 OneHotEncoder_fb10663a68d0,
 VectorAssembler_57c69ca5b635,
 StandardScaler_4666af4e50ca,
 VectorAssembler_fe2079e48844,
 StandardScaler_a0f203788fd4,
 VectorAssembler_3f35cfde98f3,
 StandardScaler_1bbaa32a9ab4]

In [31]:
assembler_input = [cat+'_onehot' for cat in cat_features] + [num+'_scaled' for num in num_features]
assembler_input

['pickup_location_id_onehot',
 'dropoff_location_id_onehot',
 'day_of_week_onehot',
 'passenger_count_scaled',
 'trip_distance_scaled',
 'pickup_time_scaled']

In [32]:
assembler = VectorAssembler(inputCols=assembler_input, outputCol='feature_vector')
stages += [assembler]
stages

[StringIndexer_0d58d2430a6f,
 OneHotEncoder_61795766de61,
 StringIndexer_28056f54160b,
 OneHotEncoder_05e794fc4d97,
 StringIndexer_79e245269036,
 OneHotEncoder_fb10663a68d0,
 VectorAssembler_57c69ca5b635,
 StandardScaler_4666af4e50ca,
 VectorAssembler_fe2079e48844,
 StandardScaler_a0f203788fd4,
 VectorAssembler_3f35cfde98f3,
 StandardScaler_1bbaa32a9ab4,
 VectorAssembler_aaef482c26a6]

In [33]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=stages)
fitted_transform = pipeline.fit(train_df)
vtrain_df = fitted_transform.transform(train_df)
vtrain_df.printSchema()

root
 |-- passenger_count: integer (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_location_id_idx: double (nullable = false)
 |-- pickup_location_id_onehot: vector (nullable = true)
 |-- dropoff_location_id_idx: double (nullable = false)
 |-- dropoff_location_id_onehot: vector (nullable = true)
 |-- day_of_week_idx: double (nullable = false)
 |-- day_of_week_onehot: vector (nullable = true)
 |-- passenger_count_vector: vector (nullable = true)
 |-- passenger_count_scaled: vector (nullable = true)
 |-- trip_distance_vector: vector (nullable = true)
 |-- trip_distance_scaled: vector (nullable = true)
 |-- pickup_time_vector: vector (nullable = true)
 |-- pickup_time_scaled: vector (nullable = true)
 |-- feature_vector: vector (nul