# Control GAP Analyzer

> The Control takes a ZIP file containing Standard JSON (source standard and target standard) as input. The API splits the controls from the standard and applies an embedding process. The system then identifies gaps in the controls by comparing the embedded controls with the policy, categorizing them as 70% <= Matched, 70-50% = GAP, and 50%> Missing. Finally, the system generates a ZIP (GAP_JSON) output containing the gap analysis report. 

In [None]:
#| default_exp policy/control_gap_analyzer

In [None]:
#| export
from app.policy.helper.llm_functions import get_openai_embedding
from app.policy.helper.tools import apply_markdown_gap
from sklearn.metrics.pairwise import cosine_distances
import numpy as np

In [None]:
#| export
def extract_controls_gap(input_standards): #JSON input of source and target standard
    "Function to extract Controls GAP"
    input_standards = input_standards[0]
    standards = [
        {
            "StandardName": input_standards['source_standard_name'],
            "controls": input_standards['source_standard_info']
        },
        {
            "StandardName": input_standards['target_standard_name'],
            "controls": input_standards['target_standard_info']
        }
    ]
     # Apply markdown to all standards
    # standards = load_standards()
    
    for idx, standard in enumerate(standards):
        for domain in standard["controls"]:
            domain['Markdown'] = apply_markdown_gap(domain, idx)
    arrays = [standard['controls'] for standard in standards]

    # def merge_arrays(arrays, threshold=0.5): 
    unique_items = []
    array_indices = []

    for array_index, sublist in enumerate(arrays):
        unique_items.extend(sublist)
        array_indices.extend([array_index] * len(sublist))

    # Extract all controls and their associated domain IDs
    all_controls = []
    all_controls_details = {}
    control_domain_ids = [] 
    for item in unique_items:
        for control in item['controls']:
            all_controls.append(control['markdown'])
            all_controls_details[control['control_number']]={'domain': f"{item['domain_number'].split(':')[1]} - {item['domain_name']}", 'control': f"{control['control_number'].split(':')[2]} - {control['control_name']}"}
            control_domain_ids.append(control['control_number'])
    # Get embeddings for each control
    embeddings = np.array(get_openai_embedding(all_controls))

    # Compute cosine distances
    cosine_dist = cosine_distances(embeddings)

    # Create a mapping from control index to its domain ID
    cosine_dist2 = cosine_dist

    cosine_dist2 = np.triu(cosine_dist,)
    # Get the lower triangular indices (excluding the diagonal)
    lower_triangular_indices = np.tril_indices_from(cosine_dist2, k=-1)

    # Replace elements at these indices with '1'
    cosine_dist2[lower_triangular_indices] = '1'

    #make diagonal 1 of cosine_dist2 matrix
    np.fill_diagonal(cosine_dist2, 1)

    cosine_dist = cosine_dist2

    threshold = 0.5
    control_to_domain = {i: control_domain_ids[i] for i in range(len(all_controls))}
    similar_pairs = []
    already_matched1 = []
    already_matched2 = []
    matched_controls = set()
    for i in range(len(cosine_dist)):
        icnt = 0
        for j in range(len(cosine_dist)):
            if i != j and cosine_dist[i][j] <= 1 - threshold and j not in already_matched2 and i not in already_matched1: 

                domain1 = control_to_domain[i]
                domain2 = control_to_domain[j]
                i_d_c1 =  domain1.split(":")
                i_d_c2 =  domain2.split(":")
                if i_d_c1[0] != i_d_c2[0]:
                    if icnt >=int(i_d_c2[0]): 
                        continue
                    similar_pairs.append((domain1, domain2, 1 - cosine_dist[i][j]))
                    matched_controls.add(domain1)
                    matched_controls.add(domain2)
                    already_matched1.append(i)
                    already_matched2.append(j)
                    icnt = int(i_d_c2[0])

    matched_controls_high = [pair for pair in similar_pairs if pair[2] >= 0.7]
    matched_controls_gap = [pair for pair in similar_pairs if pair[2] < 0.7]

    # print("Matched Controls with Cosine Value >= 0.7:", matched_controls_high)
    # print("Matched Controls in GAP State:", matched_controls_high)

    unmatched_controls = [control for control in control_domain_ids if control not in matched_controls]
    # print("Not Matched Controls State:", unmatched_controls)

    comparison_result = {
        "status": "success",
        "source_standard_name": standards[0]['StandardName'],
        "target_standard_name": standards[1]['StandardName'],
        "comparison_matrix": {
            "controls": []
        }
    }

    for source_control, target_control, similarity in matched_controls_high:
        source_control = all_controls_details[source_control]
        target_control = all_controls_details[target_control]
        comparison_result["comparison_matrix"]["controls"].append({
            "source_control": source_control['control'],
            "target_control": target_control['control'],
            "source_domain": source_control['domain'],
            "target_domain": target_control['domain'],
            "status": "match"
        })

    for source_control, target_control, similarity in matched_controls_gap:
        source_control = all_controls_details[source_control]
        target_control = all_controls_details[target_control]
        comparison_result["comparison_matrix"]["controls"].append({
            "source_control": source_control['control'],
            "target_control": target_control['control'],
            "source_domain": source_control['domain'],
            "target_domain": target_control['domain'],
            "status": "gap"
        })

    for control in unmatched_controls:
        s_control = all_controls_details[control]
        control = control.split(":")
        source_control = []
        target_control = []
        if control[0] == "0":
            source_control = control
        else:
            target_control = control
        comparison_result["comparison_matrix"]["controls"].append({
            "source_control": s_control['control'] if len(source_control) > 2 else "",
            "target_control": s_control['control'] if len(target_control) > 2 else "",
            "source_domain": s_control['domain'] if len(source_control) > 1 else "",
            "target_domain": s_control['domain'] if len(target_control) > 1 else "",
            "missing_source": control[0] == "1",
            "status": "missing"
        })
    return comparison_result

In [None]:
#| export
def controls_gap_processor(input_standards):#JSON input of source and target standard
    print('control gap api process started')
    comparison_result = extract_controls_gap(input_standards)
    print('control gap api process completed')
    return comparison_result

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()