In [None]:
import os
import sys
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import lightgbm as lgb
import joblib
from azureml.core import Workspace, Datastore, Dataset, Experiment, Environment
from azureml.core.model import Model, InferenceConfig
from azureml.core.webservice import AksWebservice, Webservice
from azureml.core.compute import AksCompute
from dateutil.relativedelta import relativedelta
from datetime import datetime, timedelta

In [None]:
CODE_PATH = '../code'
DATA_PATH = '../sample_data'
DOWNLOAD_PATH = '../download'
AML_UTIL_PATH = '../ml_service'
sys.path.append(os.path.join(os.getcwd(), CODE_PATH))
sys.path.append(os.path.join(os.getcwd(), AML_UTIL_PATH))
import utils
import consts
import train
import aml_utils as amlutils

In [None]:
# This is only needed when we run in a Jupyter notebook and the external files are changed
import importlib
importlib.reload(utils)
importlib.reload(consts)
importlib.reload(train)
importlib.reload(amlutils)

In [None]:
ws = Workspace.from_config()
experiment = Experiment(ws, consts.experiment_name)
compute_target_name = os.environ['AML_COMPUTE']
inference_target_name = os.environ['AML_INFERENCE_COMPUTE']

## Read raw data and prepare for training

Read the raw data in the sample_data_folder, or get sample data from Azure Open DataSet as shown below

In [None]:
from azureml.opendatasets import NycTlcGreen

dfjan = pd.DataFrame([])
start = datetime.strptime("1/1/2015","%m/%d/%Y")
end = datetime.strptime("1/31/2015","%m/%d/%Y")

for sample_month in range(1):
    temp_df_green = NycTlcGreen(start + relativedelta(months=sample_month), end + relativedelta(months=sample_month)) \
        .to_pandas_dataframe()
    dfjan = dfjan.append(temp_df_green.sample(1000))

dfjan.to_csv(os.path.join(DATA_PATH, 'raw/201501.csv'))    
dfjan.head(5)

In [None]:
raw_data_folder = os.path.join(DATA_PATH, 'raw')
dfraw = utils.read_raw_data(raw_data_folder)
dfraw.head(5)

In [None]:
train_data_folder = os.path.join(DATA_PATH, 'train')
dftrain = utils.process_raw_data(dfraw)
dftrain.head(5)

In [None]:
dftrain.duration.describe()

In [None]:
# Create AML datastore if not already exists
datastore_name = os.environ['AML_DATASTORE']
try:
    datastore = Datastore.get(ws, datastore_name=datastore_name)
    print('datastore {} exists'.format(datastore_name))
except Exception:
    print('create datastore {}'.format(datastore_name))
    container_name = os.environ["BLOB_CONTAINER"]
    account_name = os.environ["BLOB_ACCOUNTNAME"]
    account_key = os.environ["BLOB_ACCOUNT_KEY"]

    datastore = Datastore.register_azure_blob_container(
        workspace=ws, 
        datastore_name=datastore_name, 
        container_name=container_name, 
        account_name=account_name,
        account_key=account_key)

### Upload prepared data so that it can be accessed when training remotely

In [None]:
utils.write_train_data(dftrain, train_data_folder, '201501.csv')
datastore.upload_files(files=[os.path.join(train_data_folder, '201501.csv')], target_path='train', overwrite=True)

## Train locally, only use Azure ML for logging and uploading model file

In [None]:
local_run = experiment.start_logging()

x_train, x_test, y_train, y_test = train.split_data(dftrain)
model, rmse, mape = train.train_model(x_train, x_test, y_train, y_test)

local_run.log('rmse', rmse)
local_run.log('mape', mape)
print("rmse:{0}, mape:{1}".format(rmse, mape))

In [None]:
y_predict = model.predict(x_test)
actual_vs_predicted = y_test.to_frame()
actual_vs_predicted['predicted'] = y_predict
actual_vs_predicted.sort_index().plot(figsize=(20, 5), rot=45)

In [None]:
os.makedirs('outputs', exist_ok=True)
model_file = os.path.join('outputs', consts.model_name)
print('writing model file to {}'.format(model_file))
joblib.dump(value=model, filename=model_file)
local_run.upload_file(name=consts.model_name, path_or_stream=model_file)

