# Using Spark ML and IBM Business Automation Insights to estimate process duration

Artificial intelligence can be combined with business process management in more than one way. For example, Artificial intellignece can help transforming unstructured data into data that a process can work with, through techniques such as visual or text recognition. Assistants and bots can help providing a better user experience. Several IBM Watson services serve this purpose but a business process also captures a lot of business data.
This notebook shows how to take better benefit of this data and inject Machine Learning techniques to predict the duration of BPMN processes, based on the data captured by the process.

## The  scenario
This notebook uses data that is stored from the BPMN process called 'Hiring Sample'. This process is a sample that is delivered with the Business Automation Workflow product. It represents the successive tasks to hire a new employee, from the submission of a new open position to the selection of the candidate, over the approval of the general manager. The purpose of this scenario is to reuse the data that is stored by this BPMN process, apply machine learning algorithms to predict the duration of a process, and possibly notify the submitter if the process is going to last long.

The purpose of this notebook is not to reproduce a real scenario but rather to show how such a scenario can be worked out by using Business Automation Insights.


## Overview of the solution

This notebook assumes that that IBM Business Automation Workflow is used to run the business process and that this process management instance is connected to IBM Business Automation Insights so that all the operational data of the process is stored in an HDFS data lake.


## Learning goals

The learning goals of this notebook are:

- Understands 'time series' and 'completed summary' data from IBM Business Automation Insights
- Explore the format of the data and learn how to read it
- Create an Apache® Spark machine-learning pipeline to estimate process duration from existing data and business data
- Train and evaluate the model.
- Use the model to evaluate the duration of unfinished processes by leveraging the activity summary data of the process application.



## Prerequisites

Make sure that IBM Business Automation Workflow and IBM Business Automation Insights are installed.


## Creating some data to train the system

If you want to exercise with the notebook, you will have to create several instances of the Hiring Sample process and complete part of the process. The completed parts are then used to estimate the process duration. Leave some processes unfinished so that the duration of the unfinished part can be computed.

## The format of the Business Automation Insight data
This exercice uses both the BPMN 'time series' events and the BPMN 'summaries' events. Time series events represent raw events that are captured as activities or processes that are executed. The 'summaries' events are stored to HDFS when a process or activity is completed and contain information such as the duration of the process or activity. All data are stored in the JSON format. The paths are defined as follows:

 * Process timeseries:

    [path_to_your_hdfs]/ibm-bai/bpmn-timeseries/[process Application Id]/[process Application Version Id]/process/[processId]/[date]


 * Activity timeseries:

    [path_to_your_hdfs]/ibm-bai/bpmn-timeseries/[process Application Id]/[process Application Version Id]/activity/[processId]/[activityId]/[date]


 * Process summary paths:

    [path_to_your_hdfs]/ibm-bai/bpmn-summaries-completed/[process Application Id]/[process Application Version Id]/process/[process Id]/[date]
    

 * Activity summary paths: 
 
    [path_to_your_hdfs]/ibm-bai/bpmn-summaries-completed/[process Application Id]/[process Application Version Id]/activity/[process Id]/[activity Id]/[date]

## Using Spark SQL to read Business Automation Insights data
Business Automation Insight stores data in HDFS. As indicated above, events coming from the Business Automation Workflow instance are stored in JSON files. The data is read through Spark SQL. 

This first query reads the data from all completed process instances in the 'Hiring Sample' process. Here the identifier of the process application is hard-coded because the identifier for the Hiring Sample is the same across various Business Automation Workflow installations. If you need to adapt the code to your particular process application, find the process application identifier and the process application version identifier. Page https://www.ibm.com/support/knowledgecenter/SSYHZ8_18.0.0/com.ibm.dba.bai/topics/tsk_bai_retrieve_process_info_from_id.html of the Business Automation Insights documentation explains how to find those identifiers by using the REST API. In the code below, make sure you <strong>change the HDFS URL</strong>.


In [None]:
from pyspark.sql import SparkSession, SQLContext, Row
from pyspark.sql.types import IntegerType
from datetime import datetime

