To decide if the tool has a battery, the code checks the tool code in the headers.

**Criteria:**
- The tool must have a valid tool code: 'MP34', 'MP36', 'MP38', or 'MP39' (Tool si SonicScope or not)
- If the tool code is not one of these, the function returns early and does not calculate battery usage.

**Criteria to start calculating battery usage:**
- Required data channels (e.g., BATA_V, STMP, CFG_ID, LTB_V) must be present in the input data.
- Battery usage is calculated only when the battery is ON, defined by:
- LTB_V <= 18 and BATA_V > 1.0 (or BATB_V > 1.0 for Battery B).

If these conditions are met, the code proceeds to calculate battery usage.

In [None]:
"""
Battery Remaining Useful Life Model
note: MP475 has two batteries used in parallel, usage shall be the same
calculate Amp hour then divided by 2 for two batteries case.
three important mappings
1) CFG_ID -> record rate
2) record rate -> current
3) temperature -> battery life

must have channels: BATA_V, STMP, CFG_ID, LTB_V


Changes:
    2021-10-27: Brian Bai, first version (with manual input options)
    1) Added manual input of batteries Serial Numbers, pre-run usage
    2) Calculate used battery life of current run
    3) Add techlog plots for battery voltage & battery current
    4) Calculation of battery A and battery B seperately
    5) Cap current usage and cumulative usage to 100%
    2022-02: Brian Bai
    1) replace manual input of record rate by getting data from headers['configurationFile']
     e.g., headers['configurationFile']['file28']['scanRate']['Quadrupole']
        {'file28': {'scanRate': {'SYSTEM': 2,
            'TEST SIG': 60,
            'Monopole-H': 10,
            'Monopole-L': 10,
            'Quadrupole': 10}},
        'file29': {'scanRate': {'SYSTEM': 2,
            'TEST SIG': 60,
            'Monopole-H': 15,
            'Monopole-L': 15,
            'Quadrupole': 15}},
        'file30': {'scanRate': {'SYSTEM': 2,
            'TEST SIG': 60,
            'Monopole-H': 2,
            'Monopole-L': 2,
            'Quadrupole': 2}}}
    2023-09: Karolina Sobczak-Oramus
    1) update the information read from headers to the new json format
    use the info from: headers['configuration']['scanRateGroups']
    where:
          - "configurationRecordId" is in [28, 29, 30]
          - "scanRateGroupName": "Quadrupole"
        {
            'configurationRecordId': 30, 
            'scanRateGroupName': 'Quadrupole', 
            'scanRateGroupInterval': 20, 
            'recordId': 122, 
            'recordName': 'QPI_WF11', 
            'recordInterval': 20, 
            'channels': ['WQP-11']
        }
    depending on the run, there will be one or more configuration_record_id
    {28, 29, 30}, all of the available are used.
    2024-08 Tess Peng
    1) schema for configurationRecord updated
"""

import os
import pandas as pd
import numpy as np
import json
import pickle
from sonicscope.battery_plot_functions import techlog_lines

from pylon import summary, limit, Status, results, info


CHANNELS = ['BATA_I', 'BATB_I', 'BATA_V', 'BATB_V',
            'STMP', 'CFG_ID', 'LTB_V']

The purpose of the **<check_>** function is to validate input data and compute battery usage statistics for a tool run.

- It checks for required data channels in the input.
- Reads manual battery info (serial numbers, pre-usage) from a JSON file.
- Validates the tool code.
- Calculates battery usage for Battery A (and Battery B if applicable).
- Generates techlog plots for battery voltage/current and temperature.
- Prepares summary and limit information for reporting, including cumulative usage and pass/fail status.

The function returns a result object containing all relevant battery usage information for the run.

In [None]:

