In [1]:
from pyspark.sql import SparkSession

MAX_MEMORY = "5g"
spark = SparkSession.builder.appName("taxi_fare_prediction")\
    .config("spark.executor.memory", MAX_MEMORY)\
    .config("spark.driver.memory", MAX_MEMORY)\
    .getOrCreate()

22/02/19 15:07:47 WARN Utils: Your hostname, robertminui-MacBookAir.local resolves to a loopback address: 127.0.0.1; using 192.168.0.21 instead (on interface en0)
22/02/19 15:07:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/02/19 15:07:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
trip_files = "/Users/robertmin/PycharmProjects/study/data_engineering/spark_review/data/taxi/*"
trips_df = spark.read.csv(f"file:///{trip_files}", inferSchema=True, header=True)
trips_df.printSchema()



root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



                                                                                

In [3]:
trips_df.createOrReplaceTempView("trips")

In [4]:
query = """
SELECT
    passenger_count,
    PULocationID as pickup_location_id,
    DOLocationID as dropoff_location_id,
    trip_distance,
    HOUR(tpep_pickup_datetime) as pickup_time,
    DATE_FORMAT(TO_DATE(tpep_pickup_datetime), 'EEEE') AS day_of_week,
    total_amount
FROM
    (SELECT
        *,
        TO_DATE(t.tpep_pickup_datetime) AS pickup_date
    FROM
        trips t)
WHERE
    total_amount < 5000
    AND total_amount > 0
    AND trip_distance > 0
    AND trip_distance < 500
    AND passenger_count < 4
    AND pickup_date >= '2021-01-01'
    AND pickup_date < '2021-08-01'
"""
data_df = spark.sql(query)
data_df.createOrReplaceTempView("data")

In [5]:
data_df.printSchema()

root
 |-- passenger_count: integer (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)



In [6]:
train_df, test_df = data_df.randomSplit([0.8, 0.2])

In [7]:
toy_df = train_df.sample(False, .1, seed=261)

In [2]:
# parquet 으로 저장
data_dir = "/Users/robertmin/PycharmProjects/study/data_engineering/spark_review/data"
train_df.write.format("parquet").save(f"{data_dir}/train/")
test_df.write.format("parquet").save(f"{data_dir}/test/")
toy_df.write.format("parquet").save(f"{data_dir}/toy/")

NameError: name 'train_df' is not defined

In [3]:
# 다시 읽어오기
train_df = spark.read.parquet(f"{data_dir}/train/")
test_df = spark.read.parquet(f"{data_dir}/test/")
toy_df = spark.read.parquet(f"{data_dir}/toy/")

                                                                                

In [4]:
train_df.printSchema()

root
 |-- passenger_count: integer (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)



In [6]:
# One-Hot Encoding
from pyspark.ml.feature import OneHotEncoder, StringIndexer

cat_features = [
    "pickup_location_id",
    "dropoff_location_id",
    "day_of_week",
    "pickup_time"
]

# pipline stages
stages = []

for c in cat_features:
    # c -> c_idx
    cat_indexer = StringIndexer(inputCol=c, outputCol = c + "_idx").setHandleInvalid("keep")
    # onehot
    onehot_encoder = OneHotEncoder(inputCols=[cat_indexer.getOutputCol()], outputCols=[c + "_onehot"])
    stages += [cat_indexer, onehot_encoder]

In [8]:
# features normalized
from pyspark.ml.feature import VectorAssembler, StandardScaler

num_features = [
    "passenger_count",
    "trip_distance"
]

# vector assembler
for n in num_features:
    num_assembler = VectorAssembler(inputCols=[n], outputCol= n + "_vector")
    num_scaler = StandardScaler(inputCol=num_assembler.getOutputCol(), outputCol= n+"_scaled")
    stages += [num_assembler, num_scaler]

In [11]:
# cat + num feature 합친 assembler_inputs 값 생성
assembler_inputs = [c + "_onehot" for c in cat_features] + [n + "_scaled" for n in num_features]
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="feature_vector")
stages += [assembler]

In [12]:
stages

[StringIndexer_c380bcb677ab,
 OneHotEncoder_31c44f14466c,
 StringIndexer_733de83b7a88,
 OneHotEncoder_2016b96c1225,
 StringIndexer_d984a37200fd,
 OneHotEncoder_bff35b171c73,
 StringIndexer_01b5be058590,
 OneHotEncoder_cb97bc1ae4b1,
 VectorAssembler_345db4e6d744,
 StandardScaler_6fc406251317,
 VectorAssembler_77fcc2096727,
 StandardScaler_c4105f19f1d4,
 VectorAssembler_030fb5d9f653]