processAppId = '9ab0d0c6-d92c-4355-9ed5-d8a05acdc4b0'
processAppVersionId = '2064.64cc52a5-fdc6-4cab-8e36-3ded94921f56'

hdfsroot = 'hdfs://path to your hdfs'

spark = SparkSession.builder.getOrCreate()
spark.conf.set("dfs.client.use.datanode.hostname", "true")


processSummaries = spark.read.json(hdfsroot+"/ibm-bai/bpmn-summaries-completed/"+processAppId+"/"+processAppVersionId+"/process/*/*")
processSummaries.createOrReplaceTempView("processSummaries")
print ('The schema of the completed processes')

processSummaries.printSchema()
print ('The data containts ' + str(processSummaries.count()) + ' events')


You should get a schema for events that looks like:
    <pre>
root
 |-- bpmCellName: string (nullable = true)
 |-- bpmSystemId: string (nullable = true)
 |-- completedTime: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- aEmpRequisition121381434563922: struct (nullable = true)
 |    |    |-- @ids: struct (nullable = true)
 |    |    |    |-- trackingGroupId: string (nullable = true)
 |    |    |    |-- trackingGroupVersionId: string (nullable = true)
 |    |    |-- Department.string: string (nullable = true)
 |    |    |-- EmploymentStatus.string: string (nullable = true)
 |    |    |-- GMApproval.string: string (nullable = true)
 |    |    |-- HiringManager.string: string (nullable = true)
 |    |    |-- Location.string: string (nullable = true)
 |-- deletedTime: string (nullable = true)
 |-- duration: long (nullable = true)
 |-- id: string (nullable = true)
 |-- lastBusinessDataUpdateActivity: string (nullable = true)
 |-- lastBusinessDataUpdateEvent: string (nullable = true)
 |-- lastBusinessDataUpdatePerformer: string (nullable = true)
 |-- lastBusinessDataUpdateTime: string (nullable = true)
 |-- name: string (nullable = true)
 |-- processApplicationId: string (nullable = true)
 |-- processApplicationName: string (nullable = true)
 |-- processApplicationSnapshotName: string (nullable = true)
 |-- processApplicationVersionId: string (nullable = true)
 |-- processId: string (nullable = true)
 |-- processSnapshotName: string (nullable = true)
 |-- processVersionId: string (nullable = true)
 |-- startTime: string (nullable = true)
 |-- state: string (nullable = true)
 |-- terminatedTime: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- type: string (nullable = true)
 |-- version: string (nullable = true)
    </pre>

In the process summary event, you are interested in the overall process duration that is visible in the 'duration' column. You will also use the 'startTime' of the process and the business data that is attached to the Business Automation Workflow process. The process stores some business data related to the hiring process, such as the hiring department, the type of employment, the hiring location, and the manager. This data, the process start time, and the duration are used to build a machine-learning model that can predict the duration of processes. Real cases involve more parameters but this simplified example is sufficient for the demonstration.

The hiring process has the 'autotracking' flag on, which means that all activity events and process events will embed the business data. The business data is tracked through a tracking group, whose autogenerated name is: aEmpRequisition121381434563922. The event structure shows that all the business data is stored under the data.aEmpRequisition121381434563922 object.

In [None]:
processsummaries = spark.sql("SELECT  duration, startTime, data.aEmpRequisition121381434563922.* FROM processsummaries");

processsummaries.show()
processsummaries.printSchema()

The resulting schema for the processSummary data set is: 
    <pre>
root
 |-- duration: long (nullable = true)
 |-- startTime: string (nullable = true)
 |-- @ids: struct (nullable = true)
 |    |-- trackingGroupId: string (nullable = true)
 |    |-- trackingGroupVersionId: string (nullable = true)
 |-- Department.string: string (nullable = true)
 |-- EmploymentStatus.string: string (nullable = true)
 |-- GMApproval.string: string (nullable = true)
 |-- HiringManager.string: string (nullable = true)
 |-- Location.string: string (nullable = true)
    </pre>

## Create an Apache® Spark machine learning model

Watson Machine learning supports a growing number of IBM or open-source machine-learning and deep-learning packages. This example uses Spark ML, in particular the Linear Regression algorithm. You are now going to learn how to prepare data, create an ApacheÆ Spark machine-learning pipeline, and train the model.

