<a href="https://colab.research.google.com/github/puzis/conet/blob/main/code/GeNet_journal/GeNet_journal_topology_img_to_YAML.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Convert Topology image to JSON using OpenAI API

**Notice!** in order to run this notebook you need to: add your api_key to the 'secret section' in colab.

In [None]:
!pip install tqdm



In [None]:
!pip install rapidfuzz

Collecting rapidfuzz
  Downloading rapidfuzz-3.13.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Downloading rapidfuzz-3.13.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.1/3.1 MB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: rapidfuzz
Successfully installed rapidfuzz-3.13.0


In [None]:
import base64
import os
import pandas as pd
import requests
import time
import json
import tqdm
import math
from google.colab import userdata
from openai import OpenAI
# OpenAI API Key
api_key = userdata.get('api_key')#need to add the key to the 'secrets section'
client = OpenAI(api_key = api_key)
# OpenRouter API Key
Open_Router_API_KEY = userdata.get('Open_Router_API_KEY') #need to add the key to the 'secrets section'

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## API request:

In [None]:
import json

# Function to encode the image
def encode_image(image_path):
  with open(image_path, "rb") as image_file:
    return base64.b64encode(image_file.read()).decode('utf-8')


# Getting the base64 string
def create_image_content(_image_path):
    return {
          "type": "image_url",
          "image_url": {
            "url": f"data:image/jpeg;base64,{encode_image(_image_path)}",
            "detail": "high"
          }
        }

def phase_1_result(topology_image_path, temp, selected_model,open_router = True, or_model_dict = {}):
  img_content  = create_image_content(topology_image_path)
  prompt = ''' As a network topology analyst, you are given an image of a network topology diagram.
  Analyze it carefully and return **only** a single JSON object in the exact format below, without extra text.

  {
    "nodes": [
      {
        "label": "<NODE_NAME>", //  Avoid adding component type to the name of the component i.e. router R1 should be called "R1" rather than "Router R1"
        "icon": "<one of: pc | cloud | router | ethernet_switch | ids>",
        // If icons are not illustrated in the diagram, use an empty string ("") for the icon field.
        // If the icon’s name differs, map it to the closest option above e.g. "Desktop computer symbol" → "pc".
      }
      // …repeat for every device in the topology (routers, firewalls, switches, PCs, etc.)
    ],

    "links": [
      ["<source_node_label>", "<destination_node_label>", "<source_interface>", "<destination_interface>"]
      // If interfaces are not illustrated in the diagram, use an empty string ("") for both interface fields.
      // …repeat for every edge/connection in the topology
    ]
  }
  '''
  if open_router == False:
    response = client.chat.completions.create(
        model= selected_model,
        messages=[
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": prompt},
                    img_content
                ],
            }
        ],
        temperature = temp,
        response_format={ "type": "json_object" })
    topology = response.choices[0].message.content
  else:
    # print(img_content)
    model_path = or_model_dict[selected_model]["path"]
    proivder_name = or_model_dict[selected_model]["provider_name"]
    conversation_history = [
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": prompt},
                    img_content
                ],
            }
        ]
    topology, reasoning_text, token_usage_counts, error_text, cost = prompt_model(model_path,proivder_name,conversation_history)
  e = None
  try:
    topology = json.loads(topology)
  except Exception as ex:
    e = ex
    print(f"Can't parse JSON properly because of {e}")
  return topology, reasoning_text, token_usage_counts, error_text, cost, e

Open Router code

