Import Required Libraries

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import dayofweek, year, month, hour
from pyspark.ml import Pipeline
from pyspark.mllib.evaluation import RegressionMetrics

Create Spark session

In [2]:
spark = SparkSession.builder \
  .appName('Chicagotaxi') \
  .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta,com.microsoft.ml.spark:mmlspark_2.11:1.0.0-rc2') \
  .config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven") \
  .getOrCreate()

Import libraries from LBGM

In [3]:
from mmlspark.featurize import AssembleFeatures
from mmlspark.stages import UDFTransformer, DropColumns
from mmlspark.featurize import DataConversion
from mmlspark.lightgbm import LightGBMRegressor

Create a spark dataframe with subset of data from Bigquery.  Removing the filter condition from the below option will give full dataset from the database

In [4]:
df_master = spark.read.format('bigquery') \
            .option("credentialsFile", 'key.json') \
            .option('parentproject', 'zeta-treat-276509') \
            .option('project', 'zeta-treat-276509') \
            .option('table', 'bigquery-public-data:chicago_taxi_trips.taxi_trips') \
            .option("filter",
                    "EXTRACT(MONTH from trip_start_timestamp) = 3 and "
                    "EXTRACT(DAYOFWEEK from trip_start_timestamp) = 3 and "
                    "EXTRACT(YEAR from trip_start_timestamp) = 2019") \
            .load()

For Model purpose, lets choose only the below fields
1. trip_start_timestamp
2. pickup_latitude, pickup_longitude
3. dropoff_latitude, dropoff_longitude
4. compare
5. fare - This field will be our label to predict

In [5]:
df = df_master[['trip_start_timestamp','pickup_latitude','pickup_longitude','dropoff_latitude','dropoff_longitude','company','fare']]
df.schema

StructType(List(StructField(trip_start_timestamp,TimestampType,true),StructField(pickup_latitude,DoubleType,true),StructField(pickup_longitude,DoubleType,true),StructField(dropoff_latitude,DoubleType,true),StructField(dropoff_longitude,DoubleType,true),StructField(company,StringType,true),StructField(fare,DoubleType,true)))

In [None]:
df.show(5)

Drop the rows that have blank values

In [6]:
df = df.dropna()

Remove the rows that have fare less than $2.70, which is the minium taxi fare in chicago

In [7]:
df = df.filter(df.fare >= 2.70)

Convert the given timestamp to CST

In [8]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
from datetime import datetime
get_week_day = udf(lambda z: z.weekday(), IntegerType())
get_year = udf(lambda z: z.year, IntegerType())
get_month = udf(lambda z: z.month, IntegerType())
get_hour = udf(lambda z: z.hour, IntegerType())

In [9]:
assembler = AssembleFeatures(columnsToFeaturize=['pickup_latitude','pickup_longitude','dropoff_latitude', 'dropoff_longitude',
                                      'Trip_Day_Of_Week', 'Trip_Year', 'Trip_Month', 'Trip_Hour','company'],
                            numberOfFeatures=9)
lgbm = LightGBMRegressor(learningRate=0.001,
                           numIterations=50,
                           featuresCol='features',
                           labelCol='fare')

In [10]:
STAGES = [UDFTransformer(inputCol='trip_start_timestamp', outputCol='Trip_Day_Of_Week', udf=get_week_day),
          UDFTransformer(inputCol='trip_start_timestamp', outputCol='Trip_Year', udf=get_year),
          UDFTransformer(inputCol='trip_start_timestamp', outputCol='Trip_Month', udf=get_month),
          UDFTransformer(inputCol='trip_start_timestamp', outputCol='Trip_Hour', udf=get_hour),
          DataConversion(cols=['pickup_latitude','pickup_longitude','dropoff_latitude','dropoff_longitude'], convertTo='double'),
          DataConversion(cols=['pickup_latitude','pickup_longitude','dropoff_latitude','dropoff_longitude'], convertTo='double'),
          DropColumns(cols=['trip_start_timestamp']), assembler]

In [11]:
(train, test) = df.randomSplit([.90, 0.10], seed=42)

In [12]:
train_pip = Pipeline(stages=STAGES)
model = train_pip.fit(train)

In [13]:
model.write().overwrite().save('data_prep')

In [14]:
from pyspark.ml import PipelineModel
data_prep = PipelineModel.load('data_prep')

In [15]:
res_train = data_prep.transform(train)

In [16]:
res_train.summary()

DataFrame[summary: string, pickup_latitude: string, pickup_longitude: string, dropoff_latitude: string, dropoff_longitude: string, company: string, fare: string, Trip_Day_Of_Week: string, Trip_Year: string, Trip_Month: string, Trip_Hour: string]

In [17]:
res_train.show(5)

