In [1]:
from pyspark.sql import SparkSession

In [2]:
MAX_MEMORY = "5g"
spark = SparkSession.builder.appName("trip_count_by_zone_sql")\
    .config("spark.executor.memory", MAX_MEMORY)\
    .config("spark.driver.memory", MAX_MEMORY)\
    .getOrCreate()

In [3]:
trip_files = "/Users/keon/fastcampus/data-engineering/01-spark/data/trips/*"
zone_file = "/Users/keon/fastcampus/data-engineering/01-spark/data/taxi+_zone_lookup.csv"

In [4]:
trips_df = spark.read.csv(f"file:///{trip_files}", inferSchema = True, header = True)
zones_df = spark.read.csv(f"file:///{zone_file}", inferSchema = True, header = True)
# trip_data = spark.read.format("csv").option("header", "true").load(trip_files)

In [5]:
trips_df.printSchema()
zones_df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [6]:
trips_df.createOrReplaceTempView("trips")
zones_df.createOrReplaceTempView("zones")

In [7]:
query = """
    SELECT 
        t.VendorID as vendor_id,
        TO_DATE(t.tpep_pickup_datetime) as pickup_date,
        TO_DATE(t.tpep_dropoff_datetime) as dropoff_date,
        HOUR(t.tpep_pickup_datetime) as pickup_time,
        HOUR(t.tpep_dropoff_datetime) as dropoff_time,
        DATE_FORMAT(TO_DATE(t.tpep_pickup_datetime), 'EEEE') AS day_of_week,
        t.trip_distance,
        t.RatecodeID as rate_code,
        t.store_and_fwd_flag,
        t.fare_amount,
        t.tip_amount,
        t.tolls_amount,
        t.total_amount,
        t.payment_type,
        t.passenger_count,
        t.PULocationID as pickup_location_id,
        pz.Zone as pickup_zone,
        pz.Borough as pickup_borough,
        t.DOLocationID as dropoff_location_id,
        dz.Zone as dropoff_zone,
        dz.Borough as dropoff_borough,
        ROUND((
            UNIX_TIMESTAMP(t.tpep_dropoff_datetime) - 
            UNIX_TIMESTAMP(t.tpep_pickup_datetime)) / 60,
            2) as duration_min
    FROM 
        trips t
    LEFT JOIN
        zones pz
        ON
            t.PULocationID = pz.LocationID
    LEFT JOIN
        zones dz
        ON
            t.DOLocationID = dz.LocationID
"""
combo_df = spark.sql(query)
combo_df.createOrReplaceTempView("combo")

In [8]:
combo_df.select(["trip_distance", "duration_min"]).show()

+-------------+------------+
|trip_distance|duration_min|
+-------------+------------+
|          2.1|        6.03|
|          0.2|        0.98|
|         14.7|        27.6|
|         10.6|       15.22|
|         4.94|       16.53|
|          1.6|        8.02|
|          4.1|        17.0|
|          5.7|       18.08|
|          9.1|       20.95|
|          2.7|       13.57|
|         6.11|       22.25|
|         1.21|        7.15|
|          7.4|        22.2|
|          1.7|        7.77|
|         0.81|        2.22|
|         1.01|        4.12|
|         0.73|        4.98|
|         1.17|        4.95|
|         0.78|        3.62|
|         1.66|        8.57|
+-------------+------------+
only showing top 20 rows



# Data Cleaning

In [9]:
query = """
SELECT
    pickup_time,
    day_of_week,
    trip_distance,
    passenger_count,
    pickup_location_id,
    pickup_borough,
    dropoff_location_id,
    dropoff_borough,
    duration_min
FROM
    combo c
WHERE 
    c.total_amount < 10000
    AND c.total_amount > 0
    AND c.trip_distance < 1000
    AND c.passenger_count < 100
    AND c.pickup_date >= '2021-01-01'
    AND c.pickup_date < '2021-08-01'
    AND c.trip_distance > 0
"""
model_df = spark.sql(query)
model_df.createOrReplaceTempView("model_df")

# ML

In [10]:
train_df, test_df = model_df.randomSplit([0.8, 0.2], seed=2019)
toy_df = train_df.sample(False, .1, seed=261)
print("Train set count: ", train_df.count())
print("Test set count:", test_df.count())

Train set count:  11133918
Test set count: 2784353


In [11]:
# 컬럼 기반 포멧 parquet로 저장.. 압축률이 좋고 disk io가 적다 컬럼별로 적절한 인코딩이 가능
data_dir = "/Users/keon/fastcampus/data-engineering/01-spark/data"
train_df.write.format("parquet").save(f"{data_dir}/train/")
test_df.write.format("parquet").save(f"{data_dir}/test/")
toy_df.write.format("parquet").save(f"{data_dir}/toy/")

In [12]:
# 다시 읽어오기
train_df = spark.read.parquet(f"{data_dir}/train/")
test_df = spark.read.parquet(f"{data_dir}/test/")
toy_df = spark.read.parquet(f"{data_dir}/toy/")

