# Simple Specification Compliance Calculation Example

This example demonstrates a basic compliance calculations for various specs inside products. It utilizes the **TestMonitor Service** to load the measurements for specs inside the product, the **Spec Service** to update the specs with compliance metrics, and the **File Service** to upload the generated Analysis report.

### How to use this notebook

1. Upload the specs to the product.
1. Upload the measurement data file to the product.
1. Run the BDC extraction notebook on the measurement data file.
1. Select one or more specs, click Analyze button in the tool bar, select this notebook and click Analyze.

### About the notebook

1. The notebook queries the parametric specs from the product.
1. For each of the parametric spec in the product, the valid measurement data is stored as an array.
1. Min, Max, Mean, Median, Standard deviation, Cp, Cpk and Health compliance is calculated for each spec.
1. Compliance selected specs will be updated as custom properties for respective specs and the compliance report is generated and uploaded to the product.

### Rules for condition mapping

1. If spec condition does not have unit, step condition has unit - it will not be considered for
   mapping.
1. If spec condition has unit and step condition does not have unit - it will be considered for
   mapping. Condition value will be assigned directly.
1. If spec condition does not have unit, step condition does not have unit - it will be considered
   for mapping. Condition value will be assigned directly without any unit conversion.
1. If spec condition and step condition both have the same unit - it will be considered for mapping.
   Condition value will be assigned directly.
1. If spec condition and step condition have different units with same base unit - it will be
   considered for mapping. Condition value will be converted to spec condition unit and assigned.
1. If spec condition and step condition have different units with different base unit - it will not
   be considered for mapping.

### Rules for step condition within spec condition

1. If the step condition value is not present - it can be considered as within range.
1. If the step condition value is present and within spec condition range and/or discrete array - it
   can be considered as within range.

### Rules for measurement data

1. If spec and measurement data does not have unit - the measurement data will be considered for
   compliance calculation.
1. If spec has unit and measurement data does not have unit or spec does not have unit and
   measurement data has unit - it will not be considered for compliance calculation.
1. If spec and measurement data has different base unit - it will not be considered for compliance
   calculation.
1. If spec and measurement data has unit - the measurement data will be converted from base unit to 
   the spec unit (Assuming measurement data will be stored in base unit. Ex:BDC extraction notebook
   will store the measurement data in base unit).
1. If measurement data for a row is empty - it will not be considered for compliance calculation.
1. If measurement data for a row is not convertible to Decimal - it will not be considered for
   compliance calculation.

### Rules for spec health calculation

1. If spec min value is null and spec max value is not null, spec health will be 'Pass' if max
   compliance <= spec max, else spec health will be 'Fail'.
1. If spec max value is null and spec min value is not null, spec health will be 'Pass' if min
   compliance >= spec min, else spec health will be 'Fail'.
1. If spec min and spec max value is null, spec health will be 'Pass'.
1. If both spec min and spec max value are not null, spec health will be 'Pass' when min and max
   compliance is within spec min and spec max, else spec health will be 'Fail'.

**Note: Any other cases not mentioned above may/may not work. The notebook is designed to work**
**only with the above mentioned cases**

### Known cases where notebook does not work
1. If a spec has two or more conditions with the same name but different units, the compliance might
   not be calculated properly, only one of these conditions will be displayed in the compliance
   excel report and the condition data might not match with the actual spec condition. (Ex:
   Spec01 has two condition entries vcc(V), Vcc(I))



### Imports

Import Python Modules for executing the notebook. The requests, aiohttp, systemlink and json library are used for communicating with various SystemLink Enterprise's endpoints. Pandas is used for building and handling dataframe. Scrapbook is used for running notebooks and recording data for the SystemLink Notebook Execution Service.

The SYSTEMLINK_API_KEY environment variable specifies an API key created for the user executinge this notebook, which provides Role Based Access Control to the various SystemLink APIs called by this notebook. The API key will expire after 24 hours.
The SYSTEMLINK_HTTP_URI environment variable gives the base URL to the SystemLink instance executing this notebook.

In [None]:
import json
import os
import re
from decimal import Decimal, InvalidOperation
from http import HTTPStatus
from typing import Any, Callable, Dict, List, Tuple

import aiohttp
import backoff
import pandas as pd
import requests
import scrapbook as sb
from ni_unit_converter import convert_from_base_unit, convert_to_base_unit
from requests import Response
from requests.exceptions import HTTPError
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from systemlink.clients.nitestmonitor import StepsApi
from systemlink.clients.nitestmonitor.models import StepsAdvancedQuery

requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

api_key = os.getenv("SYSTEMLINK_API_KEY")
systemlink_uri = os.getenv("SYSTEMLINK_HTTP_URI")

### Input Parameters

These are the parameters that the notebook expects to be passed in by SystemLink. For notebooks designed to be triggered by pressing the 'Analyze' button in SystemLink Specs grid inside product details page, they must tag the cell with 'parameters' and at minimum specify the following in the cell metadata using the JupyterLab Property Inspector (double gear icon):

```json
{
  "papermill": {
    "parameters": {
      "spec_ids": [],
      "product_id": ""
    }
  },
  "systemlink": {
    "namespaces": [],
    "parameters": [
      {
        "display_name": "spec_ids",
        "id": "spec_ids",
        "type": "string[]"
      },
      {
        "display_name": "product_id",
        "id": "product_id",
        "type": "string"
      }
    ],
    "version": 2
  },
  "tags": ["parameters"]
}
```

