# NYC Taxi Fare Prediction with RayDP and Pytorch

In [1]:
import ray
import os
import pandas as pd, numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F

from pyspark.sql.functions import *

import raydp
from raydp.torch.estimator import TorchEstimator
from raydp.utils import random_split

## Initialize or connect to existed Ray cluster

In [2]:
# Firstly, You need to init or connect to a ray cluster. Note that you should set include_java to True.
# For more config info in ray, please refer the ray doc. https://docs.ray.io/en/latest/package-ref.html
# ray.init(address="auto", redis_password="123")
#ray.shutdown()
ray.init()

2020-12-16 22:11:18,018	INFO services.py:1169 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


{'node_ip_address': '192.168.1.5',
 'raylet_ip_address': '192.168.1.5',
 'redis_address': '192.168.1.5:6379',
 'object_store_address': '/tmp/ray/session_2020-12-16_22-11-17_484866_84597/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2020-12-16_22-11-17_484866_84597/sockets/raylet',
 'webui_url': '127.0.0.1:8265',
 'session_dir': '/tmp/ray/session_2020-12-16_22-11-17_484866_84597',
 'metrics_export_port': 65506,
 'node_id': '240c6e958529c67e9f52ec04a73e37d0c3726d61'}

In [3]:
# After initialize ray cluster, you can use the raydp api to get a spark session
app_name = "NYC Taxi Fare Prediction with RayDP"
num_executors = 1
cores_per_executor = 1
memory_per_executor = "2GB"
spark = raydp.init_spark(app_name, num_executors, cores_per_executor, memory_per_executor)

