# Azure Machine Learning - Dataset Drift Detection Demo

Sample notebook showcasing how to programmatically configure an Azure ML dataset monitor to identify when data has drifted materially. Here, we identify changes in distributions of training & scoring data, and can use these signals to kick off downstream ML model retraining activities to ensure continued model performance. In our example, we train a model for predicting taxi fare using the New York Taxi Cab Green Dataset (from Azure ML's Open Datasets), and store batched results into an Azure ML-linked datastore. Here, we configure our dataset monitor to run weekly to identify drift and can utilize detection to initiate downstream retraining. 

In terms of using this approach in a MLOps workflow, upon retraining a new model, dataset monitors can be updated so the baseline is inclusive of all training data, and the target dataset contains all inputs/outputs captured moving forward.

### Import required packages

In [None]:
from azureml.core import Workspace, Dataset
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.opendatasets import NycTlcGreen
from datetime import datetime
import os
import json
import pandas as pd
import mlflow
from mlflow import set_tracking_uri

from sklearn.compose import ColumnTransformer  
from sklearn.pipeline import Pipeline  
from sklearn.impute import SimpleImputer  
from sklearn.preprocessing import StandardScaler, OneHotEncoder  
from sklearn.ensemble import RandomForestRegressor  
from sklearn.model_selection import train_test_split 

### Establish connection to Azure ML workspace

In [None]:
ws = Workspace.from_config()
datastore = ws.get_default_datastore()
set_tracking_uri(ws.get_mlflow_tracking_uri())

### Create compute cluster for drift analysis

In [None]:
# Choose a name for the CPU cluster
cpu_cluster_name = "cpu-cluster"

# Define the configuration for the CPU cluster
cpu_config = AmlCompute.provisioning_configuration(vm_size="STANDARD_D2_V2",
                                                   min_nodes=0,
                                                   max_nodes=4,
                                                   idle_seconds_before_scaledown=2400)

# Create the CPU cluster
cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, cpu_config)

# Monitor the creation process
cpu_cluster.wait_for_completion(show_output=True)

### Retrieve New York City Green Taxi Dataset from Azure ML Open Datasets

Select a subset of datapoints and register raw dataset

In [None]:
sample_dataset = NycTlcGreen.get_tabular_dataset().take(10000000)
df = sample_dataset.to_pandas_dataframe()

registered_dataset = Dataset.Tabular.register_pandas_dataframe(
    dataframe=df,
    name='taxi_data_raw',
    description='Sample Data from the NYC Green Taxis Dataset',
    target=datastore
)

### Downselect to target columns and register separate training & evaluation subsets

In [None]:
selected_columns = ['vendorID', 'lpepPickupDatetime', 'lpepDropoffDatetime', 'passengerCount', 'tripDistance', 'pickupLongitude', 'pickupLatitude', 'dropoffLongitude', 'dropoffLatitude', 'rateCodeID', 'storeAndFwdFlag', 'paymentType', 'fareAmount']

df = df[selected_columns]
df = df.dropna()

df['year'] = df['lpepPickupDatetime'].dt.year
df['month'] = df['lpepPickupDatetime'].dt.month
df['day'] = df['lpepPickupDatetime'].dt.day
df['hour'] = df['lpepPickupDatetime'].dt.hour
df['minute'] = df['lpepPickupDatetime'].dt.minute

training_df = df[df['month'] <=3]
evaluation_df = df[df['month']>3]

Dataset.Tabular.register_pandas_dataframe(
    dataframe=training_df,
    name='taxi_training_data',
    description='Training Data from the NYC Green Taxis Dataset',
    target=datastore
)

Dataset.Tabular.register_pandas_dataframe(
    dataframe=evaluation_df,
    name='taxi_evaluation_data',
    description='Evaluation Data from the NYC Green Taxis Dataset',
    target=datastore
)

### Train and register new Mlflow model

In [None]:
taxi_dataset = Dataset.get_by_name(ws, 'taxi_training_data')
df = taxi_dataset.to_pandas_dataframe()

mlflow.autolog(log_input_examples=True, log_model_signatures=True)

experiment_name = 'Taxi_Fare_Prediction_Experiment'
run_name = 'Random_Forest_Regressor_Trial'

mlflow.set_experiment(experiment_name)

run_id = None

