In [1]:
from pyspark.sql import SparkSession

In [2]:
MAX_MEMORY = "5g"
spark = SparkSession.builder.appName("taxi-duration-prediction-2")\
            .config("spark.executor.memory", MAX_MEMORY)\
            .config("spark.driver.memory", MAX_MEMORY).getOrCreate()

In [3]:
trip_files = "/Users/woals/data-engineering/01-spark/data/trips/*"

In [4]:
trips_df = spark.read.csv(f"file:///{trip_files}", inferSchema=True, header=True)

In [5]:
trips_df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



In [6]:
trips_df.createOrReplaceTempView("trips")

In [7]:
query = """
SELECT
    passenger_count,
    PULocationID as pickup_location_id,
    DOLocationID as dropoff_location_id,
    trip_distance,
    HOUR(tpep_pickup_datetime) as pickup_time,
    DATE_FORMAT(TO_DATE(tpep_pickup_datetime), 'EEEE') AS day_of_week,
    total_amount
FROM
    (SELECT
        *,
        TO_DATE(t.tpep_pickup_datetime) AS pickup_date
    FROM
        trips t)
WHERE
    total_amount < 5000
    AND total_amount > 0
    AND trip_distance > 0
    AND trip_distance < 500
    AND passenger_count < 4
    AND pickup_date >= '2021-01-01'
    AND pickup_date < '2021-12-01'
"""
data_df = spark.sql(query)
data_df.createOrReplaceTempView("data")

In [8]:
data_df.printSchema()

root
 |-- passenger_count: double (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)



In [9]:
data_df.select(["passenger_count", "trip_distance", "pickup_time", "day_of_week"]).describe().show()

+-------+------------------+------------------+------------------+-----------+
|summary|   passenger_count|     trip_distance|       pickup_time|day_of_week|
+-------+------------------+------------------+------------------+-----------+
|  count|          24405049|          24405049|          24405049|   24405049|
|   mean| 1.217503066680997|3.0594232791747196|14.199043853589476|       null|
| stddev|0.5495874147034774| 4.080585476765442| 5.373144900705152|       null|
|    min|               0.0|              0.01|                 0|     Friday|
|    max|               3.0|             482.1|                23|  Wednesday|
+-------+------------------+------------------+------------------+-----------+



In [10]:
train_df, test_df = data_df.randomSplit([0.8, 0.2], seed=1)
print(train_df.count())
print(test_df.count())

19522070
4882979


In [11]:
toy_df = train_df.sample(False, .1, seed=261)

In [12]:
data_dir = "/Users/woals/data-engineering/01-spark/data"

In [13]:
# 컬럼 기반 포멧 parquet로 저장.. 압축률이 좋고 disk io가 적다 컬럼별로 적절한 인코딩이 가능

train_df.write.format("parquet").save(f"{data_dir}/train/")
test_df.write.format("parquet").save(f"{data_dir}/test/")
toy_df.write.format("parquet").save(f"{data_dir}/toy/")

In [14]:
# 다시 읽어오기
train_df = spark.read.parquet(f"{data_dir}/train/")
test_df = spark.read.parquet(f"{data_dir}/test/")
toy_df = spark.read.parquet(f"{data_dir}/toy/")

In [15]:
train_df.printSchema()

root
 |-- passenger_count: double (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)



In [16]:

from pyspark.ml.feature import OneHotEncoder, StringIndexer

# 카테고리 피쳐들
cat_feats = [
    "pickup_location_id",
    "dropoff_location_id",
    "day_of_week",
    "pickup_time",
]

# 파이프라인 스테이지
stages = []

# 카테고리 피쳐 프리프로세싱
for c in cat_feats:
    # c -> c_idx
    cat_indexer = StringIndexer(inputCol = c, outputCol = c + "_idx").setHandleInvalid("keep")
    # one hot encode 
    onehot_encoder = OneHotEncoder(inputCols = [cat_indexer.getOutputCol()], outputCols = [c + "_onehot"])
    stages += [cat_indexer, onehot_encoder]

In [17]:
# Feature Normalization
from pyspark.ml.feature import VectorAssembler, StandardScaler

# Numerical features
num_feats = [
    "passenger_count",
    "trip_distance"
]

# vector assembler
for n in num_feats:
    num_assembler = VectorAssembler(inputCols=[n], outputCol = n + "_vector")
    num_scaler = StandardScaler(inputCol = num_assembler.getOutputCol(), outputCol = n + "_scaled")
    stages += [num_assembler, num_scaler]

