# Predicting driving taxi pickup count by location with the Amazon SageMaker DeepAR algorithm
_**Using the Amazon SageMaker DeepAR algorithm to predict taxi pickup count by location **_



## Contents

1. [Background](#Background)
1. [Setup](#Setup)
1. [Data](#Data)
1. [Train](#Train)
1. [Host](#Host)
1. [Evaluate](#Evaluate)
1. [Extensions](#Extensions)

---

## Background

This article is based on https://github.com/aws/amazon-sagemaker-examples/blob/master/introduction_to_applying_machine_learning/deepar_chicago_traffic_violations/deepar_chicago_traffic_violations.ipynb .

#
This notebook demonstrates time series forecasting using the Amazon SageMaker DeepAR algorithm by analyzing [New York City Taxi and Limousine Commission (TLC) Trip Record Data](https://registry.opendata.aws/nyc-tlc-trip-records-pds/). 

The dataset contains pickup/dropoff locations and timestamp. Amazon SageMaker’s DeepAR algorithm can be used to train a model to predict future pickup count and location using the Amazon SageMaker’s [DeepAR algorithm](https://docs.aws.amazon.com/sagemaker/latest/dg/deepar.html).

---

## Setup

This notebook was created and tested on an ml.m4.xlarge notebook instance.

Let's start by specifying:

- The S3 bucket and prefix that you want to use for training and model data.  This should be within the same region as the notebook instance, training, and hosting.
- The IAM role arn used to give training and hosting access to your data. See the [documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-roles.html) for how to create these.  Note, if more than one role is required for notebook instances, training, and/or hosting, please replace `sagemaker.get_execution_role()` with the appropriate full IAM role arn string(s).

In [None]:
import sagemaker

#bucket = sess.default_bucket()
bucket = "tlc-stack-artifactbucket-wwus3ytvirwq"
source = "kinesis-output/20211213/"
prefix = "sagemaker/nyctlc"
datafile = "source.csv"

sess = sagemaker.Session()
region = sess.boto_region_name
role = sagemaker.get_execution_role()


Now we import Python libraries like matplotlib, pandas and numpy

In [None]:
import boto3
import os, sys
import json, datetime

from IPython.display import Markdown, display
import matplotlib.pyplot as plt
import pandas as pd


---

## Data

The dataset contains 10 minutes time window for timestamp, location, pickup count. If we want to known the next 10 minutes dataset, we can use Amazon SageMaker’s DeepAR algorithm to train a model to predict it.

The dataset contains several columns, we use the timestamp, location(geohash), pickup cout for the forecasting.


In [None]:
DATE_FORMAT = "%Y-%m-%d %H:%M:%S"

def parse_datetime(ss):
	return [datetime.datetime.strptime(x, DATE_FORMAT) for x in ss]

def pd_read():
	# read the input file, and display sample rows/columns
	pd.set_option("display.max_columns", 500)
	pd.set_option("display.max_rows", 50)
	df = pd.read_csv(datafile, header=None, 
		names=["TIMESTAMP", "GEOHASH", "PICKUP_COUNT", "LOCATION"],
		parse_dates=["TIMESTAMP"], date_parser=parse_datetime)

	# print first 10 lines to look at part of the dataset
	print("------------------------------------------------------")
	print(df[0:10])

	print("----SORT by TIMESTAMP, GEOHASH------------------------")
	df = df.sort_values(["TIMESTAMP", "GEOHASH"])
	print(df[0:10])

	return df

df = pd_read()



We convert the timestamp from string format to date format, determine the range of datetime, and look at how many unique location we have in our dataset.

As described in [Amazon SageMaker DeepAR input/output interface](https://docs.aws.amazon.com/sagemaker/latest/dg/deepar.html#deepar-inputoutput) section, we will convert the data into array, and use 0 for the pickup count when data for a location is not available. Using the Matplotlib library we display each location as a timeseries to visualize the data


In [None]:
def make_unique_geohash():
	unique_geohash = df.GEOHASH.unique()
	number_of_geohash = len(unique_geohash)

	print("------------------------------------------------------")
	print("Unique GeoHash {}".format(number_of_geohash))
	print(
		"Minimum pickup date is {}, maximum pickup date is {}".format(
			df.TIMESTAMP.min(), df.TIMESTAMP.max()
		)
	)
	return unique_geohash

unique_geohash = make_unique_geohash()


def make_pickup_list(freq):
	pickup_list = []
	idx = pd.date_range(df.TIMESTAMP.min(), df.TIMESTAMP.max(), freq=freq)
	for key in unique_geohash:
		temp_df = df[["TIMESTAMP", "PICKUP_COUNT"]][df.GEOHASH == key]
		temp_df.set_index(["TIMESTAMP"], inplace=True)
		temp_df.index = pd.DatetimeIndex(temp_df.index)
		temp_df = temp_df.reindex(idx, fill_value=0)
		pickup_list.append(temp_df["PICKUP_COUNT"])

	# print first 10 items
	print("----PICKUP LIST---------------------------------------")
	print(pickup_list[0:10])
	return pickup_list

pickup_list = make_pickup_list("10min")

# plot
def plot():
	plt.figure(figsize=(12, 6), dpi=100, facecolor="w")
	for key, geohash in enumerate(unique_geohash):
		plt.plot(pickup_list[key], label=geohash)

	plt.ylabel("PickupCount")
	plt.xlabel("DateTime")
	plt.title("New York City TLC Pickup Count")
	plt.legend(loc="upper center", bbox_to_anchor=(0.5, -0.05), shadow=False, ncol=4)
	plt.show()

plot()


We define prediction length as 144 (1 day = 6 * 24), and split the data with last 1 day of data as test data. We use rest of the data for training of the model. We can use the last 30 days of data to evaluate the accuracy of our trained model. We write the training and test data files in JSON format in the S3 bucket. 

In [None]:
# 1day = 10min * 6 * 24
prediction_length = 6 * 24

# Split the data for training and test
def make_train_list():
	training = []
	for i in pickup_list:
		training.append((i[:-prediction_length]))

	return training

pickup_list_training = make_train_list()



def format_datetime(d):
	return datetime.datetime.strftime(d, DATE_FORMAT)

def series_to_obj(ts, cat=None):
	obj = {
		"start": format_datetime(ts.index[0]), 
		"target": list(ts)
	}
	if cat:
		obj["cat"] = cat
	return obj


def series_to_jsonline(ts, cat=None):
	return json.dumps(series_to_obj(ts, cat))


encoding = "utf-8"
def save_train_data():
	file_name = "train.json"
	train_data_path = "{}/train/{}".format(prefix, file_name)

	print("------------------------------------------------------")
	print("saving %s" % file_name)
	with open(file_name, "wb") as fp:
		for ts in pickup_list_training:
			fp.write(series_to_jsonline(ts).encode(encoding))
			fp.write("\n".encode(encoding))

	print("uploading %s => %s" % (file_name, train_data_path))
	s3r.Object(bucket, train_data_path).upload_file(file_name)

def save_test_data():
	file_name = "test.json"
	test_data_path = "{}/test/{}".format(prefix, file_name)

	print("------------------------------------------------------")
	print("saving %s" % file_name)
	with open(file_name, "wb") as fp:
		for ts in pickup_list:
			fp.write(series_to_jsonline(ts).encode(encoding))
			fp.write("\n".encode(encoding))

	print("uploading %s => %s" % (file_name, test_data_path))
	s3r.Object(bucket, test_data_path).upload_file(file_name)

save_train_data()
save_test_data()


---

## Train

We use [SageMaker Python SDK](https://sagemaker.readthedocs.io/en/stable/) to create an [estimator](https://sagemaker.readthedocs.io/en/stable/estimators.html) object to kick off training job. The use_spot parameter indicates the use of [managed spot training](https://docs.aws.amazon.com/sagemaker/latest/dg/model-managed-spot-training.html). The training will run at most 30 minutes (1800 seconds). 

We use the [Automatic Model Tuning](https://docs.aws.amazon.com/sagemaker/latest/dg/automatic-model-tuning.html) or Hyperparameter optimization for identifying the best values for the [DeepAR hyperparameters](https://docs.aws.amazon.com/sagemaker/latest/dg/deepar_hyperparameters.html). The Automatic Model Tuning job will kick of 10 parallel jobs (set by by max_parallel_jobs) to search the best hyperparameters for this dataset. The jobs will try to minimize the root mean square error on the test dataset using predicted and actual values.

You can consider increasing the max_parallel_jobs and max_run and max_wait parameters to allow for finding better hyperparameters, and allow additional tuning of the hyperparameters.



In [None]:
from sagemaker.tuner import (
	IntegerParameter,
	CategoricalParameter,
	ContinuousParameter,
	HyperparameterTuner,
)
from sagemaker import image_uris


container = image_uris.retrieve(region=region, framework="forecasting-deepar")

deepar = sagemaker.estimator.Estimator(
	container,
	role,
	instance_count=1,
	instance_type="ml.m4.xlarge",
	use_spot_instances=True,  # use spot instances
	max_run=1800,  # max training time in seconds
	max_wait=1800,  # seconds to wait for spot instance
	output_path="s3://{}/{}".format(bucket, "estimate"),
	sagemaker_session=sess,
)
freq = "10min"
context_length = 6 * 24

deepar.set_hyperparameters(
	time_freq=freq, context_length=str(context_length), prediction_length=str(prediction_length)
)

hyperparameter_ranges = {
	"mini_batch_size": IntegerParameter(100, 400),
	"epochs": IntegerParameter(200, 400),
	"num_cells": IntegerParameter(30, 100),
	"likelihood": CategoricalParameter(["negative-binomial", "student-T"]),
	"learning_rate": ContinuousParameter(0.0001, 0.1),
}

objective_metric_name = "test:RMSE"

tuner = HyperparameterTuner(
	deepar,
	objective_metric_name,
	hyperparameter_ranges,
	max_jobs=10,
	strategy="Bayesian",
	objective_type="Minimize",
	max_parallel_jobs=10,
	early_stopping_type="Auto",
)

s3_input_train = sagemaker.inputs.TrainingInput(
	s3_data="s3://{}/{}/train/".format(bucket, prefix), content_type="json"
)
s3_input_test = sagemaker.inputs.TrainingInput(
	s3_data="s3://{}/{}/test/".format(bucket, prefix), content_type="json"
)

tuner.fit({"train": s3_input_train, "test": s3_input_test}, include_cls_metadata=False)
tuner.wait()

---

## Host

We use the [HyperParameterTuner](https://sagemaker.readthedocs.io/en/stable/tuner.html) to host the best model using a single ml.m4.xlarge instance.

In [None]:
from sagemaker.serializers import IdentitySerializer
from sagemaker.deserializers import JSONDeserializer

best_tuning_job_name = tuner.best_training_job()
endpoint = tuner.deploy(
	initial_instance_count=1,
	endpoint_name=best_tuning_job_name,
	instance_type="ml.m4.xlarge",
	serializer=IdentitySerializer(content_type="application/json"),
	deserializer=JSONDeserializer(),
	wait=True,
)

### Evaluate

To evaluate the model, we define a DeepARPredictor class. This class extends the [Predictor](https://sagemaker.readthedocs.io/en/stable/api/inference/predictors.html) class. Implementing encode and decode functions helps us make requests using `pandas.Series` objects rather than raw JSON strings.
    

In [None]:
class DeepARPredictor(sagemaker.predictor.Predictor):
	def __init__(self, *args, **kwargs):
		super().__init__(
			*args, serializer=IdentitySerializer(content_type="application/json"), **kwargs
		)

	def set_prediction_parameters(self, freq, prediction_length):
		"""Set the time frequency and prediction length parameters. This method **must** be
		called before being able to use `predict`.

		Parameters:
		freq -- string indicating the time frequency
		prediction_length -- integer, number of predicted time points

		Return value: none.
		"""
		self.freq = freq
		self.prediction_length = prediction_length

	def predict(
		self, ts, cat=None, encoding="utf-8", num_samples=100, quantiles=["0.1", "0.5", "0.9"]
	):
		"""Requests the prediction of for the time series listed in `ts`, each with the
		(optional) corresponding category listed in `cat`.

		Parameters:
		ts -- list of `pandas.Series` objects, the time series to predict
		cat -- list of integers (default: None)
		encoding -- string, encoding to use for the request (default: 'utf-8')
		num_samples -- integer, number of samples to compute at prediction time (default: 100)
		quantiles -- list of strings specifying the quantiles to compute (default: ['0.1', '0.5', '0.9'])

		Return value: list of `pandas.DataFrame` objects, each containing the predictions
		"""
		prediction_times = [x.index[-1] + x.index.freq for x in ts]
		req = self.__encode_request(ts, cat, encoding, num_samples, quantiles)
		res = super(DeepARPredictor, self).predict(req)
		return self.__decode_response(res, prediction_times, encoding)

	def __encode_request(self, ts, cat, encoding, num_samples, quantiles):
		instances = [series_to_obj(ts[k], cat[k] if cat else None) for k in range(len(ts))]
		configuration = {
			"num_samples": num_samples,
			"output_types": ["quantiles"],
			"quantiles": quantiles,
		}
		http_request_data = {"instances": instances, "configuration": configuration}
		return json.dumps(http_request_data).encode(encoding)

	def __decode_response(self, response, prediction_times, encoding):
		response_data = json.loads(response.decode(encoding))
		list_of_df = []
		for k in range(len(prediction_times)):
			prediction_index = pd.date_range(
				start=prediction_times[k], freq=self.freq, periods=self.prediction_length
			)
			list_of_df.append(
				pd.DataFrame(
					data=response_data["predictions"][k]["quantiles"], index=prediction_index
				)
			)
		return list_of_df


predictor = DeepARPredictor(endpoint_name=endpoint.endpoint_name, sagemaker_session=sess)

Now we can use the previously created `predictor` object. We will predict only the first few time series, and compare the results with the actual data we kept in the test set.

In [None]:
predictor.set_prediction_parameters(freq, prediction_length)
list_of_df = predictor.predict(pickup_list_training[:5])
actual_data = pickup_list[:5]
for k in range(len(list_of_df)):
	plt.figure(figsize=(12, 6), dpi=75, facecolor="w")
	plt.ylabel("Pickup Count")
	plt.xlabel("Date Time")
	plt.title("NYC TLC Pickup Count:" + unique_geohash[k])
	actual_data[k][-prediction_length - context_length :].plot(label="target")
	p10 = list_of_df[k]["0.1"]
	p90 = list_of_df[k]["0.9"]
	plt.fill_between(p10.index, p10, p90, color="y", alpha=0.5, label="80% confidence interval")
	list_of_df[k]["0.5"].plot(label="prediction median")
	plt.legend()
	plt.show()

---

### Clean-up

At the end of this exercise, delete the endpoint to avoid accumulating charges in your account.

In [None]:
predictor.delete_model()
predictor.delete_endpoint()