# Near real-time scoring of streaming transactions using the Cosmos DB Change Feed and Spark Structured Streaming

In this notebook, you will load the batch scoring model from your Azure Machine Learning workspace and use the Cosmos DB Linked Service to connect to the [Azure Cosmos DB Change Feed](https://docs.microsoft.com/en-us/azure/cosmos-db/change-feed). Using the Cosmos DB Change Feed and [Spark Structured Streaming](https://docs.azuredatabricks.net/spark/latest/structured-streaming/index.html), you will read the data being ingested into the Cosmos DB `transactions` container and use your trained machine learning model to predict whether each transaction is suspicious, in near-real-time, as it streams in. You will then write transaction data scored as `isSuspicious` and save that to a `suspicious_transactions` Cosmos DB container.

You will also select all of the transactions where `isSuspicious` is true, and write those out to your transactions collection in Azure Comsos DB.


## Start the TransactionGenerator

With the streaming query now created, it is time to restart the transaction generator. In Exercise 1 of the hands-on lab step-by-step guide, you used the `TransactionGenerator` console app on your LabVM to compare data ingestion between Event Hubs and Cosmos DB. You will now make a configuration change in the console app's `appsettings.js` file to send transactions only to Cosmos DB. These transactions will be used to enable reading from the Cosmos DB Change Feed as new transactions are inserted into the Comsos DB collection.

1. Return to your LabVM and the TransactionGenerator console app.
2. Open the `appSettings.js` file and locate the `ONLY_WRITE_TO_COSMOS_DB` setting.
3. Change the value of `ONLY_WRITE_TO_COSMOS_DB` to `true`.
4. Save `appSettings.js`.
5. Start the console app by selecting the green **Run** button in the Visual Studio toolbar.
6. Once the console app is running, return to this notebook and continue on with the next cell.


## Load ML model

In the last exercise, you saved your trained model to your Azure ML workspace. To perform batch scoring, the first thing you need to do is load the model from your Azure ML workspace. In the cell below, you define some helper functions for retrieving your Azure ML workspace and loading the model stored there.


In [None]:
# Import the required libraries
import numpy
import os
import pandas as pd
import pickle
import azureml
from azureml.core import Workspace
from azureml.core.model import Model
from sklearn.externals import joblib

def getOrCreateWorkspace(subscription_id, resource_group, workspace_name, workspace_region):
    # By using the exist_ok param, if the worskpace already exists we get a reference to the existing workspace instead of an error
    ws = Workspace.create(
        name = workspace_name,
        subscription_id = subscription_id,
        resource_group = resource_group, 
        location = workspace_region,
        exist_ok = True)
    return ws

def loadModelFromAML(ws, model_name="batch-score"):
  # download the model folder from AML to the current working directory
  model_file_path = Model.get_model_path(model_name, _workspace=ws)
  print('Loading model from:', model_file_path)
  model = joblib.load(model_file_path)
  return model

Execute the cell below to load your model. You will need to respond to the prompt in the output, navigating to <https://microsoft.com/devicelogin> and then entering the code specified below to authenticate.

**Enter the same values** you copied from the **Prepare batch scoring model** Azure ML notebook.


In [None]:
#Provide the Subscription ID of your existing Azure subscription
subscription_id = "" #"YOUR_SUBSCRIPTION_ID"

#Provide values for the Resource Group and Workspace that will be created
resource_group = "" #"YOUR_RESOURCE_GROUP"
workspace_name = "" #"YOUR_AML_WORKSPACE_NAME"
workspace_region = "" # eastus, westcentralus, southeastasia, australiaeast, westeurope

#Get an AML Workspace
ws =  getOrCreateWorkspace(subscription_id, resource_group, 
                   workspace_name, workspace_region)

model = loadModelFromAML(ws)

## Save ML model to file system

In the next task, you will be creating a Databricks Job to run batch scoring on a schedule. To facilitate the use of your model by this scheduled job, it is easiest to save a copy of the model into a shared folder in DBFS within your Databricks workspace. Databricks spins up a new cluster every time a schduled job runs, so this will prevent your job from needing to authenticate against your AML workspace each time that happens.


In [None]:
model_name="batch-score"

# Save the model for future use
filename = model_name + '.pkl'
pickle.dump(model, open(filename, 'wb'))

In [None]:
# load the model 
anomaly_model = pickle.load(open(filename, 'rb'))

## Batch score transactions

With the model now loaded, the next step is to create a DataFrame containing the transactions loaded from the Azure Cosmos DB analytical store, and score each of those records using the model. As you did in the previous exercise, you will need transform that data in the `transactions` table for use by your model. Encode the transformations into custom transformers for use in a pipeline as follows:


In [None]:
from sklearn.base import BaseEstimator, TransformerMixin
class NumericCleaner(BaseEstimator, TransformerMixin):
    def __init__(self):
        self = self
    def fit(self, X, y=None):
        print("NumericCleaner.fit called")
        return self
    def transform(self, X):
        print("NumericCleaner.transform called")
        X["localHour"] = X["localHour"].fillna(-99)
        X["accountAge"] = X["accountAge"].fillna(-1)
        X["numPaymentRejects1dPerUser"] = X["numPaymentRejects1dPerUser"].fillna(-1)
        X.loc[X.loc[:,"localHour"] == -1, "localHour"] = -99
        return X

class CategoricalCleaner(BaseEstimator, TransformerMixin):
    def __init__(self):
        self = self
    def fit(self, X, y=None):
        print("CategoricalCleaner.fit called")
        return self
    def transform(self, X):
        print("CategoricalCleaner.transform called")
        X = X.fillna(value={"cardType":"U","cvvVerifyResult": "N"})
        X['isUserRegistered'] = X.apply(lambda row: 1 if row["isUserRegistered"] == "TRUE" else 0, axis=1)
        return X

In [None]:
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OrdinalEncoder

numeric_features=["transactionAmountUSD", "localHour", 
                  "transactionIPaddress", "digitalItemCount", "physicalItemCount", "accountAge",
                  "paymentInstrumentAgeInAccount", "numPaymentRejects1dPerUser"
                 ]

categorical_features=["transactionCurrencyCode", "browserLanguage", "paymentInstrumentType", "cardType", "cvvVerifyResult",
                      "isUserRegistered"
                     ]                           

numeric_transformer = Pipeline(steps=[
    ('cleaner', NumericCleaner())
])
                               
categorical_transformer = Pipeline(steps=[
    ('cleaner', CategoricalCleaner()),
    ('encoder', OrdinalEncoder())])

preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ])