[2m[33m(pid=raylet)[0m SLF4J: Class path contains multiple SLF4J bindings.
[2m[33m(pid=raylet)[0m SLF4J: Found binding in [jar:file:/Users/dr6jl/Documents/ray-raydp/ray/python/ray/jars/ray_dist.jar!/org/slf4j/impl/StaticLoggerBinder.class]
[2m[33m(pid=raylet)[0m SLF4J: Found binding in [jar:file:/Users/dr6jl/anaconda3/lib/python3.8/site-packages/pyspark/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
[2m[33m(pid=raylet)[0m SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
[2m[33m(pid=raylet)[0m SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]


## Distributed data preprocessing with pyspark

In [4]:
# Then you can code as you are using spark
# The dataset can be downloaded from https://www.kaggle.com/c/new-york-city-taxi-fare-prediction/data
# Here we just use a subset of the training data
train = spark.read.format("csv").option("header", "true") \
        .option("inferSchema", "true") \
        .load("../../data1/new-york-city-taxi-fare-prediction/train.csv")

# Set spark timezone for processing datetime
spark.conf.set("spark.sql.session.timeZone", "UTC")

In [5]:
# Clean up the outlier
def clean_up(data):
    
    data = data.filter(col('pickup_longitude')<=-72) \
            .filter(col('pickup_longitude')>=-76) \
            .filter(col('dropoff_longitude')<=-72) \
            .filter(col('dropoff_longitude')>=-76) \
            .filter(col('pickup_latitude')<=42) \
            .filter(col('pickup_latitude')>=38) \
            .filter(col('dropoff_latitude')<=42) \
            .filter(col('dropoff_latitude')>=38) \
            .filter(col('passenger_count')<=6) \
            .filter(col('passenger_count')>=1) \
            .filter(col('fare_amount') > 0) \
            .filter(col('fare_amount') < 250) \
            .filter(col('dropoff_longitude') != col('pickup_longitude')) \
            .filter(col('dropoff_latitude') != col('pickup_latitude')) 
    
    return data

In [6]:
# Add time related features
def add_time_features(data):
    data = data.withColumn("day", dayofmonth(col("pickup_datetime")))
    data = data.withColumn("hour_of_day", hour(col("pickup_datetime")))
    data = data.withColumn("day_of_week", dayofweek(col("pickup_datetime"))-2)
    data = data.withColumn("week_of_year", weekofyear(col("pickup_datetime")))
    data = data.withColumn("month_of_year", month(col("pickup_datetime")))
    data = data.withColumn("quarter_of_year", quarter(col("pickup_datetime")))
    data = data.withColumn("year", year(col("pickup_datetime")))
    @udf("int")
    def night(hour, weekday):
        if ((hour <= 20) and (hour >= 16) and (weekday < 5)):
            return int(1)
        else:
            return int(0)
    @udf("int")
    def late_night(hour):
        if ((hour <= 6) and (hour >= 20)):
            return int(1)
        else:
            return int(0)
    data = data.withColumn("night", night("hour_of_day", "day_of_week"))
    data = data.withColumn("late_night", late_night("hour_of_day"))
    return data

In [7]:
# Add distance related features
def add_distance_features(data):
    @udf("float")
    def manhattan(lat1, lon1, lat2, lon2):
        return float(np.abs(lat2 - lat1) + np.abs(lon2 - lon1))
    # Location of NYC downtown
    ny = (-74.0063889, 40.7141667)
    # Location of the three airport in NYC
    jfk = (-73.7822222222, 40.6441666667)
    ewr = (-74.175, 40.69)
    lgr = (-73.87, 40.77)
    # Features about the distance between pickup/dropoff and airport
    data = data.withColumn("abs_diff_longitude", abs(col("dropoff_longitude")-col("pickup_longitude"))) \
            .withColumn("abs_diff_latitude", abs(col("dropoff_latitude") - col("pickup_latitude")))
    data = data.withColumn("manhattan", col("abs_diff_latitude")+col("abs_diff_longitude"))
    data = data.withColumn("pickup_distance_jfk", manhattan("pickup_longitude", "pickup_latitude", lit(jfk[0]), lit(jfk[1])))
    data = data.withColumn("dropoff_distance_jfk", manhattan("dropoff_longitude", "dropoff_latitude", lit(jfk[0]), lit(jfk[1])))
    data = data.withColumn("pickup_distance_ewr", manhattan("pickup_longitude", "pickup_latitude", lit(ewr[0]), lit(ewr[1])))
    data = data.withColumn("dropoff_distance_ewr", manhattan("dropoff_longitude", "dropoff_latitude", lit(ewr[0]), lit(ewr[1])))
    data = data.withColumn("pickup_distance_lgr", manhattan("pickup_longitude", "pickup_latitude", lit(lgr[0]), lit(lgr[1])))
    data = data.withColumn("dropoff_distance_lgr", manhattan("dropoff_longitude", "dropoff_latitude", lit(lgr[0]), lit(lgr[1])))
    data = data.withColumn("pickup_distance_downtown", manhattan("pickup_longitude", "pickup_latitude", lit(ny[0]), lit(ny[1])))
    data = data.withColumn("dropoff_distance_downtown", manhattan("dropoff_longitude", "dropoff_latitude", lit(ny[0]), lit(ny[1])))
    return data

In [8]:
# Drop unused features
def drop_col(data):
    
    data = data.drop("pickup_datetime") \
            .drop("pickup_longitude") \
            .drop("pickup_latitude") \
            .drop("dropoff_longitude") \
            .drop("dropoff_latitude") \
            .drop("passenger_count") \
            .drop("key")
    
    return data

In [9]:

train_data = clean_up(train)

train_data = add_time_features(train_data)

train_data = add_distance_features(train_data)

train_data = drop_col(train_data)

In [10]:
columns=len(train_data.dtypes)
rows=train_data.count()
print ("Rows:%d, Columns:%d"%(rows,columns))

Rows:53478118, Columns:21


In [10]:
train_data.printSchema()

root
 |-- fare_amount: double (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour_of_day: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- week_of_year: integer (nullable = true)
 |-- month_of_year: integer (nullable = true)
 |-- quarter_of_year: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- night: integer (nullable = true)
 |-- late_night: integer (nullable = true)
 |-- abs_diff_longitude: double (nullable = true)
 |-- abs_diff_latitude: double (nullable = true)
 |-- manhattan: double (nullable = true)
 |-- pickup_distance_jfk: float (nullable = true)
 |-- dropoff_distance_jfk: float (nullable = true)
 |-- pickup_distance_ewr: float (nullable = true)
 |-- dropoff_distance_ewr: float (nullable = true)
 |-- pickup_distance_lgr: float (nullable = true)
 |-- dropoff_distance_lgr: float (nullable = true)
 |-- pickup_distance_downtown: float (nullable = true)
 |-- dropoff_distance_downtown: float (nullable = true)



In [11]:
train_data = train_data.withColumn('index', monotonically_increasing_id())
train_data = train_data.sort('index').limit(1000)

## Distributed model training and evaluation

In [12]:
# Split data into train_dataset and test_dataset
train_df, test_df = random_split(train_data, [0.9, 0.1])
features = [field.name for field in list(train_df.schema) if field.name != "fare_amount"]

In [13]:
# Define the model, loss function and optimizer
class NYC_Model(nn.Module):
    def __init__(self, cols):
        super(NYC_Model, self).__init__()
        self.fc1 = nn.Linear(cols, 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, 64)
        self.fc4 = nn.Linear(64, 16)
        self.fc5 = nn.Linear(16, 1)
        self.bn1 = nn.BatchNorm1d(256)
        self.bn2 = nn.BatchNorm1d(128)
        self.bn3 = nn.BatchNorm1d(64)
        self.bn4 = nn.BatchNorm1d(16)
    def forward(self, *x):
        x = torch.cat(x, dim=1)
        x = F.relu(self.fc1(x))
        x = self.bn1(x)
        x = F.relu(self.fc2(x))
        x = self.bn2(x)
        x = F.relu(self.fc3(x))
        x = self.bn3(x)
        x = F.relu(self.fc4(x))
        x = self.bn4(x)
        x = self.fc5(x)
        return x.squeeze(1)
nyc_model = NYC_Model(len(features))
criterion = nn.SmoothL1Loss()
optimizer = torch.optim.Adam(nyc_model.parameters(), lr=0.001)

In [14]:
nyc_model

NYC_Model(
  (fc1): Linear(in_features=21, out_features=256, bias=True)
  (fc2): Linear(in_features=256, out_features=128, bias=True)
  (fc3): Linear(in_features=128, out_features=64, bias=True)
  (fc4): Linear(in_features=64, out_features=16, bias=True)
  (fc5): Linear(in_features=16, out_features=1, bias=True)
  (bn1): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (bn2): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (bn3): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (bn4): BatchNorm1d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
)

In [15]:
criterion

SmoothL1Loss()

In [16]:
optimizer

Adam (
Parameter Group 0
    amsgrad: False
    betas: (0.9, 0.999)
    eps: 1e-08
    lr: 0.001
    weight_decay: 0
)

In [15]:
nodeinfo = ray.nodes()[0]['NodeManagerAddress']

# set 'node' resource
@ray.remote
def set_resource(resource_name, resource_capacity):
    #by default, on actor's local node
    ray.experimental.set_resource(resource_name,resource_capacity)
ray.get(set_resource.remote(nodeinfo, 1))
print(nodeinfo)

192.168.1.5


In [19]:
# Create a distributed estimator based on the raydp api
#estimator = TorchEstimator(num_workers=4, model=nyc_model, optimizer=optimizer, loss=criterion,
#                            feature_columns=features, label_column="fare_amount", 
#                           batch_size=256, num_epochs=10,placements={nodeinfo:1})

In [17]:
estimator = TorchEstimator(num_workers=1, model=nyc_model, optimizer=optimizer, loss=criterion,
                            feature_columns=features, label_column="fare_amount", 
                           batch_size=256, num_epochs=1)

In [None]:
# Train the model
# estimator.fit_on_spark(train_df, test_df)

In [18]:
%time estimator.fit_on_spark(train_df)

Epoch-0: {'num_samples': 906, 'epoch': 1.0, 'batch_count': 4.0, 'train_loss': 10.984497032418156, 'last_train_loss': 28.41286277770996}
CPU times: user 17.4 s, sys: 7.43 s, total: 24.8 s
Wall time: 19min 16s


[2m[36m(pid=84876)[0m   return F.smooth_l1_loss(input, target, reduction=self.reduction, beta=self.beta)
[2m[36m(pid=84876)[0m   return F.smooth_l1_loss(input, target, reduction=self.reduction, beta=self.beta)
2020-12-16 22:38:20,241	ERROR worker.py:977 -- Possible unhandled error from worker: [36mray::ParallelIteratorWorker.par_iter_next_batch()[39m (pid=84874, ip=192.168.1.5)
  File "python/ray/_raylet.pyx", line 464, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 419, in ray._raylet.execute_task.function_executor
  File "/Users/dr6jl/Documents/ray-raydp/ray/python/ray/util/iter.py", line 1158, in par_iter_next_batch
    batch.append(self.par_iter_next())
  File "/Users/dr6jl/Documents/ray-raydp/ray/python/ray/util/iter.py", line 1152, in par_iter_next
    return next(self.local_it)
StopIteration


In [None]:
# shutdown raydp and ray
estimator.shutdown()
raydp.stop_spark()
ray.shutdown()