In [None]:
# default_exp core

# Lambdas

## Lambda to Stream

In [None]:
import json
import boto3
import os
import csv

STREAM_NAME=os.environ['STREAM_NAME']


def lambda_handler(event, context):
    print("Incoming Event: ", event);
    print("bucket ", context)
    
    # csvfile = s3.get_object(Bucket=bucket, Key=file_key)
    #csvcontent = csvfile['Body'].read().split(b'\n')
    #csv_data = csv.DictReader(csvcontent)
    
    records = event['Records']
    
    key=records['s3']['object']['key']
    bucket=records['s3']['bucket']['name']
    
    s3_resource = boto3.resource('s3')
    s3_object = s3_resource.Object(bucket, key)

    data = s3_object.get()['Body'].read().decode('utf-8').splitlines()

    lines = csv.reader(data)
    headers = next(lines)
    #print('headers: %s' %(headers))
    payloads = []
    # refernece:- https://stackoverflow.com/questions/56849240/how-to-read-csv-file-from-s3-bucket-in-aws-lambda
    for line in lines:
        #print complete line
        #print(line)
        #print index wise
        #print(line[0], line[1])
        j=0
        d={}
        for i in headers:
            if(i!="target"):
                d[i]=line[j]
            j=j+1

        payloads.append(d)
    
    for payload in payloads:
        put_to_stream(payload,time.time(),STREAM_NAME)
        
    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }
    
    
def put_to_stream(payload,timestamp,STREAM_NAME):
    ret_status = True
    data = json.dumps(payload)
    print(f'Sending a new payload: ', payload)
    response = kinesis_client.put_record(StreamName = STREAM_NAME ,
                                             Data = data,
                                             PartitionKey = 'shard1')
    
    if (response['ResponseMetadata']['HTTPStatusCode'] != 200):
        print("ERROR: Kinesis put_record failed: \n{}".format(json.dumps(response)))
        ret_status = False
        
    return ret_status



def get_cloudwatch_logs_url(start_time, end_time,predict_lambda_name):
    log_group_name = '/aws/lambda/' + predict_lambda_name 
    # get the latest log stream for our Lambda that makes fraud predictions
    cw_client = boto3.client('logs')
    last_cw_evt = 0
    while last_cw_evt < int(start_test_time * 1000):
        streams = cw_client.describe_log_streams(logGroupName=log_group_name,
                                                 orderBy='LastEventTime',
                                                 descending=True)['logStreams']
        last_cw_evt = streams[0]['lastIngestionTime'] #'lastEventTimestamp']
        latest_stream = str(streams[0]['logStreamName']).replace('/', '$252F').replace('[$LATEST]', '$255B$2524LATEST$255D')
        if last_cw_evt < int(start_test_time * 1000):
            print('waiting for updated log stream...')
            time.sleep(10)

    # produce a valid URL to get to that log stream
    region = boto3.session.Session().region_name
    log_group_escaped = log_group_name.replace('/', '$252F')
    cw_url = f'https://console.aws.amazon.com/cloudwatch/home?region={region}#logsV2:log-groups/log-group/{log_group_escaped}'
    time_filter = f'$26start$3D{int(start_test_time * 1000) - 10000}$26end$3D{int(end_test_time * 1000) + 40000}'
    full_cw_url = f'{cw_url}/log-events/{latest_stream}$3FfilterPattern$3DPrediction+{time_filter}'
    print('Updated log stream is ready.')
    return full_cw_url


## Lamnda to Endpoint

In [None]:
import os
import json
import base64
import logging
import time
from datetime import datetime


# Read environment variables
ENDPOINT_NAME = os.environ['ENDPOINT_NAME']
FEATURE_GROUP = os.environ['FEATURE_GROUP_NAME']
FRAUD_THRESHOLD = os.environ['FRAUD_THRESHOLD']
LOG_LEVEL = os.environ['LOG_LEVEL']

logger = logging.getLogger()
if logging._checkLevel(LOG_LEVEL):
    logger.setLevel(LOG_LEVEL)
else:
    logger.setLevel(logging.INFO)

logging.info(f'Setting Logger Level to {logging.getLevelName(logger.level)}')

import boto3

print(f'boto3 version: {boto3.__version__}')
# Create session via Boto3
session = boto3.session.Session()

try:
    featurestore_runtime = boto3.Session().client(service_name='sagemaker-featurestore-runtime')
except:
    logging.error('Failed to instantiate featurestore-runtime client with install.sh script!')

# Allocate SageMaker runtime
sagemaker_runtime = boto3.client('runtime.sagemaker')

logging.info(f'Lambda will call SageMaker ENDPOINT name: {ENDPOINT_NAME}')