def check(source: pd.DataFrame, headers: dict):
    # check channels in source
    data_channels = []
    for item in CHANNELS:
        if item in source.columns:
            data_channels.append(item)

    manual_data = get_manual(headers)
    # temporary solution, better to get historical data (not available now)
    # battery pre-usage is 100% (new battery) - life before the run
    battery_life_a = manual_data[manual_data['key']
                              == 'battery_a_life_per']['value']
    battery_life_b = manual_data[manual_data['key']
                              == 'battery_b_life_per']['value']
    battery_a_sn = manual_data[
        manual_data['key'] == 'battery_a_sn']['value']
    battery_b_sn = manual_data[
        manual_data['key'] == 'battery_b_sn']['value']
    
    if len(battery_life_a.index) > 0 and len(battery_a_sn.index) > 0:
        pre_usage_a = 100 - battery_life_a.values[0]
        battery_a_sn = battery_a_sn.values[0]
    else:
        pre_usage_a = 0
        battery_a_sn = 'N/A'
    if len(battery_life_b.index) > 0:
        pre_usage_b = 100 - battery_life_b.values[0]
        battery_b_sn = battery_b_sn.values[0]
    else:
        pre_usage_b = 0
        battery_b_sn = 'N/A'

    toolcode = headers['toolCode']
    toolcode_formatted = toolcode.strip().upper()
    # check tool code
    if not('MP34' in toolcode_formatted) and \
        not('MP36' in toolcode_formatted) and \
        not('MP38' in toolcode_formatted) and \
        not('MP39' in toolcode_formatted):
        my_info = [
            info(
                key='toolcode',
                name='Tool code',
                value_string=f'{toolcode} is invalid.'
            )
        ]

        result = results(
            cmd_id=headers['cmdId'],
            information=my_info)

        return result

    # compute current battery usage
    data = source[data_channels].dropna(subset=['BATA_V'])
    current_usage = compute_usage(
        data, 'BATA_V', toolcode, headers, manual_data)

    # techlog plots
    bokehplot_location = headers["folder"]

    dict_battery_a = [
        dict(channel='STMP', title='STMP: tool temperature', y_label='STMP (degC)'),
        dict(channel='LTB_V', title='LTB_V: LTB voltage', y_label='LTB_V (V)'),
        dict(channel='BATA_V', title='BATA_V: battery A voltage',
             y_label='BATA_V (V)'),
        dict(channel='BATA_I', title='BATA_I: battery A current',
             y_label='BATA_I (mA)'),
    ]
    battery_a_plot = techlog_lines(
        data,
        dict_battery_a,
        output_html_name="battery_a_plot",
        output_location=bokehplot_location)

    # add battery serial numbers into result
    my_info = [
        info(
            key='toolcode',
            name='Tool code',
            value_string=toolcode
        ),
        info(
            key='battery_asn',
            name='Battery A serial number',
            value_string=str(battery_a_sn)
        ),
        info(
            key='battery_a_plot',
            name='Battery A techlog plots',
            value_string='',
            chart=str(battery_a_plot)
        ),
        info(
            key='avg_stmp',
            name='Mean tool temperature of current run',
            value_string=str(round(source['STMP'].mean(), 2))
        )
    ]

    total_usage_a = float(pre_usage_a) + current_usage['usage']
    if total_usage_a >= 100:
        total_usage_a = 100

    if 'MP34' in toolcode.strip().upper():
        # append techlog plots
        dict_battery_b = [
            dict(channel='STMP', title='STMP: tool temperature',
                 y_label='STMP (degC)'),
            dict(channel='LTB_V', title='LTB_V: LTB voltage', y_label='LTB_V (V)'),
            dict(channel='BATB_V', title='BATB_V: battery B voltage',
                 y_label='BATB_V (V)'),
            dict(channel='BATB_I', title='BATB_I: battery B current',
                 y_label='BATB_I (mA)'),
        ]
        battery_b_plot = techlog_lines(
            data,
            dict_battery_b,
            output_html_name="battery_b_plot",
            output_location=bokehplot_location)

        # append battery B SN
        my_info.extend([
            info(
                key='battery_bsn',
                name='Battery B serial number',
                value_string=str(battery_b_sn)
            ),
            info(
                key='battery_b_plot',
                name='Battery B techlog plots',
                value_string='',
                chart=str(battery_b_plot)
            )
        ])
        # cumulative usage of battery A and battery B
        data = source[data_channels].dropna(subset=['BATB_V'])
        usage_b = compute_usage(data, 'BATB_V', toolcode, headers, manual_data)
        total_usage_b = float(pre_usage_b) + usage_b['usage']

        if total_usage_b >= 100:
            total_usage_b = 100

        my_limits = [
            limit(
                key='cumusage_a',
                name='Battery A: total cumulative percentage life used',
                value=total_usage_a,
                limit_value=70,
                unit='%',
                status=Status.Pass
                if total_usage_a < 70 else Status.Failed),
            limit(
                key='cumusage_b',
                name='Battery B: total cumulative percentage life used',
                value=total_usage_b,
                limit_value=70,
                unit='%',
                status=Status.Pass
                if total_usage_a < 70 else Status.Failed),
        ]
    else:
        # cumulative usage of battery A, draw power from one battery
        my_limits = [
            limit(
                key='cumusage_a',
                name='Battery A: total cumulative percentage life used',
                value=total_usage_a,
                limit_value=70,
                unit='%',
                status=Status.Pass
                if total_usage_a < 70 else Status.Failed)
        ]

    my_summaries = get_summaries(current_usage, toolcode)

    result = results(
        cmd_id=headers['cmdId'],
        information=my_info,
        summaries=my_summaries,
        limits=my_limits)

    return result