In [None]:
def api_call_to_prompt_model(model_path, provider_name, conversation_history, temperature=1.0):
    """
    === Prompt Requests ===
    This function sends a request using Open Routers API with the prompt (within conversation history) and other parameters.
    The request is sent to the provider with a name matching the contents of the "provider_name" variable.

    === Request Error Handling ===
    Additionally this function handles errors as a result of communication and request handling.
    If request fails, attempt resending with a max total attempts of 3.
    If after 3 attempts no successful response received, return the error message if exists, otherwise a custom error.

    === Parameters ===
    :param model_path: Path to model as stated in Open Router
    :param provider_name: Name of provider which provides the selected model (which is specified by model_path)
    :param conversation_history: The conversation history containing the prompt.
    :param max_tokens: Number of max tokens the model may generate and return as response to prompt.
    :param temperature: The temperature parameter of the model.

    === Return Values ===
    :return: idx 0 - Response jason returned in response or 0 no successful response received.
    :return: idx 1 - Error message (if not errors occurred, return empty string "")
    """
    # Init counter to track number of attempted requests
    request_attempts = 0

    # Init flag variable to track successful responses
    successful_response = False

    # Init error message
    error_message = ""

    # Send request to Open Router - If failed, attempt again up to 3 times
    while request_attempts < 3 and successful_response == False:
        try:
            request_attempts += 1
            response = requests.post(
                url="https://openrouter.ai/api/v1/chat/completions",
                headers={
                    "Authorization": f"Bearer {Open_Router_API_KEY}",
                },
                data=json.dumps({
                    "model": model_path,
                    "messages": conversation_history, # Contains conversation history
                    "provider": {"only":[provider_name]}, # Specifies which provider to use
                    "temperature": temperature,
                    "response_format": { "type": "json_object" }
                }))

            successful_response = True # Successful response

        # Error during request
        except Exception as e:
            error_message = str(e)
            time.sleep(2) # sleep a bit before retrying

    # Failed to get response after 3 attempts
    if request_attempts == 3:
        return 0, error_message

    # Successful response
    elif response.status_code == 200:
        error_message = ""
        return response.json(), error_message

    # Response contains error
    else:
        # Handle non-200 status codes
        error_message = response.json().get("error", {}).get("message", "No specific error message provided.")
        print(f"API returned error status {response.status_code}. Message: {error_message}")
        return 0, error_message

def prompt_model(model_path,provider_name,conversation_history):#,max_tokens):
    """
    === Prompting ===
    This function calls for api_call_to_prompt_model to send requests and received response jason containing model response to sent prompt or errors occurred during request handaling.
    When received, the following information is extracted from the response: generation_id, response_text, token_usage_counts.

    === Request Cost ===
    A request is sent to Open Router with the generation_id to retrieve the cost of the request.
    This is done to allow cost tracking.
    This request is attempted at most 3 times, after which, if no valid response received, the cost value is set to 0.

    === Error Handaling ===
    this function also handles errors related to the model which returned in the response json when model failed to responed succesfully.
    In this case the error is recorded.

    === Parameters ===
    :param model_path: Path to model as stated in Open Router
    :param provider_name: Name of provider which provides the selected model (which is specified by model_path)
    :param conversation_history: The conversation history containing the prompt.
    :param max_tokens: Number of max tokens the model may generate and return as response to prompt.

    === Return Variables ===
    :return: idx 0 - response_text - Models respones (string) to the prompt sent.
    :return: idx 1 - token_usage_counts - A dictionarry with an approximation of token usage (prompt,complition,total) returned by Open Router, and not by the provider !
    :return: idx 2 - error_text - Returns error text if occurred.
    :return: idx 3 - cost - Returns the cost of the prompt request.
    """
    # ===== Sent prompt using API ===== #
    response, request_error_message = api_call_to_prompt_model(model_path, provider_name, conversation_history)#, max_tokens=max_tokens)


    # ===== Response Successful ===== #
    if response != 0 and response.get("choices",0) != 0:
        generation_id = response["id"]
        response_text = response["choices"][0]["message"]["content"] # Get model response text
        reasoning_text = response["choices"][0]["message"].get("reasoning") or "" # Get the models reasoning text if available, otherwise write ""
        token_usage_counts = response.get("usage") # Get token usage count
        error_text = "" # No error happened

        # Edge Case Testing:
        if token_usage_counts is None:
            token_usage_counts = {'completion_tokens':0, 'prompt_tokens':0, 'total_tokens':0}
            print("MISSING TOKEN USAGE INFORMATION")

        # ===== Get Request Cost Info ===== #
        # Get Query Generation Stats
        attempts = 0
        max_attempts = 3
        cost = 0  # Init cost with 0
        while attempts < max_attempts:

            # Give time for logs to update / wait between attempts
            time.sleep(1)

            # seng request for info
            generation_response = requests.get(
                f"https://openrouter.ai/api/v1/generation?id={generation_id}",
                headers={
                    "Authorization": f"Bearer {Open_Router_API_KEY}"})

            attempts += 1

            # Get cost if successful request
            if generation_response.status_code == 200:
                stats = generation_response.json()
                data = stats["data"]
                cost = data["total_cost"]
                break # exist while loop


    # ===== Response Failed ===== #
    else:

        # Set response to empty string (due to failed response retrieve from API/Model)
        response_text = ""
        reasoning_text = ""

        # Assuming no cost when response fails
        cost = 0

        # Get error message from response
        try:
            error_text = response['error']['message']
        except:
            # Get error message during request
            if request_error_message:
                error_text = request_error_message
            else:
                error_text = "Error happened and failed to get official error message"

        # Init empty token usage dictionary
        token_usage_counts = {}

    return response_text, reasoning_text, token_usage_counts, error_text, cost

