# Batch scoring of transactions in the analytical store

In this notebook, you will retrieve transaction data from the Cosmos DB analytical store and use your trained machine learning model to predict whether each transaction is suspicious. You will then use the scored data to perform a data aggregation to determine the percentage of traffic for each `ipCountryCode` that is suspicious, and save it to the `suspicious_transactions` Cosmos DB container to be used for reporting later on.

## Load ML model

In the last exercise, you saved your trained model to your Azure ML workspace. To perform batch scoring, the first thing you need to do is load the model from your Azure ML workspace. In the cell below, you define some helper functions for retrieving your Azure ML workspace and loading the model stored there.


In [None]:
# Import the required libraries
import numpy
import os
import pandas as pd
import pickle
import azureml
from azureml.core import Workspace
from azureml.core.model import Model
from sklearn.externals import joblib

def getOrCreateWorkspace(subscription_id, resource_group, workspace_name, workspace_region):
    # By using the exist_ok param, if the worskpace already exists we get a reference to the existing workspace instead of an error
    ws = Workspace.create(
        name = workspace_name,
        subscription_id = subscription_id,
        resource_group = resource_group, 
        location = workspace_region,
        exist_ok = True)
    return ws

def loadModelFromAML(ws, model_name="batch-score"):
  # download the model folder from AML to the current working directory
  model_file_path = Model.get_model_path(model_name, _workspace=ws)
  print('Loading model from:', model_file_path)
  model = joblib.load(model_file_path)
  return model

Execute the cell below to load your model. You will need to respond to the prompt in the output, navigating to <https://microsoft.com/devicelogin> and then entering the code specified below to authenticate.

**Enter the same values** you copied from the **Prepare batch scoring model** Azure ML notebook.

In [None]:
#Provide the Subscription ID of your existing Azure subscription
subscription_id = "" #"YOUR_SUBSCRIPTION_ID"

#Provide values for the Resource Group and Workspace that will be created
resource_group = "" #"YOUR_RESOURCE_GROUP"
workspace_name = "" #"YOUR_AML_WORKSPACE_NAME"
workspace_region = "" # eastus, westcentralus, southeastasia, australiaeast, westeurope

#Get an AML Workspace
ws =  getOrCreateWorkspace(subscription_id, resource_group, 
                   workspace_name, workspace_region)

model = loadModelFromAML(ws)

## Save ML model to file system


In [None]:
model_name="batch-score"

# Save the model for future use
filename = model_name + '.pkl'
pickle.dump(model, open(filename, 'wb'))

In [None]:
# load the model 
anomaly_model = pickle.load(open(filename, 'rb'))

## Batch score transactions

With the model now loaded, the next step is to create a DataFrame containing the transactions loaded from the Azure Cosmos DB analytical store, and score each of those records using the model. As you did in the previous exercise, you will need transform that data in the `transactions` table for use by your model. Encode the transformations into custom transformers for use in a pipeline as follows:


In [None]:
from sklearn.base import BaseEstimator, TransformerMixin
class NumericCleaner(BaseEstimator, TransformerMixin):
    def __init__(self):
        self = self
    def fit(self, X, y=None):
        print("NumericCleaner.fit called")
        return self
    def transform(self, X):
        print("NumericCleaner.transform called")
        X["localHour"] = X["localHour"].fillna(-99)
        X["accountAge"] = X["accountAge"].fillna(-1)
        X["numPaymentRejects1dPerUser"] = X["numPaymentRejects1dPerUser"].fillna(-1)
        X.loc[X.loc[:,"localHour"] == -1, "localHour"] = -99
        return X

class CategoricalCleaner(BaseEstimator, TransformerMixin):
    def __init__(self):
        self = self
    def fit(self, X, y=None):
        print("CategoricalCleaner.fit called")
        return self
    def transform(self, X):
        print("CategoricalCleaner.transform called")
        X = X.fillna(value={"cardType":"U","cvvVerifyResult": "N"})
        X['isUserRegistered'] = X.apply(lambda row: 1 if row["isUserRegistered"] == "TRUE" else 0, axis=1)
        return X

In [None]:
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OrdinalEncoder

numeric_features=["transactionAmountUSD", "localHour", 
                  "transactionIPaddress", "digitalItemCount", "physicalItemCount", "accountAge",
                  "paymentInstrumentAgeInAccount", "numPaymentRejects1dPerUser"
                 ]