In [18]:
# Categorical + Numeric featurees
assembler_inputs = [c + "_onehot" for c in cat_feats] + [n + "_scaled" for n in num_feats]
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="feature_vector")
stages += [assembler]

In [19]:
from pyspark.ml import Pipeline
# add model into the stages
transform_stages = stages

# Construct pipeline using the set of stages defined
pipeline =  Pipeline(stages=transform_stages)

# Fit the transformer
fitted_transformer = pipeline.fit(train_df)

In [20]:
# Transform the train data
transformed_train_df = fitted_transformer.transform(train_df)
# transformed_train_df = transformed_train_df.cache()

In [23]:
transformed_train_df.printSchema()

root
 |-- passenger_count: double (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_location_id_idx: double (nullable = false)
 |-- pickup_location_id_onehot: vector (nullable = true)
 |-- dropoff_location_id_idx: double (nullable = false)
 |-- dropoff_location_id_onehot: vector (nullable = true)
 |-- day_of_week_idx: double (nullable = false)
 |-- day_of_week_onehot: vector (nullable = true)
 |-- pickup_time_idx: double (nullable = false)
 |-- pickup_time_onehot: vector (nullable = true)
 |-- passenger_count_vector: vector (nullable = true)
 |-- passenger_count_scaled: vector (nullable = true)
 |-- trip_distance_vector: vector (nullable = true)
 |-- trip_distance_scaled: vector (nullable = true)
 |-- feature_vector: vector (nullab

In [21]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(maxIter = 100, 
                         solver = "normal", 
                         labelCol = "total_amount",
                         featuresCol = "feature_vector",
                        )

In [22]:
model = lr.fit(transformed_train_df)

In [24]:
from pyspark.ml.evaluation import RegressionEvaluator

In [25]:
transformed_test_df = fitted_transformer.transform(test_df)

In [26]:
predictions = model.transform(transformed_test_df).cache()

In [27]:
predictions.show()

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+----------------------+-------------------------+-----------------------+--------------------------+---------------+------------------+---------------+------------------+----------------------+----------------------+--------------------+--------------------+--------------------+------------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|pickup_location_id_idx|pickup_location_id_onehot|dropoff_location_id_idx|dropoff_location_id_onehot|day_of_week_idx|day_of_week_onehot|pickup_time_idx|pickup_time_onehot|passenger_count_vector|passenger_count_scaled|trip_distance_vector|trip_distance_scaled|      feature_vector|        prediction|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+----------------------+-------------------------+-----------------------+--------

In [28]:
predictions.select(["trip_distance", "day_of_week", "total_amount", "prediction"]).show()

+-------------+-----------+------------+------------------+
|trip_distance|day_of_week|total_amount|        prediction|
+-------------+-----------+------------+------------------+
|          4.1|   Saturday|        20.3|20.473947216522323|
|          0.7|     Friday|        10.3|12.777826308250683|
|          0.7|     Sunday|       11.75| 12.54739073620915|
|          2.8|    Tuesday|        19.3|18.458986070834662|
|          2.9|  Wednesday|       22.85|18.031107529923403|
|          3.0|     Sunday|        14.8| 17.19795473960268|
|          1.1|     Friday|        12.8|15.421434340888897|
|          4.0|   Saturday|       20.75|20.197069775102435|
|          1.0|  Wednesday|         7.3|11.264536031985918|
|          4.4|     Friday|        21.3| 19.97794167743662|
|          3.1|   Thursday|        13.8|18.500146808893543|
|          2.0|     Friday|        11.8|14.470407903409757|
|         15.1|   Thursday|        74.7| 69.28711523709701|
|         35.7|   Thursday|       168.6|

In [29]:
from pyspark.sql.types import DoubleType
distance_list = [1.1, 5.5, 10.5, 30.0]
distances_df = spark.createDataFrame(distance_list, DoubleType()).toDF("trip_distance")

In [30]:
distances_df.show()

+-------------+
|trip_distance|
+-------------+
|          1.1|
|          5.5|
|         10.5|
|         30.0|
+-------------+



In [31]:
vdistances_df = vassembler.transform(distances_df)

NameError: name 'vassembler' is not defined

In [None]:
vdistances_df.show()

In [None]:
model.transform(vdistances_df).show()

# 성능 평가

In [None]:
model.summary

In [None]:
print("RMSE: ", model.summary.rootMeanSquaredError)

In [None]:
print("R2: ", model.summary.r2)
# R2:  0.018565176935511962

In [None]:
train_df.describe().show()