In [1]:
# spark session 생성 
from pyspark.sql import SparkSession

In [2]:
# 인스턴스 생성 (Out of memory 방지를 위해 MAX_MEMORY 설정)
MAX_MEMORY="5g"
spark = SparkSession.builder.appName("taxi-fare-prediciton")\
                .config("spark.executor.memory", MAX_MEMORY)\
                .config("spark.driver.memory", MAX_MEMORY)\
                .getOrCreate()

22/05/20 20:55:20 WARN Utils: Your hostname, gim-yelin-ui-iMac.local resolves to a loopback address: 127.0.0.1; using 192.168.219.101 instead (on interface en1)
22/05/20 20:55:20 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/20 20:55:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# 파일 불러오기
trip_files = "/Users/yello-ow/taxi/data/trips/*"
trips_df = spark.read.csv(f"file:///{trip_files}", inferSchema=True, header=True)

                                                                                

In [4]:
# 스키마 확인
trips_df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [5]:
trips_df.createOrReplaceTempView("trips")

In [6]:
# modeling을 위한 column 추가 (baseline: trip_distance)
query = """
SELECT 
    passenger_count,
    PULocationID as pickup_location_id,
    DOLocationID as dropoff_location_id,
    trip_distance,
    HOUR(tpep_pickup_datetime) as pickup_time,
    DATE_FORMAT(TO_DATE(tpep_pickup_datetime), 'EEEE') AS day_of_week,
    total_amount
FROM
    trips
WHERE
    total_amount < 5000
    AND total_amount > 0
    AND trip_distance > 0
    AND trip_distance < 500
    AND passenger_count < 4
    AND TO_DATE(tpep_pickup_datetime) >= '2021-01-01'
    AND TO_DATE(tpep_pickup_datetime) < '2021-08-01'
"""
data_df = spark.sql(query)
data_df.createOrReplaceTempView("data")

In [7]:
data_df.show()

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|              1|               142|                 43|          2.1|          0|     Friday|        11.8|
|              1|               238|                151|          0.2|          0|     Friday|         4.3|
|              1|               132|                165|         14.7|          0|     Friday|       51.95|
|              0|               138|                132|         10.6|          0|     Friday|       36.35|
|              1|                68|                 33|         4.94|          0|     Friday|       24.36|
|              1|               224|                 68|          1.6|          0|     Friday|       14.15|
|              1|           

In [8]:
data_df.printSchema()

root
 |-- passenger_count: integer (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)



In [9]:
train_df, test_df = data_df.randomSplit([0.8, 0.2], seed=1)

In [10]:
data_dir = "/Users/yello-ow/taxi/spark/data/"

In [11]:
# train/ test set split한 데이터를 parquet형식으로 저장 
train_df.write.format("parquet").save(f"{data_dir}/train/")
test_df.write.format("parquet").save(f"{data_dir}/test/")

# parguet: 압축률이 좋고, 디스크 io가 적음, column 기반 format 

                                                                                

In [12]:
# 저장한 데이터 불러오기
train_df = spark.read.parquet(f"{data_dir}/train/")
test_df = spark.read.parquet(f"{data_dir}/test/")

In [13]:
train_df.printSchema()

root
 |-- passenger_count: integer (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)



In [14]:
# stringindexer -> onehotencoder 
# ex) Wednesday -> 3 -> [0,0,0,1,0,0,0]
from pyspark.ml.feature import OneHotEncoder, StringIndexer

# categorical feature 설정 
cat_feats = [
    "pickup_location_id",
    "dropoff_location_id",
    "day_of_week"
]

# stage를 담는 배열 생성 
stages = []

for c in cat_feats:
    cat_indexer = StringIndexer(inputCol=c, outputCol= c + "_idx").setHandleInvalid("keep")
    onehot_encoder = OneHotEncoder(inputCols=[cat_indexer.getOutputCol()], outputCols=[c + "_onehot"])
    stages += [cat_indexer, onehot_encoder]

stages

[StringIndexer_ec80aef8c43f,
 OneHotEncoder_79c8fb43cf66,
 StringIndexer_db067017db06,
 OneHotEncoder_e240a5da5bd5,
 StringIndexer_5880731475e8,
 OneHotEncoder_3c2f909922fa]

In [15]:
# normalization (vectorassembler-> standardscaler)
from pyspark.ml.feature import VectorAssembler, StandardScaler

# numerical feature 설정
num_feats = [
    "passenger_count",
    "trip_distance",
    "pickup_time"
]

for n in num_feats:
    num_assembler = VectorAssembler(inputCols=[n], outputCol= n + "_vecotr")
    num_scaler = StandardScaler(inputCol=num_assembler.getOutputCol(), outputCol= n + "_scaled")
    stages += [num_assembler, num_scaler]

stages

[StringIndexer_ec80aef8c43f,
 OneHotEncoder_79c8fb43cf66,
 StringIndexer_db067017db06,
 OneHotEncoder_e240a5da5bd5,
 StringIndexer_5880731475e8,
 OneHotEncoder_3c2f909922fa,
 VectorAssembler_436ee7dd2ce7,
 StandardScaler_bde68939aebd,
 VectorAssembler_501e4a83e54c,
 StandardScaler_34058d181122,
 VectorAssembler_c5a2411c7751,
 StandardScaler_853aac7ea29b]

