# Green Taxi 
This is a DBS DS interview assignment, think it is fun to share, so people know what the world is expecting a DS should be able to do

### Original dataset
https://data.cityofnewyork.us/Transportation/2017-Green-Taxi-Trip-Data/5gj9-2kzx

In [None]:
from pyspark.sql.functions import col
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorAssembler, VectorIndexer
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import col, isnan, when, trim

In [None]:
green_taxi = spark.read.csv("gs://hsbc-9553155-ihubhk-dev-tom-test/public_data/2017_Green_Taxi_Trip_Data.csv", \
                            header="true", inferSchema="true")

## Data Preparation

In [None]:
green_taxi.createOrReplaceTempView("taxi")

In [None]:
sqlDF = spark.sql("SELECT PULocationID, DOLocationID, passenger_count, trip_distance, \
                    total_amount, payment_type, trip_type, tip_amount, fare_amount, \
                    ROUND(CAST(tip_amount/fare_amount AS DOUBLE), 4) as tip_percent, \
                    CAST(from_unixtime(unix_timestamp(lpep_pickup_datetime, 'MM/dd/yyyy hh:mm:ss aa'), 'yyyy') AS INT) as pickup_year, \
                    CAST(from_unixtime(unix_timestamp(lpep_pickup_datetime, 'MM/dd/yyyy hh:mm:ss aa'), 'MM') AS INT) as pickup_month,\
                    CAST(from_unixtime(unix_timestamp(lpep_pickup_datetime, 'MM/dd/yyyy hh:mm:ss aa'), 'dd') AS INT) as pickup_day, \
                    CAST(from_unixtime(unix_timestamp(lpep_pickup_datetime, 'MM/dd/yyyy hh:mm:ss aa'), 'hh') AS INT) as pickup_hour, \
                    CAST(from_unixtime(unix_timestamp(lpep_pickup_datetime, 'MM/dd/yyyy hh:mm:ss aa'), 'mm') AS INT) as pickup_minute, \
                    CAST(from_unixtime(unix_timestamp(lpep_dropoff_datetime, 'MM/dd/yyyy hh:mm:ss aa'), 'yyyy') AS INT) as dropoff_year, \
                    CAST(from_unixtime(unix_timestamp(lpep_dropoff_datetime, 'MM/dd/yyyy hh:mm:ss aa'), 'MM') AS INT) as dropoff_month,\
                    CAST(from_unixtime(unix_timestamp(lpep_dropoff_datetime, 'MM/dd/yyyy hh:mm:ss aa'), 'dd') AS INT) as dropoff_day, \
                    CAST(from_unixtime(unix_timestamp(lpep_dropoff_datetime, 'MM/dd/yyyy hh:mm:ss aa'), 'hh') AS INT) as dropoff_hour, \
                    CAST(from_unixtime(unix_timestamp(lpep_dropoff_datetime, 'MM/dd/yyyy hh:mm:ss aa'), 'mm') AS INT) as dropoff_minute, \
                    ROUND(CAST((unix_timestamp(lpep_dropoff_datetime, 'MM/dd/yyyy hh:mm:ss aa') - unix_timestamp(lpep_pickup_datetime, 'MM/dd/yyyy hh:mm:ss aa'))/360 AS DOUBLE), 4) as tripdurr \
                    FROM taxi WHERE fare_amount > 2.50 \
                    limit 10000 \
                  ")

**NOTE**: could've created nested SQL

In [None]:
sqlDF.createOrReplaceTempView("taxi")

In [None]:
def to_null(c):
    return when(~(col(c).isNull() | isnan(col(c)) | (trim(col(c)) == "")), col(c))

In [None]:
sqlDF = sqlDF.na.drop()

In [None]:
sqlDF.createOrReplaceTempView("taxi")
sqlDF = spark.sql("SELECT *, \
    ROUND(CAST(trip_distance/tripdurr AS DOUBLE), 4) as avg_speed \
    FROM taxi WHERE pickup_month in (1, 2) AND pickup_year=2017 AND tip_percent<1").na.drop()

In [None]:
zones = spark.read.csv("gs://hsbc-9553155-ihubhk-dev-tom-test/public_data/nyczones.csv", header="true",inferSchema="true")

In [None]:
sqlDF = sqlDF.join(zones, sqlDF.PULocationID == zones.LocationID, how='left')
sqlDF = sqlDF.drop("Zone").drop("service_zone").drop('LocationID')

In [None]:
indexerBorough = StringIndexer(inputCol="Borough", outputCol="PU_boroughIndex")
sqlDF = indexerBorough.fit(sqlDF).transform(sqlDF)
sqlDF = sqlDF.drop("Borough")

In [None]:
sqlDF = sqlDF.withColumn("PU_boroughIndex", col("PU_boroughIndex").cast("int"))

In [None]:
sqlDF.createOrReplaceTempView("taxi")

In [None]:
train, test = sqlDF.randomSplit([0.7, 0.3], seed=12345)
# ref: https://spark.apache.org/docs/latest/ml-tuning.html

In [None]:
# train = spark.sql("SELECT * FROM taxi WHERE pickup_month in (1) and pickup_year=2017")
# train = train.drop("pickup_year").drop("pickup_month").drop("dropoff_year").drop("dropoff_month")

In [None]:
# test = spark.sql("SELECT * FROM taxi WHERE pickup_month in (2) and pickup_year=2017")
# test = test.drop("pickup_year").drop("pickup_month").drop("dropoff_year").drop("dropoff_month")

## SparkML pipeline  
Spark built-in feature vectorization for ML scoring

In [None]:
featuresCols = train.columns
featuresCols.remove('tip_amount')
# This concatenates all feature columns into a single feature vector in a new column "rawFeatures".
vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="rawFeatures")
# This identifies categorical features and indexes them.

vectorIndexer = VectorIndexer(inputCol="rawFeatures", outputCol="features", maxCategories=4)

In [None]:
gbt = GBTRegressor(labelCol="tip_amount")

## Hyperparameter tuning - Grid search

In [None]:
# deeper trees (10 or higher) and more trees in the ensemble (>100).
paramGrid = ParamGridBuilder()\
            .addGrid(gbt.maxDepth, [5,10])\
            .addGrid(gbt.maxIter, [10,20])\
            .build()

evaluator = RegressionEvaluator(metricName="rmse", labelCol=gbt.getLabelCol(), \
                                predictionCol=gbt.getPredictionCol())

cv = CrossValidator(estimator=gbt, evaluator=evaluator, estimatorParamMaps=paramGrid)

In [None]:
pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, cv])

## Async ML scoring because it takes really long long time

In [None]:
%%time
pipelineModel = pipeline.fit(train)

In [None]:
%%time
predictions = pipelineModel.transform(test)

In [None]:
%%time
rmse = evaluator.evaluate(predictions)

## ML evaluation

In [None]:
print("RMSE on our test set: {}".format(rmse))
# result was 0.388

In [None]:
predictions.head()

## Lets Wrap things up

In [None]:
pipelineModel.save("sparkmodel")

## Let's do CI/CD for fun

In [None]:
def score(data):
    model = PipelineModel.load("sparkmodel")
    result = model.transform(data)
    return result

In [None]:
prediction = score(test)

In [None]:
prediction.head()