In [None]:
import json
import math
import os
from enum import Enum
from http import HTTPStatus
from typing import Callable

import backoff
import pandas as pd
import requests
import scrapbook as sb
from pandas import DataFrame, ExcelFile
from requests import Response
from requests.exceptions import HTTPError
from requests.packages.urllib3.exceptions import InsecureRequestWarning

requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

In [None]:
api_key = os.getenv("SYSTEMLINK_API_KEY")
systemlink_uri = os.getenv("SYSTEMLINK_HTTP_URI")

### HTTP Status codes and constants


In [None]:
MAX_HTTP_RETRIES = 6
TIMEOUT_IN_SECONDS = 60
CREATE_SPECS_BATCH_SIZE = 1000
MAXIMUM_SPEC_LIMIT = 10000
VERIFY_SSL_CERTIFICATE = False
HTTP_RETRY_CODES = [
    HTTPStatus.TOO_MANY_REQUESTS,
    HTTPStatus.INTERNAL_SERVER_ERROR,
    HTTPStatus.BAD_GATEWAY,
    HTTPStatus.SERVICE_UNAVAILABLE,
    HTTPStatus.GATEWAY_TIMEOUT
]

### HTTP Routes


In [None]:
class HttpRouteConstants:
    TM_BASE_ROUTE = "/nitestmonitor"
    POST_QUERY_PRODUCTS = "/v2/query-products"
    FILE_BASE_ROUTE = "/nifile"
    AVAILABLE_FILES_ROUTE = "/v1/service-groups/Default/files"
    SPECS_BASE_ROUTE = "/nispec/v1"
    CREATE_SPECS_ROUTE = "/specs"
    DELETE_SPECS_ROUTE = "/delete-specs"
    QUERY_SPECS_ROUTE = "/query-specs"

### Error Messages


In [None]:
class ErrorMessages:
    EMPTY_SPEC_ID = "Spec ID cannot be empty"
    EMPTY_TYPE = "Spec Type cannot be empty"
    EMPTY_PRODUCT_ID = "Product ID cannot be empty"
    MULTIPLE_NOTEBOOKS_SELECTED = "This notebook is designed to operate on one file at a time"
    WRONG_FILE_EXTENSION = "File extension is not xlsx"
    MAXIMUM_SPEC_INGESTION_LIMIT = (f"Notebook only supports extraction and ingestion of {MAXIMUM_SPEC_LIMIT} specs")
    CONDITION_EXTRACTION_ERROR = "Error when extracting condition"

### Input Parameters


In [None]:
file_ids = [""]

### API URL's


In [None]:
delete_spec_url = f"{systemlink_uri}{HttpRouteConstants.SPECS_BASE_ROUTE}{HttpRouteConstants.DELETE_SPECS_ROUTE}"
create_spec_url = f"{systemlink_uri}{HttpRouteConstants.SPECS_BASE_ROUTE}{HttpRouteConstants.CREATE_SPECS_ROUTE}"
query_spec_url = f"{systemlink_uri}{HttpRouteConstants.SPECS_BASE_ROUTE}{HttpRouteConstants.QUERY_SPECS_ROUTE}"
query_products_url = f"{systemlink_uri}{HttpRouteConstants.TM_BASE_ROUTE}{HttpRouteConstants.POST_QUERY_PRODUCTS}"

### API Utility functions


In [None]:
@backoff.on_exception(backoff.expo, HTTPError, max_tries=MAX_HTTP_RETRIES, giveup=lambda e: e.response.status_code not in HTTP_RETRY_CODES)
def retry_request(callable_function: Callable) -> Response:
    response = callable_function()
    response.raise_for_status()

    return response


def create_get_request(url: str, headers: object = {}) -> Response:
    default_headers = {'x-ni-api-key': api_key}
    headers = {**default_headers, **headers}

    return retry_request(lambda: requests.get(url, headers=headers, verify=VERIFY_SSL_CERTIFICATE, timeout=TIMEOUT_IN_SECONDS))


