## Capstone Project : Near real-time monitoring of a manufacturing production line
**Keywords:** <font color='green'>SQL, AWS (S3 bucket, Sagemaker-XGBoost, lambda function, API Gateway), Power BI (Streaming dashboard, API)</font>   

**Background:** The objective of this capstone project was to build and deploy a model to monitor in near real-time a customer-critical attribute of a product being manufactured at one of my company's facilities. For a variety of reasons, such limited resources and testing capabilities, this critical attribute can only be measured every 12 hours. Finished products are manufactured at 400-600 per minute rate. Therefore, a failure to meet this customer-critical attribute has the consequence of having to put on hold (often scrap) 12 hours of production. During these 12-hour periods several quality and production checks are performed at each stage of the product manufacturing process. The built model uses the data from these intermediary checks to predict the customer-critical attribute during the 12-hour intervals where this attribute is not measured directly. If a failure is predicted, a notification is sent to the appropriate personnel to take immediate action. 
#### Project structure:
- **Part I: ETL** The data used for training, validation and testing is hosted on two SQL servers. One SQL server host product quality data and the other SQL server host the machine state data. The first step is to extract the data from the SQL Servers, transform it and load it to an AWS S3 bucket. 
- **Part II:** 
 - **II.1 Build, Train and Deploy the model** With the data in AWS S3, I used Sagemaker to train and deploy an XGBoost model. Deploying the model creates an endpoint that can be accessed for predictions. 
 - **II.2 Lambda function & Gateway API** I created a lambda function & GateWay API to be able to access the model for predictions. The API allows me to the send the data for prediction as a POST request for low-latency response. This is a cost effective solution since I am only charged when I send the request to the API. 
- **Part III: Predict near real-time and stream to PowerBI dashboard** With the model deployed and the API in service, I scheduled a taks on one of our on-premises servers to send the latest intermediary check to the model and get a prediction of the customer-critical attribute. This prediction is then pass to the PowerBI dashboard (also as POST request). The PowerBI Streaming dashboard visualizes the predictions in real-time.   

### <font color='brown'>Part III:</font> Near real-time prediction and data streaming to Power BI
* Python script to get a prediction from a XGBoost model deployed in AWS and stream the prediction to a Power BI dashboard for near real-time monitoring of a manufacturing line. The XGBoost model endpoint is accessed thru a AWS Rest API (API Gateway > Lambda Function > XGBoot model endpoint).
* This python code will be run as a scheduled task on a on-premises server. The frequency will be determined based on type of product to be monitored.  

In [4]:
# Importing required libraries
import time
import requests #Both, AWS API and Power BI will be accessed with a POST request
import pandas as pd
from datetime import datetime, timezone # For timestamping the predictions

In [15]:
# Loading data for simulation
df = pd.read_csv('./300x407/test.csv').drop('Unnamed: 0', axis=1)
df.shape

(1049, 1513)

**Note:**  In production, the code below will not be run in a while loop. This is just for demo purposes.

