# Tutorial: Build a regression model with Open Datasets

In this tutorial, you leverage the convenience of Azure Open Datasets to create a regression model to predict NYC taxi fare prices. Easily download publicly available taxi, holiday and weather data to create a dataset that can train a regression model using sklearn.

In [0]:
from azureml.opendatasets import NycTlcGreen
import pandas as pd
import numpy as np
from datetime import datetime
from dateutil.relativedelta import relativedelta

pd.options.mode.chained_assignment = None

## Download Data
Begin by downloading the NYC Taxi dataset from Azure Open Datasets. In non-Spark environments, Open Datasets only allows one month of data at a time with certain classes to avoid MemoryError with large datasets. To download 1 year of taxi data, we will fetch 2000 random samples from each month.

Note: Open Datasets has mirroring classes for working in Spark where data size and memory are not a concern.

In [0]:
start = datetime.strptime("1/1/2016","%m/%d/%Y")
end = datetime.strptime("1/31/2016","%m/%d/%Y")

green_taxi_df = pd.concat([NycTlcGreen(start + relativedelta(months=x), end + relativedelta(months=x)) \
        .to_pandas_dataframe().sample(2000) for x in range(12)])
green_taxi_df

Now that the initial data is loaded, define a function to create various time-based features from the pickup datetime field. This will create new fields for the month number, day of month, day of week, and hour of day. From those, we calculate the sin and cosine transformations to capture the cyclical nature of the variable which will allow the model to factor in time-based seasonality. This function also adds a static feature for the country code to join the holiday data. Use the apply() function on the dataframe to interatively apply this function to each row in the dataframe.

In [0]:
def build_time_features(vector):
    pickup_datetime = vector[0]
    month_num = pickup_datetime.month
    day_of_month = pickup_datetime.day
    day_of_week = pickup_datetime.weekday()
    hour_of_day = pickup_datetime.hour
    country_code = "US"
    hr_sin = np.sin(hour_of_day*(2.*np.pi/24))
    hr_cos = np.cos(hour_of_day*(2.*np.pi/24))
    dy_sin = np.sin(day_of_week*(2.*np.pi/7))
    dy_cos = np.cos(day_of_week*(2.*np.pi/7))
    
    return pd.Series((month_num, day_of_month, day_of_week, hour_of_day, country_code, hr_sin, hr_cos, dy_sin, dy_cos))

green_taxi_df[["month_num", "day_of_month","day_of_week", "hour_of_day", "country_code", "hr_sin", "hr_cos", "dy_sin", "dy_cos"]] = green_taxi_df[["lpepPickupDatetime"]].apply(build_time_features, axis=1)
green_taxi_df

Remove some of the columns that you won't need for modeling or additional feature building. Rename the time field for pickup time, and additionally convert the time to midnight using `pandas.Series.dt.normalize`. This is done to all time features so that the datetime column can be later used as a key when joining datasets together at a daily level of granularity.

In [0]:
columns_to_remove = ["lpepDropoffDatetime", "puLocationId", "doLocationId", "extra", "mtaTax",
                     "improvementSurcharge", "tollsAmount", "ehailFee", "tripType", "rateCodeID", 
                     "storeAndFwdFlag", "paymentType", "fareAmount", "tipAmount"]

green_taxi_df.drop(columns_to_remove, axis=1, inplace=True)

green_taxi_df["datetime"] = green_taxi_df["lpepPickupDatetime"].dt.normalize()
green_taxi_df.head(5)

### Enrich with Holiday Data

Now that the taxi data is downloaded and roughly prepared, add in holiday data as additional features. Holiday-specific features will assist model accuracy, as major holidays are times where taxi demand increases dramatically and supply becomes limited. The holiday dataset is relatively small, so fetch the full set by using the `PublicHolidays` class constructor with no parameters for filtering. Preview the data to check the format.

In [0]:
from azureml.opendatasets import PublicHolidays

# call default constructor to download full dataset
holidays_df = PublicHolidays().to_pandas_dataframe()
holidays_df.head(5)

Rename the `countryRegionCode` and `date` columns to match the respective field names from the taxi data, and also normalize the time so it can be used as a key. Next, join the holiday data with the taxi data by performing a left-join using the Pandas `merge()` function. This will preserve all records from `green_taxi_df`, but add in holiday data where it exists for the corresponding `datetime` and `country_code`, which in this case is always `\"US\"`. Preview the data to verify that they were merged correctly.