def create_post_request(url: str, body, headers: object = {}) -> Response:
    default_headers = {
        "accept": "application/json",
        "Content-Type": "application/json",
        "x-ni-api-key": api_key,
    }
    headers = {**default_headers, **headers}

    return retry_request(lambda: requests.post(url, data=body, headers=headers, verify=VERIFY_SSL_CERTIFICATE, timeout=TIMEOUT_IN_SECONDS))

### Constants

In [None]:
TEMPLATE_SHEET = "SpecTemplate"
SPEC_FILE_MAPPING = {
    "Spec ID": "specId",
    "Category": "category",
    "Block": "block",
    "Spec Symbol": "symbol",
    "Spec Name": "name",
    "Min": "min",
    "Typical": "typical",
    "Max": "max",
    "Unit": "unit",
}


class ColumnTypes(Enum):
    STD = "STD"
    COND = "COND"
    INF = "INF"


class SpecDataKeys:
    SPEC_ID = "specId"
    PRODUCT_ID = "productId"
    NAME = "name"
    CATEGORY = "category"
    TYPE = "type"
    SYMBOL = "symbol"
    BLOCK = "block"
    UNIT = "unit"
    LIMIT = "limit"
    CONDITIONS = "conditions"
    WORKSPACE = "workspace"
    PROPERTIES = "properties"


class SpecLimitKeys:
    MIN = "min"
    MAX = "max"
    TYPICAL = "typical"


class SpecConditionKeys:
    CONDITION_TYPE = "conditionType"
    NAME = "name"
    VALUE = "value"
    DISCRETE = "discrete"
    RANGE = "range"
    MIN = "min"
    MAX = "max"
    UNIT = "unit"


class SpecConditionTypeValues:
    STRING = "STRING"
    NUMERIC = "NUMERIC"


class SpecType:
    PARAMETRIC = "parametric"
    FUNCTIONAL = "functional"


class ApiBodyKeys:
    PRODUCT_IDS = "productIds"
    TAKE = "take"
    IDS = "ids"
    SPECS = "specs"
    FILTER = "filter"
    FILE_IDS = "fileIds"
    CONTAINS = "Contains"


class ApiResponseKeys:
    ID = "id"
    ERROR = "error"
    SPEC_ID = "specId"
    SPECS = "specs"
    CREATED_SPECS = "createdSpecs"
    AVAILABLE_FILES = "availableFiles"
    WORKSPACE = "workspace"
    PRODUCTS = "products"

### Get File content and write into a file


In [None]:
def get_file_content(file_id: str):
    download_file_url = f"{systemlink_uri}{HttpRouteConstants.FILE_BASE_ROUTE}{HttpRouteConstants.AVAILABLE_FILES_ROUTE}/{file_id}/data"

    headers = {'X-NI-API-KEY': api_key}
    download_resp = create_get_request(
        download_file_url,
        headers
    )

    download_resp.raise_for_status()

    return download_resp

In [None]:
def write_file_content_into_file(response: Response | None):
    filename = response.headers['content-disposition'].split('"')[1::-1][0]
    file_extension = response.headers['content-disposition'].split('"')[1::-1][0].split(".")[-1]

    if (file_extension != 'xlsx'):
        raise Exception(ErrorMessages.WRONG_FILE_EXTENSION)

    with open(filename, 'wb') as file:
        file.write(response.content)

    return filename

In [None]:
if len(file_ids) != 1:
    raise Exception(ErrorMessages.MULTIPLE_NOTEBOOKS_SELECTED)

file_id = file_ids[0]

file_content_response = get_file_content(file_id)

filename = write_file_content_into_file(file_content_response)

### Get Workspace ID for the file