## Training

In [14]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
# add model into the stages
# Construct pipeline using the set of stages defined
# 스테이지에 모델 더하고  정의된 스테이지대로 파이프라인을 구축
transform_stage = stages
pipeline = Pipeline(stages=transform_stage)

# Fit the transformer
# 구축된 파이프라인으로 Transformer 생성
fitted_transformer = pipeline.fit(train_df)

                                                                                

In [16]:
# transformer로 학습 데이터 변형하기
transformed_train_df = fitted_transformer.transform(train_df)
# transformed_train_df = transformed_train_df.cache()
# transformer 로 테스트 데이터 변형하기
transformed_test_df = fitted_transformer.transform(test_df)

In [17]:
lr = LinearRegression(maxIter=100,
                      solver="normal",
                      labelCol="total_amount",
                      featuresCol="feature_vector")

In [18]:
model = lr.fit(transformed_train_df)

22/02/19 15:35:53 WARN Instrumentation: [a89a771e] regParam is zero, which might cause numerical instability and overfitting.
22/02/19 15:36:26 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/02/19 15:36:26 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/02/19 15:36:52 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
22/02/19 15:36:53 WARN Instrumentation: [a89a771e] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
22/02/19 15:36:54 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/02/19 15:36:54 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

In [19]:
predictions = model.transform(transformed_test_df).cache()

In [20]:
predictions.show()

[Stage 23:>                                                         (0 + 1) / 1]

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+----------------------+-------------------------+-----------------------+--------------------------+---------------+------------------+---------------+------------------+----------------------+----------------------+--------------------+--------------------+--------------------+------------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|pickup_location_id_idx|pickup_location_id_onehot|dropoff_location_id_idx|dropoff_location_id_onehot|day_of_week_idx|day_of_week_onehot|pickup_time_idx|pickup_time_onehot|passenger_count_vector|passenger_count_scaled|trip_distance_vector|trip_distance_scaled|      feature_vector|        prediction|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+----------------------+-------------------------+-----------------------+--------

                                                                                

In [21]:
# 성능평가
print("RMSE : ", model.summary.rootMeanSquaredError)
print("r2 : ", model.summary.r2)

RMSE :  5.83898017277841
r2 :  0.798241969844171


In [27]:
predictions.select(["trip_distance", "day_of_week", "total_amount", "prediction"]).show()

+-------------+-----------+------------+------------------+
|trip_distance|day_of_week|total_amount|        prediction|
+-------------+-----------+------------+------------------+
|          5.8|  Wednesday|        31.3|29.695237394504943|
|          3.1|     Friday|       18.95| 18.27322682893683|
|          1.1|     Friday|       14.15|13.075531084409807|
|          1.3|     Sunday|         7.3|11.821567845230419|
|          4.1|     Monday|        17.8|17.338825542924866|
|         14.2|     Monday|        64.8| 65.09130700124263|
|          5.8|   Saturday|       26.75|24.784401761710452|
|          5.5|     Sunday|        23.8|  24.2408930880204|
|          6.9|  Wednesday|       84.35| 50.88649451912192|
|          4.1|     Friday|       20.75| 23.05012761714883|
|          3.4|     Friday|        28.6| 23.64214838270908|
|          4.9|   Thursday|       27.85|26.699218757385594|
|          3.5|     Monday|       21.35|19.307052835488562|
|          3.9|  Wednesday|       21.35|

## HyperParameterTuning

In [22]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

lr = LinearRegression(
    maxIter=50,
    solver="normal",
    labelCol="total_amount",
    featuresCol="feature_vector"
)

cv_stages = stages + [lr]

In [23]:
cv_pipeline = Pipeline(stages=cv_stages)

In [24]:
param_grid = ParamGridBuilder()\
                .addGrid(lr.elasticNetParam, [0.1, 0.2, 0.3, 0.4, 0.5])\
                .addGrid(lr.regParam, [0.01, 0.02, 0.03, 0.04, 0.05])\
                .build()

In [25]:
cross_val = CrossValidator(
    estimator=cv_pipeline,
    estimatorParamMaps=param_grid,
    evaluator=RegressionEvaluator(labelCol="total_amount"),
    numFolds=5
)

In [None]:
cv_model = cross_val.fit(toy_df)