In [426]:
# Questions:
#   What is priority and how is it calculated?
#   Can we assume that measure B will always be subtracted from measure A when it appears?

# TODO Implement:
# Apply tests to each vessel
# Return results in given format

# List of each check per file. Return FACILITY_CODE, VESSEL_CODE, CYCLE_CODE, CYCLE_STATUS_CODE, EXPOSURE_STATUS_CODE,
# FILENAME AS CYCLE_RUN_ID, TIMESTAMP_CYCLE_START, CYCLE_EVAL_GROUP_NAME, STAGE_NUM, CYCLE_CHECK_MEASURE_NAME,
# CALCULATION_TYPE_DESCR, STATUS_CHECK_DESCR, CALCULATED_VALUE, UPPER_LIMIT_AMT, LOWER_LIMIT_AMT,
# CALCULATION_LIMIT_AMT, PRIORITY_CODE, STATUS_CHECK_RESULT, CREATED_BY, CREATED_DATE

## Table of Contents:
[Create spark session](#Create-spark-session)  
[Schema sefinition](#Schema-definition)  
[Load data](#Load-data)  
[Helper functions](#Helper-functions)  
[Calculation functions](#Calculation-functions)  
[Functions map](#Functions-map)  
[Evaluation orchestrator](#Evaluation-orchestrator)  
[Run calculations](#Run-calculations)  

In [1]:
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import lag, col
import pyspark
import pandas as pd
from datetime import datetime

### Create spark session

In [2]:
session = SparkSession.builder.appName("Baxter MPA Process Calculation").getOrCreate()

In [3]:
RUN_TIME = "{date:%Y-%m-%d %H:%M:%S}".format(date=datetime.now())

### Schema definition

In [4]:
cycle_measures_schema = StructType([ \
StructField("FACILITY_CODE", StringType(), True),
StructField("VESSEL_CODE", IntegerType(), True), \
StructField("CYCLE_CODE", StringType(), True), \
StructField("CYCLE_STATUS_CODE", StringType(), True), \
StructField("EXPOSURE_STATUS_CODE", StringType(), True), \
StructField("TIMESTAMP", StringType(), True), \
StructField("PC_TIME", StringType(), True), \
StructField("CYCLE_STAGE", StringType(), True), \
StructField("CYCLE_TIME", FloatType(), True), \
StructField("MAX_CYCLE_STAGE", IntegerType(), True), \
StructField("TIMESTAMP_CYCLE_START", StringType(), True), \
StructField("FILENAME", StringType(), True), \
StructField("VESSEL_TEMPERATURE", FloatType(), True), \
StructField("VESSEL_TEMPERATURE_SETPT", FloatType(), True), \
StructField("STEAM_CONTROL_VALVE", IntegerType(), True), \
StructField("VESSEL_PRESSURE", FloatType(), True), \
StructField("VESSEL_PRESSURE_SETPT", IntegerType(), True), \
StructField("AIR_CONTROL_VALVE", IntegerType(), True), \
StructField("WATER_TEMPERATURE", FloatType(), True), \
StructField("WATER_TEMPERATURE_SETPT", FloatType(), True), \
StructField("COOLING_CONTROL_VALVE", IntegerType(), True), \
StructField("LEFT_AUX_TEMPERATURE", FloatType(), True), \
StructField("RIGHT_AUX_TEMPERATURE", FloatType(), True), \
StructField("VESSEL_REF_TEMPERATURE", FloatType(), True), \
StructField("VESSEL_REF_PRESSURE", FloatType(), True), \
StructField("WATER_REF_TEMPERATURE", FloatType(), True)
])

cycle_eval_schema = StructType([ \
StructField("facility_code", StringType(), True), \
StructField("cycle_code", StringType(), True), \
StructField("stage_number", StringType(), True), \
StructField("calculation_limit", StringType(), True), \
StructField("check_evaluation_group", StringType(), True), \
StructField("cycle_check_measure_name", StringType(), True), \
StructField("measure_1_name", StringType(), True), \
StructField("measure_2_name", StringType(), True), \
StructField("calculation_type_description", StringType(), True), \
StructField("lower_limit", StringType(), True), \
StructField("upper_limit", StringType(), True) \
])

### Load data

In [5]:
sc = session.sparkContext

In [6]:
sqlContext = SQLContext(sc)

In [7]:
cycle_measures = sqlContext.read.parquet('bax-data/cycle-measures')

In [8]:
# Set column names to uppercase
new_col_names = [c.upper() for c in cycle_measures.columns]
cycle_measures = cycle_measures.toDF(*new_col_names)

In [9]:
# for reading from 
# cycle_measures = session.read.format('com.databricks.spark.csv') \
#             .options(delimiter='\t', header='true', inferschema='false', ignoreLeadingWhiteSpace ='true', ignoreTrailingWhiteSpace ='true', treatEmptyValuesAsNulls = 'true') \
#             .load('/home/jovyan/work/bax-data/cycle_measures.txt', schema= cycle_measures_schema)

In [10]:
cycle_measures.printSchema()

root
 |-- FACILITY_CODE: string (nullable = true)
 |-- VESSEL_CODE: string (nullable = true)
 |-- CYCLE_CODE: string (nullable = true)
 |-- CYCLE_STATUS_CODE: string (nullable = true)
 |-- EXPOSURE_STATUS_CODE: string (nullable = true)
 |-- MEASURE_TIMESTAMP: timestamp (nullable = true)
 |-- PC_TIME: string (nullable = true)
 |-- CYCLE_STAGE: string (nullable = true)
 |-- CYCLE_TIME: double (nullable = true)
 |-- MAX_CYCLE_STAGE: string (nullable = true)
 |-- CYCLE_RUN_START_TIMESTAMP: timestamp (nullable = true)
 |-- FILENAME: string (nullable = true)
 |-- VESSEL_TEMPERATURE: double (nullable = true)
 |-- VESSEL_TEMPERATURE_SETPT: double (nullable = true)
 |-- STEAM_CONTROL_VALVE: double (nullable = true)
 |-- VESSEL_PRESSURE: double (nullable = true)
 |-- VESSEL_PRESSURE_SETPT: double (nullable = true)
 |-- AIR_CONTROL_VALVE: double (nullable = true)
 |-- WATER_TEMPERATURE: double (nullable = true)
 |-- WATER_TEMPERATURE_SETPT: double (nullable = true)
 |-- COOLING_CONTROL_VALVE: dou

In [11]:
cycle_eval = session.read.format('com.databricks.spark.csv') \
            .options(delimiter='\t', header='true', inferschema='false', 
                     ignoreLeadingWhiteSpace ='true', ignoreTrailingWhiteSpace ='true', treatEmptyValuesAsNulls = 'true') \
            .load('bax-data/cycle_eval_group_updated.txt', schema= cycle_eval_schema)

In [12]:
cycle_eval.printSchema()

root
 |-- facility_code: string (nullable = true)
 |-- cycle_code: string (nullable = true)
 |-- stage_number: string (nullable = true)
 |-- calculation_limit: string (nullable = true)
 |-- check_evaluation_group: string (nullable = true)
 |-- cycle_check_measure_name: string (nullable = true)
 |-- measure_1_name: string (nullable = true)
 |-- measure_2_name: string (nullable = true)
 |-- calculation_type_description: string (nullable = true)
 |-- lower_limit: string (nullable = true)
 |-- upper_limit: string (nullable = true)



### Helper functions

In [13]:
# def filter_cycle_measures(df, stages, vessel):
#     """Filters the cycle measures data based on the specified stages appearing in MAX_CYCLE_STAGE"""
#     stages = stages.split(',')
#     return df.filter((col("MAX_CYCLE_STAGE").isin(stages)) & (col("VESSEL_CODE") == vessel))

### Calculation functions

In [14]:
def subtract_columns(df, column_pair):
    """Subtracts c specified in column_pair[1] from column_pair[0] within df.  
    Result is assigned to a new column_difference variable """
    df = df.withColumn("column_difference", col(column_pair[0]) - col(column_pair[1]))
    return df

In [15]:
def avg_calc(df, target_measure, upper_limit=None, lower_limit=None):
    """calculates an average of the specified column"""
    return (df.agg(F.avg(col(target_measure))).collect())[0][0]

In [16]:
def min_calc(df, target_measure, upper_limit=None, lower_limit=None):
    """calculates a minimum of the specified column"""
    return (df.agg(F.min(col(target_measure))).collect())[0][0]

In [17]:
def max_calc(df, target_measure, upper_limit=None, lower_limit=None):
    """calculates a minimum of the specified column"""
    return (df.agg(F.max(col(target_measure))).collect())[0][0]

In [18]:
def stddev_calc(df, target_measure, upper_limit=None, lower_limit=None):
    """calculats the standard deviation of the specified column"""
    return (df.agg(F.stddev(col(target_measure))).collect())[0][0]

In [19]:
def range_calc(df, target_measure, upper_limit=None, lower_limit=None):
    max_value = (df.agg(F.max(col(target_measure))).collect())[0][0]
    min_value = (df.agg(F.min(col(target_measure))).collect())[0][0]
    return max_value - min_value

In [20]:
def time_window_calc(df, target_measure, upper_limit=None, lower_limit=None):
    """Finds the range in minutes between the first and last value of a datetime column"""
    df = df.withColumn(target_measure, df[target_measure].cast("timestamp"))
    last_timestamp = (df.agg(F.max(col(target_measure))).collect())[0][0]
    first_timestamp = (df.agg(F.min(col(target_measure))).collect())[0][0]
    result = (last_timestamp - first_timestamp).seconds // 60
    return result

In [21]:
def osc_counter(over_upper, below_lower):
    """Handles the oscillation counting"""
    check_upper = True
    check_lower = False
    osc_count = 0
    for i in range(len(over_upper)):
        if check_upper:
            if over_upper[i]:
                osc_count += 1
                check_upper = False
                check_lower = True
        if check_lower:
            if below_lower[i]:
                osc_count += 1
                check_upper = True
                check_lower = False
    return osc_count

In [22]:
def count_oscillations(df, target_measure, upper_limit=None, lower_limit=None):
    """Calculates the oscillation count on the specified column, based on [column_avg + upper_limit, column_avg + lower_limit]"""
    
    column_avg = (df.agg(F.avg(col(target_measure))).collect())[0][0]
    df = df.withColumn("upper_thresh", F.lit(column_avg + upper_limit))
    df = df.withColumn("lower_thresh", F.lit(column_avg + lower_limit))
    
    df = df.withColumn("over_upper", df[target_measure] > df["upper_thresh"])
    df = df.withColumn("below_lower", df[target_measure] < df["lower_thresh"])
    over_upper = [v[0] for v in df.select("over_upper").collect()]
    below_lower = [v[0] for v in df.select("below_lower").collect()]
    
    result = osc_counter(over_upper, below_lower)
    return result

In [23]:
def find_start_value(column):
    """Finds the first value in the column array"""
    pass

In [24]:
def find_stop_value(column):
    """Finds the last value in the column array"""
    pass

In [25]:
def time_above_upper_limit(column):
    """Calculates the amount of cycle time (in minutes) that the Values[ ] array is 
    greater than the specified Upper Limit."""
    pass

In [26]:
def time_below_lower_limit(df, target_measure, upper_limit=None, lower_limit=None):
    """Calculates the amount of cycle time (in minutes) that the Values[ ] array is 
    below the specified Lower Limit."""
    # Make MEASURE_TIMESTAMP a timestamp type:
    df = df.withColumn("MEASURE_TIMESTAMP", df["MEASURE_TIMESTAMP"].cast("timestamp"))
    # flag where we have the target measure below the set threshold:
    df = df.withColumn('time_below', (df[target_measure] < lower_limit).cast('integer'))
    
    # use lag analytic function to get the next timestamp for all records:
    lagwindow = Window.orderBy('MEASURE_TIMESTAMP')
    df = df.withColumn('lagged_time', (lag('MEASURE_TIMESTAMP', -1).over(lagwindow)).cast("timestamp"))
    
    # filter to just look at records where the time_below flag has been set, and calculate the diff
    df = df.filter(col('time_below') == 1)
    time_incr = F.unix_timestamp('lagged_time') - F.unix_timestamp('MEASURE_TIMESTAMP') 
    df = df.withColumn('time_below_incr', time_incr)
    
    result = df.agg(F.sum(col('time_below_incr'))).collect()[0][0] // 60
    return result

### Functions map

In [27]:
# dictionary mapping calculation descriptions to functions. Uses built-in functions where possible and defines custom functions where not.
calculation_map = {
    'Average': avg_calc,
    'Minimum': min_calc,
    'Maximum': max_calc,
    'Standard Deviation': stddev_calc,
    'Start Value': find_start_value,
    'Stop Value': find_stop_value,
    'Time Window Calc': time_window_calc,
    'Max - Min': range_calc,
    'Oscillation Counter': count_oscillations,
    'Time Above Upper Limit': time_above_upper_limit,
    'Time Below Lower Limit': time_below_lower_limit
}

In [28]:
# test passing functions from the dict:
res = calculation_map['Average'](cycle_measures, "CYCLE_TIME")
res

48.40244376451706

### Evaluation orchestrator

In [29]:
def run_evaluation(df, calc_type, calculation_map, measure_a, measure_b, upper_limit, lower_limit):
    """Orchestrates calculation given the calc_type, calculation_map, and selected measures a and b"""
    if not measure_b: # only have one metric to calculate
        calculated_value = calculation_map[calc_type](df, measure_a, upper_limit, lower_limit)

    else:
        df = subtract_columns(df, [measure_a, measure_b])
        calculated_value = calculation_map[calc_type](df, 'column_difference', upper_limit, lower_limit)
    
    return calculated_value

In [30]:
# group eval_data by the relevant fields listed below
# append other fields from cycle_eval group
# add fields calculated from process
# add admin fields like created_by . . .

In [31]:
#   4 AS CYCLE_CHECK_ROWID,
#   FACILITY_CODE, # from cycle_eval_group
#   VESSEL_CODE, # from cycle_measures
#   CYCLE_CODE, # from cycle_eval_group
#   CYCLE_STATUS_CODE, # from cycle_measures
#   EXPOSURE_STATUS_CODE, # from cycle_measures
#   FILENAME AS CYCLE_RUN_ID, # from cycle_measures
#   TIMESTAMP_CYCLE_START, # from cycle_measures
#   'PBWS 250F ABS LIMIT CHECK' AS CYCLE_EVAL_GROUP_NAME, # from cycle_eval_group
#   MAX_CYCLE_STAGE AS STAGE_NUM, # from cycle_eval_group
#   'Cycle Time' AS CYCLE_CHECK_MEASURE_NAME, # from cycle_eval_group
#   'Time Window' AS CALCULATION_TYPE_DESCR, # from cycle_eval_group
#   'Absolute Limits Check' AS STATUS_CHECK_DESCR, # from cycle_eval_group
#   MAX(CYCLE_TIME) - MIN(CYCLE_TIME) AS CALCULATED_VALUE, # set by process
#   50 AS UPPER_LIMIT_AMT, # from cycle_eval_group
#   10 AS LOWER_LIMIT_AMT, # from cycle_eval_group
#   NULL AS CALCULATION_LIMIT_AMT, # from cycle_eval_group
#   'N/A' AS PRIORITY_CODE, # from cycle_eval_group
#    STATUS_CHECK_RESULT, # set by process
#   CREATED_BY, # system
#   CREATED_DATE # current_timestamp

In [32]:
def get_cycle_measures_metadata(eval_data, output_fields):
    """group fields needed for output from cycle_measures data"""
    eval_data = eval_data.groupby(*output_fields).count()
    result = eval_data[output_fields].collect()
    return result

In [33]:
cycle_measures_outputs = ['VESSEL_CODE', 'CYCLE_STATUS_CODE', 'EXPOSURE_STATUS_CODE', 'FILENAME', 
                          'CYCLE_RUN_START_TIMESTAMP']

outputs = get_cycle_measures_metadata(eval_data=cycle_measures, output_fields=cycle_measures_outputs)
outputs[0:5]

[Row(VESSEL_CODE='2', CYCLE_STATUS_CODE='ACCEPT', EXPOSURE_STATUS_CODE='EC', FILENAME='V2M40D180728T091050_ACCEPT_EC_NC', CYCLE_RUN_START_TIMESTAMP=datetime.datetime(2018, 7, 28, 9, 10, 50)),
 Row(VESSEL_CODE='2', CYCLE_STATUS_CODE='ACCEPT', EXPOSURE_STATUS_CODE='EC', FILENAME='V2M483D180720T221705_ACCEPT_EC_NC', CYCLE_RUN_START_TIMESTAMP=datetime.datetime(2018, 7, 20, 22, 17, 5)),
 Row(VESSEL_CODE='2', CYCLE_STATUS_CODE='ACCEPT', EXPOSURE_STATUS_CODE='EC', FILENAME='V2M40D180727T121946_ACCEPT_EC_NC', CYCLE_RUN_START_TIMESTAMP=datetime.datetime(2018, 7, 27, 12, 19, 46)),
 Row(VESSEL_CODE='2', CYCLE_STATUS_CODE='ACCEPT', EXPOSURE_STATUS_CODE='EC', FILENAME='V2M395D180713T205603_ACCEPT_EC_NC', CYCLE_RUN_START_TIMESTAMP=datetime.datetime(2018, 7, 13, 20, 56, 3)),
 Row(VESSEL_CODE='2', CYCLE_STATUS_CODE='ACCEPT', EXPOSURE_STATUS_CODE='EC', FILENAME='V2M40D180725T200828_ACCEPT_EC_NC', CYCLE_RUN_START_TIMESTAMP=datetime.datetime(2018, 7, 25, 20, 8, 28))]

### Run calculations

In [34]:
# list of evaluation calc metadata:
#cycle_eval_filt = cycle_eval.filter(col("calculation_type_description") == 'Time Below Lower Limit')
# testing adding a comment
evals = cycle_eval.collect()
evals[0:5]

[Row(facility_code='NC', cycle_code='126', stage_number='3,4', calculation_limit='', check_evaluation_group='PBWS 250F Abs Limit Check', cycle_check_measure_name='Cycle Time', measure_1_name='', measure_2_name='', calculation_type_description='Time Window Calc', lower_limit='10', upper_limit='50'),
 Row(facility_code='NC', cycle_code='126', stage_number='4', calculation_limit='', check_evaluation_group='PBWS 250F Abs Limit Check', cycle_check_measure_name='Cycle Time', measure_1_name='', measure_2_name='', calculation_type_description='Time Window Calc', lower_limit='10', upper_limit='50'),
 Row(facility_code='NC', cycle_code='126', stage_number='4', calculation_limit='', check_evaluation_group='PBWS 250F Abs Limit Check', cycle_check_measure_name='Left Aux - Temp Setpt', measure_1_name='Left Aux Temperature', measure_2_name='Vessel Temperature Setpt', calculation_type_description='Average', lower_limit='-10', upper_limit='10'),
 Row(facility_code='NC', cycle_code='126', stage_number='

In [35]:
# set of vessels to use for calculations:
vessels_list = [v[0] for v in cycle_measures.select("VESSEL_CODE").distinct().collect()]
vessels_list[0:5]

['1', '2']

In [36]:
def run_evaluation_checks(cycle_measures, evaluation_list, vessels_list, debug=False):
    """Runs cycle evaluation alerts across cycle measures data for each vessel in a facility according to criteria in 
    cycle evaluation_list"""
    for v in vessels_list:
        print('Running calculation for vessel {}'.format(v))
        eval_data = cycle_measures
        eval_data = eval_data.filter(col("VESSEL_CODE") == v)
    
        for e in evaluation_list:
            # definitions of the checks to run based on the records in cycle_eval_group:
            facility_code, cycle_code, stage_number, calculation_limit, check_evaluation_group, cycle_check_measure_name, \
              measure_1_name, measure_2_name, calculation_type_description, lower_limit, upper_limit = \
                e['facility_code'], e['cycle_code'], e['stage_number'], e['calculation_limit'], e['check_evaluation_group'], \
                e['cycle_check_measure_name'], e['measure_1_name'], e['measure_2_name'], e['calculation_type_description'], \
                e['lower_limit'], e['upper_limit']
            
            # sometimes we don't get a definition of measure_1_name (could COALESCE in the SQL to build this input)
            if measure_1_name is None or measure_1_name == '':
                measure_1_name = cycle_check_measure_name
            
            if upper_limit is None:
                upper_limit = -1
            
            # Remove these when we get the input format right:
            upper_limit = float(upper_limit) 
            lower_limit = float(lower_limit)
            
            # other fields we need for the output:
            max_stage_num = stage_number.split(',')[-1] # stage_number arrives as a list and we want the last value to filter cycle_measures
    
            # TODO: Convert values in metrics data to match casing on column names in cycle_measures data:
            measure_1_name = measure_1_name.upper().replace(' ', '_')
            measure_2_name = measure_2_name.upper().replace(' ', '_')
            
            # filter eval_data based on definitions in cycle_eval_group:
            if not debug:
                eval_data = eval_data.filter((col("MAX_CYCLE_STAGE").isin(max_stage_num)) & \
                                         (col("CYCLE_CODE") == cycle_code))
    
            if eval_data.count() > 0: # only run calcs if we have data:
                print('Calculation used: {}'.format(calculation_type_description))
                print('Measures used: 1) {} and 2) {}'.format(measure_1_name, measure_2_name))
                
                calculated_value = run_evaluation(eval_data, calculation_type_description, calculation_map, 
                                                  measure_1_name, measure_2_name, upper_limit, lower_limit)
                
                status_check_result = True if calculated_value < upper_limit and calculated_value > lower_limit else False
        
            else:
                calculated_value = -1
                status_check_result = False
    
            print('Result: {}'.format(calculated_value))
    

In [37]:
run_evaluation_checks(cycle_measures, evals, vessels_list, debug=True)

Running calculation for vessel 1
Calculation used: Time Window Calc
Measures used: 1) CYCLE_TIME and 2) 
Result: 1
Calculation used: Time Window Calc
Measures used: 1) CYCLE_TIME and 2) 
Result: 1
Calculation used: Average
Measures used: 1) LEFT_AUX_TEMPERATURE and 2) VESSEL_TEMPERATURE_SETPT
Result: -35.1023059215747
Calculation used: Average
Measures used: 1) RIGHT_AUX_TEMPERATURE and 2) VESSEL_TEMPERATURE_SETPT
Result: -35.561823305286474
Calculation used: Average
Measures used: 1) VESSEL_TEMPERATURE and 2) VESSEL_TEMPERATURE_SETPT
Result: -38.21365296078768
Calculation used: Average
Measures used: 1) LEFT_AUX_TEMPERATURE and 2) 
Result: 184.5825607239235


KeyboardInterrupt: 