def lambda_handler(event, context):
    """ This handler is triggered by incoming Kinesis events,
    which contain a payload encapsulating the transaction data.
    The Lambda will then lookup corresponding records in the
    aggregate feature groups, assemble a payload for inference,
    and call the inference endpoint to generate a prediction.
    """
    logging.debug('Received event: {}'.format(json.dumps(event, indent=2)))

    records = event['Records']
    logging.debug('Event contains {} records'.format(len(records)))
    
    ret_records = []
    for rec in records:
        # Each record has separate eventID, etc.
        event_id = rec['eventID']
        event_source_arn = rec['eventSourceARN']
        logging.debug(f'eventID: {event_id}, eventSourceARN: {event_source_arn}')

        kinesis = rec['kinesis']
        event_payload = decode_payload(kinesis['data'])


        feature_string = assemble_features(event_payload)
        prediction = invoke_endpoint(feature_string)
        
        if prediction is not None:
            sequence_num = kinesis['sequenceNumber']
            ret_records.append({'eventId': event_id,
                            'sequenceNumber': sequence_num,
                            'prediction': prediction,
                            'statusCode': 200})

    return ret_records
                       
def assemble_features( event_payload ):
    inference_features = []
    features = []
    for feature in features:
        inference_features.append(str(event_payload[feature]))

    logging.debug(f'Inference features: {inference_features}')

    # assemble features into CSV-format string
    feature_string = ','.join(inference_features)

    return feature_string


def invoke_endpoint(request_body):
    logging.debug('Passing Request Body (CSV-format): {}'.format(request_body))
    response = sagemaker_runtime.invoke_endpoint(
        EndpointName=ENDPOINT_NAME,
        ContentType='text/csv',
        Body=request_body)        
    logging.info('Inference Response: {}'.format(response))

    probability = json.loads(response['Body'].read().decode('utf-8'))
    return probability


def decode_payload(event_data):
    agg_data_bytes = base64.b64decode(event_data)
    decoded_data = agg_data_bytes.decode(encoding="utf-8") 
    event_payload = json.loads(decoded_data) 
    logging.info(f'Decoded data from kinesis record: {event_payload}')
    return event_payload

## Lamnda to FeatureStore

In [None]:
import json
import base64
import subprocess
import os
import sys
from datetime import datetime
import time

import boto3

print(f'boto3 version: {boto3.__version__}')

try:
    sm = boto3.Session().client(service_name='sagemaker')
    sm_fs = boto3.Session().client(service_name='sagemaker-featurestore-runtime')
except:
    print(f'Failed while connecting to SageMaker Feature Store')
    print(f'Unexpected error: {sys.exc_info()[0]}')


# Read Environment Vars
FEATURE_GROUP = os.environ['FEATURE_GROUP_NAME']