In [None]:
models_dict = {
"llama_4_maverick": {
    "path": "meta-llama/llama-4-maverick",
    "provider_name": "deepinfra/base",
    },
"llama-4-scout":{
    "path": "meta-llama/llama-4-scout",
    "provider_name": "lambda/fp8",
    } ,
"Llama_3.2_11B_Vision_Instruct": {
    "path": "meta-llama/llama-3.2-11b-vision-instruct",
    "provider_name": "deepinfra/bf16",
    },
"llama-3.2-90b-vision-instruct":{
    "path": "meta-llama/llama-3.2-90b-vision-instruct",
    "provider_name": "together/fp8",
    },
"gemini-2.5-flash":  {
    "path": "google/gemini-2.5-flash",
    "provider_name": "google-vertex",
    },
"gemini_2.0_Flash":{
    "path": "google/gemini-2.0-flash-001",
    "provider_name": "google-ai-studio",
    },
"gpt-4.1-mini":{
    "path": "openai/gpt-4.1-mini",
    "provider_name": "openai",
    },
"o4-mini": {
    "path": "openai/o4-mini",
    "provider_name": "openai"
    },
"gpt-4o-mini": {
    "path": "openai/gpt-4o-mini",
    "provider_name": "openai",
    }
                }

# Running phase1 experiment

In [None]:
os.makedirs("path", exist_ok=True)

In [None]:
import time
from tqdm import tqdm
import pandas as pd
error_count = 0
#---- Time evaluation file creation and reading----
phase1_time_calc_file_address = f"path/Phase1_time.csv"
if os.path.isfile(phase1_time_calc_file_address):
  phase1_df = pd.read_csv(phase1_time_calc_file_address)
else:
  phase1_df = pd.DataFrame(columns=["Scenario","Platform","Diagram_Type","Phase1_Temp","Run","Model","Time","Cost","Reasoning_Text","Prompt_tokens","Completion_tokens","Error_message"])
#--------------------------------------------------
model = "Llama_3.2_11B_Vision_Instruct"
run = 1
#--------output format - YAML or JSON------------- # Only JSON is needed for this expt
# JSON -> True, YAML -> false
# output_format = True

# diagrams input directory
vision_dir = "path"
# output directory
phase1_output_dir = f"path/{model}"
os.makedirs(phase1_output_dir, exist_ok=True)