categorical_features=["transactionCurrencyCode", "browserLanguage", "paymentInstrumentType", "cardType", "cvvVerifyResult",
                      "isUserRegistered"
                     ]                           

numeric_transformer = Pipeline(steps=[
    ('cleaner', NumericCleaner())
])
                               
categorical_transformer = Pipeline(steps=[
    ('cleaner', CategoricalCleaner()),
    ('encoder', OrdinalEncoder())])

preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ])

Now, load the batch `transaction` data into a Spark DataFrame, covert that to a Pandas DataFrame, and then pass that data through the transformation pipeline.


In [None]:
# Load transactions from the Cosmos DB analytical store into a Spark DataFrame
#transactions = spark.sql("SELECT * FROM transactions")
transactions = spark.read\
    .format("cosmos.olap")\
    .option("spark.synapse.linkedService", "WoodgroveCosmosDb")\
    .option("spark.cosmos.container", "transactions")\
    .load()

# Remove unwanted columns from the columns collection
cols = list(set(transactions.columns) - {'_attachments','_etag','_rid','_self','_ts','collectionType','id','ttl'})

transactions = transactions.select(cols)

# Get a Pandas DataFrame from the Spark DataFrame
pandas_df = transactions.toPandas()

# Transform the batch data
preprocessed_transactions = preprocessor.fit_transform(pandas_df)

## Score the batch data

With the batch data transformed, you are now ready to use your ML model to predict whether each transaction in the data set is suspicious. Execute the following cell to retrieve the predictions from your model.


In [None]:
transactions_preds = model.predict(preprocessed_transactions)

Execute the cell below to view the predictions. Notice that the output is in the form of an `array`. Each item in the array is associated with a record in the `transactions` batch data, based on the order of the records.


In [None]:
transactions_preds

To add the prediction results to your transaction data you can use the `tolist()` method on the array. This will assign them in order to each row columns in your Pandas DataFrame. In this case, you will add the prediction as a new column named `isSuspicious`.


In [None]:
pandas_df["isSuspicious"] = transactions_preds.tolist()

You can now take a quick look to see the count of suspicious versus not suspicious records in the data set.


In [None]:
pandas_df['isSuspicious'].value_counts()

To enable writing the scored transaction data out to a Cosmos DB container, convert the Pandas DataFrame back to a Spark DataFrame.

> You may ignore any Arrow optimization warnings if you see them.


In [None]:
scored_transactions = spark.createDataFrame(pandas_df)

scored_transactions.createOrReplaceTempView("scored_transactions_view")

In [None]:
%%sql

SELECT * from scored_transactions_view  LIMIT 100

## Write data to Azure Cosmos DB using HTAP

### Aggregate data by `ipCountryCode`

The final action you will perform in this notebook is to do a simple aggregation of the data based on the `isSuspicious` value and the `ipCountryCode`.

We set the `id` value to `ipCountryCode` so we can perform upserts on the aggregate data. We also set the `collectionType` field to `SuspiciousAgg` to differentiate between suspicious transaction data and aggregate data in the `suspicious_transactions` container.


In [None]:
suspicious_agg = spark.sql("SELECT ipCountryCode AS id, ipCountryCode, COUNT(CASE WHEN isSuspicious = 1 THEN 0 END) SuspiciousTransactionCount, COUNT(*) AS TotalTransactionCount, COUNT(CASE WHEN isSuspicious = 1 THEN 0 END)/COUNT(*) AS PercentSuspicious, 'SuspiciousAgg' AS collectionType FROM scored_transactions_view GROUP BY ipCountryCode ORDER BY ipCountryCode")

In [None]:
from pyspark.sql.functions import *

suspicious_agg.orderBy(desc('PercentSuspicious')).show()

Now, write the DataFrame to the Azure Cosmos DB `suspicious_transactions` OLTP container.

In [None]:
suspicious_agg.write\
            .format("cosmos.oltp")\
            .option("spark.synapse.linkedService", "WoodgroveCosmosDb")\
            .option("spark.cosmos.container", "suspicious_transactions")\
            .option("spark.cosmos.write.upsertEnabled", "true")\
            .mode('append')\
            .save()

In [None]:
suspicious_agg.show()