Copyright (c) Microsoft Corporation. All rights reserved.  
Licensed under the MIT License.

# NYC Taxi Data Regression Model
This is an Azure Machine Learning Pipelines version of two-part tutorial (Part 1, Part 2) available for Azure Machine Learning.

You can combine the two part tutorial into one using AzureML Pipelines as Pipelines provide a way to stitch together various steps involved (like data preparation and training in this case) in a machine learning workflow.

In this notebook, you learn how to prepare data for regression modeling by using open source library pandas. You run various transformations to filter and combine two different NYC taxi datasets. Once you prepare the NYC taxi data for regression modeling, then you will use AutoMLStep available with Azure Machine Learning Pipelines to define your machine learning goals and constraints as well as to launch the automated machine learning process. The automated machine learning technique iterates over many combinations of algorithms and hyperparameters until it finds the best model based on your criterion.

After you complete building the model, you can predict the cost of a taxi trip by training a model on data features. These features include the pickup day and time, the number of passengers, and the pickup location.

## Prerequisite
If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure you go through the configuration Notebook located at https://github.com/Azure/MachineLearningNotebooks first if you haven't. This sets you up with a working config file that has information on your workspace, subscription id, etc.

## Prepare data for regression modeling
First, we will prepare data for regression modeling. We will leverage the convenience of Azure Open Datasets along with the power of Azure Machine Learning service to create a regression model to predict NYC taxi fare prices. Perform ```pip install azureml-opendatasets``` to get the open dataset package. The Open Datasets package contains a class representing each data source (NycTlcGreen and NycTlcYellow) to easily filter date parameters before downloading.

### Load data
Begin by creating a dataframe to hold the taxi data. When working in a non-Spark environment, Open Datasets only allows downloading one month of data at a time with certain classes to avoid MemoryError with large datasets. To download a year of taxi data, iteratively fetch one month at a time, and before appending it to green_df_raw, randomly sample 500 records from each month to avoid bloating the dataframe. Then preview the data. To keep this process short, we are sampling data of only 1 month.

Note: Open Datasets has mirroring classes for working in Spark environments where data size and memory aren't a concern.

In [None]:
from azureml.opendatasets import NycTlcGreen, NycTlcYellow
import pandas as pd
from datetime import datetime
from dateutil.relativedelta import relativedelta

green_df_raw = pd.DataFrame([])
start = datetime.strptime("1/1/2016","%m/%d/%Y")
end = datetime.strptime("1/31/2016","%m/%d/%Y")

number_of_months = 1
sample_size = 5000

for sample_month in range(number_of_months):
    temp_df_green = NycTlcGreen(start + relativedelta(months=sample_month), end + relativedelta(months=sample_month)) \
        .to_pandas_dataframe()
    green_df_raw = green_df_raw.append(temp_df_green.sample(sample_size))

In [None]:
yellow_df_raw = pd.DataFrame([])
start = datetime.strptime("1/1/2016","%m/%d/%Y")
end = datetime.strptime("1/31/2016","%m/%d/%Y")

sample_size = 500

for sample_month in range(number_of_months):
    temp_df_yellow = NycTlcYellow(start + relativedelta(months=sample_month), end + relativedelta(months=sample_month)) \
        .to_pandas_dataframe()
    yellow_df_raw = yellow_df_raw.append(temp_df_yellow.sample(sample_size))

### See the data

In [None]:
from IPython.display import display

display(green_df_raw.head(5))
display(yellow_df_raw.head(5))

### Download data locally and then upload to Azure Blob
This is a one-time process to save the dave in the default datastore.

In [None]:
import os
dataDir = "data"

if not os.path.exists(dataDir):
    os.mkdir(dataDir)

greenDir = dataDir + "/green"
yelloDir = dataDir + "/yellow"

if not os.path.exists(greenDir):
    os.mkdir(greenDir)
    
if not os.path.exists(yelloDir):
    os.mkdir(yelloDir)
    
greenTaxiData = greenDir + "/unprepared.parquet"
yellowTaxiData = yelloDir + "/unprepared.parquet"

green_df_raw.to_csv(greenTaxiData, index=False)
yellow_df_raw.to_csv(yellowTaxiData, index=False)

print("Data written to local folder.")

In [None]:
from azureml.core import Workspace

ws = Workspace.from_config()
print("Workspace: " + ws.name, "Region: " + ws.location, sep = '\n')

# Default datastore
default_store = ws.get_default_datastore() 

default_store.upload_files([greenTaxiData], 
                           target_path = 'green', 
                           overwrite = True, 
                           show_progress = True)

default_store.upload_files([yellowTaxiData], 
                           target_path = 'yellow', 
                           overwrite = True, 
                           show_progress = True)