#------ main params---------
diagram_types = ["Normal","Messy_Layout", "No_Labels_On_Edges"]
platform_options = ["Paper_Sketches","PowerPoint" ,"GNS3"]
folders = ["Adding_Communication_Servers", "Adding_DMZ", "Adding_DRA", "Adding_Local_PCs", "Internet_Connectivity","Role_Based_CLI_Access","Time_Based_Access_List","Transparent_IOS_Firewall", "Basic_Zone_Based_Firewall", "IP_Traffic_Export" ]
temp = 1.0


#---------------------------

for platform in tqdm(platform_options, desc="Processing"):
 vision_diagram_dir = os.path.join(vision_dir, platform)
 result_diagram_dir = os.path.join(phase1_output_dir, platform)
 os.makedirs(result_diagram_dir, exist_ok=True)

 for diagram_type in diagram_types:
  vision_diagram_path = os.path.join(vision_diagram_dir, diagram_type)
  result_diagram_path = os.path.join(result_diagram_dir, diagram_type)
  os.makedirs(result_diagram_path, exist_ok=True)

  for scenario in folders:
    vision_scenario_dir = os.path.join(vision_diagram_path, scenario)
    result_scenario_dir = os.path.join(result_diagram_path, scenario)
    os.makedirs(result_scenario_dir, exist_ok=True)
    graph_image_path = f"{vision_scenario_dir}/{scenario}_{platform}_{diagram_type}.jpg"


    for run_number in range(run,run+1):
      ans_file_path = os.path.join(result_scenario_dir,f"Topology(Run{run_number}).json")
      # if file already exist:
      if os.path.isfile(ans_file_path):
        continue

      print(f"\n--------------SCENARIO: {scenario}, Model: {model}, RUN:{run_number}, Diagram Type: {diagram_type}, Platform: {platform}  ----------------------------------\n")
      start_time = time.time()
      #------------------calling the phase1 function----------------------------
      answer, reasoning_text, token_usage_counts, error_text, cost, e = phase_1_result(graph_image_path, temp, model, True, models_dict)
      #-------------------------------------------------------------------------
      end_time = time.time()
      time_taken = end_time-start_time
      if e:
        error_count += 1
        with open(f"{phase1_output_dir}/parsing_error_log.txt", "a", encoding="utf-8") as log_file:
            log_file.write(f"Parsing Error occured in Scenario: {scenario}, Platform: {platform}, Diagram Type: {diagram_type}:\n")
            log_file.write(f"{e}\n")
            log_file.write("\n" + "="*60 + "\n")
      # adding new line to the time evaluation dataframe
      new_row = {"Scenario":scenario, "Platform":platform, "Diagram_Type":diagram_type, "Temp":temp, "Run":run_number,"Model":model, "Time":time_taken, "Reasoning_Text":reasoning_text, "Prompt_tokens": token_usage_counts["prompt_tokens"],"Completion_tokens":token_usage_counts["completion_tokens"],"Error_message":error_text,"Cost":cost}
      phase1_df.loc[len(phase1_df)] = new_row

      #--------------------------------------------------------------------------------------
      with open(ans_file_path, 'w') as file:
        json.dump(answer, file)

      print(answer)
      print("\n")
      print(f"Time taken: {time_taken}\n")

phase1_df.to_csv(phase1_time_calc_file_address, index=False)

print(f"Finished sucessfully with {error_count} erroneous JSON results which can't be parsed properly")

Processing:  67%|██████▋   | 2/3 [00:16<00:07,  7.45s/it]


--------------SCENARIO: Transparent_IOS_Firewall, Model: Llama_3.2_11B_Vision_Instruct, RUN:5, Diagram Type: No_Labels_On_Edges, Platform: GNS3  ----------------------------------

