In [1]:
from pyspark.sql import SparkSession

MAX_MEMORY = '8g'

spark = SparkSession.builder.appName('taxi-fare-prediction_2nd') \
    .config('spark.driver.memory', MAX_MEMORY) \
    .config('spark.executor.memory', MAX_MEMORY) \
    .getOrCreate()

In [2]:
import os
cwd =os.getcwd()
trip_data_path = os.path.join(cwd, 'learning_spark_data','trips','*.csv')
file_path = f"file:///{trip_data_path.replace(os.sep,'/') }"
trip_df = spark.read.csv(file_path, inferSchema=True, header=True)
trip_df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [3]:
trip_df.createOrReplaceTempView('trips')

In [4]:
query = """
SELECT 
    passenger_count,
    PULocationID AS pickup_location_id,
    DOLocationID AS dropoff_location_id,
    trip_distance,
    HOUR(tpep_pickup_datetime) AS pickup_time,
    DATE_FORMAT(TO_DATE(tpep_pickup_datetime), 'EEEE') AS day_of_week,
    total_amount
FROM
    trips
WHERE
    total_amount < 5000
    AND total_amount > 0
    AND trip_distance > 0
    AND trip_distance < 500
    AND passenger_count < 4
    AND TO_DATE(tpep_pickup_datetime) >= '2021-01-01'
    AND TO_DATE(tpep_pickup_datetime) < '2021-08-01'
"""


In [5]:
trip_df = spark.sql(query)
trip_df.createOrReplaceTempView('data')

In [6]:
query = '''
SELECT * 
FROM DATA
LIMIT 5
'''

spark.sql(query).show()

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|              0|               138|                265|         16.5|          0|     Monday|       70.07|
|              1|                68|                264|         1.13|          0|     Monday|       11.16|
|              1|               239|                262|         2.68|          0|     Monday|       18.59|
|              1|               186|                 91|         12.4|          0|     Monday|        43.8|
|              2|               132|                265|          9.7|          0|     Monday|        32.3|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+



In [7]:
# 분할
train_data, test_data = trip_df.randomSplit([0.8, 0.2], seed=1)

# 파이프라인 생성
- 전처리 과정을 각 스테이지로 정의해서 쌓는다
- 범주형] StringIndexer+onehotencoding : 'pickup_location_id', 'dropoff_location_id', 'day_of_week'- 
수치형] StandardScaler : 'passenger_count', 'trip_distance', 'pickup_time'

In [8]:
stages =[]

In [9]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
cat_features = ['pickup_location_id', 'dropoff_location_id', 'day_of_week']
for cat in cat_features:
    cat_index = StringIndexer(inputCol=cat, outputCol=cat+'_idx').setHandleInvalid('keep')
    onehot_encode = OneHotEncoder(inputCols= [cat_index.getOutputCol()], #_idx col
                                  outputCols=[cat+'_onehot'] #postfix
                                 )
    stages += [cat_index, onehot_encode ] #collist
stages

[StringIndexer_1d841b1c3348,
 OneHotEncoder_f2fc04206797,
 StringIndexer_a19fae2c13dc,
 OneHotEncoder_a7fc6d014518,
 StringIndexer_71fef2e295e2,
 OneHotEncoder_8d0057f5447a]

In [10]:
from pyspark.ml.feature import StandardScaler, VectorAssembler
num_features = ['passenger_count', 'trip_distance', 'pickup_time']

for num in num_features:
    num_assembler = VectorAssembler(inputCols=[num], outputCol=num+'_vector')
    num_scaler = StandardScaler(inputCol=num_assembler.getOutputCol(), outputCol=num+'_scaled')
    stages += [num_assembler, num_scaler]
stages

[StringIndexer_1d841b1c3348,
 OneHotEncoder_f2fc04206797,
 StringIndexer_a19fae2c13dc,
 OneHotEncoder_a7fc6d014518,
 StringIndexer_71fef2e295e2,
 OneHotEncoder_8d0057f5447a,
 VectorAssembler_031b9ebd593b,
 StandardScaler_dbef57a69110,
 VectorAssembler_a6ff3f6e0581,
 StandardScaler_10d716ae61a4,
 VectorAssembler_f93e50d02e23,
 StandardScaler_19a869957674]