In [None]:
def get_workspace_id(file_id: str):
    get_file_properties_url = f"{systemlink_uri}{HttpRouteConstants.FILE_BASE_ROUTE}{HttpRouteConstants.AVAILABLE_FILES_ROUTE}?id={file_id}"
    resp_json = create_get_request(get_file_properties_url)
    resp = resp_json.json()
    workspace_id = resp[ApiResponseKeys.AVAILABLE_FILES][0][ApiResponseKeys.WORKSPACE]
    return str(workspace_id)

### Get Product ID from File ID


In [None]:
def get_product_id_for_file(file_id: str):
    body = {ApiBodyKeys.FILTER: f'{ApiBodyKeys.FILE_IDS}.{ApiBodyKeys.CONTAINS}("{file_id}")'}
    payload = json.dumps(body)
    resp_json = create_post_request(query_products_url, payload)
    resp = resp_json.json()
    product_id = resp[ApiResponseKeys.PRODUCTS][0][ApiResponseKeys.ID]
    return str(product_id)

In [None]:
product_id = get_product_id_for_file(file_id)
workspace_id = get_workspace_id(file_id)

### Spec extraction utility functions


In [None]:
def process_condition_values(input_string: str, unit: str | None, sheet_name: str, column, row):
    try:
        if input_string is not None:
            input_string = input_string.strip()

        if not input_string:
            return None

        if input_string.startswith('[') and input_string.endswith(']'):
            input_string = input_string[1:-1]
            discrete = []
            range_min = None
            range_max = None

            if '..' in input_string:
                values = input_string.split('..')

                if (values[0] == '' or values[-1] == ''):
                    if (values[0] == ''):
                        range_max = float(values[-1])
                    elif (values[-1] == ''):
                        range_min = float(values[0])
                    if (len(values) >= 3):
                        for index, value in enumerate(values):
                            if index != 0 and index != len(values)-1:
                                discrete.append(float(value))
                else:
                    range_min = float(values[0])
                    range_max = float(values[-1])

                    if (len(values) >= 3):
                        for index, value in enumerate(values):
                            if index != 0 and index != len(values)-1:
                                discrete.append(float(value))
                return {
                    SpecConditionKeys.CONDITION_TYPE: SpecConditionTypeValues.NUMERIC,
                    SpecConditionKeys.DISCRETE: discrete,
                    SpecConditionKeys.RANGE: [
                        {
                            SpecConditionKeys.MIN: range_min,
                            SpecConditionKeys.MAX: range_max
                        }
                    ],
                    SpecConditionKeys.UNIT: unit
                }

            else:
                if ',' in input_string:
                    discrete = [float(value)
                                for value in input_string.split(',')]
                else:
                    discrete = [float(input_string)]
                return {
                    SpecConditionKeys.CONDITION_TYPE: SpecConditionTypeValues.NUMERIC,
                    SpecConditionKeys.DISCRETE: discrete,
                    SpecConditionKeys.UNIT: unit
                }
        elif (not (input_string.startswith('[') and input_string.endswith(']'))):
            discrete = [str(value).strip() for value in input_string.split(',')]
            return {
                SpecConditionKeys.CONDITION_TYPE: SpecConditionTypeValues.STRING,
                SpecConditionKeys.DISCRETE: discrete
            }
        else:
            return None
    except Exception:
        raise SystemExit(
            f"{ErrorMessages.CONDITION_EXTRACTION_ERROR} in {sheet_name}, {column}, row {row}")


def extract_condition_unit(input_string: str):
    start = input_string.find('(')
    end = input_string.find(')')
    if start != -1 and end != -1 and start < end:
        return input_string[start+1:end]
    else:
        return None


