In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("taxi-fare-predict").getOrCreate()
spark

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/20 13:20:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
directory = "/home/ubuntu/working/datasource"
trip_files = "/trips/*"

trips_df = spark.read.csv(f"file:///{directory}/{trip_files}", inferSchema=True, header=True)
trips_df.printSchema()



root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



                                                                                

In [3]:
trips_df.createOrReplaceTempView("trips")

In [4]:
# 데이터 정제
query = """
SELECT
    t.passenger_count,
    PULocationID as pickup_location_id,
    DOLocationID as dropoff_location_id,
    t.trip_distance,
    HOUR(tpep_pickup_datetime) as pickup_time,
    DATE_FORMAT(TO_DATE(tpep_pickup_datetime), 'EEEE') as day_of_week,
    
    t.total_amount

FROM trips t

WHERE t.total_amount < 200
  AND t.total_amount > 0
  AND t.passenger_count < 5
  AND TO_DATE(t.tpep_pickup_datetime) >= '2021-01-01'
  AND TO_DATE(t.tpep_pickup_datetime) < '2021-08-01'
  AND t.trip_distance < 10
  AND t.trip_distance > 0
"""

data_df = spark.sql(query)

In [5]:
data_df.printSchema()

root
 |-- passenger_count: integer (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)



Train / Test 나누기

In [6]:
train_sdf, test_sdf = data_df.randomSplit([0.8, 0.2], seed=42)

만약에 데이터의 양이 너무 많고, 그 데이터를 오랜 시간 들여서 전처리를 다 완료 했다고 가정.
- 여러 모델을 만들거나 실험을 할 때에도 위의 전처리 작업을 그대로 매번 수행
- 추후에 다시 이 데이터를 활용한다면 시간이 많이 걸릴 듯.
- 처리가 완료된 데이터를 파일이나 데이터베이스에 저장해 놓고 나중에 불러오는게 더 빠르다.

In [7]:
# 파케이 (parquet) 형식으로 저장
save_dir = "/home/ubuntu/working/spark-examples/data/ml-data"

# Spark DataFrame의 write 메소드를 이용해 데이터를 파일 또는 데이터베이스에 저장할 수 있다.
train_sdf.write.format("parquet").save(f"{save_dir}/train/")
test_sdf.write.format("parquet").save(f"{save_dir}/test/")

AnalysisException: path file:/home/ubuntu/working/spark-examples/data/ml-data/train already exists.

## 파이프라인 정의
파이프라인 정의를 위한 stage 정의

In [8]:
# 파이프라인에 넣을 과정을 모아 놓을 리스트
stages = []

## OneHotEncoding Stage
- `pickup_location_id`
- `dropoff_location_id`
- `day_of_week`
`pickup_location_id`, `dropoff_location_id`는 숫자 형식의 데이터
- 숫자 형식의 데이터는 `OneHotEncoding` 불가능
- `StringIndexer` Transformer를 활용해 숫자형 데이터를 문자열로 취급하게끔 할 수 있다.



In [9]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

# OneHotEncoding을 수행할 컬럼
cat_features = [
    "pickup_location_id",
    "dropoff_location_id",
    "day_of_week"
]

for c in cat_features:
    # 1. 데이터를 문자열 형식으로 바꿔준다.
    cat_indexer = StringIndexer(inputCol=c, outputCol=c+"_idx").setHandleInvalid("keep")
    
    # 2. OneHotEnoding 수행
    onehot_encoder = OneHotEncoder(
        inputCols=[cat_indexer.getOutputCol()],
        outputCols=[c+"_onehot"]
    )
    
    stages += [cat_indexer, onehot_encoder]
    
stages

[StringIndexer_b3bdf7bf15b9,
 OneHotEncoder_ea0d7852607c,
 StringIndexer_c014ecc6fa86,
 OneHotEncoder_3ddbfd6a021f,
 StringIndexer_915d8dd92be7,
 OneHotEncoder_57945aa853f5]

## Standard Scaling Stage
- 숫자형 데이터들에 대한 표준화 수행
- `passenger_count`, `trip_distance`, `pickup_time`