For more information on how parameterization works, review the [papermill documentation](https://papermill.readthedocs.io/en/latest/usage-parameterize.html#how-parameters-work)."


In [None]:
spec_ids = []
product_id = ""

### HTTP Status Codes and Constants

In [None]:
MAX_HTTP_RETRIES = 6
TIMEOUT_IN_SECONDS = 60
VERIFY_SSL_CERTIFICATE = False
QUERY_SPECS_TAKE_COUNT = 1000
QUERY_STEPS_TAKE_COUNT = 1000
HTTP_RETRY_CODES = [
    HTTPStatus.TOO_MANY_REQUESTS,
    HTTPStatus.INTERNAL_SERVER_ERROR,
    HTTPStatus.BAD_GATEWAY,
    HTTPStatus.SERVICE_UNAVAILABLE,
    HTTPStatus.GATEWAY_TIMEOUT
]

### API URL's

In [None]:
class ApiUrls:
    QUERY_PRODUCTS_URL = f"{systemlink_uri}/nitestmonitor/v2/query-products"
    UPDATE_PRODUCT_URL = f"{systemlink_uri}/nitestmonitor/v2/update-products"
    QUERY_SPECS_URL = f"{systemlink_uri}/nispec/v1/query-specs"
    UPDATE_SPECS_URL = f"{systemlink_uri}/nispec/v1/update-specs"
    QUERY_STEPS_URL = f"{systemlink_uri}/nitestmonitor/v2/query-steps"
    GET_PRODUCT_URL = f"{systemlink_uri}/nitestmonitor/v2/products"
    GET_FILE_PROPERTIES_URL = f"{systemlink_uri}/nifile/v1/service-groups/Default/files"
    UPLOAD_FILE_URL = f"{systemlink_uri}/nifile/v1/service-groups/Default/upload-files"

### API Utility functions


In [None]:
@backoff.on_exception(
    backoff.expo,
    HTTPError,
    max_tries=MAX_HTTP_RETRIES,
    giveup=lambda e: e.response.status_code not in HTTP_RETRY_CODES,
)
def retry_request(callable_function: Callable) -> Response:
    """Run the callable function and raise for status

    Args:
        callable_function (Callable): Callable request function

    Returns:
        Response: Callable function response
    """
    response = callable_function()
    response.raise_for_status()

    return response


def create_get_request(url: str, headers: Dict = None) -> Response:
    """Get request

    Args:
        url (str): API URL
        headers (Dict, optional): API request headers. Defaults to None.

    Returns:
        Response: Get request response
    """
    if not headers:
        headers = {}
    default_headers = {"x-ni-api-key": api_key}
    headers = {**default_headers, **headers}

    return retry_request(
        lambda: requests.get(
            url, headers=headers, verify=VERIFY_SSL_CERTIFICATE, timeout=TIMEOUT_IN_SECONDS
        )
    )


def create_post_request(url: str, body, headers: Dict = None) -> Response:
    """Post request

    Args:
        url (str): API URL
        body (_type_): Request body
        headers (Dict, optional): API request headers. Defaults to None.

    Returns:
        Response: Post request response
    """
    if not headers:
        headers = {}
    default_headers = {
        "accept": "application/json",
        "Content-Type": "application/json",
        "x-ni-api-key": api_key,
    }
    headers = {**default_headers, **headers}

    return retry_request(
        lambda: requests.post(
            url,
            json=body,
            headers=headers,
            verify=VERIFY_SSL_CERTIFICATE,
            timeout=TIMEOUT_IN_SECONDS,
        )
    )


def upload_file_request(url: str, file: Any, headers: Dict = None) -> Response:
    """Upload file post request

    Args:
        url (str): API URL
        file (Any): File content
        headers (Dict, optional): API request headers. Defaults to None.

    Returns:
        Response: Upload file request response
    """
    if not headers:
        headers = {}
    default_headers = {"accept": "application/json", "x-ni-api-key": api_key}
    headers = {**default_headers, **headers}

    return retry_request(
        lambda: requests.post(
            url, files=file, headers=headers, verify=False, timeout=TIMEOUT_IN_SECONDS
        )
    )

### Constants

In [None]:
class ApiBody:
    PRODUCT_IDS = "productIds"
    TAKE = "take"
    TYPE = "type"
    PARAMETRIC = "PARAMETRIC"
    SPECS = "specs"
    CONTINUATION_TOKEN = "continuationToken"
    JSON = "JSON"
    RESPONSE_FORMAT = "responseFormat"
    SPEC_ID = "SpecID"
    STEP_ID = "STEP_ID"
    FILTER = "filter"
    RESULT_FILTER = "resultFilter"
    ORDER_BY = "orderBy"
    FILE_IDS = "fileIds"
    PART_NUMBER = "partNumber"
    DATA = "data"
    PARAMETERS = "parameters"
    ANY = "Any"
    IT = "it"
    PROJECTION = "projection"
    PRODUCTS = "products"
    REPLACE = "replace"
    FALSE = "false"
    WORKSPACE = "workspace"


class ApiResponse:
    TYPE = "type"
    ID = "id"
    SPEC_ID = "specId"
    SPECS = "specs"
    STEPS = "steps"
    PARAMETRIC = "PARAMETRIC"
    PRODUCTS = "products"
    AVAILABLE_FILES = "availableFiles"
    WORKSPACE = "workspace"
    FILE_IDS = "fileIds"
    UPDATED_AT = "updatedAt"
    URI = "uri"
    EMPTY = "Empty"


class Projection:
    INPUTS = "INPUTS"
    DATA = "DATA"
    STEP_ID = "STEP_ID"


class Spec:
    SPECID = "specId"
    CATEGORY = "category"
    BLOCK = "block"
    SYMBOL = "symbol"
    LIMIT = "limit"
    CONDITIONS = "conditions"
    NAME = "name"
    VALUE = "value"
    RANGE = "range"
    MIN = "min"
    MAX = "max"
    STEP = "step"
    TYPICAL = "typical"
    UNIT = "unit"
    DISCRETE = "discrete"
    PROPERTIES = "properties"


class Step:
    INPUTS = "inputs"
    NAME = "name"
    VALUE = "value"
    DATA = "data"
    PARAMETERS = "parameters"
    MEASUREMENT = "measurement"
    SPEC_ID = "SpecID"
    UNITS = "units"


class DataframeHeaders:
    SPEC_DETAILS = "Spec Details"
    SPEC_ID = "SpecID"
    CATEGORY = "Category"
    BLOCK = "Block"
    SYMBOL = "Symbol"
    NAME = "Name"
    MIN = "Min"
    TYPICAL = "Typical"
    MAX = "Max"
    UNIT = "Unit"
    DISCRETE = "Discrete"
    STEP = "Step"
    CONDITION = "Condition"
    COMPLIANCE = "Compliance"
    CP = "Cp"
    CPK = "Cpk"
    STANDARD_DEVIATION = "Standard Deviation"


class ComplianceParameters:
    MIN = "Min"
    MAX = "Max"
    MEAN = "Mean"
    MEDIAN = "Median"
    HEALTH = "Health"
    CP = "Cp"
    CPK = "Cpk"
    STANDARD_DEVIATION = "Standard Deviation"


class SpecComplianceProperties:
    MIN = "Compliance Min"
    MAX = "Compliance Max"
    MEAN = "Compliance Mean"
    MEDIAN = "Compliance Median"
    HEALTH = "Compliance Health"
    CP = "Compliance Cp"
    CPK = "Compliance Cpk"
    STANDARD_DEVIATION = "Compliance Standard Deviation"


class SpecHealth:
    PASS = "Pass"
    FAIL = "Fail"
    INF = "inf"
    NEGATIVE_INF = "-inf"
    NAN = "nan"

MAX_SPECS_PER_UPDATE_REQUEST = 100
EXCEL_FILE_NAME = "Spec_Compliance.xlsx"

### Error Messages

In [None]:
class ErrorMessages:
    MISMATCHING_BASE_UNITS = "Input and output units are having different base unit"

### Get Workspace ID of the file

In [None]:
def get_workspace_id(file_id: str) -> str:
    """Retrieve the associated workspace ID for the given file ID

    Args:
        FILE_ID (str): File ID

    Returns:
        str: Workspace ID
    """
    get_file_properties_url = f"{ApiUrls.GET_FILE_PROPERTIES_URL}?id={file_id}"
    resp_json = create_get_request(get_file_properties_url)
    resp = resp_json.json()
    workspace_id = resp[ApiResponse.AVAILABLE_FILES][0][ApiResponse.WORKSPACE]

    return str(workspace_id)

### Get Product Information

In [None]:
def get_product_information(product_id: str) -> Response:
    """Get the product response

    Args:
        product_id (str): Product ID

    Returns:
        Response: Product response
    """
    headers = {"accept": "application/json", "Content-Type": "application/json"}
    response = create_get_request(f"{ApiUrls.GET_PRODUCT_URL}/{product_id}", headers)

    return response

In [None]:
product_information = get_product_information(product_id).json()
part_number = product_information[ApiBody.PART_NUMBER]

### Utility Functions

In [None]:
def __batch_query_request(url: str, body: Dict[str, Any], response_key: str) -> List:
    """Query request in batches, accumulate and return the data

    Args:
        url (str): URL for query request
        body (Dict[str, Any]): Body of the query request
        response_key (str): Key to retrieve data from the response

    Returns:
        List: List of data retrieved from the response
    """
    data = []

    response = create_post_request(url, body)
    response = response.json()
    if response is not None and response[response_key] is not None:
        data.extend(response[response_key])
    while response[ApiBody.CONTINUATION_TOKEN]:
        body[ApiBody.CONTINUATION_TOKEN] = response[ApiBody.CONTINUATION_TOKEN]
        response = create_post_request(url, body)
        response = response.json()
        if response is not None and response[response_key] is not None:
            data.extend(response[response_key])

    return data


def __generate_spec_ids_filter() -> str:
    filter = ''
    for index, spec_id in enumerate(spec_ids):
        filter += f'specId == \"{spec_id}\"'
        if index < len(spec_ids) - 1:
            filter += " || "
    return filter
    
def query_parametric_specs(product_id: str) -> List:
    """
    Query Parametric specs in the product

    Args:
        product_id (str): Product ID

    Returns:
        List: Parametric specs in the product
    """
    spec_ids_filter = __generate_spec_ids_filter()
    body = {
        ApiBody.PRODUCT_IDS: [product_id],
        ApiBody.FILTER: f"(({spec_ids_filter}) && ({ApiBody.TYPE} == \"{ApiBody.PARAMETRIC}\"))",
        ApiBody.TAKE: QUERY_SPECS_TAKE_COUNT
    }
    specs = __batch_query_request(ApiUrls.QUERY_SPECS_URL, body, ApiResponse.SPECS)

    return specs

def update_specs(updated_specs: List) -> Response:
    body = {
        ApiBody.SPECS: updated_specs
    }
    return create_post_request(ApiUrls.UPDATE_SPECS_URL, body)

async def __query_steps(request: StepsAdvancedQuery) -> aiohttp.ClientResponse:
    """Query steps

    Args:
        request (StepsAdvancedQuery): Request body for query steps api

    Returns:
        aiohttp.ClientResponse: Client response
    """    
    return await StepsApi().query_steps_v2(post_body=request, _preload_content=False)


async def __batch_query_steps(part_number: str, spec_id: str) -> List:
    """Batch query steps

    Args:
        part_number (str): Part number
        spec_id (str): Specification ID

    Raises:
        Exception: Raises exception when there is a client response error

    Returns:
        List: Steps which contains the measurement data for the given specification ID
    """    
    data = []
    request = StepsAdvancedQuery(
        filter=f'({ApiBody.DATA}.{ApiBody.PARAMETERS}.{ApiBody.ANY}({ApiBody.IT}["{ApiBody.SPEC_ID}"] = "{spec_id}"))',
        result_filter=f'({ApiBody.PART_NUMBER} == "{part_number}")',
        take=QUERY_STEPS_TAKE_COUNT,
        projection=[Projection.DATA, Projection.INPUTS],
        response_format= ApiBody.JSON,
    )
    try:
        response = await __query_steps(request)
        response = await response.json()
        if response is not None and response[ApiResponse.STEPS] is not None:
            data.extend(response[ApiResponse.STEPS])
        while response[ApiBody.CONTINUATION_TOKEN]:
            request = StepsAdvancedQuery(
                filter=f'({ApiBody.DATA}.{ApiBody.PARAMETERS}.{ApiBody.ANY}({ApiBody.IT}["{ApiBody.SPEC_ID}"] = "{spec_id}"))',
                result_filter=f'({ApiBody.PART_NUMBER} == "{part_number}")',
                take=QUERY_STEPS_TAKE_COUNT,
                projection=[Projection.DATA, Projection.INPUTS],
                response_format= ApiBody.JSON,
                continuation_token=response[ApiBody.CONTINUATION_TOKEN],
            )
            response = await __query_steps(request)
            response = await response.json()
            if response is not None and response[ApiResponse.STEPS] is not None:
                data.extend(response[ApiResponse.STEPS])
    except aiohttp.ClientResponseError:
        raise Exception
    return data


async def query_steps(part_number: str, spec_id: str) -> List:
    """
    Query steps

    Args:
        part_number (str): Part number
        spec_id (str): Specification ID

    Returns:
        List: Steps which contains the measurement data for the given specification ID
    """
    steps = await __batch_query_steps(part_number, spec_id)

    return steps


def __convert_to_decimal(value: str) -> Decimal | None:
    """Converts the value to decimal

    Args:
        value (str): value in string format

    Returns:
        Decimal | None: returns the converted value
    """
    try:
        return Decimal(value)
    except Exception:
        return None


def __convert_to_bool(value: str) -> bool | None:
    """Converts the value to boolean

    Args:
        value (str): value in string format

    Returns:
        bool | None: returns the converted value
    """
    if value in ["True", "true"]:
        return True
    if value in ["False", "false"]:
        return False
    return None


def __convert_to_datatype(value: str) -> bool | Decimal | str:
    """Convert the given string to any one of these types (decimal, bool)

    Args:
        value (str): String value

    Returns:
        bool | Decimal | str: returns the converted value
    """
    bool_value = __convert_to_bool(value=value)
    if bool_value is not None:
        return bool_value

    decimal_value = __convert_to_decimal(value=value)
    if decimal_value is not None:
        return decimal_value

    return value


def get_spec_condition(spec: Dict) -> Dict:
    """
    Get the specification conditions as a dictionary

    Args:
        spec (Dict): Specification data

    Returns:
        Dict: Specification Conditions as a dict
    """
    spec_condition = {}

    for condition in spec[Spec.CONDITIONS]:
        spec_condition[condition[Spec.NAME]] = condition[Spec.VALUE]

    return spec_condition


def extract_condition_name_and_unit(condition_string: str) -> Tuple[str, str | None]:
    """
    Extract the condition name and unit seperately from the condition string

    Args:
        condition_string (str): Condition string which contains the name and unit

    Returns:
        Tuple[str, str | None]: Condition name, Condition Unit
    """
    pattern = r'^(.*?)\((.*?)\)$'
    match = re.match(pattern, condition_string)
    if match:
        condition_name = match.group(1).strip()
        condition_unit = match.group(2).strip()

        return condition_name, condition_unit

    return condition_string.strip(), None


def is_different_base_unit(input_unit: str, output_unit: str) -> bool:
    """Check if both the units have different base unit

    Args:
        input_unit (str): Input unit
        output_unit (str): Output unit

    Returns:
        bool: Returns true if both the units have different base unit, else returns false
    """
    input_base_unit = convert_to_base_unit(input_unit, 1).base_unit
    output_base_unit = convert_to_base_unit(output_unit, 1).base_unit

    if input_base_unit != output_base_unit:
        return True

    return False


def convert_unit(input_unit: str, output_unit: str, value: str) -> Decimal:
    """
    Converts the value from input unit to output unit

    Args:
        input_unit (str): Input unit
        output_unit (str): Ouput unit
        value (str): Value to be converted

    Raises:
        Exception: Input and output units are having different base unit

    Returns:
        Decimal: Value converted from input unit to output unit
    """
    if is_different_base_unit(input_unit, output_unit):
        raise Exception(ErrorMessages.MISMATCHING_BASE_UNITS)
    output_value = convert_to_base_unit(input_unit, value).converted_value
    output_value = convert_from_base_unit(output_unit, output_value)

    return output_value


def get_step_condition_matching_the_spec_condition(step: Dict, spec_condition: Dict) -> Dict:
    """
    Returns the step conditions matching the spec conditions as a dictionary

    Args:
        step (Dict): Step data
        spec_condition (Dict): Spec Conditions dictionary

    Returns:
        Dict: Step conditions matching the spec conditions
    """
    step_condition = {}

    if Step.INPUTS not in step:
        return step_condition
    for step_input in step[Step.INPUTS]:
        step_condition_name, step_condition_unit = extract_condition_name_and_unit(
            step_input[Step.NAME]
        )

        if step_condition_name not in spec_condition:
            continue

        try:
            spec_condition_unit = spec_condition[step_condition_name][Spec.UNIT]
            if spec_condition_unit is None and step_condition_unit:
                continue
        except KeyError:
            spec_condition_unit = None
            if step_condition_unit:
                continue

        if (step_condition_unit == spec_condition_unit) or (
            spec_condition_unit and not step_condition_unit
        ):
            try:
                step_condition[step_condition_name] = (
                    Decimal(str(step_input[Step.VALUE])) if Step.VALUE in step_input else ''
                )
            except InvalidOperation:
                step_condition[step_condition_name] = step_input[Step.VALUE]
        else:
            if is_different_base_unit(step_condition_unit, spec_condition_unit):
                continue
            if Step.VALUE not in step_input or (
                Step.VALUE in step_input and step_input[Step.VALUE] == str()
            ):
                step_condition[step_condition_name] = str()
            else:
                try:
                    step_condition_value = convert_unit(
                        step_condition_unit, spec_condition_unit, str(step_input[Step.VALUE])
                    )
                    step_condition[step_condition_name] = step_condition_value
                except (ValueError, InvalidOperation):
                    step_condition[step_condition_name] = step_input[Step.VALUE]

    return step_condition


def check_step_condition_within_spec_condition_range(
    spec_condition: Dict,
    condition_name: str,
    condition_value: Decimal
) -> bool:
    """
    Checks if the step condition value is within the spec condition range

    Args:
        spec_condition (Dict): Specification condition
        condition_name (str): Condition name
        condition_value (Decimal): Condition value

    Returns:
        bool: Returns true if step condition is within spec condition range , else returns false
    """
    if Spec.RANGE in spec_condition[condition_name] and spec_condition[condition_name][Spec.RANGE]:
        for spec_condition_range in spec_condition[condition_name][Spec.RANGE]:
            min_limit = __convert_to_decimal(str(spec_condition_range[Spec.MIN]))
            max_limit = __convert_to_decimal(str(spec_condition_range[Spec.MAX]))
            if (min_limit is None or condition_value >= min_limit) and (
                max_limit is None or condition_value <= max_limit
            ):
                return True

    return False


def check_step_condition_within_spec_condition_discrete_array(
    spec_condition: Dict,
    condition_name: str,
    condition_value: Decimal | str
) -> bool:
    """
    Check if the step condition value is within the spec condition discrete array

    Args:
        spec_condition (Dict): Specification condition
        condition_name (str): Condition name
        condition_value (Decimal | str): Condition value

    Returns:
        bool: Returns true if step condition value is within the spec condition discrete array else returns false
    """
    if Spec.DISCRETE in spec_condition[condition_name]:
        discrete_values = spec_condition[condition_name][Spec.DISCRETE]
        converted_discrete_values = []
        for discrete_value in discrete_values:
            converted_discrete_values.append(__convert_to_datatype(str(discrete_value)))
        if condition_value in converted_discrete_values:
            return True

    return False


def check_step_condition_within_spec_condition(spec_condition: Dict, step_condition: Dict) -> bool:
    """
    Check if the step condition value is within the spec condition

    Condition mapping rules:
    1. If condition value in measurement data is empty, it is considered as within bounds
    2. If condition value in measurement data is within bounds of either range or discrete array of spec condition, it is considered as within bounds

    Args:
        spec_condition (Dict): Specification condition
        step_condition (Dict): Step condition

    Returns:
        bool: Returns true if  step condition is within the spec condition, else returns false
    """
    condition_within_range = True

    for condition_name, condition_value in step_condition.items():
        try:
            condition_value = Decimal(str(condition_value))
        except InvalidOperation:
            pass

        if not condition_value and isinstance(condition_value, str):
            continue

        condition_within_range = (
            (
                check_step_condition_within_spec_condition_range(
                    spec_condition, condition_name, condition_value
                )
                or check_step_condition_within_spec_condition_discrete_array(
                    spec_condition, condition_name, condition_value
                )
            )
            if isinstance(condition_value, Decimal)
            else check_step_condition_within_spec_condition_discrete_array(
                spec_condition, condition_name, condition_value
            )
        )

        if not condition_within_range:
            return condition_within_range

    return condition_within_range


def fetch_measurement_data_for_spec(
    steps: List,
    spec_id: str,
    spec_condition: Dict,
    spec_unit: str | None
) -> List:
    """
    Returns a list of measurement data associated with the spec after condition mapping

    Measurement data rules:
    1. If Measurement value is empty, it is not considered for compliance
    2. If Measurement value is not a valid number, it is not considered for compliance

    Args:
        steps (List): Steps data
        spec_id (str): Specification ID
        spec_condition (Dict): Specification condition
        spec_unit (str | None): Specification unit

    Returns:
        List: Measurement data
    """
    measurement_data = []

    for step in steps:
        step_condition = get_step_condition_matching_the_spec_condition(step, spec_condition)
        if not check_step_condition_within_spec_condition(spec_condition, step_condition):
            continue
        if Step.DATA not in step or Step.PARAMETERS not in step[Step.DATA]:
            continue
        for parameter in step[Step.DATA][Step.PARAMETERS]:
            step_unit = parameter[Step.UNITS]
            if parameter[Step.UNITS] is str():
                step_unit = None
            if (spec_unit and not step_unit) or (step_unit and not spec_unit):
                continue
            if spec_unit and step_unit and is_different_base_unit(step_unit, spec_unit):
                continue
            if Step.SPEC_ID in parameter and parameter[Step.SPEC_ID] != spec_id:
                continue
            measurement_value = (
                parameter[Step.MEASUREMENT] if Step.MEASUREMENT in parameter else str()
            )
            if measurement_value:
                try:
                    if not spec_unit:
                        measurement_data.append(Decimal(str(measurement_value)))
                    else:
                        measurement_value = convert_from_base_unit(spec_unit, measurement_value)
                        measurement_data.append(measurement_value)
                except (TypeError, ValueError, InvalidOperation):
                    continue

    return measurement_data


def calculate_compliance(measurement_data: pd.DataFrame, parameter: str) -> pd.Series:
    """Calculate compliance for the measurement data

    Args:
        measurement_data (pd.DataFrame): Spec measurement data
        parameter (str): Compliance parameter

    Returns:
        pd.Series: Calculated compliance
    """
    if parameter == ComplianceParameters.MIN:
        return measurement_data.min(skipna=True)
    if parameter == ComplianceParameters.MAX:
        return measurement_data.max(skipna=True)
    if parameter == ComplianceParameters.MEAN:
        return measurement_data.mean(skipna=True)
    if parameter == ComplianceParameters.MEDIAN:
        return measurement_data.median(skipna=True)
    return pd.Series()


def get_spec_health(spec, min_compliance: str, max_compliance: str) -> str | None:
    """Calculate spec health based on min and max compliance

    Args:
        spec (_type_): Specification data
        min_compliance (str): Min compliance value
        max_compliance (str): Max compliance value

    Returns:
        str | None: Returns Pass or Fail, None if there are any errors
    """
    if spec[Spec.UNIT] is not None:
        spec_min = (
            convert_to_base_unit(spec[Spec.UNIT], spec[Spec.LIMIT][Spec.MIN]).converted_value
            if spec[Spec.LIMIT][Spec.MIN] is not None
            else Decimal(SpecHealth.NEGATIVE_INF)
        )
        spec_max = (
            convert_to_base_unit(spec[Spec.UNIT], spec[Spec.LIMIT][Spec.MAX]).converted_value
            if spec[Spec.LIMIT][Spec.MAX] is not None
            else Decimal(SpecHealth.INF)
        )
        min_compliance = convert_to_base_unit(spec[Spec.UNIT], min_compliance).converted_value
        max_compliance = convert_to_base_unit(spec[Spec.UNIT], max_compliance).converted_value
    else:
        spec_min = (
            Decimal(str(spec[Spec.LIMIT][Spec.MIN]))
            if spec[Spec.LIMIT][Spec.MIN] is not None
            else Decimal(SpecHealth.NEGATIVE_INF)
        )
        spec_max = (
            Decimal(str(spec[Spec.LIMIT][Spec.MAX]))
            if spec[Spec.LIMIT][Spec.MAX] is not None
            else Decimal(SpecHealth.INF)
        )
        min_compliance = Decimal(str(min_compliance))
        max_compliance = Decimal(str(max_compliance))

    try:
        if spec_min is Decimal(SpecHealth.NEGATIVE_INF) and spec_max is not None:
            return SpecHealth.PASS if max_compliance <= spec_max else SpecHealth.FAIL
        if spec_max is Decimal(SpecHealth.INF) and spec_min is not None:
            return SpecHealth.PASS if min_compliance <= spec_min else SpecHealth.FAIL
        if spec_min is Decimal(SpecHealth.NEGATIVE_INF) and spec_max is Decimal(SpecHealth.INF):
            return SpecHealth.PASS
        if spec_min <= min_compliance <= spec_max and spec_min <= max_compliance <= spec_max:
            return SpecHealth.PASS
        return SpecHealth.FAIL
    except InvalidOperation:
        return None


def concatenate_dataframes(
    dfs: List[pd.DataFrame],
    reset_index: bool = True,
    axis: int = 1
) -> pd.DataFrame:
    """Concatenate multiple dataframes

    Args:
        dfs (List[pd.DataFrame]): List of dataframes
        reset_index (bool, optional): Reset index. Defaults to True.
        axis (int, optional): Axis to concatenate. Defaults to 1.

    Returns:
        pd.DataFrame: Returns the concatenated dataframe
    """
    if reset_index:
        for df in dfs:
            df.reset_index(drop=True, inplace=True)
    concatenated_df = pd.concat(dfs, axis=axis)

    return concatenated_df


def generate_spec_details_dataframe(spec: Dict) -> pd.DataFrame:
    """Generate spec details dataframe

    Args:
        spec (Dict): Specification data

    Returns:
        pd.DataFrame: Spec detail dataframe
    """
    spec_df = pd.DataFrame(
        {
            (DataframeHeaders.SPEC_DETAILS, DataframeHeaders.SPEC_ID): [spec[Spec.SPECID]],
            (DataframeHeaders.SPEC_DETAILS, DataframeHeaders.CATEGORY): [spec[Spec.CATEGORY]],
            (DataframeHeaders.SPEC_DETAILS, DataframeHeaders.BLOCK): [spec[Spec.BLOCK]],
            (DataframeHeaders.SPEC_DETAILS, DataframeHeaders.SYMBOL): [spec[Spec.SYMBOL]],
            (DataframeHeaders.SPEC_DETAILS, DataframeHeaders.NAME): [spec[Spec.NAME]],
            (DataframeHeaders.SPEC_DETAILS, DataframeHeaders.MIN): [spec[Spec.LIMIT][Spec.MIN]],
            (DataframeHeaders.SPEC_DETAILS, DataframeHeaders.TYPICAL): [
                spec[Spec.LIMIT][Spec.TYPICAL]
            ],
            (DataframeHeaders.SPEC_DETAILS, DataframeHeaders.MAX): [spec[Spec.LIMIT][Spec.MAX]],
            (DataframeHeaders.SPEC_DETAILS, DataframeHeaders.UNIT): [spec[Spec.UNIT]],
        }
    )

    return spec_df


def generate_spec_conditions_dataframe(spec_condition: Dict) -> pd.DataFrame:
    """Generate spec condition dataframe

    Args:
        spec_condition (Dict): Specification condition dictionary

    Returns:
        pd.DataFrame: Spec condition dataframe
    """
    condition_df = pd.DataFrame()

    for condition_name, condition_data in spec_condition.items():
        condition_header = f"{DataframeHeaders.CONDITION} ({condition_name})"
        if Spec.RANGE in condition_data and len(condition_data[Spec.RANGE]) > 0:
            condition_data = {
                (condition_header, DataframeHeaders.MIN): [
                    range[Spec.MIN] for range in condition_data[Spec.RANGE]
                ],
                (condition_header, DataframeHeaders.STEP): [
                    range[Spec.STEP] for range in condition_data[Spec.RANGE]
                ],
                (condition_header, DataframeHeaders.MAX): [
                    range[Spec.MAX] for range in condition_data[Spec.RANGE]
                ],
                (condition_header, DataframeHeaders.DISCRETE): [
                    condition_data.get(Spec.DISCRETE)
                    if len(condition_data.get(Spec.DISCRETE, [])) > 0
                    else None
                    for _ in condition_data[Spec.RANGE]
                ],
                (condition_header, DataframeHeaders.UNIT): [
                    condition_data.get(Spec.UNIT, None) for _ in condition_data[Spec.RANGE]
                ],
            }
        else:
            condition_data = {
                (condition_header, DataframeHeaders.MIN): [None],
                (condition_header, DataframeHeaders.STEP): [None],
                (condition_header, DataframeHeaders.MAX): [None],
                (condition_header, DataframeHeaders.DISCRETE): [
                    condition_data.get(Spec.DISCRETE)
                    if len(condition_data.get(Spec.DISCRETE, [])) > 0
                    else None
                ],
                (condition_header, DataframeHeaders.UNIT): [
                    condition_data.get(Spec.UNIT, None)
                ],
            }
        spec_condition_df = pd.DataFrame(condition_data)
        condition_df = concatenate_dataframes([condition_df, spec_condition_df])

    return condition_df


def generate_spec_compliance_dataframe(
    spec: Dict,
    rows: int,
    measurement_data: List
) -> pd.DataFrame:
    """Generate spec compliance dataframe

    Args:
        spec (Dict): Specification data
        rows (int): Number of rows to append compliance data
        measurement_data (List): Measurement data

    Returns:
        pd.DataFrame: Spec compliance dataframe
    """
    min_compliance_heading = (DataframeHeaders.COMPLIANCE, ComplianceParameters.MIN)
    max_compliance_heading = (DataframeHeaders.COMPLIANCE, ComplianceParameters.MAX)
    mean_compliance_heading = (DataframeHeaders.COMPLIANCE, ComplianceParameters.MEAN)
    median_compliance_heading = (DataframeHeaders.COMPLIANCE, ComplianceParameters.MEDIAN)
    spec_health_heading = (DataframeHeaders.COMPLIANCE, ComplianceParameters.HEALTH)
    spec_cpk_heading = (DataframeHeaders.COMPLIANCE, ComplianceParameters.CPK)
    spec_ck_heading = (DataframeHeaders.COMPLIANCE, ComplianceParameters.CP)
    spec_std_heading = (DataframeHeaders.COMPLIANCE, ComplianceParameters.STANDARD_DEVIATION)
    min_rows = max(1, rows)
    if len(measurement_data) > 0:
        measurement_df = pd.DataFrame({spec[Spec.SPECID]: measurement_data})
        min_compliance = calculate_compliance(measurement_df, ComplianceParameters.MIN)
        max_compliance = calculate_compliance(measurement_df, ComplianceParameters.MAX)
        mean_compliance = calculate_compliance(measurement_df, ComplianceParameters.MEAN)
        median_compliance = calculate_compliance(measurement_df, ComplianceParameters.MEDIAN)
        spec_health = (
            get_spec_health(spec, str(min_compliance.iloc[0]), str(max_compliance.iloc[0]))
            if (str(min_compliance.iloc[0]) != SpecHealth.NAN and str(max_compliance.iloc[0])) != SpecHealth.NAN
            else str()
        )

        std = Decimal(measurement_df.astype('double').std(skipna=True).iloc[0])

        maxima = Decimal(max_compliance.iloc[0])
        minima = Decimal(min_compliance.iloc[0])
        mean = Decimal(mean_compliance.iloc[0])

        is_valid_max = bool(maxima)
        is_valid_min = bool(minima)
        cpk_high = (
            np.nan if not is_valid_max
            else __calculate_cpk_high(maxima, mean, std)
        )
        cpk_low = (
            np.nan if not is_valid_min 
            else __calculate_cpk_low(minima, mean, std)
        )
        cpk = min(cpk_high, cpk_low)
        
        cp = (
            np.nan if not (is_valid_max and is_valid_min)
            else __calculate_cp(maxima, minima, std)
        )

        compliance_data = {
            min_compliance_heading: [min_compliance.iloc[0]] * min_rows,
            max_compliance_heading: [max_compliance.iloc[0]] * min_rows,
            mean_compliance_heading: [mean_compliance.iloc[0]] * min_rows,
            median_compliance_heading: [median_compliance.iloc[0]] * min_rows,
            spec_health_heading: [spec_health] * min_rows,
            spec_std_heading: [std] * min_rows,
            spec_cpk_heading: [cpk] * min_rows,
            spec_ck_heading: [cp] * min_rows
        }
    else:
        empty_compliance_data = [None] * min_rows
        compliance_data = {
            min_compliance_heading: empty_compliance_data,
            max_compliance_heading: empty_compliance_data,
            mean_compliance_heading: empty_compliance_data,
            median_compliance_heading: empty_compliance_data,
            spec_health_heading: empty_compliance_data,
            spec_std_heading: empty_compliance_data,
            spec_cpk_heading: empty_compliance_data,
            spec_ck_heading: empty_compliance_data
        }
    compliance_df = pd.DataFrame(compliance_data)

    return compliance_df

def __calculate_cpk_high(high_limit, mean, std):
    diff = (high_limit) - (mean)
    if std == 0:
        return np.inf
    return diff / (3 * (std))

def __calculate_cpk_low(low_limit, mean, std):
    diff = (mean) - (low_limit)
    if std == 0:
        return np.inf
    return diff / (3 * (std))

def __calculate_cp(high_limit, low_limit, std):
    diff = (high_limit) - (low_limit)
    if std == 0:
        return np.inf
    return diff / (6 * (std))

async def generate_spec_dataframe(specs: List, part_number: str) -> Dict:
    """Generate a spec dataframe for each spec category

    Args:
        specs (List): Specifications data
        part_number (str): Part number of the product

    Returns:
        Dict: Dictionary of spec category and dataframe
    """
    spec_data = {}
    spec_compliance_data = {}

    for spec in specs:
        category = spec.get(Spec.CATEGORY, ApiResponse.EMPTY)
        spec_id = spec[Spec.SPECID]
        spec_unit = spec[Spec.UNIT]
        spec_condition = get_spec_condition(spec)
        spec_details_df = generate_spec_details_dataframe(spec)
        spec_conditions_df = generate_spec_conditions_dataframe(spec_condition)
        steps = await query_steps(part_number, spec_id)
        measurement_data = fetch_measurement_data_for_spec(
            steps, spec_id, spec_condition, spec_unit
        )
        spec_compliance_df = generate_spec_compliance_dataframe(
            spec, len(spec_conditions_df), measurement_data
        )
        spec_df = concatenate_dataframes([spec_details_df, spec_conditions_df])

        if category not in spec_data:
            spec_data[category] = spec_df
            spec_compliance_data[category] = spec_compliance_df
        else:
            spec_data[category] = concatenate_dataframes([spec_data[category], spec_df], False, 0)
            spec_compliance_data[category] = concatenate_dataframes(
                [spec_compliance_data[category], spec_compliance_df], False, 0
            )

    for category in spec_data:
        spec_data[category] = concatenate_dataframes(
            [spec_data[category], spec_compliance_data[category]]
        )

    return spec_data

def __get_batches(list, batch_size):
    for i in range(0, len(list), batch_size):
        yield list[i:i+batch_size]

def __format_compliance(value: Decimal) -> Decimal:
    return '{:.4f}'.format(value)

def __delete_spec_custom_property(spec, property_to_be_deleted) -> None:
    if property_to_be_deleted in spec[Spec.PROPERTIES]:
        del spec[Spec.PROPERTIES][property_to_be_deleted]

def add_compliance_to_spec_custom_properties(specs: List, dataframe_dict: Dict) -> None:
    specs_to_be_updated = []
    for category, dataframe in dataframe_dict.items():
        specIds_df = dataframe[DataframeHeaders.SPEC_DETAILS][DataframeHeaders.SPEC_ID]
        for spec in specs:
            spec_compliance_metrices_df = dataframe[specIds_df == spec[Spec.SPECID]][DataframeHeaders.COMPLIANCE]
            
            if spec_compliance_metrices_df.empty or spec_compliance_metrices_df.iloc[0].empty or (not spec_compliance_metrices_df.iloc[0][ComplianceParameters.HEALTH]):
                __delete_spec_custom_property(spec, SpecComplianceProperties.MIN)
                __delete_spec_custom_property(spec, SpecComplianceProperties.MAX)
                __delete_spec_custom_property(spec, SpecComplianceProperties.MEAN)
                __delete_spec_custom_property(spec, SpecComplianceProperties.MEDIAN)
                __delete_spec_custom_property(spec, SpecComplianceProperties.HEALTH)
                __delete_spec_custom_property(spec, SpecComplianceProperties.STANDARD_DEVIATION)
                __delete_spec_custom_property(spec, SpecComplianceProperties.CPK)
                __delete_spec_custom_property(spec, SpecComplianceProperties.CP)

            else:
                spec[Spec.PROPERTIES][SpecComplianceProperties.MIN] = str(__format_compliance(spec_compliance_metrices_df.iloc[0][ComplianceParameters.MIN]))
                spec[Spec.PROPERTIES][SpecComplianceProperties.MAX] = str(__format_compliance(spec_compliance_metrices_df.iloc[0][ComplianceParameters.MAX]))
                spec[Spec.PROPERTIES][SpecComplianceProperties.MEAN] = str(__format_compliance(spec_compliance_metrices_df.iloc[0][ComplianceParameters.MEAN]))
                spec[Spec.PROPERTIES][SpecComplianceProperties.MEDIAN] = str(__format_compliance(spec_compliance_metrices_df.iloc[0][ComplianceParameters.MEDIAN]))
                spec[Spec.PROPERTIES][SpecComplianceProperties.HEALTH] = str(spec_compliance_metrices_df.iloc[0][ComplianceParameters.HEALTH])
                
                spec[Spec.PROPERTIES][SpecComplianceProperties.STANDARD_DEVIATION] = str(__format_compliance(spec_compliance_metrices_df.iloc[0][ComplianceParameters.STANDARD_DEVIATION]))
                spec[Spec.PROPERTIES][SpecComplianceProperties.CPK] = str(__format_compliance(spec_compliance_metrices_df.iloc[0][ComplianceParameters.CPK]))
                spec[Spec.PROPERTIES][SpecComplianceProperties.CP] = str(__format_compliance(spec_compliance_metrices_df.iloc[0][ComplianceParameters.CP]))
            
            specs_to_be_updated.append(spec)

    for specs_batch in __get_batches(specs_to_be_updated, MAX_SPECS_PER_UPDATE_REQUEST):
        update_specs_response = update_specs(specs_batch)
        if (update_specs_response.status_code != 200) or ("failedSpecs" in update_specs_response.json()):
            sb.glue("Error updating spec custom properties with the compliance metrices", json.dumps(update_specs_response.json()))
            break

def upload_file(file_name: str, workspace_id: str) -> Response:
    """Upload file to SLE

    Args:
        file_name (str): File name
        part_number (str): Part number of the product

    Returns:
        Response: Upload file response
    """
    upload_file_url = f"{ApiUrls.UPLOAD_FILE_URL}?workspace={workspace_id}"
    file = {"file": (f"{part_number}-{file_name}", open(file_name, "rb"))}
    response = upload_file_request(upload_file_url, file)

    return response


def add_file_id_to_product(product_information: Any, file_id: str) -> Response:
    """Add file to the product using update product API

    Args:
        product_information (Any): Product information received for get product API
        file_id (str): File ID

    Returns:
        Response: Update product response
    """
    product_information[ApiResponse.FILE_IDS].append(file_id)
    product_information.pop(ApiResponse.UPDATED_AT)
    body = {ApiBody.PRODUCTS: [product_information], ApiBody.REPLACE: ApiBody.FALSE}
    response = create_post_request(ApiUrls.UPDATE_PRODUCT_URL, body)

    return response


def write_dataframe_to_excel(dataframe_dict: Dict, file_name: str) -> None:
    """Write each dataframe to a new sheet in excel

    Args:
        dataframe_dict (Dict): Dictionary of dataframes
        file_name (str): File name
    """
    with pd.ExcelWriter(file_name, engine="openpyxl") as writer:
        for category, dataframe in dataframe_dict.items():
            dataframe.reset_index(drop=True, inplace=True)
            dataframe.to_excel(writer, sheet_name=f"Spec_{category}", index=True)

### Fetch parametric specs, measurement data, calculate Compliance and generate dataframe

In [None]:
parametric_specs = query_parametric_specs(product_id)
spec_dataframe = await generate_spec_dataframe(parametric_specs, part_number)

### Write compliance metrices to custom properties of specs

In [None]:
add_compliance_to_spec_custom_properties(parametric_specs, spec_dataframe)

### Write dataframe to excel, upload and add the excel report to the product

In [None]:
pd.set_option("display.float_format", "{:.15f}".format)
write_dataframe_to_excel(spec_dataframe, EXCEL_FILE_NAME)
upload_file_response = upload_file(file_name=EXCEL_FILE_NAME, workspace_id=parametric_specs[0]['workspace']).json()
uploaded_file_id = upload_file_response[ApiResponse.URI].split("/")[-1]
product_response = add_file_id_to_product(
    product_information=product_information, file_id=uploaded_file_id
).json()

### Store the result information so that SystemLink can access it

SystemLink uses scrapbook to store result information from each notebook execution to display to the user in the Execution Details slide-out. Here we will display the hyperlink to files grid inside product details page or the error message incase of any error."
   

In [None]:
try:
    if product_response[ApiBody.PRODUCTS]:
        sb.glue(
            "Compliance excel report generated and uploaded to the product",
            f'<a href="../../testinsights/products/product/{product_id}/files">Product Files tab</a>',
        )
except KeyError:
    sb.glue("Error", json.dumps(product_response))

# Next Steps

1. Publish this notebook to SystemLink by right-clicking it in the JupyterLab File Browser with the interface as Specification Analysis.
1. Manually execute this notebook against the specs inside specs grid in product details page.
1. Go to spec details page to view the compliance metrics generated for the specs.