In [11]:
assembler_input = [cat + '_onehot' for cat in cat_features] + [num+ '_scaled' for num in num_features]
assembler_input

['pickup_location_id_onehot',
 'dropoff_location_id_onehot',
 'day_of_week_onehot',
 'passenger_count_scaled',
 'trip_distance_scaled',
 'pickup_time_scaled']

선형회귀 모델 - 값들이 수치형
data_df = feature(6) + label(1)
cat_feature = 3 -> StringIndexer+OneHotEncoder
num_feature = 3 -> VectorAssembler + StandardScaler
각 피처마다 2단계에 걸쳐 전처리
총6개 피처 -> 2단계 ->

In [12]:
assembler = VectorAssembler(inputCols=assembler_input, outputCol='feature_vector')
stages+= [assembler]
stages

[StringIndexer_1d841b1c3348,
 OneHotEncoder_f2fc04206797,
 StringIndexer_a19fae2c13dc,
 OneHotEncoder_a7fc6d014518,
 StringIndexer_71fef2e295e2,
 OneHotEncoder_8d0057f5447a,
 VectorAssembler_031b9ebd593b,
 StandardScaler_dbef57a69110,
 VectorAssembler_a6ff3f6e0581,
 StandardScaler_10d716ae61a4,
 VectorAssembler_f93e50d02e23,
 StandardScaler_19a869957674,
 VectorAssembler_b9c430d737d0]

In [13]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=stages)
fitted_transform = pipeline.fit(train_data)
vector_df = fitted_transform.transform(train_data) # 백터화된 train_data
vector_df.printSchema()

root
 |-- passenger_count: integer (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_location_id_idx: double (nullable = false)
 |-- pickup_location_id_onehot: vector (nullable = true)
 |-- dropoff_location_id_idx: double (nullable = false)
 |-- dropoff_location_id_onehot: vector (nullable = true)
 |-- day_of_week_idx: double (nullable = false)
 |-- day_of_week_onehot: vector (nullable = true)
 |-- passenger_count_vector: vector (nullable = true)
 |-- passenger_count_scaled: vector (nullable = true)
 |-- trip_distance_vector: vector (nullable = true)
 |-- trip_distance_scaled: vector (nullable = true)
 |-- pickup_time_vector: vector (nullable = true)
 |-- pickup_time_scaled: vector (nullable = true)
 |-- feature_vector: vector (nul

In [14]:
vector_df.select('feature_vector').show(2)

+--------------------+
|      feature_vector|
+--------------------+
|(533,[62,311,527,...|
|(533,[62,280,526,...|
+--------------------+
only showing top 2 rows



In [15]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(maxIter=50, featuresCol='feature_vector', labelCol='total_amount')
model = lr.fit(vector_df)

In [16]:
# 테스트데이터도 변환
ytest_df = fitted_transform.transform(test_data)
ytest_df.printSchema()

root
 |-- passenger_count: integer (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_location_id_idx: double (nullable = false)
 |-- pickup_location_id_onehot: vector (nullable = true)
 |-- dropoff_location_id_idx: double (nullable = false)
 |-- dropoff_location_id_onehot: vector (nullable = true)
 |-- day_of_week_idx: double (nullable = false)
 |-- day_of_week_onehot: vector (nullable = true)
 |-- passenger_count_vector: vector (nullable = true)
 |-- passenger_count_scaled: vector (nullable = true)
 |-- trip_distance_vector: vector (nullable = true)
 |-- trip_distance_scaled: vector (nullable = true)
 |-- pickup_time_vector: vector (nullable = true)
 |-- pickup_time_scaled: vector (nullable = true)
 |-- feature_vector: vector (nul

In [17]:
pred = model.transform(ytest_df)

In [18]:
pred.select('total_amount', 'prediction').show(5)

+------------+------------------+
|total_amount|        prediction|
+------------+------------------+
|       10.55|12.695522792729275|
|        13.3|14.450558014776915|
|        21.3|21.108271361254218|
|        41.3| 40.87993984204378|
|       14.15|13.906932982613997|
+------------+------------------+
only showing top 5 rows



In [19]:
spark.stop()