In [None]:
local_run.complete()

## Configure training environment and dataset to make training repeatable locally and remotely
* Package dependencies for training are defined in conda dependency file train_env.yml.
* Training data needs to be stored in blob storage for both local and remote compute can access it. The dataset doesn't have to be registered in order to mount or download to the compute resource.
* Compared to the above way of training locally, the following approach unifies local and remote training with the same code. However, it doesn't seem to work on a Windows local machine. Use a Notebook VM instead for local training.

In [None]:
from azureml.core.runconfig import DEFAULT_CPU_IMAGE

# if an existing env of specified name exists, we'll use it, otherwise create a new one
use_existing_env = False
# if a new env needs to be created, enable docker.  Azure ML can use either docker for training, or just conda environments
enable_docker = False
# if a new env is created, also register it
register_new_env = False

if use_existing_env:
    found_existing_env = False
    try:
        training_env = Environment.get(ws, name=consts.train_environment_name)
        found_existing_env = True
        print('found existing env {}'.format(training_env.name))
    except Exception:
        print('didnot find existing env {}'.format(consts.train_environment_name))

# if we don't want to use existing env, or we didn't find existing env, create a new one
if (use_existing_env and found_existing_env):
    if training_env.docker.enabled != enable_docker:
        print('existing env has different docker settings than what you specified!')
else:
    print('create new env {}'.format(consts.train_environment_name))
    training_env = Environment.from_conda_specification(name = consts.train_environment_name,
                                                        file_path = os.path.join(CODE_PATH, 'train_env.yml'))
    if enable_docker:
        training_env.docker.enabled = True
        training_env.docker.base_image = DEFAULT_CPU_IMAGE
        training_env.python.user_managed_dependencies = False
    else:
        training_env.python.user_managed_dependencies = True        
    if register_new_env:
        training_env.register(ws)

If you train with ScriptRun, you need to create a training dataset so that it can be
* downloaded for local training without Docker
* mounted for local training with Docker or remote training

It can be in-memory, doesn't have to be registered. TabularDataset has a method to_pandas_dataframes(), but it doesn't provide the same flexibility as pandas dataframe read_csv(). 


In [None]:
file_train_dataset = Dataset.File.from_files(path=(datastore, 'train'))

If you train with Estimator, you can directly mount or download from datastore as shown below

In [None]:
train_local = True

if train_local:
    compute_target = 'local'
    if enable_docker:
        data_folder = datastore.path('train').as_mount()
    else:
        data_folder = datastore.path('train').as_download(DOWNLOAD_PATH)
else:
    compute_target = ws.compute_targets[compute_target_name]
    data_folder = datastore.path('train').as_mount()

In [None]:
from azureml.train.estimator import Estimator

est = Estimator(
        source_directory=CODE_PATH,
        entry_script='train.py',
        script_params={'--data_folder': data_folder},
        compute_target=compute_target,
        environment_definition=training_env)

run = experiment.submit(config=est)
run.wait_for_completion(show_output=True)

In [None]:
run.get_metrics()