with mlflow.start_run(run_name=run_name) as run:

    # Drop any Datetime columns (try/except)
    try:
        datetime_cols = [x for x in df.columns.values if 'datetime' in x.lower()]
        df.sort_values(by=datetime_cols[0], ascending=True, inplace=True)
        training_df = df.sample(100000)
        dates = training_df[datetime_cols[0]]
        training_df = training_df.drop(columns=datetime_cols)
    except Exception as e:
        pass

    drop_cols = ['month', 'year']

    # Select column types
    numeric_features = training_df.drop(columns=['fareAmount']).select_dtypes(include=['int64', 'float64', 'int32']).columns  
    categorical_features = training_df.select_dtypes(include=['object']).columns  

    # # Define preprocessing for numeric columns (scale them)  
    numeric_transformer = Pipeline(steps=[  
        ('imputer', SimpleImputer(strategy='median')),  
        ('scaler', StandardScaler())])  

    # # Define preprocessing for categorical features (encode them)  
    categorical_transformer = Pipeline(steps=[  
        ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),  
        ('onehot', OneHotEncoder(handle_unknown='ignore'))])  

    # Combine preprocessing steps  
    preprocessor = ColumnTransformer(  
        transformers=[  
           ('num', numeric_transformer, numeric_features), 
            ('cat', categorical_transformer, categorical_features)
    ]) 

    # Create preprocessing and training pipeline  
    pipeline = Pipeline(steps=[('preprocessor', preprocessor),  
                               ('regressor', RandomForestRegressor())
                              ])  


    # Load your data  
    X = training_df.drop('fareAmount', axis=1)  
    y = training_df['fareAmount']  

    # Split your data into train and test datasets  
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)  

    # Train model  
    pipeline.fit(X_train, y_train) 
    
    run_id = run.info.run_id
    
model_uri = f'runs:/{run_id}/model'
model_name = 'taxi-fare-prediction-model'
registered_model = mlflow.register_model(model_uri, model_name)

### Score all training data and upload scored dataset to Azure ML-linked datastore

In [None]:
# Predict on test data  
y_pred = pipeline.predict(training_df)  
y_pred

training_df['Predicted_Fare'] = y_pred
training_df['date'] = dates
training_df

print(max(training_df['date']))
max_date = max(training_df['date']).strftime("%Y-%m-%d")

# save csv, upload and create a dataset
filename = './scored-taxi-data-' + max_date + '.csv'
training_df.to_csv(filename, index=False)       
datastore.upload_files(files=[filename], target_path="scored-taxi-data/", overwrite=True)

### Score all evaluation data and upload scored data to Azure ML-linked datastore

In [None]:
datetime_cols = [x for x in df.columns.values if 'datetime' in x.lower()]
dates = evaluation_df[datetime_cols[0]]

try:
    evaluation_df.sort_values(by=datetime_cols[0], ascending=True, inplace=True)
    evaluation_df = evaluation_df.sample(250000)
    dates = evaluation_df[datetime_cols[0]]
    evaluation_df = evaluation_df.drop(columns=datetime_cols)
except Exception as e:
    pass

preds = pipeline.predict(evaluation_df)

evaluation_df['Predicted_Fare'] = preds
evaluation_df['date'] = dates
evaluation_df

print(max(evaluation_df['date']))
max_date = max(evaluation_df['date']).strftime("%Y-%m-%d")

# save csv, upload and create a dataset
filename = './scored-taxi-data-' + max_date + '.csv'
evaluation_df.to_csv(filename, index=False)       
datastore.upload_files(files=[filename], target_path="scored-taxi-data/", overwrite=True)

### Create dataset from CSV files in AML datastore

In [None]:
csv_paths = [(datastore, 'scored-taxi-data/*.csv')] # use wildcard to match all csv files in the folder

# create the Tabular dataset with 'state' and 'date' as virtual columns
dset = Dataset.Tabular.from_delimited_files(path=csv_paths)

# assign the timestamp attribute to a real or virtual column in the dataset
dset = dset.with_timestamp_columns('date')

# register the dataset as the target dataset
dset = dset.register(ws, 'scored_taxi_data', create_new_version=True)

### Configure and run Azure ML dataset monitor

In [None]:
from azureml.core import Workspace, Dataset
from azureml.datadrift import DataDriftDetector
from datetime import datetime

# get the target dataset
target = Dataset.get_by_name(ws, 'scored_taxi_data')

# set the baseline dataset
baseline = target.time_before(datetime(2014, 4, 1))

# set up feature list

# set up data drift detector
monitor = DataDriftDetector.create_from_datasets(ws, 'taxi-data-monitor', baseline, target,
                                                      compute_target='cpu-cluster',
                                                      frequency='Week',
                                                      feature_list=None,
                                                      drift_threshold=.6,
                                                      latency=0)

backfill1 = monitor.backfill(datetime(2014, 4, 1), datetime(2014, 10, 1))

# update data drift detector
monitor = monitor.update(feature_list=['passenger_count', 'trip_distance', 'pickupLongitude', 'pickupLatitude', 'dropoffLongitude', 'dropoffLatitude', 'Predicted_Fare'])

monitor.enable_schedule()

monitor.run(datetime(2014, 12, 31))