In [10]:
# 각 컬럼의 데이터를 벡터화 시키고, Standard Scaling을 수행
from pyspark.ml.feature import VectorAssembler, StandardScaler

num_features = [
    "passenger_count",
    "trip_distance",
    "pickup_time"
]

for n in num_features:
    # 1. 벡터화 스테이지
    num_assembler = VectorAssembler(inputCols=[n], outputCol=n+"_vector")
    
    # 2. StandardScaler 스테이지
    num_scaler = StandardScaler(inputCol=num_assembler.getOutputCol(), outputCol=n+"_scaled")
    
    stages += [num_assembler, num_scaler]
    
stages

[StringIndexer_b3bdf7bf15b9,
 OneHotEncoder_ea0d7852607c,
 StringIndexer_c014ecc6fa86,
 OneHotEncoder_3ddbfd6a021f,
 StringIndexer_915d8dd92be7,
 OneHotEncoder_57945aa853f5,
 VectorAssembler_313751fa088c,
 StandardScaler_07d14930e4ff,
 VectorAssembler_bf68b41381be,
 StandardScaler_8c883075c871,
 VectorAssembler_e095120a623f,
 StandardScaler_191a9ed7ae19]

## Feature Assemble Stage
- 컬럼 명 뒤에 `_onehot`이 붙거나 `_scaled`가 붙은 컬럼만 Feature Vector로 만들기

In [11]:
assembler_inputs = [ c +"_onehot" for c in cat_features ] + [ n + "_scaled" for n in num_features]
assembler_inputs

['pickup_location_id_onehot',
 'dropoff_location_id_onehot',
 'day_of_week_onehot',
 'passenger_count_scaled',
 'trip_distance_scaled',
 'pickup_time_scaled']

In [14]:
feature_assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")

stages.append(feature_assembler)
stages

[StringIndexer_b3bdf7bf15b9,
 OneHotEncoder_ea0d7852607c,
 StringIndexer_c014ecc6fa86,
 OneHotEncoder_3ddbfd6a021f,
 StringIndexer_915d8dd92be7,
 OneHotEncoder_57945aa853f5,
 VectorAssembler_313751fa088c,
 StandardScaler_07d14930e4ff,
 VectorAssembler_bf68b41381be,
 StandardScaler_8c883075c871,
 VectorAssembler_e095120a623f,
 StandardScaler_191a9ed7ae19,
 VectorAssembler_c77edbb511e9]

## Pipeline 구성
순서대로 구성된 stage를 한꺼번에 수행할 파이프라인 생성

In [15]:
from pyspark.ml import Pipeline

transform_stages = stages
pipeline = Pipeline(stages=transform_stages)

## 데이터를 파이프라인에 통과시키기

In [16]:
# transformer의 fit : 변환을 하기 위한 수 또는 방법을 구하는 과정
fitted_transformer = pipeline.fit(train_sdf)

Exception ignored in: <function JavaWrapper.__del__ at 0x7f8261413940>          
Traceback (most recent call last):
  File "/home/ubuntu/miniconda3/envs/spark-env/lib/python3.8/site-packages/pyspark/ml/wrapper.py", line 39, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'VectorAssembler' object has no attribute '_java_obj'
                                                                                

In [17]:
# transform : 데이터를 변환
vec_train_sdf = fitted_transformer.transform(train_sdf)
vec_train_sdf.printSchema()