In [0]:
holidays_df = holidays_df.rename(columns={"countryRegionCode": "country_code"})
holidays_df["datetime"] = holidays_df["date"].dt.normalize()

holidays_df.drop(["countryOrRegion", "holidayName", "date"], axis=1, inplace=True)

taxi_holidays_df = pd.merge(green_taxi_df, holidays_df, how="left", on=["datetime", "country_code"])
taxi_holidays_df[taxi_holidays_df["normalizeHolidayName"].notnull()]

### Enrich with weather data

Now NOAA surface weather data can be appended to the taxi and holiday data. Use a similar approach to fetch the weather data by downloading one month at a time iteratively. Additionally, specify the `cols` parameter with an array of strings to filter the columns to download. This is a very large dataset containing weather surface data from all over the world, so before appending each month, filter the lat/long fields to near NYC using the `query()` function on the dataframe. This will ensure the `weather_df` doesn't get too large.

In [0]:
from azureml.opendatasets import NoaaIsdWeather
start = datetime.strptime("1/1/2016","%m/%d/%Y")
end = datetime.strptime("1/31/2016","%m/%d/%Y")

weather_df = pd.concat([NoaaIsdWeather(cols=["temperature", "precipTime", "precipDepth"], start_date=start + relativedelta(months=x), end_date=end + relativedelta(months=x))\
        .to_pandas_dataframe().query("latitude>=40.53 and latitude<=40.88 and longitude>=-74.09 and longitude<=-73.72 and temperature==temperature") for x in range(12)])

In [0]:
weather_df

Again call `pandas.Series.dt.normalize` on the `datetime` field in the weather data so it matches the time key in `taxi_holidays_df`.


Next group the weather data to have daily aggregated weather values. Define a dict `aggregations` to define how to aggregate each field at a daily level. For`temperature` take the mean and for `precipTime` and `precipDepth` take the daily maximum. Use the `groupby()` function along with the aggregations to group the data. Preview the data to ensure there is one record per day.

In [0]:
weather_df["datetime"] = weather_df["datetime"].dt.normalize()

# group by datetime
aggregations = {"precipTime": "max", "temperature": "mean", "precipDepth": "max"}
weather_df_grouped = weather_df.groupby("datetime").agg(aggregations)
weather_df_grouped.head(10)

