# Time Series Forecasting Model

In this notebook, we'll demonstrate an end-to-end data science workflow for a time series forecasting model. In this scenario we will predict the total monthly sales of property in NYC based on the historic sales.

Time series forecasting is a common and important problem in business. The tools and methods in this notebook can be applied to other forecasting tasks, such as capacity planning or price forecasting.

In this tutorial, we will do the following:
1. Load and process the data
2. Understand the data using exploratory data analysis
3. Train a machine learning model using an open sourced software package called Prophet
4. Save the final machine learning model
5. Load the machine learning model for scoring and making predictions

## Prerequisites
- Have a lakehouse added to this notebook. We will be downloading data from a public blob, and storing that in the lakehouse. 

## Introduction 

We will use [NYC Property Sales data](https://www1.nyc.gov/site/finance/about/open-portal.page) range from 2003 to 2015 published by NYC Department of Finance on the [NYC Open Data Portal](https://opendata.cityofnewyork.us/). 

The dataset is a record of every building sold in New York City property market during 13-year period. Please refer to [Glossary of Terms for Property Sales Files](https://www1.nyc.gov/assets/finance/downloads/pdf/07pdf/glossary_rsf071607.pdf) for definition of columns in the spreadsheet. Two example rows are shown below:

|borouge|neighborhood|building_class_category|tax_class|block|lot|eastment|building_class_at_present|address|apartment_number|zip_code|residential_units|commercial_units|total_units|land_square_feet|gross_square_feet|year_built|tax_class_at_time_of_sale|building_class_at_time_of_sale|sale_price|sale_date|
|----|----|----|----|----|----|----|----|----|----|----|----|----|----|----|----|----|----|----|----|----|
|Manhattan|ALPHABET CITY|07  RENTALS - WALKUP APARTMENTS|0.0|384.0|17.0||C4|225 EAST 2ND   STREET||10009.0|10.0|0.0|10.0|2145.0|6670.0|1900.0|2.0|C4|275000.0|2007-06-19|
|Manhattan|ALPHABET CITY|07  RENTALS - WALKUP APARTMENTS|2.0|405.0|12.0||C7|508 EAST 12TH   STREET||10009.0|28.0|2.0|30.0|3872.0|15428.0|1930.0|2.0|C7|7794005.0|2007-05-21|

We will build a model that forcasts the monthly total sales based on historic data. We will use [Facebook Prophet](https://facebook.github.io/prophet/) to do the forecasting. 

Prophet is an open-source forecasting library developed by Facebook. Prophet is useful for analyzing time series data and making predictions based on historical trends and seasonal patterns. It is designed to be robust to outliers and missing values, and to be user friendly. 

Prophet uses a decomposable time series model which consist of three components: **trend**, **seasonality** and **holidays**. 
- For the **trend** part, Prophet assumes piece-wise constant rate of growth with automatic change point selection.
- For **seasonality** part, Prophet models weekly and yearly seasonality using Fourier Series. 
- We will be aggregating our data on a monthly level, and therefore will not be considering **holidays**. 

For more information about the modeling techniques used by Prophet, you can read their paper [here](https://peerj.com/preprints/3190/).

### Install Prophet

Before we get started we'll need to install Prophet.

In [1]:
%pip install prophet

StatementMeta(, , , Finished, )

LivyHttpRequestFailure: [InternalServerError] Internal server error encountered while preparing service authentication tokens. HTTP status code: 500.

## Step 1: Load the Data

#### Download dataset and Upload to Lakehouse

There are 15 csv files containig property sales records from 5 boroughs in New York since 2003 to 2015. For your convenience, these files are compressed in `nyc_property_sales.tar` and is available in a public blob storage.

In [None]:
URL = "https://synapseaisolutionsa.blob.core.windows.net/public/NYC_Property_Sales_Dataset/"
TAR_FILE_NAME = "nyc_property_sales.tar"
DATA_FOLDER = "Files/NYC_Property_Sales_Dataset"
TAR_FILE_PATH = f"/lakehouse/default/{DATA_FOLDER}/tar/"
CSV_FILE_PATH = f"/lakehouse/default/{DATA_FOLDER}/csv/"

EXPERIMENT_NAME = "aisample-timeseries"

StatementMeta(, , , Cancelled, )

**Note:** if you have not added a lakehouse to the notebook, you will be met with an error below. 

If you have added a lakehouse, then we will be downloading the data from the URL specified above, and storing it in the lakehouse. 

In [None]:
import os

if not os.path.exists("/lakehouse/default"):
    # ask user to add a lakehouse if no default lakehouse added to the notebook.
    # a new notebook will not link to any lakehouse by default.
    raise FileNotFoundError(
        "Default lakehouse not found, please add a lakehouse for the notebook."
    )
else:
    # check if the needed files are already in the lakehouse, try to download and unzip if not.
    if not os.path.exists(f"{TAR_FILE_PATH}{TAR_FILE_NAME}"):
        os.makedirs(TAR_FILE_PATH, exist_ok=True)
        os.system(f"wget {URL}{TAR_FILE_NAME} -O {TAR_FILE_PATH}{TAR_FILE_NAME}")

    os.makedirs(CSV_FILE_PATH, exist_ok=True)
    os.system(f"tar -zxvf {TAR_FILE_PATH}{TAR_FILE_NAME} -C {CSV_FILE_PATH}")

StatementMeta(, , , Cancelled, )

In [None]:
# to record the notebook running time
import time

ts = time.time()

StatementMeta(, , , Cancelled, )

In [None]:
# setup mlflow experiment
import mlflow

mlflow.set_experiment(EXPERIMENT_NAME)
mlflow.autolog(disable=True)  # disable mlflow autologging

StatementMeta(, , , Cancelled, )

#### Create Dataframe from Lakehouse

First lets see if the data is what we are expecting. It is recommended to manually go through a subset of your data whenever you are working on a new dataset. This practise will give you a better understanding of your data. Even looking at a few dozen rows can quickly teach you a lot about your data.

The `display` function prints the dataframe. You can also show the "Chart" views to easily visualize a subset of the data

In [None]:
df = (
    spark.read.format("csv")
    .option("header", "true")
    .load("Files/NYC_Property_Sales_Dataset/csv")
)
display(df)

StatementMeta(, , , Cancelled, )

## Step 2: Exploratory Data Analysis

Some of the observations based on manually going through the data include:
- The sale price is sometimes $0. According to the [Glossary of Terms](https://www.nyc.gov/assets/finance/downloads/pdf/07pdf/glossary_rsf071607.pdf), this implies a transfer of ownership without a cash consideration. **We should remove these cases where `sales_price` is 0 from our dataset.**
- There are a few different building classes included in this dataset. For our problem we want to focus only on residential buildings. According to the glossary of terms, this means building classes of type "A". **We should filter our data to only residential buildings.** We have two options for this; `building_class_at_time_of_sale` or `building_class_at_present`. We'll go for the building class at time of sale as it captures what the building was sold as. 
- There are some situations where the `total_units` is 0, or the `gross_square_feet` is 0. **Lets remove all instances where either `total_units` or `gross_square_units` are 0.**
- Some other columns have missing or NULL values, like the `apartment_number`, `tax_class`, `build_class_at_present`. It seems reasonable that this missing data is due to clerical errors or because the data doesn't exist (e.g. there is no apartment number for the unit). None of these missing values are important to our analysis, so we will ignore these missing values.
- The `sale_price` is stored as a string with "$" prepended to it. If we want to do analysis on this, this information is better represented as a number. We could feasibly store this as a float, but the raw data doesn't let us retrieve any information after the decimal. **So lets cast the `sale_price` column to an integer.**

#### Type Conversion and Filtering
Lets take action on some of the tasks identified above. First we need to do some imports from pyspark.

In [None]:
# import libs
import pyspark.sql.functions as F
from pyspark.sql.types import *

StatementMeta(, , , Cancelled, )

First lets cast our sales data from a string to an integer. \
We use regular expressions to split the numeric portion of the string from the dollar sign (i.e. splitting "$" and "300,000", in the string "$300,000"), and then we cast the numeric portion to an integer.

Secondly, lets filter our data to only include situations where all the below conditions are true:
1. the sales price is greater than 0
2. the total_units is greater than 0
3. the gross_square_feet is greater than 0
4. the building class is of type A

In [None]:
df = df.withColumn(
    "sale_price", F.regexp_replace("sale_price", "[$,]", "").cast(IntegerType())
)
df = df.select("*").where(
    'sale_price > 0 and total_units > 0 and gross_square_feet > 0 and building_class_at_time_of_sale like "A%"'
)

StatementMeta(, , , Cancelled, )

#### Aggregating on a monthly basis

The sales of property is tracked on a daily level, but this is too granular for our purpose. We should aggregate our data on a monthly basis. 

First lets change our dates to only show years and months.
Note that the data still contains the year so that we can distinguish the month of December in 2005 from the month of December in 2006.

Also, lets keep just the columns we care about: `sales_price`, `total_units`, `gross_square_feet` and the `sales_date` which we'll rename to `month`.

In [None]:
monthly_sale_df = df.select(
    "sale_price",
    "total_units",
    "gross_square_feet",
    F.date_format("sale_date", "yyyy-MM").alias("month"),
)

display(monthly_sale_df)

StatementMeta(, , , Cancelled, )

Next, lets aggregate the `sale_price`, `total_units` and `gross_square_feet` by month. 

We will group the data by `month`, and sum all values within the group. 

In [None]:
summary_df = (
    monthly_sale_df.groupBy("month")
    .agg(
        F.sum("sale_price").alias("total_sales"),
        F.sum("total_units").alias("units"),
        F.sum("gross_square_feet").alias("square_feet"),
    )
    .orderBy("month")
)

display(summary_df)

StatementMeta(, , , Cancelled, )

### From pyspark dataframes to pandas dataframes
Pyspark dataframes are great when operating with large datasets. Because we have now aggregated the data, the size of the dataframe is much smaller. This means we can now operate with pandas dataframes which you may be more familiar with.

Casting a model from a pyspark dataframe to a pandas dataframe is easy as shown below.

In [None]:
import pandas as pd

df_pandas = summary_df.toPandas()
display(df_pandas)

StatementMeta(, , , Cancelled, )

### Visualization
Now, let's take a look at the trend of property trade trend at NYC. This allows us to quickly get a feel for the patterns and seasonality that might exist. 

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

f, (ax1, ax2) = plt.subplots(2, 1, figsize=(35, 10))
plt.sca(ax1)
plt.xticks(np.arange(0, 15 * 12, step=12))
plt.ticklabel_format(style="plain", axis="y")
sns.lineplot(x="month", y="total_sales", data=df_pandas)
plt.ylabel("Total Sales")
plt.xlabel("Time")
plt.title("Total Property Sales by Month")

plt.sca(ax2)
plt.xticks(np.arange(0, 15 * 12, step=12))
plt.ticklabel_format(style="plain", axis="y")
sns.lineplot(x="month", y="square_feet", data=df_pandas)
plt.ylabel("Total Square Feet")
plt.xlabel("Time")
plt.title("Total Property Square Feet Sold by Month")
plt.show()

StatementMeta(, , , Cancelled, )

Some observations we can make:
- There is a clear recurring pattern on a yearly cadence. I.e. there is **yearly seasonality**. It seems like the summer months see higher sales volumes than the winter months.
- In years with high sales the difference between high sales months and low sales months is greater in absolute terms, than in years where the sales is low. For example, in 2004, the difference between the highest month and the the lowest month is about $900,000,000 - $500,000,000 = $400,000,000. But in 2011 the difference is only $400,000,000 - $300,000,000 = $100,000,000. This becomes important later when we have to decide between **multiplicative** vs. **additive** seasonality effects. 

### Summary of steps so far
- We filtered out non-cash transfers of property, as this would impact our modeling.
- We removed untrustworthy datapoints which showed 0 square footage, or 0 units sold.
- We aggregated the data over months, rather than days.
- We noted a clear yearly seasonality in the data.
- It seems like the seasonality in the data is multiplicative rather than additive.

## Step 3: Model Training and Evaluation

#### Model fitting

We will now be doing model fitting.

The input to Prophet is always a dataframe with two columns; a time axis `ds` and a value axis `y`. The time column should be in a format like YYYY_MM, as it is for us already. The value column must be a numeric.

So to do model fitting, we just need to rename the time axis to `ds` and value axis to `y` and pass the data to Prophet.

For more information on this see [Prophet's documentation](https://facebook.github.io/prophet/docs/quick_start.html#python-api).

In [None]:
df_pandas["ds"] = pd.to_datetime(df_pandas["month"])
df_pandas["y"] = df_pandas["total_sales"]

StatementMeta(, , , Cancelled, )

Now let's fit the model.

Prophet follows the same convention as sklearn. First you create a new instance of Prophet with some parameters (such as seasonality_mode), and then you fit that instance with data.


- We will use **'multiplicative' seasonality**, instead of a constant additive factor which is what Prophet uses by default. This directly came from our analysis in the previous section. Because the amplitude of our seasonality changes, using a simple additive seasonality won't fit the data well.

- We will also turn **weekly_seasonality off**, as we do not have weekly data; we have aggregated the data by month.

- We will use **Markov Chain Monte Carlo (MCMC)**. By default Prophet will be able to provide uncertainty estimates on the trend and observation noise, but does not provide uncertainty estimates for seasonality. Using MCMC takes longer, but does allow the algorithm to provide uncertainty estimates on the seasonality in addition to the trend and observation noise uncertainty estimates. For more info, refer to [Prophets documentation](https://facebook.github.io/prophet/docs/uncertainty_intervals.html).

- Finally, the we will play with the sensitivity of automatic changepoint detection through the **changepoint_prior_scale parameter**. The Prophet algorithm automatically tries to find places in the data where the trajectories abruptly change. Its hard to know what the right value is for this, so we will try a few options and select the best performing model. More information on this parameter in [Prophet's documentation](https://facebook.github.io/prophet/docs/trend_changepoints.html).

In [None]:
from prophet import Prophet
from prophet.plot import add_changepoints_to_plot

models = []

seasonality_mode = "multiplicative"
weekly_seasonality = False
changepoint_priors = [0.01, 0.05, 0.1]
mcmc_samples = 100

for chpt_prior in changepoint_priors:
    m = Prophet(
        seasonality_mode=seasonality_mode, weekly_seasonality=weekly_seasonality, changepoint_prior_scale = chpt_prior, mcmc_samples = mcmc_samples
    )
    m.fit(df_pandas)
    models.append(m)

StatementMeta(, , , Cancelled, )

#### Visualize the model with Prophet
Prophet has built-in visualization functions. Lets use these functions to show the model fitting results. 

The black dots are data points used to train the model. The blue line is the prediction, and the light blue area shows uncertainty intervals.

We have built three models with varying levels of changepoint_prior_scale. The predictions of the three models are shown below:

In [None]:
for idx, m in enumerate(models):
    future = m.make_future_dataframe(periods=12, freq="M")
    forecast = m.predict(future)
    fig = m.plot(forecast)
    fig.suptitle(f"changepoint = {changepoint_priors[idx]}")

StatementMeta(, , , Cancelled, )

#### Visualize trend and seasonality with Prophet

One useful feature of Prophet is that you can easily visualize the underlying trends and seasonalities. Here the light blue area reflects uncertainty.

There appears to be a strong long-period oscillating trend; over the course of a few years the sales volumes rises and falls. 

In [None]:
fig2 = models[1].plot_components(forecast)

StatementMeta(, , , Cancelled, )

#### Cross Validation
Prophet has a built-in cross-validation tool. This can be useful to estimate the forecast error and helps us find the best performing model.

Normally when doing cross-validation, we randomly sample the data and put some in a training set and the rest in a validation test. However, this approach does not work for time-series data. If the model has seen January-2005, and March-2005, and we try to predict February-2005, the model can sort of "cheat" - it has seen where the data is going. In real life you are forecasting into the _future_, into unseen regions. 

The way to make this a fair test is to split the data on dates; you use data up to some date (say the first 11 years of data), and then predict on the remaining, unseen data.

Prophet makes that easy. **Below we start with 11 years of training data**, and then make monthly predictions with 1 year horizon. We compare these **predictions with the real world values** to establish how good our model is at predictions.

So our training data contains everything from 2003-2013. Then our first run is to predict Jan-2014 through Jan-2015. Then we predict Feb-2014 through Feb-2015, and so on. 

We repeat this for every one of our three models so we can compare which model performed the best.

In [None]:
from prophet.diagnostics import cross_validation
from prophet.diagnostics import performance_metrics

df_metrics = []
for m in models:
    df_cv = cross_validation(m, initial="11 Y", period="30 days", horizon="365 days")
    df_p = performance_metrics(df_cv, monthly=True)
    df_metrics.append(df_p)

StatementMeta(, , , Cancelled, )

We can see the various metrics, like MSE, for each of the models by displaying them as follows. Note the varying horizons: we predict one year in the future 12 times. 

In [None]:
display(df_metrics[0])

StatementMeta(, , , Cancelled, )

## Step 4: Register final ML Model

We should log these models so that we can remember what parameters we have tried, and we should save the models for later use. 

#### Log Model with MLFlow
Now we can save the trained models for later use. Here we use mlflow to log metrics/models.

In [None]:
# setup mlflow
from mlflow.models.signature import infer_signature

signature = infer_signature(future, forecast)

StatementMeta(, , , Cancelled, )

In [None]:
# log model, metrics and params
model_name = f"{EXPERIMENT_NAME}-prophet"

for idx, m in enumerate(models):
    with mlflow.start_run() as run:
        mlflow.prophet.log_model(
            m, model_name, registered_model_name=model_name, signature=signature
        )
        mlflow.log_params(
            {"seasonality_mode":  seasonality_mode,
             "mcmc_samples": mcmc_samples,
             "weekly_seasonality": weekly_seasonality,
             "changepoint_prior": changepoint_priors[idx]}
        )
        metrics = df_metrics[idx].mean().to_dict()
        metrics.pop("horizon")
        mlflow.log_metrics(metrics)

StatementMeta(, , , Cancelled, )

## Step 5: Save prediction results
Lastly, we'll deploy the model and save the prediction results.

#### Prediction with *Predict* Transformer

We can now load the model and use it to make predictions. You can learn more about ```PREDICT``` and how to use it within Fabric [here](https://aka.ms/fabric-predict).

In [None]:
from synapse.ml.predict import MLFlowTransformer

spark.conf.set("spark.synapse.ml.predict.enabled", "true")

model = MLFlowTransformer(
    inputCols=future.columns.values,
    outputCol="prediction",
    modelName=f"{EXPERIMENT_NAME}-prophet",
    modelVersion=1,
)

test_spark = spark.createDataFrame(data=future, schema=future.columns.to_list())

batch_predictions = model.transform(test_spark)

display(batch_predictions)

StatementMeta(, , , Waiting, )

In [None]:
# code for saving predictions into lakehouse
batch_predictions.write.format("delta").mode("overwrite").save(
    f"{DATA_FOLDER}/predictions/batch_predictions"
)

StatementMeta(, , , Waiting, )

In [None]:
print(f"Full run cost {int(time.time() - ts)} seconds.")

StatementMeta(, , , Waiting, )