In [None]:
#08_02_Regression.ipynb

In [3]:
from pyspark.sql import SparkSession
MAX_MEMORY = '8g'
spark = SparkSession.builder.appName("taxi-fare-prediciton_2nd")\
            .config('spark.driver.memory', MAX_MEMORY)\
            .config('spark.executor.memory', MAX_MEMORY)\
            .getOrCreate()

In [4]:
import os
cwd = os.getcwd()
trip_data_path = os.path.join(cwd, 'learning_spark_data', 'trips', '*.csv')
file_path = f"file:///{trip_data_path.replace(os.sep,'/') }"
trip_df = spark.read.csv(file_path, inferSchema=True, header=True)
trip_df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [2]:
#spark.stop()

In [5]:
trip_df.createOrReplaceTempView('trips')

In [6]:
query = """
SELECT 
    passenger_count,
    PULocationID as pickup_location_id,
    DOLocationID as dropoff_location_id,
    trip_distance,
    HOUR(tpep_pickup_datetime) as pickup_time,
    DATE_FORMAT(TO_DATE(tpep_pickup_datetime), 'EEEE') AS day_of_week,
    total_amount
FROM
    trips
WHERE
    total_amount < 5000
    AND total_amount > 0
    AND trip_distance > 0
    AND trip_distance < 500
    AND passenger_count < 4
    AND TO_DATE(tpep_pickup_datetime) >= '2021-01-01'
    AND TO_DATE(tpep_pickup_datetime) < '2021-08-01'
"""


In [7]:
data_df = spark.sql(query)
data_df.show(5)

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|              0|               138|                265|         16.5|          0|     Monday|       70.07|
|              1|                68|                264|         1.13|          0|     Monday|       11.16|
|              1|               239|                262|         2.68|          0|     Monday|       18.59|
|              1|               186|                 91|         12.4|          0|     Monday|        43.8|
|              2|               132|                265|          9.7|          0|     Monday|        32.3|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
only showing top 5 rows



In [8]:
train_df, test_df = data_df.randomSplit([0.8,0.2], seed=1)

# 파이프라인 생성
- 전처리 과정을 각 스테이지로 정의해서 쌓는다
- 범주형] StringIndexer+onehotencoding : 'pickup_location_id', 'dropoff_location_id', 'day_of_week'
- 수치형] StandardScaler : 'passenger_count', 'trip_distance', 'pickup_time'

In [10]:
stages = []

In [12]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
cat_features = ['pickup_location_id', 'dropoff_location_id', 'day_of_week']
for cat in cat_features:
    cat_index = StringIndexer(inputCol=cat, outputCol=cat+'_idx').setHandleInvalid('keep')
    onehot_encode = OneHotEncoder(inputCols= [cat_index.getOutputCol()], #_idx col
                                  outputCols=[cat+'_onehot'] #postfix
                                 )
    stages += [cat_index, onehot_encode ] #collist
stages

[StringIndexer_7b022c005e72,
 OneHotEncoder_4b6524673046,
 StringIndexer_84d5e108de22,
 OneHotEncoder_9f6aa9695f65,
 StringIndexer_e29455f5f2d3,
 OneHotEncoder_61ad2dfc7458]