def validate_column_types(df_type: pd.DataFrame, sheet_name: str):
    for idx, col in enumerate(df_type.columns):
        if (idx == 0):
            continue
        if isinstance(df_type.at[0, col], str):
            col_value = df_type.at[0, col].upper()
        else:
            raise Exception(
                f"Column '{col}' Name cannot be empty in {sheet_name}")
        if col_value not in [column_types.value for column_types in ColumnTypes]:
            raise Exception(f"Column '{col}' has an invalid value: '{col_value}'. "
                            f"Allowed values are: {', '.join(column_type.value for column_type in ColumnTypes)}")

    for idx, col in enumerate(df_type.columns):
        col_value = df_type.at[1, col]
        if (idx == 0):
            continue
        if isinstance(df_type.at[1, col], str):
            if (col_value in dict.fromkeys(SPEC_FILE_MAPPING) and df_type.at[0, col].upper() != ColumnTypes.STD.value):
                raise Exception(f"Column '{col_value}' should be {ColumnTypes.STD.value} type, "
                                f"instead it is {df_type.at[0, col].upper()} type in {sheet_name}")
            if (col_value not in dict.fromkeys(SPEC_FILE_MAPPING) and df_type.at[0, col].upper() == ColumnTypes.STD.value):
                raise Exception(f"Column '{col}' has an invalid value: '{col_value}' in {sheet_name}. "
                                f"Allowed values for STD columns are: {', '.join(SPEC_FILE_MAPPING.keys())}")
        else:
            raise Exception(
                f"Column '{col}' Name cannot be empty in {sheet_name}")


def seperate_column_types(df_type: DataFrame):
    """Separate columns by type and return their header names for use with pandas usecols."""
    standard_columns = [
        df_type.iloc[1, idx] for idx, col in enumerate(df_type.columns) if df_type.at[0, col].upper() == ColumnTypes.STD.value]
    condition_columns = [
        df_type.iloc[1, idx] for idx, col in enumerate(df_type.columns) if df_type.at[0, col].upper() == ColumnTypes.COND.value]
    property_columns = [
        df_type.iloc[1, idx] for idx, col in enumerate(df_type.columns) if df_type.at[0, col].upper() == ColumnTypes.INF.value]

    return standard_columns, condition_columns, property_columns


def extract_column_names(df_type: DataFrame, column_names: list[str]):
    """Extract unique column names from the list."""
    column_names = list(dict.fromkeys(column_names))
    return column_names


def construct_dataframe_for_column_type(excel_file: ExcelFile, sheet_name: str, column_info, column_type: str):
    """Construct dataframe for a specific column type using actual column header names."""
    column_names = column_info[sheet_name][f"{column_type}"]
    if not column_names:
        # Return empty dataframe if no columns of this type
        return pd.DataFrame()
    
    # Use actual column names (which become the headers when header=3)
    data_frame = excel_file.parse(
        sheet_name,
        header=3,
        usecols=column_names,
        dtype=str,
        keep_default_na=False
    ).rename(columns=SPEC_FILE_MAPPING).replace('', None)

    return data_frame


def seperate_columns(excel_file: ExcelFile, sheet_names: list):
    column_info = {}
    for sheet_name in sheet_names:
        data_frame = excel_file.parse(sheet_name, skiprows=2, nrows=2, header=None, dtype=object)
        standard_columns, condition_columns, property_columns = seperate_column_types(data_frame)

        column_info.update({
            sheet_name: {
                ColumnTypes.STD.value: standard_columns,
                ColumnTypes.COND.value: condition_columns,
                ColumnTypes.INF.value: property_columns
            }
        })

    return column_info, standard_columns, condition_columns, property_columns


def validate_spec_count(excel_file: ExcelFile, sheet_names: list):
    total_specs = 0

    for sheet_name in sheet_names:
        data_frame = excel_file.parse(sheet_name, header=3)
        total_specs += len(data_frame)

    if (total_specs > 10000):
        raise Exception(f"{ErrorMessages.MAXIMUM_SPEC_INGESTION_LIMIT}, total specs in the excel file is {total_specs}")