The purpose of the **<get_manual function>** is to read manual battery information (such as serial numbers and pre-usage values) from a JSON file.

In [None]:

def get_manual(headers: dict):
    """
    read manual input battery information from manual_input.json
    e.g.
    key                   type      value
    battery_a_sn          string    8039
    battery_a_preusage    number    10
    battery_b_sn          string    8045
    battery_b_preusage    number    10

    Note: this is temporary solution, 
    better solution: get battery SN from Qtrac, 
    Quadrupole scan rate from dump file for file 28, 29 and 30 

    Paramters:
        headers: <dict>

    Returns:
        manual_data: <pd.DataFrame>
    """
    manual_fname = headers['folder'].split('\\')[-1] + '_manual_input.json'
    manual_data = pd.DataFrame()
    parent_folder = os.path.join(headers["folder"], "..")

    if manual_fname in os.listdir(parent_folder):
        with open(os.path.join(headers["folder"], "..", manual_fname)) as file:
            manual_data = pd.read_json(file)

    if not manual_data.empty:
        # get battery serial number and preusage
        mask = manual_data.apply(lambda x: x.key.startswith('battery'), axis=1)
        manual_data = manual_data[mask]

    return manual_data



The purpose of the **<get_recordrate>** function is to extract the Quadrupole scan rate for each configuration code (CFG_ID) from the headers.

- It searches the headers for Quadrupole scan rates and updates the mapping for each relevant configuration.
- If a configuration code is not found, it uses the default value.
- This mapping is used later to associate each CFG_ID with its correct scan rate for battery usage calculations.

In [None]:

def get_recordrate(headers: dict):
    """
    read cfg code and corresponding Quadrupole scan rate of each code
    Note: Quadrupole scan rate is not available in the headers now
    Corner case, code not in [28,29,30], use code=0
    e.g., cfg data in headers
    {
        'Slow_Cfg': 29,
        'Medium_Cfg': 28,
        'Fast_Cfg': 30,
    }

    Paramters:
        headers: <dict>

    Returns:
        record_rates: <dict>
    """

    # get from headers: Quadrupole record rate of each code
    record_rates = {
        28: 1,
        29: 1,
        30: 1,
        0: 0
    }
 
    scan_rates = headers['configurationRecord']['scanRates']

    for scan_rate in scan_rates:
        if scan_rate['name'] == 'Quadrupole':
            for record in scan_rate['records']:
                for file in record['value']:
                    file_int = int(file.split('_')[-1])
                    # check 28, 29 and 30 configurations for Quadrupole
                    # save the record intervals
                    if file_int in record_rates:
                        record_rates[file_int] = record['value'][file]
                break
            break
    
    return record_rates