root
 |-- passenger_count: integer (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_location_id_idx: double (nullable = false)
 |-- pickup_location_id_onehot: vector (nullable = true)
 |-- dropoff_location_id_idx: double (nullable = false)
 |-- dropoff_location_id_onehot: vector (nullable = true)
 |-- day_of_week_idx: double (nullable = false)
 |-- day_of_week_onehot: vector (nullable = true)
 |-- passenger_count_vector: vector (nullable = true)
 |-- passenger_count_scaled: vector (nullable = true)
 |-- trip_distance_vector: vector (nullable = true)
 |-- trip_distance_scaled: vector (nullable = true)
 |-- pickup_time_vector: vector (nullable = true)
 |-- pickup_time_scaled: vector (nullable = true)
 |-- features: vector (nullable 

In [18]:
vec_train_sdf.select("features", "total_amount").show(5)

[Stage 20:>                                                         (0 + 1) / 1]

+--------------------+------------+
|            features|total_amount|
+--------------------+------------+
|(532,[62,311,528,...|         6.8|
|(532,[62,311,526,...|        15.3|
|(532,[62,280,523,...|         8.3|
|(532,[62,308,522,...|        15.8|
|(532,[62,291,524,...|        19.2|
+--------------------+------------+
only showing top 5 rows



                                                                                

## 모델 생성 및 훈련

In [19]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(
    maxIter = 50,
    solver = 'normal', # 최적화 방식
    labelCol = 'total_amount',
    featuresCol = 'features'
)

In [20]:
lr_model = lr.fit(vec_train_sdf)

23/11/20 14:25:17 WARN Instrumentation: [dd10f711] regParam is zero, which might cause numerical instability and overfitting.
23/11/20 14:25:47 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/11/20 14:25:47 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
23/11/20 14:27:40 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
23/11/20 14:27:40 WARN Instrumentation: [dd10f711] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
23/11/20 14:27:41 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/11/20 14:27:41 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

## 예측

In [21]:
# 파이프라인을 이용해 test_sdf 변환
vec_test_sdf = fitted_transformer.transform(test_sdf)

In [22]:
# vec_test_sdf로 예측
predictions = lr_model.transform(vec_test_sdf)

In [23]:
predictions.show()

[Stage 25:>                                                         (0 + 1) / 1]

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+----------------------+-------------------------+-----------------------+--------------------------+---------------+------------------+----------------------+----------------------+--------------------+--------------------+------------------+--------------------+--------------------+------------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|pickup_location_id_idx|pickup_location_id_onehot|dropoff_location_id_idx|dropoff_location_id_onehot|day_of_week_idx|day_of_week_onehot|passenger_count_vector|passenger_count_scaled|trip_distance_vector|trip_distance_scaled|pickup_time_vector|  pickup_time_scaled|            features|        prediction|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+----------------------+-------------------------+----------------------

                                                                                

In [24]:
# 예측한 결과를 따로 확인할 때는 일반적으로 조회만 일어나기 때문에 캐시 처리는 해 주는 것이 좋다.
predictions.cache()

DataFrame[passenger_count: int, pickup_location_id: int, dropoff_location_id: int, trip_distance: double, pickup_time: int, day_of_week: string, total_amount: double, pickup_location_id_idx: double, pickup_location_id_onehot: vector, dropoff_location_id_idx: double, dropoff_location_id_onehot: vector, day_of_week_idx: double, day_of_week_onehot: vector, passenger_count_vector: vector, passenger_count_scaled: vector, trip_distance_vector: vector, trip_distance_scaled: vector, pickup_time_vector: vector, pickup_time_scaled: vector, features: vector, prediction: double]

In [25]:
predictions.select("trip_distance", "day_of_week", "total_amount", "prediction").show()

[Stage 26:>                                                         (0 + 1) / 1]

+-------------+-----------+------------+------------------+
|trip_distance|day_of_week|total_amount|        prediction|
+-------------+-----------+------------+------------------+
|          2.8|   Saturday|        19.3|17.906918373303213|
|          1.0|   Thursday|       12.25|11.758641785745093|
|          0.9|   Thursday|       11.15|11.687663923241496|
|          0.8|   Saturday|         9.8|11.308533187011903|
|          2.6|    Tuesday|       15.41| 16.97472258970197|
|          1.5|    Tuesday|        12.8|13.548759324683143|
|          4.1|  Wednesday|        16.3|19.121522380743787|
|          1.8|     Sunday|         8.8| 11.86784157404618|
|          6.0|     Friday|        25.0|28.049947107583073|
|          3.1|   Saturday|        12.3|13.899130630614604|
|          6.2|     Monday|        0.31|29.192941653695726|
|          4.6|     Friday|        17.8| 38.75091497037384|
|          2.5|   Saturday|       17.75|15.900680118656872|
|          6.3|     Monday|        24.3|

                                                                                

In [26]:
lr_model.summary.rootMeanSquaredError

3.2664868984555686

In [27]:
lr_model.summary.r2

0.7947353376551619

In [28]:
spark.stop()