print("Upload calls completed.")

### Create and register datasets
By creating a dataset, you create a reference to the data source location. If you applied any subsetting transformations to the dataset, they will be stored in the dataset as well. You can learn more about the what subsetting capabilities are supported by referring to our documentation. The data remains in its existing location, so no extra storage cost is incurred.

In [None]:
from azureml.core import Dataset
green_taxi_data = Dataset.Tabular.from_delimited_files(default_store.path('green/unprepared.parquet'))
yellow_taxi_data = Dataset.Tabular.from_delimited_files(default_store.path('yellow/unprepared.parquet'))

Register the taxi datasets with the workspace so that you can reuse them in other experiments or share with your colleagues who have access to your workspace.

In [None]:
green_taxi_data = green_taxi_data.register(ws, 'green_taxi_data', create_new_version=True)
yellow_taxi_data = yellow_taxi_data.register(ws, 'yellow_taxi_data', create_new_version=True)

### Setup Compute
#### Create new or use an existing compute

In [None]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

# Choose a name for your CPU cluster
amlcompute_cluster_name = "cpu-cluster"

# Verify that cluster does not exist already
try:
    aml_compute = ComputeTarget(workspace=ws, name=amlcompute_cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D2_V2',
                                                           max_nodes=4)
    aml_compute = ComputeTarget.create(ws, amlcompute_cluster_name, compute_config)

aml_compute.wait_for_completion(show_output=True)

### Prepare data
Now we will prepare for regression modeling by using pandas. We run various transformations to filter and combine two different NYC taxi datasets.

We achieve this by creating a separate step for each transformation as this allows us to reuse the steps and saves us from running all over again in case of any change. We will keep data preparation scripts in one subfolder and training scripts in another.

> The best practice is to use separate folders for scripts and its dependent files for each step and specify that folder as the source_directory for the step. This helps reduce the size of the snapshot created for the step (only the specific folder is snapshotted). Since changes in any files in the source_directory would trigger a re-upload of the snapshot, this helps keep the reuse of the step when there are no changes in the source_directory of the step.
#### Define Useful Columns
Here we are defining a set of "useful" columns for both Green and Yellow taxi data.

In [None]:
display(green_df_raw.columns)
display(yellow_df_raw.columns)

# useful columns needed for the Azure Machine Learning NYC Taxi tutorial
useful_columns = str(["cost", "distance", "dropoff_datetime", "dropoff_latitude", 
                      "dropoff_longitude", "passengers", "pickup_datetime", 
                      "pickup_latitude", "pickup_longitude", "store_forward", "vendor"]).replace(",", ";")

print("Useful columns defined.")

#### Cleanse Green taxi data

In [None]:
from azureml.component import Component

cleansing_component_func = Component.from_yaml(ws, os.path.join('nyc-taxi-data-regression-model-building', 'cleanse', 'cleanse_spec.yaml'))

In [None]:
green_columns = str({ 
    "vendorID": "vendor",
    "lpepPickupDatetime": "pickup_datetime",
    "lpepDropoffDatetime": "dropoff_datetime",
    "storeAndFwdFlag": "store_forward",
    "pickupLongitude": "pickup_longitude",
    "pickupLatitude": "pickup_latitude",
    "dropoffLongitude": "dropoff_longitude",
    "dropoffLatitude": "dropoff_latitude",
    "passengerCount": "passengers",
    "fareAmount": "cost",
    "tripDistance": "distance"
}).replace(",", ";")

cleansing_step_green = cleansing_component_func(
    raw_data=green_taxi_data.as_named_input('raw_data'),
    useful_columns=useful_columns,
    columns=green_columns)

print("cleansing_step_green created.")

#### Cleanse Yellow taxi data

In [None]:
yellow_columns = str({
    "vendorID": "vendor",
    "tpepPickupDateTime": "pickup_datetime",
    "tpepDropoffDateTime": "dropoff_datetime",
    "storeAndFwdFlag": "store_forward",
    "startLon": "pickup_longitude",
    "startLat": "pickup_latitude",
    "endLon": "dropoff_longitude",
    "endLat": "dropoff_latitude",
    "passengerCount": "passengers",
    "fareAmount": "cost",
    "tripDistance": "distance"
}).replace(",", ";")

cleansing_step_yellow = cleansing_component_func(
    raw_data=yellow_taxi_data.as_named_input('raw_data'),
    useful_columns=useful_columns,
    columns=yellow_columns)

print("cleansing_step_yellow created.")

#### Merge cleansed Green and Yellow datasets
We are creating a single data source by merging the cleansed versions of Green and Yellow taxi data.