The purpose of the **<map_rate2current>** function is to create a mapping between scan rates and battery current values, depending on the tool code.

- For tool codes MP36, MP38, or MP39, it uses the "current0" values.
- For tool code MP34, it uses the "current1" values.
- If the scan rate is not recognized, it maps scan rate 0 to a default current value (0.05A).

In [None]:

def map_rate2current(toolcode):
    """
    given toolcode, extract the scan rate to current map
    Note: corner case, if scan rate not in 
    [1, 2, 4, 5, 6, 10, 15, 20, 30, 60], map 0 to current=0.05A

    Paramters:
        toolcode: <string>

    Returns:
        rate_current: <dict>, key:value, scan_rate: current
    """
    rate_current = {}
    second = [0, 1, 2, 4, 5, 6, 10, 15, 20, 30, 60]
    current0 = [0.05, 0.470, 0.490, 0.620, 0.590,
                0.550, 0.542, 0.478, 0.446, 0.414, 0.382]
    current1 = [0.05, 0.418, 0.431, 0.516, 0.496,
                0.470, 0.465, 0.423, 0.402, 0.382, 0.361]

    toolcode_formatted = toolcode.strip().upper()
    if 'MP36' in toolcode_formatted or \
        'MP38' in toolcode_formatted or \
        'MP39' in toolcode_formatted:
        rate_current = {key: value for key, value in zip(second, current0)}
    if 'MP34' in toolcode_formatted:
        rate_current = {key: value for key, value in zip(second, current1)}

    return rate_current



The purpose of the **<map_temp2life>** function is to create a mapping between temperature bins and battery service life values, depending on the tool code.

- For tool codes MP36, MP38, or MP39, it maps each temperature bin (e.g., '0-40', '41-60', etc.) to a value from "service0".
- For tool code MP34, it maps each temperature bin to a value from "service1".

In [None]:

def map_temp2life(toolcode):
    """
    given toolcode, map temperature bin to battery life

    Paramters:
        toolcode: <string>

    Returns:
        temp_life: <dict>, key:value, temp_bin: life
    """
    temp_life = {}
    temp_bin = ['0-40', '41-60', '61-80',
                '81-100', '101-120', '121-140', '141-160']
    service0 = [51.6, 84.6, 98.7, 82.0, 86.0, 77.0, 77.0]
    service1 = [12.0, 33.0, 37.0, 33.0, 29.0, 30.0, 30.0]

    toolcode_formatted = toolcode.strip().upper()
    if 'MP36' in toolcode_formatted or \
        'MP38' in toolcode_formatted or \
        'MP39' in toolcode_formatted:
        temp_life = {key: value for key, value in zip(temp_bin, service0)}
    if 'MP34' in toolcode_formatted:
        temp_life = {key: value for key, value in zip(temp_bin, service1)}

    return temp_life



The purpose of the **<compute_usage>** function is to calculate how much battery life was used during the current run.

It filters the data to periods when the battery is ON.
It maps configuration IDs to record rates and currents, and temperature bins to battery service life.
It aggregates the data and computes amp-hours used, ON hours, and percentage of battery life consumed.

Calculate amp-hours and usage:
For each group (temp bin), it calculates amp-hours used and divides by service life to get usage.

**SKK comment:**
- We need to extend the CFG_ID values, current code does not account for **CFG_ID=[31,32,33, 34]**. Current logic is if CGF_ID is not in [28,29,30], then usage of battery is set to default consumption (0.05A sleep/standby mode), which has to be corrected.