features = ['kst_brutto', 'sm', 'tm', 'cl', 'so3', 'k2o', 'na2o', 'south_kiln_feed_01om886__tph__avg', 'south_kiln_feed_01om886__tph__max', 'north_kiln_feed_01om885__tph__avg', 'north_kiln_feed_01om885__tph__max', 'north_fan_speed_01oa943__rpm__avg', 'north_fan_speed_01oa943__rpm__max', 'south_fan_speed_02oa943__rpm__avg', 'south_fan_speed_02oa943__rpm__max', 'lignite_main_burner_03sk820__tph__avg', 'lignite_main_burner_03sk820__tph__max', 'bpg_main_burner_03bf810__tph__avg', 'bpg_main_burner_03bf810__tph__max', 'lignite_calciner_02sk820__tph__avg', 'lignite_calciner_02sk820__tph__max', 'bpg_calciner_02bf810__tph__avg', 'bpg_calciner_02bf810__tph__max', 'kbs_calciner_00kb950__tph__avg', 'kbs_calciner_00kb950__tph__max', 'total_energy_to_main_burner__gj_h__avg_0', 'total_energy_to_main_burner__gj_h__max_0', 'total_energy_to_main_burner__gj_h__avg_1', 'total_energy_to_main_burner__gj_h__max_1', 'total_energy_to_main_burner__gj_h__avg_2', 'total_energy_to_main_burner__gj_h__max_2', 'total_energy_to_main_burner__gj_h__avg_3', 'total_energy_to_main_burner__gj_h__max_3', 'total_energy_to_calciner__gj_h__avg_0', 'total_energy_to_calciner__gj_h__max_0', 'total_energy_to_calciner__gj_h__avg_1', 'total_energy_to_calciner__gj_h__max_1', 'total_energy_to_calciner__gj_h__avg_2', 'total_energy_to_calciner__gj_h__max_2', 'total_energy_to_calciner__gj_h__avg_3', 'total_energy_to_calciner__gj_h__max_3', 'tf__traditional_fuel__avg', 'tf__traditional_fuel__max', 'af__alternative_fuel__avg', 'af__alternative_fuel__max', 'shc__gj__t_kiln_feed__avg', 'shc__gj__t_kiln_feed__max', 'nox_concentration_in_kiln_inlet_00oa983__ppm__avg_0', 'nox_concentration_in_kiln_inlet_00oa983__ppm__max_0', 'nox_concentration_in_kiln_inlet_00oa983__ppm__avg_1', 'nox_concentration_in_kiln_inlet_00oa983__ppm__max_1', 'nox_concentration_in_kiln_inlet_00oa983__ppm__avg_2', 'nox_concentration_in_kiln_inlet_00oa983__ppm__max_2', 'nox_concentration_in_kiln_inlet_00oa983__ppm__avg_3', 'nox_concentration_in_kiln_inlet_00oa983__ppm__max_3', 'co_concentration_in_kiln_inlet_00oa981__ppm__avg', 'co_concentration_in_kiln_inlet_00oa981__ppm__max', 'o2_concentration_in_kiln_inlet_00oa982_____avg', 'o2_concentration_in_kiln_inlet_00oa982_____max', 'bottom_cyclone_gas_temperature_00oa800___c___avg', 'bottom_cyclone_gas_temperature_00oa800___c___max', 'secondary_air_temperature_01kk827___c___avg_0', 'secondary_air_temperature_01kk827___c___max_0', 'secondary_air_temperature_01kk827___c___avg_1', 'secondary_air_temperature_01kk827___c___max_1', 'secondary_air_temperature_01kk827___c___avg_2', 'secondary_air_temperature_01kk827___c___max_2', 'secondary_air_temperature_01kk827___c___avg_3', 'secondary_air_temperature_01kk827___c___max_3', 'tertiary_air_temperature_01kb801___c___avg_0', 'tertiary_air_temperature_01kb801___c___max_0', 'tertiary_air_temperature_01kb801___c___avg_1', 'tertiary_air_temperature_01kb801___c___max_1', 'tertiary_air_temperature_01kb801___c___avg_2', 'tertiary_air_temperature_01kb801___c___max_2', 'tertiary_air_temperature_01kb801___c___avg_3', 'tertiary_air_temperature_01kb801___c___max_3', 'kiln_amps_01do812__amps__avg_0', 'kiln_amps_01do812__amps__max_0', 'kiln_amps_01do812__amps__avg_1', 'kiln_amps_01do812__amps__max_1', 'kiln_amps_01do812__amps__avg_2', 'kiln_amps_01do812__amps__max_2', 'kiln_amps_01do812__amps__avg_3', 'kiln_amps_01do812__amps__max_3', 'co_bottom_cyclone_north_00oa941__amps__avg', 'co_bottom_cyclone_north_00oa941__amps__max', 'o2_bottom_cyclone_north_00oa942_____avg_0', 'o2_bottom_cyclone_north_00oa942_____max_0', 'o2_bottom_cyclone_north_00oa942_____avg_1', 'o2_bottom_cyclone_north_00oa942_____max_1', 'o2_bottom_cyclone_north_00oa942_____avg_2', 'o2_bottom_cyclone_north_00oa942_____max_2', 'o2_bottom_cyclone_north_00oa942_____avg_3', 'o2_bottom_cyclone_north_00oa942_____max_3', 'kiln_speed_01do811__rpm__avg', 'kiln_speed_01do811__rpm__max', 'primary_air_amount_01of850__nm3_h___avg', 'primary_air_amount_01of850__nm3_h___max', 'axial_air_pressure_01of811__mbar__avg', 'axial_air_pressure_01of811__mbar__max', 'radial_air_pressure_01of812__mbar__avg', 'radial_air_pressure_01of812__mbar__max', 'central_air_pressure_01of813__mbar__avg', 'central_air_pressure_01of813__mbar__max', 'total_feed_avg', 'total_feed_max', 'total_energy_avg', 'total_energy_max', 'average_fan_speed_avg', 'average_fan_speed_max', 'average_air_temperature_avg', 'average_air_temperature_max', 'kiln_feed_rate_avg', 'kiln_feed_rate_max', 'klinker___nk_c3s_lag_shift', 'klinker___nk_mgo_lag_shift', 'klinker___nk_fe2o3_lag_shift', 'klinker___nk_al2o3_lag_shift', 'klinker___nk_na2o_lag_shift', 'klinker___nk_k2o_lag_shift', 'klinker___nk_cao_lag_shift', 'klinker___nk_sio2_lag_shift', 'heat_of_formation__kcal_kg_clinker__lag_shift', 'klinker___nk_so3_lag_shift', 'klinker___nk_sm_lag_shift', 'klinker___nk_am_lag_shift', 'klinker___nk_lsf_lag_shift', 'id']

def update_agg(fg_name, data):
    record = []
    for feature in features:
        temp = {'FeatureName': feature, 'ValueAsString' : str(data[feature])}
        record.append(temp)
    sm_fs.put_record(FeatureGroupName=fg_name, Record=record)
    return
        
def lambda_handler(event, context):
    inv_id = event['invocationId']
    app_arn = event['applicationArn']
    records = event['records']
    print(f'Received {len(records)} records, invocation id: {inv_id}, app arn: {app_arn}')
    
    ret_records = []
    for rec in records:
        raw_data = rec['data']
        data_str = base64.b64decode(raw_data) 
        data = json.loads(data_str)
        print(f' updating features for id: {data}')
        update_agg(FEATURE_GROUP, data)
        
        # Flag each record as being "Ok", so that Kinesis won't try to re-send 
        ret_records.append({'recordId': rec['recordId'],
                            'result': 'Ok'})
    return {'records': ret_records}