+---------------+----------------+----------------+-----------------+--------------------+-----+----------------+---------+----------+---------+--------------------+
|pickup_latitude|pickup_longitude|dropoff_latitude|dropoff_longitude|             company| fare|Trip_Day_Of_Week|Trip_Year|Trip_Month|Trip_Hour|            features|
+---------------+----------------+----------------+-----------------+--------------------+-----+----------------+---------+----------+---------+--------------------+
|    41.77887686|   -87.594925439|     41.77887686|    -87.594925439|Taxi Affiliation ...| 4.75|               0|     2019|         3|       19|(17,[1,2,3,4,5,6,...|
|   41.874005383|    -87.66351755|    42.009622881|    -87.670166857|Taxi Affiliation ...| 29.5|               0|     2019|         3|       19|(17,[1,2,3,4,5,6,...|
|   41.878865584|   -87.625192142|    41.835117986|    -87.618677767|Choice Taxi Assoc...| 13.5|               0|     2019|         3|       19|(17,[1,2,3,4,5,6,...|
|   

In [18]:
res_train.schema

StructType(List(StructField(pickup_latitude,DoubleType,true),StructField(pickup_longitude,DoubleType,true),StructField(dropoff_latitude,DoubleType,true),StructField(dropoff_longitude,DoubleType,true),StructField(company,StringType,true),StructField(fare,DoubleType,true),StructField(Trip_Day_Of_Week,IntegerType,true),StructField(Trip_Year,IntegerType,true),StructField(Trip_Month,IntegerType,true),StructField(Trip_Hour,IntegerType,true),StructField(features,VectorUDT,true)))

In [None]:
STAGES1 = [assembler, lgbm]
train_pip = Pipeline(stages=STAGES1)

In [None]:
model1 = train_pip.fit(res_train)

In [None]:
lgbm = LightGBMRegressor(labelCol='fare')

In [None]:
mod = lgbm.fit(res_train)

In [None]:
from pyspark.ml import Transformer
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
class Preprocessdataframe(Transformer, DefaultParamsReadable, DefaultParamsWritable):

    def __init__(self):
        super(Preprocessdataframe, self).__init__()

    def _transform(self, df):

        df = df.withColumn('trip_start_timestamp_dt',F.to_timestamp(F.unix_timestamp('trip_start_timestamp', 'yyy-MM-dd HH:mm:ss Z').cast('timestamp')))
        df = df.withColumn('trip_start_timestamp_cst', F.from_utc_timestamp('trip_start_timestamp_dt', 'CST'))
        df = df.withColumn('Trip_Day_Of_Week', dayofweek(df.trip_start_timestamp))
        df = df.withColumn('Trip_Year', year(df.trip_start_timestamp))
        df = df.withColumn('Trip_Month', month(df.trip_start_timestamp))
        df = df.withColumn('Trip_Hour', hour(df.trip_start_timestamp_cst))
        return df

Split the data as test and train set

In [None]:
(train, test) = df.randomSplit([.90, 0.10], seed=42)

In [None]:
print((df.count(), len(df.columns)))

In [None]:
df.show(5)

Create an assembler and lgbm model

In [None]:
#preprocess = Preprocessdataframe()
assembler = AssembleFeatures(columnsToFeaturize=['pickup_latitude', 'pickup_longitude', 'dropoff_latitude', 'dropoff_longitude',
                                      'Trip_Day_Of_Week', 'Trip_Year', 'Trip_Month', 'Trip_Hour','company'],
                            numberOfFeatures=9)
lgbm = LightGBMRegressor(learningRate=0.001,
                           numIterations=50,
                           featuresCol='features',
                           labelCol='fare')
STAGES = [preprocess, assembler, lgbm]

Train the model on train data set

In [None]:
train_pip = Pipeline(stages=STAGES)
model = train_pip.fit(train)

Predict the fare for test set

In [None]:
results = model.transform(test)

In [None]:
results.show(5)

Calculate Root Mean Square Error of the precited results

In [None]:
valuesandpreds = results.rdd.map(lambda p: (float(p.prediction), p.fare))
metric = RegressionMetrics(valuesandpreds)
print('RMSE for Light GBM is ', metric.rootMeanSquaredError)

In [None]:
model.write().overwrite().save('linear_model')

In [None]:
preprocess = Preprocessdataframe()
STAGES1 = [preprocess]
train_preprocess_pip = Pipeline(stages=STAGES1)
train_preprocess_model = train_preprocess_pip.fit(train)

In [None]:
train_preprocess_model.write().overwrite().save('linear_model_preprocess')

In [None]:
from pyspark.ml import PipelineModel
prep_model = PipelineModel.load('linear_model_preprocess')
train_preprocessed = prep_model.transform(train)

In [None]:
STAGES2 = [assembler, lgbm]
train_model_pip = Pipeline(stages=STAGES1)
train_model = train_model_pip.fit(train_preprocessed)

In [None]:
train_preprocess_model.write().overwrite().save('linear_model_model')