{'nodes': [{'label': 'PC1', 'icon': 'pc'}, {'label': 'PC2', 'icon': 'pc'}, {'label': 'PC3', 'icon': 'pc'}, {'label': 'VPCS', 'icon': 'pc'}, {'label': 'Switch1', 'icon': 'ethernet_switch'}, {'label': 'Switch2', 'icon': 'ethernet_switch'}, {'label': 'Switch3', 'icon': 'ethernet_switch'}, {'label': 'Switch4', 'icon': 'ethernet_switch'}, {'label': 'Switch5', 'icon': 'ethernet_switch'}, {'label': 'Switch6', 'icon': 'ethernet_switch'}, {'label': 'Switch7', 'icon': 'ethernet_switch'}, {'label': 'Switch8', 'icon': 'ethernet_switch'}, {'label': 'PC10', 'icon': 'pc'}, {'label': 'PC9', 'icon': 'pc'}, {'label': 'PC11', 'icon': 'pc'}, {'label': 'VPCS', 'icon': 'pc'}, {'label': 'PC12', 'icon': 'pc'}, {'label': 'PC5', 'icon': 'pc'}, {'label': 'VPCS', 'icon': 'pc'}], 'links': [['Switch1', 'PC1', '', ''], ['Switch1', 'Switc

Processing: 100%|██████████| 3/3 [02:59<00:00, 59.75s/it]

Can't parse JSON properly because of Expecting value: line 1 column 1 (char 0)
The diagram illustrates a network topology map that uses an interactive application to display between-connected hardware and software components.

Here are the nodes as defined by the image:

NODES
 - **PC1** (& FORGE & Switch1)
 - **IDS** (& Switch1 & Repo & Switch2 & Switch3 & Switch4)
 - *Switch1: [PC3] [Repo]*
 - *Switch3:* [*VPCS*]

LINKS
- **[PC1][IDS](Port1) ** IOS: ""
-[-]*Switch1* [-] [IDS](Port3)
-[*Switch1* [-] [Switch2](Port1) (- **[Repo](Port1)** (- Switch1: [Repo] *Switch3}elseif(ids)*/
-["Switch2","Repo"]
-[*Switch3](Port 1) [VPCS]
-[*Ethernet*]VPCS

 
 ACA et al
 lua   node"Pc"  [NodeA]- swap*

 
“That’s why I like it. Once you have a node extra inter vs nodes heap smells like sent\\.timeout toler gently extinction fiber bisa airschnitt paired jack var yvs the.”


Time taken: 24.7959201335907

Finished sucessfully with 2 erroneous JSON results which can't be parsed properly





## Eval

#### TIUS Code

In [None]:
from itertools import permutations, combinations

def get_equal_pairs(arr1, arr2, attr="label"):
    equals = []

    # For each product set {i,j}|i!=j , add to array if strings are identical
    for i in range(0,len(arr1)):
        for j in range(0, len(arr2)):
            if arr1[i][attr].lower() == arr2[j][attr].lower():
              equals = equals + [(arr1[i], arr2[j])]

    return equals

def get_sim_NPL(nodes1, nodes2):
    sim_NPL = get_equal_pairs(arr1 = nodes1, arr2 = nodes2, attr="label")
    return sim_NPL


def get_sim_NPI(sim_NPL):
    sim_NPI = []
    for pair in sim_NPL:
        node1 = pair[0]
        node2 = pair[1]
        if  node1["icon"].lower() == node2["icon"].lower():
          sim_NPI = sim_NPI + [pair]

    return sim_NPI

def are_connected(node1, node2, links):
    undirected_links = [set([link[0], link[1]]) for link in links]
    query_link = set([node1["label"], node2["label"]])
    if query_link in undirected_links:
        return True

    return False

def get_edge(node1, node2, links):
    node1_label = node1["label"]
    node2_label = node2["label"]
    possible_links = [link for link in links if set([node1_label, node2_label]) == set([link[0], link[1]])]
    link = possible_links[0]

    return link

def get_sim_E(sim_NPL, links1, links2):
    sim_E = 0
    sim_NPL_pairs = combinations(sim_NPL, 2)
    for sim_NPL_pair in sim_NPL_pairs:
        sim_pair1 = sim_NPL_pair[0]
        sim_pair2 = sim_NPL_pair[1]
        if are_connected(sim_pair1[0], sim_pair2[0], links=links1) and are_connected(sim_pair1[1], sim_pair2[1], links=links2):
            sim_E = sim_E + 1

    return sim_E

def get_sim_EPlabel(sim_NPL, links1, links2):
    sim_EPlabel = 0
    sim_NPL_pairs = combinations(sim_NPL, 2)
    for sim_NPL_pair in sim_NPL_pairs:
        sim_pair1 = sim_NPL_pair[0]
        sim_pair2 = sim_NPL_pair[1]
        connected_by_edge = are_connected(sim_pair1[0], sim_pair2[0], links=links1) and are_connected(sim_pair1[1], sim_pair2[1], links=links2)
        if connected_by_edge:
            edge1 = get_edge(sim_pair1[0], sim_pair2[0], links1)
            edge2 = get_edge(sim_pair1[1], sim_pair2[1], links2)
            edge1_interfaces = sorted([str(edge1[2]),str(edge1[3])])
            edge2_interfaces = sorted([str(edge2[2]), str(edge2[3])])
            if edge1_interfaces[0].lower() == edge2_interfaces[0].lower():
                sim_EPlabel = sim_EPlabel + 1
            if edge1_interfaces[1].lower() == edge2_interfaces[1].lower():
                sim_EPlabel = sim_EPlabel + 1

    return sim_EPlabel

def total_nodes(topology):
    return len(topology["nodes"])

def total_edges(topology):
    return len(topology["links"])


# strip labels spaces
def strip_node_names(nodes):
  for node in nodes:
    node["label"] = node["label"].replace(" ","")
  return nodes

#strip edges spaces
def strip_edge_names(links):
  for link in links:
    link[0] = link[0].replace(" ","")
    link[1] = link[1].replace(" ","")
  return links


def get_comparison_metrics(topology1, topology2):
    N_coe = 0.3
    NPL_coe = 0.2
    NPI_coe = 0.05
    E_coe = 0.35
    EPlabel_coe = 0.1


    # remove spaces in names
    topology1["nodes"] = strip_node_names(topology1["nodes"])
    topology2["nodes"] = strip_node_names(topology2["nodes"])
    topology1["links"] = strip_edge_names(topology1["links"])
    topology2["links"] = strip_edge_names(topology2["links"])

    N1, N2 = total_nodes(topology1), total_nodes(topology2)
    if N1 == 0 or N2 == 0:
        raise Exception("N1 or N2 is 0, Cannot divide by 0")

    E1, E2 = total_edges(topology1), total_edges(topology2)
    if E1 == 0 or E2 == 0:
        raise Exception("E1 or E2 is 0, Cannot divide by 0")

    sim_NPL = get_sim_NPL(topology1["nodes"], topology2["nodes"])
    NPL = len(sim_NPL) / max(N1, N2)
    sim_E = get_sim_E(sim_NPL, topology1["links"], topology2["links"])
    sim_NPI = get_sim_NPI(sim_NPL=sim_NPL)
    NPI = len(sim_NPI) / max(N1, N2)
    sim_EPlabel = get_sim_EPlabel(sim_NPL, topology1["links"], topology2["links"])

    Metric_results_value = N_coe*min(N1,N2)/max(N1,N2) + NPL_coe*NPL + NPI_coe*NPI + E_coe*sim_E/max(E1,E2) + EPlabel_coe*sim_EPlabel/(2*max(E1,E2))
    return {'N1':N1, 'N2':N2, 'E1':E1, 'E2':E2, 'NPL':NPL, 'NPI':NPI, 'sim_E':sim_E, 'sim_EPlabel':sim_EPlabel, 'Metric_score':Metric_results_value}

#### Sorted DamerauLevenshtein Edit Distance

In [None]:
from langchain.evaluation.parsing.json_distance import JsonEditDistanceEvaluator
import json

def normalize_icon(icon: str) -> str:
    return icon.lower()

def normalize_label(label: str) -> str:
    return label.lower()

def normalize_bidirectional_link(link):
    """
    Normalize a 4-element bidirectional link like:
    ["PC1", "Switch2", "e0", "e1"] or ["Switch2", "PC1", "e1", "e0"]
    So that both become: ["pc1", "e0", "switch2", "e1"] to avoid semantic equivalence that is compared wrongly by the edit distance.
    """
    if len(link) != 4:
        return [str(e).lower() for e in link]  # fallback for malformed links

    node1, node2, intf1, intf2 = [str(e).lower() for e in link]

    endpoint1 = (node1, intf1)
    endpoint2 = (node2, intf2)

    # Sort by node name (lexical order)
    sorted_endpoints = sorted([endpoint1, endpoint2], key=lambda x: x[0])

    # Flatten the list of tuples
    return [item for pair in sorted_endpoints for item in pair]

def sort_json_for_eval(json_string: str) -> str:
    """Expects to get a python dict and returns a canonicalized str (sorted for comparisson)"""
    obj = json.dumps(json_string)
    obj = json.loads(obj)
    def sort_nodes(nodes):
        for node in nodes:
            if "label" in node:
                node["label"] = normalize_label(node["label"])
            if "icon" in node:
                node["icon"] = normalize_icon(node["icon"])
        return sorted(nodes, key=lambda x: x.get("label", ""))

    def sort_links(links):
        normalized_links = [normalize_bidirectional_link(link) for link in links]
        return sorted(normalized_links, key=lambda link: tuple(link))

    def recurse_sort(o):
        if isinstance(o, dict):
            return {k: recurse_sort(v) for k, v in sorted(o.items())}  #sorts each value so nested structures are also handled i.e. the lists and dicts inside the JSON
        elif isinstance(o, list):
            if all(isinstance(i, dict) and "label" in i for i in o):
                return sort_nodes(o)
            elif all(isinstance(i, list) and len(i) == 4 for i in o):
                return sort_links(o)
            else:
                print("Incompatible formats")
                return [recurse_sort(i) for i in o] #just in case but will not be used
        elif isinstance(o, str):
            print("Incompatible formats")
            return o.lower() #just in case but will not be used
        else:
            print("Incompatible formats")
            return o #just in case but will not be used -> return untouched if it dosnt match any branch in the recursion
    sorted_obj = recurse_sort(obj)

    ### checking if the model hallucinated and assigned an invalid icon
    valid_icons = ["pc", "cloud", "router","ethernet_switch","ids",""]
    invalid_nodes = [node for node in sorted_obj["nodes"] if node.get("icon") not in valid_icons]
    if invalid_nodes:
        print(f"XXXXXXXXXXX Icon hallucination detected in the predicted topology: {invalid_nodes} XXXXXXXXXXX")
    return json.dumps(sorted_obj, sort_keys=True, separators=(",", ":"))

#### Running comparison between Phase1 results and Ground truth


In [None]:
import pprint
import json
import time
import pandas as pd
c = 0
model = "Llama_3.2_11B_Vision_Instruct"
run = 1
parsing_dir_path = f"path/{model}"
diagram_types = ["Messy_Layout", "No_Labels_On_Edges", "Normal"]
platform_options =   ["GNS3","Paper_Sketches", "PowerPoint"]
scenarios = ["Adding_Communication_Servers", "Adding_DMZ", "Adding_DRA", "Adding_Local_PCs", "Internet_Connectivity","Role_Based_CLI_Access","Time_Based_Access_List","Transparent_IOS_Firewall", "Basic_Zone_Based_Firewall", "IP_Traffic_Export"]
temp = 1.0
results_list = []
dl_evaluator = JsonEditDistanceEvaluator(canonicalize=sort_json_for_eval) #Sorted DamerauLevenshtein Edit Distance

for platform in platform_options:
  for diagram_type in diagram_types:
    for scenario in scenarios:
      for run_number in range(run,run+1):

        if platform == "PowerPoint":  ## different ground truth representations - only four types are needed to cover the 9 combinations)
          if diagram_type == "No_Labels_On_Edges": #pp-no_lables
            gt_file_path = f"path/{scenario}.json"
          else: #(pp-normal,messy)
            gt_file_path = f"path/{scenario}.json"
        else:
          if diagram_type == "No_Labels_On_Edges": #no_labels(gns,paper)
            gt_file_path = f"/path/{scenario}.json"
          else: #normal-gns,paper, messy-gns,paper) - same gt
            gt_file_path = f"path/{scenario}.json"
        with open(gt_file_path, 'r') as f:
          benchmark_topology = json.load(f)

        phase1_scenario_directory = f"{parsing_dir_path}/{platform}/{diagram_type}/{scenario}"

        print(f"Running {platform} {diagram_type} {scenario} {temp} {run_number} \n")
        with open(f"{phase1_scenario_directory}/Topology(Run{run_number}).json", 'r') as f:
          pred_topology = json.load(f)

        start_time = time.time()
        try:
          result = get_comparison_metrics(benchmark_topology, pred_topology)
          pred_topology = json.dumps(pred_topology)

          dl_result = dl_evaluator.evaluate_strings(
          prediction=str(benchmark_topology).replace("'", '"'), #should be already a string
          reference= str(pred_topology).replace("'", '"')) #should be already a string
          result["DL_Score"] = 1-dl_result["score"]
        except:
          c += 1
          result = {'N1':None, 'N2':None, 'E1':None, 'E2':None, 'NPL':None, 'NPI':None, 'sim_E':None, 'sim_EPlabel':None, 'Metric_score':None,"DL_Score":None}



        end_time = time.time()
        time_taken = end_time-start_time
        result["Scenario"] = scenario
        result["Platform"] = platform
        result["Diagram_Type"] = diagram_type
        result["Temp"] = temp
        result["Run"] = run_number
        result["Time"] = time_taken
        result["model"] = model
        results_list.append(result)
        pprint.pprint(result)
        print("--------------------------\n")


with open(f"/{parsing_dir_path}/phase1_Run{run_number}_result_parsed_to_json_dict.json", "w") as f:
  json.dump(results_list, f, ensure_ascii=False, indent=4)

df = pd.DataFrame(results_list)
df.to_csv(f"/{parsing_dir_path}/phase1_Run{run_number}_results_parsed_to_json_dict_results.csv", index=False)

print(f"Finished the eval process sucessfully with {c} problematic results which its hard to evaluate automatically")

Running GNS3 Messy_Layout Adding_Communication_Servers 1.0 5 

{'DL_Score': None,
 'Diagram_Type': 'Messy_Layout',
 'E1': None,
 'E2': None,
 'Metric_score': None,
 'N1': None,
 'N2': None,
 'NPI': None,
 'NPL': None,
 'Platform': 'GNS3',
 'Run': 5,
 'Scenario': 'Adding_Communication_Servers',
 'Temp': 1.0,
 'Time': 4.38690185546875e-05,
 'model': 'Llama_3.2_11B_Vision_Instruct',
 'sim_E': None,
 'sim_EPlabel': None}
--------------------------

Running GNS3 Messy_Layout Adding_DMZ 1.0 5 

{'DL_Score': None,
 'Diagram_Type': 'Messy_Layout',
 'E1': None,
 'E2': None,
 'Metric_score': None,
 'N1': None,
 'N2': None,
 'NPI': None,
 'NPL': None,
 'Platform': 'GNS3',
 'Run': 5,
 'Scenario': 'Adding_DMZ',
 'Temp': 1.0,
 'Time': 6.103515625e-05,
 'model': 'Llama_3.2_11B_Vision_Instruct',
 'sim_E': None,
 'sim_EPlabel': None}
--------------------------

Running GNS3 Messy_Layout Adding_DRA 1.0 5 

{'DL_Score': None,
 'Diagram_Type': 'Messy_Layout',
 'E1': None,
 'E2': None,
 'Metric_score': Non