Note: The examples in this tutorial merge data using Pandas functions and custom aggregations, but the Open Datasets SDK has classes designed to easily merge and enrich data sets. See the [notebook](https://github.com/Azure/OpenDatasetsNotebooks/blob/master/tutorials/data-join/04-nyc-taxi-join-weather-in-pandas.ipynb) for code examples of these design patterns.

### Cleanse data

Merge the existing taxi and holiday data with the new weather data. This time `datetime` is the only key, and again perform a left-join of the data. Run the `describe()` function on the new dataframe to see summary statistics for each field.

In [0]:
taxi_holidays_weather_df = pd.merge(taxi_holidays_df, weather_df_grouped, how="left", on=["datetime"])
taxi_holidays_weather_df.describe()

From the summary statistics, you see that there are several fields that have outliers or values that will reduce model accuracy. First filter the lat/long fields to be within the same bounds you used for filtering weather data. The `tripDistance` field has some bad data, because the minimum value is negative. The `passengerCount` field has bad data as well, with the max value being 210 passengers. Lastly, the `totalAmount` field has negative values, which don't make sense in the context of our model.

Filter out these anomolies using query functions, and then remove the last few columns unnecesary for training.

Note: since a random sample of 2000 was taken for each month of the taxi data, the statistics may vary each time this is ran.

In [0]:
final_df = taxi_holidays_weather_df.query("pickupLatitude>=40.53 and pickupLatitude<=40.88 and \
                                           pickupLongitude>=-74.09 and pickupLongitude<=-73.72 and \
                                           tripDistance>0 and tripDistance<75 and \
                                           passengerCount>0 and passengerCount<100 and \
                                           totalAmount>0")

Call `describe()` again on the data to ensure cleansing worked as expected. The final data is prepared and cleansed, consisting of taxi, holiday, and weather data, and is ready to use for machine learning model training.

In [0]:
final_df.describe()

## Train a model

The data is ready to train a machine learning model.

In [0]:
from sklearn.linear_model import LinearRegression
from sklearn.linear_model import RidgeCV
from sklearn.linear_model import Ridge
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.metrics import r2_score, mean_absolute_percentage_error, mean_squared_error

### Training Function

Define a function that can be used to create a model pipeline that can be trained and then used for scoring. This pipeline has 2 steps: preprocessing and model training.

<b>Preprocessing Stages:</b>
The preprocessing step of the pipeline also has 2 stages, one for numerical features and one for categorical features.
For the numerical features, let's fill in any blanks with 0's. While the training data may not have any nulls in the these fields, future data that is scored may and this step will take care of those for us. Optionally, a scaler transformation could be added in this step as well. Similarly for the categorical variables, let's have the null values filled with "MISSING". Additionally to the categorical variables, these will need to be one hot encoded, so we will include that step in our pipeline.

<b>Model Training Stage:</b>
An input parameter will determine which type of model of train. Let's test out a linear regression and random forest model to start. 

The two steps are put together into the pipeline which is what the function is returning.

In [0]:
def createClassModel(algo_name, catg, nums):
  numeric_transformer = Pipeline(steps=[('imputer', SimpleImputer(strategy='constant', fill_value=0))])
  
  categorical_transformer = Pipeline(steps=[('imputer', SimpleImputer(strategy='constant', fill_value="MISSING")), ('onehot', OneHotEncoder(handle_unknown='ignore'))])
  
  preprocesser = ColumnTransformer(transformers=[('num', numeric_transformer, nums), ('cat', categorical_transformer, catg)])
  
  if algo_name == 'linear_regression':
    model=Ridge(alpha=100)
  elif algo_name == 'random_forest':
    model = RandomForestRegressor()
  else:
    pass
  ModelPipeline = Pipeline(steps=[('preprocessor', preprocesser), ("model", model)])
  return ModelPipeline

Let's define the arguments that will be passed to the function. `catg_cols` is a list of the categorical variables that will be transformed in our processing step. `num_cols` is a list of the numerical variables that will be transformed in our processing step. Let's define the target column as `label` so it can be used in future steps as well.

In [0]:
catg_cols = ["vendorID", "month_num", "day_of_month", "normalizeHolidayName", "isPaidTimeOff"]
num_cols = ["passengerCount", "tripDistance", "precipTime", "temperature", "precipDepth", "hr_sin", "hr_cos", "dy_sin", "dy_cos"]
label = ["totalAmount"]

The training is ready to begin, but first, let's make sure that the categorical variables are strings in our dataframe to ensure no errors in our pipeline. 

Next, the data is split into training and test sets by using the `train_test_split()` function in the `scikit-learn` library. The `test_size` parameter determines the percentage of data to allocate to testing. The `random_state` parameter sets a seed to the random number generator, so that your train-test splits are deterministic.

The training will happen in the for loop so that both algorithms can be tested. The createClassModel funtion is called to retreive the pipeline that can then be trained using the training dataset. 

Once trained, the test dataset is then ran through the model to test the model's performance. Using various functions from sklearn.metrics, the R2 score, MAPE, and RMSE can be used to measure model performance.

In [0]:
# make sure categorical columns are strings
final_df[catg_cols] = final_df[catg_cols].astype("str")

# split data
X_train, X_test, y_train, y_test = train_test_split(final_df.drop(label, axis=1), final_df[label], test_size=0.2, random_state=222)

# test 2 algorithms
for algorithmname in ["linear_regression", 'random_forest']:
    fitPipeline = createClassModel(algorithmname, catg_cols, num_cols) # get pipeline
    fitPipeline.fit(X_train, y_train.values.ravel())                   # fit pipeine

    y_pred = fitPipeline.predict(X_test)                               # score with fitted pipeline

    # Evaluate
    r2 = r2_score(y_test, y_pred)
    mape = mean_absolute_percentage_error(y_test, y_pred)
    rmse = np.sqrt(mean_squared_error(y_test, y_pred))

    print(algorithmname)
    print("R2:", r2)
    print("MAPE:", mape)
    print("RMSE:", rmse)
    print()