In [None]:
def generate_base_spec_data(df_type: DataFrame, sheet_name: str, category: str, type: str):
    specs = []
    for idx, spec_row in enumerate(df_type.to_dict("records")):
        if not product_id:
            raise Exception(ErrorMessages.EMPTY_PRODUCT_ID)
        if not spec_row.get(SpecDataKeys.SPEC_ID):
            raise Exception(
                f"{ErrorMessages.EMPTY_SPEC_ID} in {sheet_name} in row {idx}")
        if isinstance(type, str):
            pass
        elif (math.isnan(type)):
            raise Exception(f"{ErrorMessages.EMPTY_TYPE} in {sheet_name}")
        spec_data = {
            SpecDataKeys.PRODUCT_ID: product_id,
            SpecDataKeys.SPEC_ID: spec_row.get(SpecDataKeys.SPEC_ID),
            SpecDataKeys.NAME: spec_row.get(SpecDataKeys.NAME) if isinstance(spec_row.get(SpecDataKeys.NAME), str) else None,
            SpecDataKeys.CATEGORY: category if isinstance(category, str) else None,
            SpecDataKeys.TYPE: type,
            SpecDataKeys.SYMBOL: spec_row.get(SpecDataKeys.SYMBOL) if isinstance(spec_row.get(SpecDataKeys.SYMBOL), str) else None,
            SpecDataKeys.BLOCK: spec_row.get(SpecDataKeys.BLOCK) if isinstance(spec_row.get(SpecDataKeys.BLOCK), str) else None,
            SpecDataKeys.UNIT: spec_row.get(SpecDataKeys.UNIT) if isinstance(spec_row.get(SpecDataKeys.UNIT), str) else None,
            SpecDataKeys.CONDITIONS: [],
            SpecDataKeys.WORKSPACE: workspace_id if workspace_id else None
        }
        if type.lower() == SpecType.PARAMETRIC:
            try:
                spec_data[SpecDataKeys.LIMIT] = {
                    SpecLimitKeys.MIN: float(spec_row.get(SpecLimitKeys.MIN)) if spec_row.get(SpecLimitKeys.MIN) else None,
                    SpecLimitKeys.MAX: float(spec_row.get(SpecLimitKeys.MAX)) if spec_row.get(SpecLimitKeys.MAX) else None,
                    SpecLimitKeys.TYPICAL: float(spec_row.get(SpecLimitKeys.TYPICAL)) if spec_row.get(SpecLimitKeys.TYPICAL) else None
                }
            except ValueError as e:
                raise Exception(
                    f"Invalid numeric value in {sheet_name} row {idx + 1}. "
                    f"Min, Max, and Typical columns must contain valid numbers. "
                    f"Error: {str(e)}")
        specs.append(spec_data)
    return specs

In [None]:
def generate_spec_conditions(df_type: DataFrame, column_names: list, sheet_name: str, specs: list, index: int):
    for idx, row in enumerate(df_type.to_dict("records")):
        for column in column_names:
            if (row.get(column)):
                condition = {}
                condition[SpecConditionKeys.NAME] = column.split('(')[0].strip()
                unit = extract_condition_unit(column)
                condition[SpecConditionKeys.VALUE] = process_condition_values(
                    row[str(column)], unit, sheet_name, column, idx)
                specs[index][SpecDataKeys.CONDITIONS].append(condition)
        index = index + 1

In [None]:
def generate_spec_properties(df_type: DataFrame, column_names: list, specs: list, index: int):
    for row in df_type.to_dict("records"):
        property = {column: row.get(str(column)) for column in column_names if row.get(str(column)) is not None}
        if property:
            specs[index][SpecDataKeys.PROPERTIES] = property
            index = index + 1

### SCM Template to SLE Format extraction logic