In [16]:
# vectorassembler를 이용해 두가지 전처리를 합쳐서 하나의 vector로 만들기
assembler_inputs = [c + "_onehot" for c in cat_feats] + [n + "_scaled" for n in num_feats]
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="feature_vector")
stages += [assembler]
stages

[StringIndexer_ec80aef8c43f,
 OneHotEncoder_79c8fb43cf66,
 StringIndexer_db067017db06,
 OneHotEncoder_e240a5da5bd5,
 StringIndexer_5880731475e8,
 OneHotEncoder_3c2f909922fa,
 VectorAssembler_436ee7dd2ce7,
 StandardScaler_bde68939aebd,
 VectorAssembler_501e4a83e54c,
 StandardScaler_34058d181122,
 VectorAssembler_c5a2411c7751,
 StandardScaler_853aac7ea29b,
 VectorAssembler_30161f67962b]

In [17]:
# stage를 이용해 pipeline 생성 
from pyspark.ml import Pipeline

transform_stages = stages
pipeline = Pipeline(stages=transform_stages)
fitted_transformer = pipeline.fit(train_df)

                                                                                

In [18]:
# pipeline을 이용해 train이 가능한 상태로 변환 
vtrain_df = fitted_transformer.transform(train_df)

In [19]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(
    maxIter=50,
    solver="normal",
    labelCol="total_amount",
    featuresCol="feature_vector"
)

In [20]:
vtrain_df.printSchema()

root
 |-- passenger_count: integer (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_location_id_idx: double (nullable = false)
 |-- pickup_location_id_onehot: vector (nullable = true)
 |-- dropoff_location_id_idx: double (nullable = false)
 |-- dropoff_location_id_onehot: vector (nullable = true)
 |-- day_of_week_idx: double (nullable = false)
 |-- day_of_week_onehot: vector (nullable = true)
 |-- passenger_count_vecotr: vector (nullable = true)
 |-- passenger_count_scaled: vector (nullable = true)
 |-- trip_distance_vecotr: vector (nullable = true)
 |-- trip_distance_scaled: vector (nullable = true)
 |-- pickup_time_vecotr: vector (nullable = true)
 |-- pickup_time_scaled: vector (nullable = true)
 |-- feature_vector: vector (nul

In [21]:
model = lr.fit(vtrain_df)

22/05/20 20:56:04 WARN Instrumentation: [3182f085] regParam is zero, which might cause numerical instability and overfitting.
22/05/20 20:56:07 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/05/20 20:56:07 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/05/20 20:56:09 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
22/05/20 20:56:10 WARN Instrumentation: [3182f085] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
22/05/20 20:56:10 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/05/20 20:56:10 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

In [22]:
vtest_df = fitted_transformer.transform(test_df)

In [23]:
predictions = model.transform(vtest_df)

In [24]:
# 나중에 쓰기 쉽게 만들기 위해 캐싱 
predictions.cache()

DataFrame[passenger_count: int, pickup_location_id: int, dropoff_location_id: int, trip_distance: double, pickup_time: int, day_of_week: string, total_amount: double, pickup_location_id_idx: double, pickup_location_id_onehot: vector, dropoff_location_id_idx: double, dropoff_location_id_onehot: vector, day_of_week_idx: double, day_of_week_onehot: vector, passenger_count_vecotr: vector, passenger_count_scaled: vector, trip_distance_vecotr: vector, trip_distance_scaled: vector, pickup_time_vecotr: vector, pickup_time_scaled: vector, feature_vector: vector, prediction: double]

In [25]:
predictions.select(["trip_distance", "day_of_week", "total_amount", "prediction"]).show()

[Stage 29:>                                                         (0 + 1) / 1]

+-------------+-----------+------------+------------------+
|trip_distance|day_of_week|total_amount|        prediction|
+-------------+-----------+------------+------------------+
|          0.8|  Wednesday|         5.8| 7.008936497794653|
|          4.8|  Wednesday|        28.5|24.087978945597452|
|          2.1|    Tuesday|        15.8|15.568942143209462|
|          3.5|    Tuesday|       22.45|19.424828674257125|
|          6.1|     Monday|        26.3| 28.58548172926217|
|          2.1|     Sunday|        14.8|13.500578166873234|
|          0.4|    Tuesday|         5.8|  7.35100425944948|
|          1.0|     Friday|         9.8| 8.946860981757407|
|          1.3|   Saturday|         8.3| 8.883954691750226|
|          2.2|    Tuesday|       15.95|13.059175786795581|
|          5.2|     Monday|       26.15| 24.16501990739097|
|         12.4|   Saturday|        51.6| 49.13133294403598|
|          2.0|    Tuesday|       14.75| 13.45665448812783|
|          4.1|     Monday|        22.8|

                                                                                

In [26]:
# 모델 평가 
model.summary.rootMeanSquaredError

3.775113709164103

In [27]:
model.summary.r2

0.8962085031462615

In [28]:
spark.stop()