In [16]:
checkNo = 0
count = 0
defective = 0
while(True):  
    '''
    Below I call the AWS REST API (Gateway REST API) I created with a POST request. The data I pass to the API is a single 
    observation, which is then pass by a AWS Lambda Function to the XGBoost model endpoint to get a prediction. 
    The response from the API is predicted value. 
    The predicted value is then streammed to Power BI thru an API (Power BI API) as well. Along with the predicted value, I pass 
    an actual value if I have it and a timestamp. 
    '''
    url_aws = 'https://llsb7c4pb4.execute-api.us-east-2.amazonaws.com/axial-xbg-stage/predictaxialload'
    sample = df.sample(1).values[0]   
    data_aws = ','.join([str(i) for i in sample[1:]])
    predicted = float(requests.post(url_aws, json={'data':data_aws}).text)
    actual = sample[0]
    date_time = datetime.now(timezone.utc).isoformat()
    
    '''
    Power BI Streaming API. Passing the actual, predicted and timestamp values to be stream in real-time on a Power BI dashboard 
    '''
    
    url_powerbi = 'https://api.powerbi.com/beta/575d120d-78ae-41fc-b5a0-4072a93abdb2/datasets/a15fae22-31e6-4d7f-9447-46a0789d33b8/rows?tenant=575d120d-78ae-41fc-b5a0-4072a93abdb2&UPN=Kesman.Valdes%40crowncork.com&key=SDRA0x0Yg12BPCWiC6fVQL%2FWLe%2BJeNLcpMmRefs%2BwntRe%2BkXXaw6dewmOBTJKSH4e6aJDp6gG6ZUQBwaT%2B8OwA%3D%3D'
    
    ''' 
    For the demo I am only passing every forth actual value to the Power BI dashboard. This will be the case when this solution 
    gets deployed.
    ''' 
    if checkNo == count: 
        last_actual = actual
        last_actual_dt = date_time
        if predicted < 450:
            defective += 1
        
        # Power BI Streaming dashboard call
        data_powerbi = {'actual': actual, # Axial load value measured every 12hrs
                        'predicted': predicted, # Value predicted by the AWS deployed model
                        'datetime': date_time, # Timestamp
                        'defective': defective # Count of predicted out-of-specification products
                       }
        requests.post(url_powerbi, # Power BI streaming data REST API
                      json = data_powerbi) # Streaming data
        
        count = checkNo+4
    else:
        data_powerbi = {'last_actual': actual,
                        'predicted': predicted, 
                        'datetime': date_time, 
                        'last_actual': last_actual, 
                        'defective': defective
        }
        requests.post(url_powerbi, json = data_powerbi)
    checkNo += 1
    time.sleep(0.2) 

KeyboardInterrupt: 

In [13]:
data_aws

'577.0,2.0,288.0,5330011.0,2.87907,4.44243,4.43967,4.43556,0.00688,0.09424,0.09217,0.09121,0.09254,0.00303,2.94138,2.9373299999999998,2.93677,2.9384900000000003,0.00461,397.0,253.0,23.0,288.0,136.0,0.0232,0.0027,0.0252,0.0225,0.0201,0.0181,0.0206,0.0216,0.0199,0.0219,0.0226,0.0248,0.0231,0.0225,0.0252,0.0228,0.0226,0.025,0.0229,0.0224,0.0249,0.0228,0.0224,0.0244,0.0227,0.0226,0.0241,0.0228,0.0227,0.0239,0.023,0.0226,0.0236,0.023,0.0225,0.0233,0.0227,0.0225,0.0231,0.0228,0.0226,0.023,0.0229,0.023,0.0228,0.0233,0.023,0.0226,0.0234,0.0239,0.0215,0.0243,0.0239,0.0196,0.0244,0.0221,0.0179,0.0224,0.0196,0.0211,0.0235,0.0235,0.0235,0.0234,0.0232,0.0232,0.0232,0.0231,0.0228,0.0228,0.0228,0.023,0.023,0.0232,0.0226,0.0208,0.0025,0.002,0.0022,0.0027,0.0024,0.0025,0.002,0.0015,0.0012,0.001,0.0008,0.0006,0.0004,0.0005,0.0008,0.0028,0.0048,0.0045,0.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0

In [14]:
requests.post(url_aws, json={'data':data_aws}).text

'{"errorMessage": "An error occurred (ModelError) when calling the InvokeEndpoint operation: Received client error (400) from model with message \\"Unable to evaluate payload provided: Feature size of csv inference data 1513 is not consistent with feature size of trained model 1512.\\". See https://us-east-2.console.aws.amazon.com/cloudwatch/home?region=us-east-2#logEventViewer:group=/aws/sagemaker/Endpoints/xgboost-axial-load-v2 in account 157248718313 for more information.", "errorType": "ModelError", "stackTrace": [["/var/task/lambda_function.py", 20, "lambda_handler", "Body=payload)"], ["/var/runtime/botocore/client.py", 316, "_api_call", "return self._make_api_call(operation_name, kwargs)"], ["/var/runtime/botocore/client.py", 626, "_make_api_call", "raise error_class(parsed_response, operation_name)"]]}'