In [None]:

def compute_usage(data: pd.DataFrame,
                  vol_channel,
                  toolcode: str,
                  headers: dict,
                  manual_data: pd.DataFrame):
    """
    MP36/8/9 toolcodes -> one battery
    MP34 toolcodes -> two batteries

    Paramters:
        data: <pd.DataFrame>
        vol_channel: <string>
        toolcode: <string>
        headers: <dict>
        manual_data: <pd.DataFrame>

    Returns:
        current_usage: <dict>, 
        usage: % of life used in current run
    """
    current_usage = {
        'amphrs': 0.0,
        'onhrs': 0.0,
        'usage': 0.0
    }

    # step 2:get scan rate from headers
    record_rate = get_recordrate(headers)
    rate_current = map_rate2current(toolcode)
    # step 4:get temperature to life map
    temp_life = map_temp2life(toolcode)

    # compute current battery usage
    # LTB_V <=18 & BATA_V>1.0, battery ON
    battery_on = data[(data['LTB_V'] <= 18) & (data[vol_channel] > 1.0)].copy()
    if not battery_on.empty:
        battery_on['CFG_ID'].fillna(method='ffill', inplace=True)
        # CFG_ID code not in [28, 29, 29], map to 0 (current=0.05A)
        battery_on.loc[~battery_on['CFG_ID'].isin([28, 29, 30]), 'CFG_ID'] = 0
        battery_on["V_smoothed"] = battery_on.rolling("1min", min_periods=5)[
            vol_channel].agg("mean")
        # temperature bins
        temp_bins = [0, 40, 60, 80, 100, 120, 140, 160]
        bin_labels = ['0-40', '41-60', '61-80',
                      '81-100', '101-120', '121-140', '141-160']
        battery_on['temp_cut'] = pd.cut(
            battery_on.STMP, bins=temp_bins, labels=bin_labels)
        # aggregate by CFG_ID, temp_cut
        battery_agg = battery_on.groupby(['temp_cut', 'CFG_ID'])[
            'V_smoothed'].count()
        battery_agg = battery_agg.reset_index()
        # map CFG_ID to record rate
        battery_agg['record_rate'] = battery_agg['CFG_ID'].apply(
            lambda x: record_rate[x])
        # map record rate to current
        battery_agg['current'] = battery_agg['record_rate'].apply(
            lambda x: rate_current[x])
        # map temperature to service life
        battery_agg['service_life'] = battery_agg['temp_cut'].apply(
            lambda x: temp_life[x])
        # compute the current usage
        time_interval = get_interval(battery_on, vol_channel)
        battery_agg['amp_hour'] = (
            battery_agg['current']*battery_agg['V_smoothed']*time_interval)/3600
        battery_agg['usage'] = battery_agg['amp_hour'] / \
            battery_agg['service_life']

        current_usage['amphrs'] = battery_agg['amp_hour'].sum()
        current_usage['onhrs'] = battery_agg['V_smoothed'].sum() * \
            time_interval/3600
        current_usage['usage'] = battery_agg['usage'].sum() * \
            100  # convert to percentage

        if current_usage['usage'] >= 100.0:
            current_usage['usage'] = 100.0
        # format float
        current_usage['amphrs'] = round(current_usage['amphrs'], 2)
        current_usage['onhrs'] = round(current_usage['onhrs'], 2)
        current_usage['usage'] = round(current_usage['usage'], 2)

    return current_usage



The purpose of the **<get_summaries>** function is to generate a summary report of battery usage for the current run.

For tools with two batteries (MP34), it creates summary entries for both Battery A and Battery B, including amp hours used, ON hours, and percentage of life used.
For other tools, it creates summary entries only for Battery A.
These summaries are returned as a list and are used for reporting or displaying battery usage statistics.

In [None]:

