In [1]:
from pyspark.sql import SparkSession

In [2]:
MAX_MEMORY = "6g"
spark = SparkSession.builder.appName("taxi-fare-prediction")\
        .config("spark.executor.memory", MAX_MEMORY)\
        .config("spark.driver.memory", MAX_MEMORY)\
        .getOrCreate()

22/10/06 21:33:48 WARN Utils: Your hostname, Moon-2.local resolves to a loopback address: 127.0.0.1; using 192.168.0.4 instead (on interface en0)
22/10/06 21:33:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/06 21:33:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/10/06 21:33:49 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
trip_files = "/Users/sig6774/Desktop/Data_Engineering/data-engineering-main/01-spark/data/tripdata_2021_1-7.csv"
trips_df = spark.read.csv(f"file:///{trip_files}",inferSchema=True, header=True)
# 가지고 온 csv데이터를 spark의 df로 변환

                                                                                

In [4]:
trips_df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



In [5]:
# sql에서 사용하기 위해 tempview 생성 
trips_df.createOrReplaceTempView("trips")

In [6]:
# 이상치를 제거하고 가져옴 

query = """
SELECT 
    passenger_count, 
    PULocationID as pickup_location_id,
    DOLocationID as dropoff_location_id,
    trip_distance,
    HOUR(tpep_pickup_datetime) as pickup_time,
    DATE_FORMAT(TO_DATE(tpep_pickup_datetime),'EEEE') as day_of_week,
    total_amount
FROM 
    trips
WHERE 
    total_amount < 5000
    AND total_amount > 0 
    AND trip_distance > 0 
    AND trip_distance < 500 
    AND passenger_count < 4 
    AND TO_DATE(tpep_pickup_datetime) >= '2021-01-01'
    AND TO_DATE(tpep_pickup_datetime) < '2021-08-01'
"""
data_df = spark.sql(query)
data_df.createOrReplaceTempView("data")

In [7]:
data_df.show()

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|            1.0|               142|                 43|          2.1|          0|     Friday|        11.8|
|            1.0|               238|                151|          0.2|          0|     Friday|         4.3|
|            1.0|               132|                165|         14.7|          0|     Friday|       51.95|
|            0.0|               138|                132|         10.6|          0|     Friday|       36.35|
|            1.0|                68|                 33|         4.94|          0|     Friday|       24.36|
|            1.0|               224|                 68|          1.6|          0|     Friday|       14.15|
|            1.0|           

In [8]:
data_df.printSchema()

root
 |-- passenger_count: double (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)



In [10]:
# train, test 데이터 분리 
train_df, test_df = data_df.randomSplit([0.8,0.2], seed=410)

In [34]:
# 위의 내용을 다시 불러오기에는 시간이 오래 걸리므로 파일로 저장해놓음 
# data_dir = "/Users/sig6774/Desktop/Data_Engineering/data-engineering-main/01-spark/data/"
# train_df.write.format("parquet").save(f"{data_dir}/train/")
# test_df.write.format("parquet").save(f"{data_dir}/test/")

AnalysisException: path file:/Users/sig6774/Desktop/Data_Engineering/data-engineering-main/01-spark/data/train already exists.

In [35]:
train_df = spark.read.parquet(f"{data_dir}/train/")
test_df = spark.read.parquet(f"{data_dir}/test/")
# 데이터를 쉽게 불러올 수 있음 

In [36]:
train_df.printSchema()

root
 |-- passenger_count: double (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)



### Preprocessing

In [37]:
# category data는 전처리가 필요함 
# string값을 가지고와서 그것을 one-hot encoding 진행 
from pyspark.ml.feature import OneHotEncoder, StringIndexer

cat_feats = [
    "pickup_location_id",
    "dropoff_location_id",
    "day_of_week"
]

# 파이프라인을 통해서 데이터를 통과시키도록 진행할 예정 
# 파이프라인은 여러가지 stage로 되어 있으므로 그 stage를 담는 배열 선언 
stages = []

for c in cat_feats:
    cat_indexer = StringIndexer(inputCol=c, outputCol=c+"_idx").setHandleInvalid("keep")
    # invalid한 값도 어떻게 처리할 지 정의 
    onehot_encoder = OneHotEncoder(inputCols=[cat_indexer.getOutputCol()], outputCols=[c+"_onehot"])
    stages += [cat_indexer, onehot_encoder]
    # 인덱서와 인코더를 파이프라인의 stages에 넣어줌 

In [38]:
stages

[StringIndexer_67ee8bc542be,
 OneHotEncoder_5de1bd64c8cf,
 StringIndexer_aac3c40678dd,
 OneHotEncoder_81230022837d,
 StringIndexer_0468179c12fd,
 OneHotEncoder_c33c169ea76a]

In [39]:
# numeric은 vector assembler로 진행 
from pyspark.ml.feature import VectorAssembler, StandardScaler

num_feats = [
    "passenger_count",
    "trip_distance",
    "pickup_time"
]

for n in num_feats:
    num_assembler = VectorAssembler(inputCols=[n], outputCol=n+"_vector")
    num_scaler = StandardScaler(inputCol=num_assembler.getOutputCol(), outputCol=n + "_scaled")
    stages += [num_assembler, num_scaler]

In [40]:
stages