## Register the model
This is where we need to register a TabularDataset with the model in order to do data drift detection later. Create a TabularDataset pointing to the data used for training, and has the same schema as scoring input (which doesn't have dataframe index column!).

In [None]:
tabular_train_dataset=Dataset.Tabular.from_delimited_files(path=[(datastore, 'train')])

In [None]:
model = run.register_model(
    model_path=os.path.join('outputs', consts.model_name),
    model_name=consts.model_name,
    description='Lightgbm model for predicting taxi trip duration',
    datasets=[(Dataset.Scenario.TRAINING, tabular_train_dataset)])

### download the model, make some predictions, and plot feature importance

In [None]:
#model = Model(ws, consts.model_name)
#model.download(target_dir=DOWNLOAD_PATH)
gbm = joblib.load(os.path.join(DOWNLOAD_PATH, consts.model_name))

In [None]:
# input is an array of datapoints, each has an array of features
input_sample = np.array([[1,1,1.00,-73.957909,40.670761,-73.952194,40.662312,8.15,1,17,5,1]]) 
# output is an array of predictions
output_sample = gbm.predict(input_sample)
output_sample

In [None]:
lgb.plot_importance(gbm)

## Deploy the model as a web service

In [None]:
inference_env = Environment.from_conda_specification(
    name = consts.inference_environment_name,
    file_path = os.path.join(CODE_PATH, 'inference_env.yml'))
inference_config = InferenceConfig(source_directory = CODE_PATH,
                                   entry_script = 'score.py',
                                   environment = inference_env)

In [None]:
aks_target = AksCompute(ws, inference_target_name)
deployment_config = AksWebservice.deploy_configuration(
    cpu_cores = 1, memory_gb = 2, collect_model_data=True, enable_app_insights=True)

try: 
    service = Webservice(ws, consts.service_name)
    print("Service {} exists, update it".format(consts.service_name))
    service.update(models=[model], inference_config=inference_config)
except:
    print('deploy a new service {}'.format(consts.service_name))
    service = Model.deploy(ws, consts.service_name, [model], inference_config, deployment_config, aks_target)
    service.wait_for_deployment(show_output = True)
    print(service.state)
    print(service.get_logs())

print(service.scoring_uri)

### Test against the deployed service

In [None]:
import requests
import json

headers = {'Content-Type': 'application/json'}

if service.auth_enabled:
    headers['Authorization'] = 'Bearer '+service.get_keys()[0]
elif service.token_auth_enabled:
    headers['Authorization'] = 'Bearer '+service.get_token()[0]

print(headers)

test_sample = json.dumps({'data': [
    [1,1,1.00,-73.957909,40.670761,-73.952194,40.662312,8.15,1,17,5,1]
]})
#test_sample = json.dumps({'data': score_df.values.tolist()})

response = requests.post(service.scoring_uri, data=test_sample, headers=headers)
print(response.status_code)
print(response.json())

## Data drift
* Create a DataDriftDetector
* Inference on a dataset different from that was used for training
* Run detection and see the distance between the training and inference datasets

In [None]:
alert_emails = [os.environ['ALERT_EMAIL']]

#model = Model(ws, consts.model_name)
feature_list = list(tabular_train_dataset.to_pandas_dataframe().drop(columns=['duration']).columns)
monitor = amlutils.create_data_drift_detector_for_model(
        ws, model, consts.service_name, compute_target_name,
        feature_list, alert_emails, 0.1)

monitor

### Training was done on Jan 2015 data, influence on July 2015 data to see if there's data drift

In [None]:
from azureml.opendatasets import NycTlcGreen

dfjuly = pd.DataFrame([])
start = datetime.strptime("7/1/2015","%m/%d/%Y")
end = datetime.strptime("7/31/2015","%m/%d/%Y")

for sample_month in range(1):
    temp_df_green = NycTlcGreen(start + relativedelta(months=sample_month), end + relativedelta(months=sample_month)) \
        .to_pandas_dataframe()
    dfjuly = dfjuly.append(temp_df_green.sample(1000))

dfjuly.to_csv(os.path.join(DATA_PATH, 'score/201507.csv'))    
dfjuly.head(5)

### Raw inference data must be processed the same way as training data was processed

In [None]:
dfjuly = utils.read_raw_data(os.path.join(DATA_PATH, 'score'))
df_score = utils.process_raw_data(dfjuly)
x_df_score = df_score.drop(columns = 'duration')
x_df_score.head(5)

### Inference on the new dataset

In [None]:
import json

service = Webservice(ws, consts.service_name)
data = json.dumps({'data': x_df_score.values.tolist()})

data_encoded = bytes(data, encoding='utf8')
prediction = service.run(input_data=data_encoded)
print(prediction)

{ Wait 10 minutes or so for the inference data to be collected to Azure blob storage. Check the _modeldata_ container of the storage account associated with your Azure ML workspace to ensure inference data has been collected. }
### Run data drift detection

In [None]:
now = datetime.utcnow()
drift_run = monitor.run(now, [consts.service_name], feature_list=feature_list, compute_target=compute_target_name)

In [None]:
# takes a while for the run to complete and data to show up.
child_run = list(drift_run.get_children())[0]
child_run.get_metrics()