Now, load the batch `transaction` data into a Spark DataFrame, covert that to a Pandas DataFrame, and then pass that data through the transformation pipeline.


## Stream transactions from Cosmos DB Change Feed into Azure Databricks

Change feed support in Azure Cosmos DB works by listening to an Azure Cosmos DB container for any changes. It then outputs the sorted list of documents that were changed in the order in which they were modified. The changes are persisted, can be processed asynchronously and incrementally, and the output can be distributed across one or more consumers for parallel processing.

In the cell below, a configuration object is created containing the required information for connecting to the Cosmos DB Change Feed. 

That configuration is then used in conjunction with the `readStream` command for Structured Streaming to stream records from the Cosmos DB Change Feed into a DataFrame.

> **NOTE**: The `format` property used below is different when reading streaming data from Cosmos DB than we reading batch or static data. In the case of streaming data, you use `cosmos.oldp`.


### Create scoring and saving functions

To leverage the model for scoring microbatches, you need to define a function with the scoring logic that will write the scored results to your desired destination. 

In this case, you will write the scored results out to a different Azure Cosmos DB container: **`suspicious_transactions`**. This is a different container than the `transactions` container from which we are reading from the change feed. We store only those transactions that have been predicted as suspicious to the **`suspicious_transactions`** container, which meets our criteria for globally serving the data to customers.

Run the following cells that will create the helper functions for scoring and saving.


In [None]:
from pyspark.sql.functions import *

def foreach_batch_scorer(df, epoch_id):
    # Transform and write batchDF
    scored_df = score_batch(df)
    write_scored_results(scored_df)
    pass

def score_batch(df):
    # Remove unneeded columns
    cols = list(set(df.columns) - {'_attachments','_etag','_rid','_self','_ts','collectionType','id','ttl'})
    changes_clean = df.select(cols)
    # Get a Pandas DataFrame from the Spark DataFrame
    pandas_df = changes_clean.toPandas()
    # Transform the batch data
    preprocessed_transactions = preprocessor.fit_transform(pandas_df)
    transactions_preds = model.predict(preprocessed_transactions)
    #pandas_df["isSuspicious"] = transactions_preds.tolist()
    pandas_df["isSuspicious"] = transactions_preds
    pandas_df["collectionType"] = 'SuspiciousTransactions'
    scored_transactions = spark.createDataFrame(pandas_df)
    scored_transactions = (scored_transactions
        .filter( col("isSuspicious") == True))
    return scored_transactions

def write_scored_results(scored_df):
    (scored_df
        .write
        .format("cosmos.oltp")
        .option("spark.synapse.linkedService", "WoodgroveCosmosDb")
        .option("spark.cosmos.container", "suspicious_transactions")
        .option("spark.cosmos.write.upsertEnabled", "true")
        .mode('append')
        .save()
    )

## Read streaming data from the change feed

Let's quickly examine the command's components:

  - `writeStream`: This option is used when writing streaming data to an output location.
  - `format("delta")`: This informs the writer that it is writing the incoming streamed data into Delta format.
  - `outputMode("append")`: You want to add the incoming data to the existing Delta table, so you set this to "append".
  - `option("checkpointLocation")`: By specifying a checkpoint location, you ensure that any interruption to the writeStream operation can be continued where it left off.
  - `table("transactions")`: The table option allows you to specify the target table of the write operation.


In [None]:
dfStream = spark.readStream\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "WoodgroveCosmosDb")\
    .option("spark.cosmos.container", "transactions")\
    .option("spark.cosmos.changeFeed.readEnabled", "true")\
    .option("spark.cosmos.changeFeed.startFromTheBeginning", "true")\
    .option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder")\
    .option("spark.cosmos.changeFeed.queryName", "streamQuery")\
    .load()

DataFrames have an `isStreaming` property, which you can use to verify the `dfStream` DataFrame created above is a streaming DataFrame.


In [None]:
dfStream.isStreaming

Run the following cell to start the streaming query. Notice the use of `foreachBatch` as sink (the destination for the data).

In [None]:
streaming_query = dfStream.writeStream.foreachBatch(foreach_batch_scorer).start() 

## Stop the streaming query

Allow the TransactionGenerator to run until completion, and then execute the cell below to stop the streaming query.

In [None]:
streaming_query.stop()