[StringIndexer_67ee8bc542be,
 OneHotEncoder_5de1bd64c8cf,
 StringIndexer_aac3c40678dd,
 OneHotEncoder_81230022837d,
 StringIndexer_0468179c12fd,
 OneHotEncoder_c33c169ea76a,
 VectorAssembler_1005e359cf39,
 StandardScaler_ee25009722b0,
 VectorAssembler_d4bb05a8b57c,
 StandardScaler_c8c047add8c1,
 VectorAssembler_261159e30a6c,
 StandardScaler_20000c72f25d]

In [41]:
# categorical과 numerical 데이터를 하나로 합치면 학습이 가능
assembler_inputs = [c + "_onehot" for c in cat_feats] + [n + "_scaled" for n in num_feats] 
assembler_inputs

['pickup_location_id_onehot',
 'dropoff_location_id_onehot',
 'day_of_week_onehot',
 'passenger_count_scaled',
 'trip_distance_scaled',
 'pickup_time_scaled']

In [42]:
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="feature_vector")
stages += [assembler]
# 마지막에 assembler 넣어줘야함 
stages

[StringIndexer_67ee8bc542be,
 OneHotEncoder_5de1bd64c8cf,
 StringIndexer_aac3c40678dd,
 OneHotEncoder_81230022837d,
 StringIndexer_0468179c12fd,
 OneHotEncoder_c33c169ea76a,
 VectorAssembler_1005e359cf39,
 StandardScaler_ee25009722b0,
 VectorAssembler_d4bb05a8b57c,
 StandardScaler_c8c047add8c1,
 VectorAssembler_261159e30a6c,
 StandardScaler_20000c72f25d,
 VectorAssembler_64b671e3f798]

### Pipeline

In [43]:

from pyspark.ml import Pipeline
transform_stages = stages 
pipeline = Pipeline(stages=transform_stages)
fitted_transformer = pipeline.fit(train_df)
# 정의한 파이프라인에 데이터를 넣어줘서 일련의 과정을 진행 
# pipeline 이거 하나의 함수같네 

                                                                                

In [44]:
vtrain_df = fitted_transformer.transform(train_df)
# transform을 통해 학습이 가능한 상태로 변환 

In [45]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(
        maxIter=5,
        solver="normal",
        labelCol="total_amount",
        featuresCol="feature_vector"
)

In [46]:
model = lr.fit(vtrain_df)

22/10/06 22:01:39 WARN Instrumentation: [e65b71d2] regParam is zero, which might cause numerical instability and overfitting.


[Stage 46:>                                                         (0 + 7) / 7]

22/10/06 22:01:43 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/10/06 22:01:43 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


                                                                                

22/10/06 22:01:54 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
22/10/06 22:01:54 WARN Instrumentation: [e65b71d2] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
22/10/06 22:01:54 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/10/06 22:01:54 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS


                                                                                

In [47]:
vtest_df = fitted_transformer.transform(test_df)
# transform을 통해 테스트 가능한 상태로 변환 

In [48]:
pred = model.transform(vtest_df)
pred.show()

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+----------------------+-------------------------+-----------------------+--------------------------+---------------+------------------+----------------------+----------------------+--------------------+--------------------+------------------+--------------------+--------------------+------------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|pickup_location_id_idx|pickup_location_id_onehot|dropoff_location_id_idx|dropoff_location_id_onehot|day_of_week_idx|day_of_week_onehot|passenger_count_vector|passenger_count_scaled|trip_distance_vector|trip_distance_scaled|pickup_time_vector|  pickup_time_scaled|      feature_vector|        prediction|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+----------------------+-------------------------+----------------------

In [49]:
pred.cache()
# 캐싱으로 조금 더 쉽게 쓸 수 있도록 진행 

DataFrame[passenger_count: double, pickup_location_id: int, dropoff_location_id: int, trip_distance: double, pickup_time: int, day_of_week: string, total_amount: double, pickup_location_id_idx: double, pickup_location_id_onehot: vector, dropoff_location_id_idx: double, dropoff_location_id_onehot: vector, day_of_week_idx: double, day_of_week_onehot: vector, passenger_count_vector: vector, passenger_count_scaled: vector, trip_distance_vector: vector, trip_distance_scaled: vector, pickup_time_vector: vector, pickup_time_scaled: vector, feature_vector: vector, prediction: double]

In [50]:
pred.select(["trip_distance", "day_of_week", "total_amount","prediction" ]).show()
# 원하는 컬럼만 보면서 예측 결과 확이 ㄴ가능 

[Stage 51:>                                                         (0 + 1) / 1]

+-------------+-----------+------------+------------------+
|trip_distance|day_of_week|total_amount|        prediction|
+-------------+-----------+------------+------------------+
|          0.2|  Wednesday|       74.75|10.929842589879033|
|          5.3|   Thursday|        22.8|23.410668178489793|
|          0.3|  Wednesday|         7.3|12.459712222315154|
|          1.9|     Sunday|        12.8|13.630621874882666|
|          1.9|  Wednesday|       14.15|15.017208062658916|
|          1.7|   Saturday|       14.15|14.880340915703279|
|          5.0|   Saturday|        26.3|20.454820350080986|
|          4.9|   Thursday|       24.35| 21.02798377810661|
|          3.3|     Monday|        18.3|16.094085346996486|
|          4.7|     Monday|        26.0| 22.52476745896286|
|          3.7|    Tuesday|        22.3|20.740580902809256|
|          1.3|     Friday|        19.3|16.200784676243167|
|          2.7|     Monday|       16.45|17.176876010784447|
|          4.4|    Tuesday|       21.62|

                                                                                

In [52]:
model.summary.rootMeanSquaredError

5.888596638783108

In [53]:
model.summary.r2

0.794118428724808