def get_summaries(current_usage: dict, toolcode: str):
    """
    MP36/8/9 toolcodes -> one battery
    MP34 toolcodes -> two batteries

    Paramters:
        current_usage: <dict>

    Returns:
        my_summaries: <list of summary>
    """

    if 'MP34' in toolcode.strip().upper():

        battery_a = [
            summary(
                key='amphrs_a',
                name='Battery A: Amp hours used on current run',
                value=current_usage['amphrs'],
                unit='Amp.hrs'
            ),
            summary(
                key='onhrs_a',
                name='Battery A: battery ON hours of current run',
                value=current_usage['onhrs'],
                unit='hrs'
            ),
            summary(
                key='currentusage_a',
                name='Battery A: Percentage life used of current run',
                value=current_usage['usage'],
                unit='%'
            )
        ]

        battery_b = [
            summary(
                key='amphrs_b',
                name='Battery B: Amp hours used on current run',
                value=current_usage['amphrs'],
                unit='Amp.hrs'
            ),
            summary(
                key='onhrs_b',
                name='Battery B: battery ON hours of current run',
                value=current_usage['onhrs'],
                unit='hrs'
            ),
            summary(
                key='currentusage_b',
                name='Battery B: Percentage life used of current run',
                value=current_usage['usage'],
                unit='%'
            )
        ]
        battery_a.extend(battery_b)
        my_summaries = battery_a

    else:
        my_summaries = [
            summary(
                key='amphrs_a',
                name='Battery A: Amp hours used on current run',
                value=current_usage['amphrs'],
                unit='Amp.hrs'
            ),
            summary(
                key='onhrs_a',
                name='Battery A: battery ON hours of current run',
                value=current_usage['onhrs'],
                unit='hrs'
            ),
            summary(
                key='currentusage_a',
                name='Battery A: Percentage life used of current run',
                value=current_usage['usage'],
                unit='%'
            )
        ]

    return my_summaries




This function, **<get_interval>**, calculates the most frequent time interval (in seconds) between consecutive measurements for a given channel in a pandas DataFrame.

Purpose:
To determine the typical sampling interval for the data in the specified channel.

In [None]:

def get_interval(data: pd.DataFrame, channel):
    """
    get time interval of one channel (most frequent one)

    Parameters
        data: <dataframe>
            index: datetime
        channel: <string>

    Returns
        time_step
    """
    data = data[[channel]].dropna()
    if data.shape[0] > 100:
        time_diff = data.index[0:100].to_series().diff().dt.total_seconds()
    else:
        time_diff = data.index.to_series().diff().dt.total_seconds()

    return time_diff.value_counts().index[0]

# SKK Proposal:

1. Keep **LTB_V <= 18 and BATA_V > 1.0 (or BATB_V > 1.0 for Battery B)** as a criteria to start calculation of Battery RUL

2. **CFG_ID** extend to include all configurations, inlcuding new. Which are **CFG_ID = [28,29,30,31,32,33,34]**. This was not there, due to firmware upgrade at SonicScope, which happened after battery calculator development. So this is gap we need to close

3. Proposed criteria to flag battery depletion status:

In [None]:
# if battery life consumed >70%, battery should not be re-used and considered depleted (HSE and SQ reasons)
total_usage_a = float(pre_usage_a) + current_usage['usage']
    if total_usage_a >= 70:
        battery_depletion_status = True 

fault detection, if for any reason battery RUL calculator is not calculated correctly (human error, faulty battery)

In [None]:
# first , need to check if tool has a battery at any given point during the run, same logic for battery RUL calculator
if LTB_V < 18 AND BATA_V > 1

    # second, we need to check battery voltage when tool is powered on LTB power, because if the battery is faulty SonicScope will be OFF and nothing will be recorded.
    if LTB_V > 18 AND (BATA_V < 16 or BATB_V < 16):
        battery_depletion_status = True
    else:
        battery_depletion_status = False