In [None]:
merging_component_func = Component.from_yaml(ws, os.path.join('nyc-taxi-data-regression-model-building', 'merge', 'merge_spec.yaml'))

In [None]:
merging_step = merging_component_func(
    cleansed_green_data=cleansing_step_green.outputs.output_cleanse,
    cleansed_yellow_data=cleansing_step_yellow.outputs.output_cleanse)

print("merging_step created.")

#### Filter data
This step filters out coordinates for locations that are outside the city border. We use a TypeConverter object to change the latitude and longitude fields to decimal type.

In [None]:
filter_component_func = Component.from_yaml(ws, os.path.join('nyc-taxi-data-regression-model-building', 'filter', 'filter_spec.yaml'))

In [None]:
filter_step = filter_component_func(merged_data=merging_step.outputs.output_merge)

print("filter_step created.")

#### Normalize data
In this step, we split the pickup and dropoff datetime values into the respective date and time columns and then we rename the columns to use meaningful names.

In [None]:
normalize_component_func = Component.from_yaml(ws, os.path.join('nyc-taxi-data-regression-model-building', 'normalize', 'normalize_spec.yaml'))

In [None]:
normalize_step = normalize_component_func(filtered_data=filter_step.outputs.output_filter)

print("normalize_step created.")

#### Transform data
Transform the normalized taxi data to final required format. This steps does the following:

- Split the pickup and dropoff date further into the day of the week, day of the month, and month values.
- To get the day of the week value, uses the derive_column_by_example() function. The function takes an array parameter of example objects that define the input data, and the preferred output. The function automatically determines the preferred transformation. For the pickup and dropoff time columns, split the time into the hour, minute, and second by using the split_column_by_example() function with no example parameter.
- After new features are generated, use the drop_columns() function to delete the original fields as the newly generated features are preferred.
- Rename the rest of the fields to use meaningful descriptions.

In [None]:
transform_component_func = Component.from_yaml(ws, os.path.join('nyc-taxi-data-regression-model-building', 'transform', 'transform_spec.yaml'))

In [None]:
transform_step = transform_component_func(normalized_data=normalize_step.outputs.output_normalize)

print("transform_step created.")

#### Split the data into train and test sets
This function segregates the data into dataset for model training and dataset for testing.

In [None]:
test_train_split_component_func = Component.from_yaml(ws, os.path.join('nyc-taxi-data-regression-model-building', 'train_test_split', 'train_test_split_spec.yaml'))

In [None]:
test_train_split_step = test_train_split_component_func(transformed_data=transform_step.outputs.output_transform)

print("test_train_split_step created.")

#### Add regression steps
TODO: use AutoML step in this part.

In [None]:
select_columns_func = Component.load(ws, namespace='azureml', name='Select Columns in Dataset')
linear_regression_func = Component.load(ws, namespace='azureml', name='Linear Regression')
train_model_func = Component.load(ws, namespace='azureml', name='Train Model')
score_model_func = Component.load(ws, namespace='azureml', name='Score Model')
evaluate_model_func = Component.load(ws, namespace='azureml', name='Evaluate Model')

In [None]:
select_columns_step = select_columns_func(
    dataset=test_train_split_step.outputs.output_split_train,
    select_columns='{"isFilter":true,"rules":[{"exclude":false,"ruleType":"ColumnNames","columns":["pickup_weekday","pickup_hour","distance","passengers","vendor","cost"]}]}')
linear_regression_step = linear_regression_func(
    solution_method='Ordinary Least Squares',
    l2_regularization_weight=0.001,
    include_intercept_term=True)
train_model_step = train_model_func(
    dataset=select_columns_step.outputs.results_dataset,
    label_column='{"isFilter":true,"rules":[{"exclude":false,"ruleType":"ColumnNames","columns":["cost"]}]}',
    untrained_model=linear_regression_step.outputs.untrained_model)
score_model_step = score_model_func(
    trained_model=train_model_step.outputs.trained_model,
    dataset=test_train_split_step.outputs.output_split_test,
    append_score_columns_to_output=True)
evaluate_model_step = evaluate_model_func(scored_dataset=score_model_step.outputs.scored_dataset)

#### Build Pipeline

In [None]:
from azureml.component import PipelineComponent

pipeline = PipelineComponent(
    [cleansing_step_green,
     cleansing_step_yellow,
     merging_step,
     filter_step,
     normalize_step,
     transform_step,
     test_train_split_step,
     select_columns_step,
     linear_regression_step,
     train_model_step,
     score_model_step,
     evaluate_model_step],
    default_compute_target='cpu-cluster')

#### Submit Pipeline

In [None]:
run = pipeline.submit(experiment_name='NYCTaxi_Tutorial_Pipelines')
run.wait_for_completion()