In [13]:
# One-Hot Encoding
from pyspark.ml.feature import OneHotEncoder, StringIndexer
# list of categorical features
cat_feats = [
    "pickup_borough",
    "dropoff_borough",
    "pickup_location_id",
    "dropoff_location_id",
    "day_of_week",
    "pickup_time",
]

# Initiate pipeline stages
stages = []

# Add one hot encoding transformation of categorical variables into the pipeline stage
for c in cat_feats:
    # cast each record in in categorical column c to an index
    cat_indexer = StringIndexer(inputCol=c, outputCol = c + "_idx").setHandleInvalid("keep")
    # one hot encode 
    onehot_encoder = OneHotEncoder(inputCols=[cat_indexer.getOutputCol()], outputCols=[c + "_onehot"])
    stages += [cat_indexer, onehot_encoder]

In [14]:
# Feature Normalization

from pyspark.ml.feature import VectorAssembler, StandardScaler

# Numerical features to be scaled
num_feats = [
    "passenger_count",
    "trip_distance"
]

# Add scaling transformation into the pipeline stages
for n in num_feats:
    num_assembler = VectorAssembler(inputCols=[n], outputCol=n+"_vector")
    num_scaler = StandardScaler(inputCol=num_assembler.getOutputCol(), outputCol=n+"_scaled")
    stages += [num_assembler, num_scaler]
    
# Add feature vector creation into the pipeline stages
assembler_inputs = [c + "_onehot" for c in cat_feats] + [n + "_scaled" for n in num_feats]
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="feature_vector")
stages += [assembler]

# Hyperparameter Tuning

In [15]:
from pyspark.sql.functions import isnan, when, count, col
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

In [16]:
# add model fitting into the pipeline
model = LinearRegression(maxIter=50, 
                         solver="normal", 
                         labelCol="duration_min",
                         featuresCol="feature_vector")

cv_stages = stages + [model]

In [17]:
# Construct pipeline using the set of stages defined
cv_pipeline =  Pipeline(stages=cv_stages)

In [22]:
# Construct a grid of parameters to search over
parameter_grid = ParamGridBuilder() \
                 .addGrid(model.elasticNetParam, [0.1, 0.2, 0.3, 0.4]) \
                 .addGrid(model.regParam, [0.01, 0.02, 0.03, 0.04]) \
                 .build()


In [23]:
# Setup cross validation for hyperparameter tuning
cross_val = CrossValidator(estimator=cv_pipeline,
                           estimatorParamMaps=parameter_grid,
                           evaluator=RegressionEvaluator(labelCol="duration_min"), 
                           numFolds=5)


In [None]:
cv_model = cross_val.fit(toy_df)

In [None]:
alpha = cv_model.bestModel.stages[-1]._java_obj.getElasticNetParam()
reg_param = cv_model.bestModel.stages[-1]._java_obj.getRegParam()
print("alpha", alpha)
print("reg_param", reg_param)

# Train

In [None]:
# add model into the stages
transform_stages = stages

# Construct pipeline using the set of stages defined
transform_pipeline =  Pipeline(stages=transform_stages)

# Fit the transformer
fitted_transformer = transform_pipeline.fit(train_df)

In [None]:
# Transform the train data
transformed_train_df = fitted_transformer.transform(train_df)
transformed_train_df = transformed_train_df.select("duration_min", "feature_vector").cache()

In [None]:
# best parameters
alpha = cv_model.bestModel.stages[-1]._java_obj.getElasticNetParam()
reg_param = cv_model.bestModel.stages[-1]._java_obj.getRegParam()

model = LinearRegression(maxIter=100, 
                         solver="normal", 
                         labelCol="duration_min",
                         featuresCol="feature_vector", 
#                          elasticNetParam=alpha, 
#                          regParam=reg_param,
                        )

# fit the model against train_df
fitted_model = model.fit(transformed_train_df)

In [None]:
transformed_test_df = fitted_transformer.transform(test_df)
transformed_test_df = transformed_test_df.select("duration_min", "feature_vector").cache()
transformed_test_df = fitted_model.transform(transformed_test_df).cache()

In [None]:
evaluator = RegressionEvaluator(
    labelCol="duration_min",
    metricName="rmse"
)
rmse = evaluator.evaluate(transformed_test_df)
print("RMSE of Prediction on test set:", rmse)

In [None]:
transformed_test_df.show()

In [None]:
transformed_test_df.select("prediction").describe().show()


In [None]:
from pyspark.sql.types import DoubleType


data = [0, "Friday", 0.01, 1, 186, "Manhattan", 186, "Manhattan", 0.35]
schema = ['pickup_time','day_of_week','trip_distance','passenger_count','pickup_location_id','pickup_borough','dropoff_location_id','dropoff_borough','duration_min']
data_df = spark.createDataFrame(distance_list, schema)
vdata_df = assembler.transform(data_df)
model.transform(vdata_df).show()

In [None]:
test_df.show(1)