In [None]:
from pyspark.ml.feature import StringIndexer, IndexToString, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline, Model

### Adaptation of data

The following code renames the columns to remove their type.
Then, several StringIndexer are created to transform string-typed columns (which represent categories) into numeric values. The VectorAssembler class creates a new 'features' column, which contains the features from which you will build the model.

An SQL Transformer query transforms the startTime column into three numeric columns (year, month, and dayofMonth).

In [None]:
processsummaries = processsummaries.withColumnRenamed("Department.string", "Department")
processsummaries = processsummaries.withColumnRenamed("EmploymentStatus.string", "EmploymentStatus")
processsummaries = processsummaries.withColumnRenamed("HiringManager.string", "HiringManager")
processsummaries = processsummaries.withColumnRenamed("Location.string", "Location")

processsummaries.show()

departmentIndexer = StringIndexer(inputCol='Department', outputCol="IndexedDepartment").setHandleInvalid("skip").fit(processsummaries)
employmentStatusIndexer = StringIndexer(inputCol='EmploymentStatus', outputCol="IndexedEmploymentStatus").fit(processsummaries)
hiringManagerIndexer = StringIndexer(inputCol='HiringManager', outputCol="IndexedHiringManager").setHandleInvalid("skip").fit(processsummaries)
locationIndexer = StringIndexer(inputCol='Location', outputCol="IndexedLocation").setHandleInvalid("skip").fit(processsummaries)

from pyspark.ml.feature import SQLTransformer

dateTransformer = SQLTransformer(
    statement="SELECT *, year(startTime) AS year, dayOfMonth(startTime) as dayOfMonth, month(startTime) as month FROM __THIS__")


features = ["IndexedDepartment", "IndexedEmploymentStatus", "IndexedHiringManager", "IndexedLocation", "year", "month", "dayOfMonth"]

assembler = VectorAssembler(inputCols=features, outputCol="features")

###  Creating the model
The model is built through the Linear Regression algorithm.

In [None]:
lr = LinearRegression(labelCol="duration", featuresCol="features", maxIter=10, regParam=0.3, elasticNetParam=0.8)

In the cell below, the data is split into training data and test data, and the prediction model is trained and then tested. Finally, the accuracy of the model is displayed as the root mean square error.

In [None]:

splitted_data = processsummaries.randomSplit([0.8, 0.20], 24)
train_data = splitted_data[0]
test_data = splitted_data[1]

pipeline = Pipeline(stages=[departmentIndexer, employmentStatusIndexer, 
                            hiringManagerIndexer, locationIndexer, dateTransformer, assembler, lr ])

model = pipeline.fit(train_data)

predictions = model.transform(test_data)

evaluator = RegressionEvaluator(labelCol="duration")

rmse = evaluator.evaluate(predictions)