In [None]:
def generate_spec_data():

    xl = pd.ExcelFile(filename)
    sheet_names = [
        sheet_name for sheet_name in xl.sheet_names if sheet_name != TEMPLATE_SHEET
    ]
    column_info = {}
    specs = []
    index = 0

    validate_spec_count(xl, sheet_names)

    # Construct dataframe and generate specs
    for sheet_name in sheet_names:

        df_type = xl.parse(sheet_name, skiprows=2, nrows=2, header=None, dtype=object)

        if (df_type.empty):
            continue

        validate_column_types(df_type, sheet_name)

        standard_columns, condition_columns, property_columns = seperate_column_types(df_type)

        column_info.update({
            sheet_name: {
                ColumnTypes.STD.value: standard_columns,
                ColumnTypes.COND.value: condition_columns,
                ColumnTypes.INF.value: property_columns
            }
        }
        )
        all_condition_column_names = extract_column_names(df_type, condition_columns)
        all_properties_column_names = extract_column_names(df_type, property_columns)

        df_meta = xl.parse(sheet_name, nrows=2, usecols="B", header=None)
        category = df_meta.values[0][0]
        type = df_meta.values[1][0]

        df_standard = construct_dataframe_for_column_type(xl, sheet_name, column_info, ColumnTypes.STD.value)
        df_condition = construct_dataframe_for_column_type(xl, sheet_name, column_info, ColumnTypes.COND.value)
        df_properties = construct_dataframe_for_column_type(xl, sheet_name, column_info, ColumnTypes.INF.value)

        specs.extend(generate_base_spec_data(df_standard, sheet_name, category, type))

        generate_spec_conditions(df_condition, all_condition_column_names, sheet_name, specs, index)

        generate_spec_properties(df_properties, all_properties_column_names, specs, index)

        index += len(df_standard)

    return specs

### Query Specs


In [None]:
def query_specs(product_ids: str):
    body = json.dumps({
        ApiBodyKeys.PRODUCT_IDS: [
            product_ids
        ],
        ApiBodyKeys.TAKE: MAXIMUM_SPEC_LIMIT
    }, indent=4)
    response = create_post_request(query_spec_url, body)
    response = response.json()
    return response

### Delete Specs


In [None]:
def delete_specs(query_spec_ids: list):
    body = json.dumps({
        ApiBodyKeys.IDS: query_spec_ids
    }, indent=4)
    response = create_post_request(delete_spec_url, body)
    return response

### Create Specs


In [None]:
def create_specs(spec_data: list):
    complete_response = []
    for i in range(0, len(spec_data), CREATE_SPECS_BATCH_SIZE):
        specs_batch = spec_data[i:i+CREATE_SPECS_BATCH_SIZE]
        payload = json.dumps({
            ApiBodyKeys.SPECS: specs_batch
        }, indent=4)
        response = create_post_request(create_spec_url, payload)
        complete_response.append(response.json())
    return complete_response

In [None]:
spec_data = generate_spec_data()

### Call API's and process responses


In [None]:
error_messages = []
query_spec_ids = []
created_spec_ids = []

query_spec_response = query_specs(product_id)

query_spec_ids = [spec[ApiResponseKeys.ID]
                  for spec in query_spec_response.get(ApiResponseKeys.SPECS, [])]

delete_spec_response = delete_specs(query_spec_ids) if len(query_spec_ids) > 0 else None
create_spec_response = create_specs(spec_data)

error_messages = [response[ApiResponseKeys.ERROR] for response in create_spec_response
                  if ApiResponseKeys.ERROR in response]

if error_messages:
    error = True

created_spec_ids = [spec[ApiResponseKeys.SPEC_ID] for response in create_spec_response
                    for spec in response.get(ApiResponseKeys.CREATED_SPECS, [])]

created_spec_ids_df = pd.DataFrame(
    {'Created Spec Id': created_spec_ids}).to_dict(orient='records')

### Print the execution result


In [None]:
if not error_messages:
    sb.glue("Result", "All specs uploaded successfully")
    sb.glue("Created Specs", created_spec_ids_df)
else:
    sb.glue("Error", error_messages)
    if created_spec_ids:
        sb.glue("Created Specs", created_spec_ids_df)