In [None]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StringType
from pyspark import SQLContext
from itertools import islice
from pyspark.sql.functions import col

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorAssembler, VectorIndexer
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import col, isnan, when, trim

import time

In [None]:
%%info

In [None]:
filename = "s3://aws-emr-resources-507786327009-us-east-1/taxidata.csv"

green_taxi = spark.read.format("s3selectCSV").option("header", "true").option("inferSchema", "true").\
load(filename)

In [None]:
type(green_taxi)

In [None]:
print((green_taxi.count(), len(green_taxi.columns)))

# Do pre-processing in Spark SQL

In [None]:
green_taxi.createOrReplaceTempView("taxi")

In [None]:
start_time = time.time()
sqlDF = spark.sql("SELECT PULocationID, DOLocationID, passenger_count, trip_distance, \
                    total_amount, payment_type, trip_type, tip_amount, fare_amount, \
                    ROUND(CAST(tip_amount/fare_amount AS DOUBLE), 4) as tip_percent, \
                    CAST(from_unixtime(unix_timestamp(lpep_pickup_datetime, 'MM/dd/yyyy hh:mm:ss aa'), 'yyyy') AS INT) as pickup_year, \
                    CAST(from_unixtime(unix_timestamp(lpep_pickup_datetime, 'MM/dd/yyyy hh:mm:ss aa'), 'MM') AS INT) as pickup_month,\
                    CAST(from_unixtime(unix_timestamp(lpep_pickup_datetime, 'MM/dd/yyyy hh:mm:ss aa'), 'dd') AS INT) as pickup_day, \
                    CAST(from_unixtime(unix_timestamp(lpep_pickup_datetime, 'MM/dd/yyyy hh:mm:ss aa'), 'hh') AS INT) as pickup_hour, \
                    CAST(from_unixtime(unix_timestamp(lpep_pickup_datetime, 'MM/dd/yyyy hh:mm:ss aa'), 'mm') AS INT) as pickup_minute, \
                    CAST(from_unixtime(unix_timestamp(lpep_dropoff_datetime, 'MM/dd/yyyy hh:mm:ss aa'), 'yyyy') AS INT) as dropoff_year, \
                    CAST(from_unixtime(unix_timestamp(lpep_dropoff_datetime, 'MM/dd/yyyy hh:mm:ss aa'), 'MM') AS INT) as dropoff_month,\
                    CAST(from_unixtime(unix_timestamp(lpep_dropoff_datetime, 'MM/dd/yyyy hh:mm:ss aa'), 'dd') AS INT) as dropoff_day, \
                    CAST(from_unixtime(unix_timestamp(lpep_dropoff_datetime, 'MM/dd/yyyy hh:mm:ss aa'), 'hh') AS INT) as dropoff_hour, \
                    CAST(from_unixtime(unix_timestamp(lpep_dropoff_datetime, 'MM/dd/yyyy hh:mm:ss aa'), 'mm') AS INT) as dropoff_minute, \
                    ROUND(CAST((unix_timestamp(lpep_dropoff_datetime, 'MM/dd/yyyy hh:mm:ss aa') - unix_timestamp(lpep_pickup_datetime, 'MM/dd/yyyy hh:mm:ss aa'))/360 AS DOUBLE), 4) as tripdurr \
                    FROM taxi WHERE fare_amount > 2.50")

sqlDF.createOrReplaceTempView("taxi")
#print("--- %s seconds ---" % (time.time() - start_time))
print(f'Total time: {(time.time() - start_time):.3f} seconds.')

In [None]:
def to_null(c):
    return when(~(col(c).isNull() | isnan(col(c)) | (trim(col(c)) == "")), col(c))

sqlDF = sqlDF.na.drop()

In [None]:
sqlDF.createOrReplaceTempView("taxi")
sqlDF = spark.sql("SELECT *, \
    ROUND(CAST(trip_distance/tripdurr AS DOUBLE), 4) as avg_speed \
    FROM taxi WHERE pickup_month in (1, 2) AND pickup_year=2017 AND tip_percent<1").na.drop()

In [None]:
sqlDF.count()

In [None]:
"""sqlDF = sqlDF.join(zones, sqlDF.PULocationID == zones.LocationID, how='left')
sqlDF = sqlDF.drop("Zone").drop("service_zone").drop('LocationID')"""

# Use Spark ML transformers

https://spark.apache.org/docs/latest/ml-features

In [None]:
indexerBorough = StringIndexer(inputCol="Borough", outputCol="PU_boroughIndex")
sqlDF = indexerBorough.fit(sqlDF).transform(sqlDF)
sqlDF = sqlDF.drop("Borough")

In [None]:
sqlDF = sqlDF.withColumn("PU_boroughIndex", col("PU_boroughIndex").cast("int"))


In [None]:
from pyspark.sql.types import IntegerType

In [None]:
def check_airport_id(id):
     return int((id == 1) | (id == 2))
    
#check_airport_id_udf = udf(check_airport_id, IntegerType())
#sqlDF = sqlDF.withColumn("is_airport", check_airport_id_udf(sqlDF['RateCodeID']))

sqlDF.createOrReplaceTempView("taxi")

train = spark.sql("SELECT * FROM taxi WHERE pickup_month in (1) and pickup_year=2017")
train = train.drop("pickup_year").drop("pickup_month").drop("dropoff_year").drop("dropoff_month")

test = spark.sql("SELECT * FROM taxi WHERE pickup_month in (2) and pickup_year=2017")
test = test.drop("pickup_year").drop("pickup_month").drop("dropoff_year").drop("dropoff_month")

In [None]:
train.write.csv('nyc_taxi_train.csv')
test.write.csv('nyc_taxi_test.csv')

# Save in Parquet files

train.write.parquet('nyc_taxi_train.parquet')
test.write.parquet('nyc_taxi_test.parquet')

In [None]:
featuresCols = train.columns
featuresCols.remove('tip_amount')

vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="rawFeatures")
vectorIndexer = VectorIndexer(inputCol="rawFeatures", outputCol="features", maxCategories=4)

gbt = GBTRegressor(labelCol="tip_amount")

paramGrid = ParamGridBuilder().addGrid(gbt.maxDepth, [2]).addGrid(gbt.maxIter, [2]).build()

evaluator = RegressionEvaluator(metricName="rmse", labelCol=gbt.getLabelCol(), \
                                predictionCol=gbt.getPredictionCol())

cv = CrossValidator(estimator=gbt, evaluator=evaluator, estimatorParamMaps=paramGrid)

pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, cv])

In [None]:
pipelineModel = pipeline.fit(train)

In [None]:
predictions = pipelineModel.transform(test)

rmse = evaluator.evaluate(predictions)
print("RMSE on our test set: {}".format(rmse))

In [None]:
spark.sparkContext.install_pypi_package("pandas")