def toHMS(duration):
    duration = duration // 1000
    h = int(duration // 3600)
    m = int ((duration % 3600) // 60)
    s = int((duration % 3600) % 60)
    return str(h) +"H" + str(m) + "M" +  str(s) + "s"

print("Root Mean Square Error = " + toHMS(rmse))

from pyspark.sql.functions import col, expr, when, max, min, avg
min_max = processsummaries.agg(min("duration"), max("duration"), avg("duration")).head()

min_duration = min_max[0]
max_duration = min_max[1]
avg_duration = min_max[2]


print("Min duration = " + toHMS(min_duration))
print("Max duration = " + toHMS(max_duration))
print("Avg duration = " + toHMS(avg_duration))

## Evaluation of the duration of noncompleted processes

The sample then uses the model to evaluate the duration of noncompleted processes. First, the list of noncompleted processes is computed. The Business Automation Insights process time series events are used for this purpose. A SPARK SQL join query between PROCESS_STARTED and PROCESS_COMPLETED events finds the process instances that are started but not completed.

The time series events for the process application are located on this path: [path_to_your_hdfs]/ibm-bai/bpmn-timeseries/[process Application Id]/[process Application Version Id]/process/[process Id]/[date]

The "type" property of the events is used to distinguish between PROCESS_STARTED and PROCESS_COMPLETED events. A join between the two tables returns the list of processes that are not completed.

In [None]:
processTimeseries = spark.read.json(hdfsroot+"/ibm-bai/bpmn-timeseries/"+processAppId+"/"+processAppVersionId+"/process/*/*")

processTimeseries.createOrReplaceTempView("processTimeseries")

completedProcesses = spark.sql("SELECT  * FROM processTimeseries WHERE type = 'PROCESS_COMPLETED' ")
completedProcesses.createOrReplaceTempView("completed")
completedProcessesCount = completedProcesses.count();


startedProcesses = spark.sql("SELECT  * FROM processTimeseries WHERE type = 'PROCESS_STARTED' ")
startedProcesses.createOrReplaceTempView("started")
startedProcessesCount = startedProcesses.count();

#This Left outer join will select the processes that are in the started list but not in the completed list.

unfinishedProcesses = spark.sql("SELECT distinct s.processInstanceId, s.timeStamp as startTime FROM  started s LEFT OUTER JOIN completed c ON s.processInstanceId=c.processInstanceId WHERE c.processInstanceId is NULL")
unfinishedProcesses.createOrReplaceTempView("unfinishedProcesses")

print ("There are " + str(unfinishedProcesses.count()) + " non completed processes over " + str(completedProcessesCount + startedProcessesCount)  +" total process instances" )
unfinishedProcesses.printSchema()

### Searching for activities and tracked data of uncompleted processes

Now that the list of uncompleted process instances is computed, the next step is to evaluate their duration. For this evaluation, you need the business data that is associated with each process instance. Some processes might be started but with no activity completed, so some process instances might have no business data available. In the Hiring sample case, for the duration of the process to be evaluated, the work position must be published. If we look at the list of completed activities by using the BAI summary events for activities, because autotracking is set to 'on', the events carry the business data.

The code below operates a left outer join to list, within all the completed activity events, those that are part of an uncompleted process instance.

In [None]:
activities = spark.read.json(hdfsroot+"/ibm-bai/bpmn-summaries-completed/"+processAppId+"/"+processAppVersionId+"/activity/*/*/*").createOrReplaceTempView("activities")

activitiesOfUnfinishedProcesses = spark.sql("select p.startTime, a.name, a.timeStamp, a.data.aEmpRequisition121381434563922.* , p.processInstanceId from activities a LEFT OUTER JOIN unfinishedProcesses p ON a.processInstanceId=p.processInstanceId where p.processInstanceId is not NULL")
activitiesOfUnfinishedProcesses.show()

activitiesOfUnfinishedProcesses.createOrReplaceTempView("activitiesOfUnfinishedProcesses")

You now want to find the last activity of the unfinished processes. This is done by partitioning the events by the process instance identifier, then ordering them by the timestamp, and taking the first event. The result consists of the last activity event that was stored for each uncompleted process.

In [None]:
unfinishedProcesses = spark.sql("SELECT distinct * FROM ( SELECT *, dense_rank() OVER (PARTITION BY processInstanceId ORDER BY timeStamp DESC) AS rank  FROM activitiesOfUnfinishedProcesses) vo WHERE rank = 1");
unfinishedProcesses.show()
unfinishedProcesses.createOrReplaceTempView("unfinishedProcesses")

The ML model can now be used to predict the duration of the uncompleted processes:

In [None]:
unfinishedProcesses = unfinishedProcesses.withColumnRenamed("Department.string", "Department")
unfinishedProcesses = unfinishedProcesses.withColumnRenamed("HiringManager.string", "HiringManager")
unfinishedProcesses = unfinishedProcesses.withColumnRenamed("Location.string", "Location")
unfinishedProcesses = unfinishedProcesses.withColumnRenamed("EmploymentStatus.string", "EmploymentStatus")

In [None]:
model.transform(unfinishedProcesses).select("prediction").show()

### Conclusion
You have seen the various type of events that are stored in Business Automation Insights for activities and processes, and how you can take benefit of them to predict the process duration, based on the business data tracked by a process.

Author: Emmanuel Tissandier is a Senior Technical Staff Member and architect in the Digital Business Automation team in the IBM France Lab.