In [None]:
import openai
from typing import List, Dict, Optional
import networkx as nx
import re
from tqdm import tqdm


In [None]:
class FlowchartEvaluator:
    def __init__(self, api_key: str):
        # Initialize OpenAI API
        openai.api_key = api_key
        openai.api_base = "https://api.xty.app/v1"
        self.model = "gpt-4"
        self.temperature = 0
        self.request_timeout = 10

    def parse_mermaid_flowchart(self, mermaid_text: str) -> List[tuple]:
        """
        Parse Mermaid flowchart text to extract nodes with descriptions and their successors.
        Returns a list of tuples: (node_id, description, list_of_successors)
        Successors are sorted alphabetically.
        """
        # Extract nodes and descriptions from original text
        node_rect_pattern = r'(\w+)\["([^"]*)"\]'
        node_dec_pattern = r'(\w+)\{([^}]*)\}'
        
        nodes = {}
        
        # Parse rectangular nodes
        for match in re.finditer(node_rect_pattern, mermaid_text):
            node_id = match.group(1)
            desc = match.group(2).strip()
            nodes[node_id] = desc
        
        # Parse decision nodes
        for match in re.finditer(node_dec_pattern, mermaid_text):
            node_id = match.group(1)
            desc = match.group(2).strip()
            nodes[node_id] = desc
        
        # Normalize text for edge parsing: replace node definitions with IDs
        normalized = re.sub(node_rect_pattern, r'\1', mermaid_text)
        normalized = re.sub(node_dec_pattern, r'\1', normalized)
        
        # Normalize all whitespace to single spaces
        normalized = re.sub(r'\s+', ' ', normalized).strip()
        
        # Edge pattern: source --> [|cond|] target
        edge_pattern = r'(\w+)\s*-->\s*(?:\|([^|]*)\|\s*)?(\w+)'
        
        edges = []
        for match in re.finditer(edge_pattern, normalized):
            source = match.group(1)
            condition = match.group(2)  # May be None if no condition
            target = match.group(3)
            edges.append((source, target))
        
        # Build adjacency list
        adj = {node_id: [] for node_id in nodes}
        for source, target in edges:
            if source in adj and target in adj:
                if target not in adj[source]:
                    adj[source].append(target)
        
        # Sort successors
        for node_id in adj:
            adj[node_id].sort()
        
        # Return sorted list of tuples
        result = [(node_id, nodes[node_id], adj[node_id]) for node_id in sorted(nodes.keys())]
        
        # Print parsed nodes
        # print("Parsed Flowchart Nodes:")
        # for node_id, desc, successors in result:
        #     node_type = self._extract_node_type(desc)
        #     print(f"Node ID: {node_id}, Type: {node_type}, Description: {desc}, Successors: {successors}")
        
        return result

    def _extract_node_type(self, description: str) -> str:
        """
        Extract node type from description (e.g., "Start:", "Decision:", "End:").
        """
        if ':' in description:
            prefix = description.split(':')[0].strip().lower()
            type_map = {
                'start': 'start',
                'prompt': 'prompt',
                'decision': 'decision',
                'action': 'action',
                'output': 'output',
                'reflection': 'reflection',
                'end': 'end'
            }
            return type_map.get(prefix, prefix)
        return description.lower()

    def call_gpt4o_mini(self, prompt: str) -> str:
        """
        Call gpt-4 to process a prompt for node matching.
        """
        try:
            completion = openai.ChatCompletion.create(
                api_base="https://api.xty.app/v1",
                model=self.model,
                temperature=self.temperature,
                messages=[{'role': 'user', 'content': prompt}],
                request_timeout=self.request_timeout
            )
            return completion.choices[0].message.content.strip()
        except Exception as e:
            print(f"Error calling gpt-4: {e}")
            return ""

    def match_utterance_to_node(self, utterance: str, node_descriptions: Dict[str, str]) -> Optional[str]:
        """
        Prompt gpt-4 to identify which node the utterance corresponds to.
        Returns node ID or None if no match is found.
        """
        if not node_descriptions:
            return None
        node_list = "\n".join([f"Node ID: {node_id}, Description: {desc}" for node_id, desc in node_descriptions.items()])
        prompt = (
            f"Given the following utterance and flowchart node descriptions, identify which node the utterance corresponds to based on the semantic.\n\n"
            f"When the utterance express query or task (e.g., I am looking for), it corresponds to the start node A. When the utterance express appreciation, it means the end of the conversation.\n\n"
            f"Return only the Node ID if a match is found, or 'None' if no node matches.\n\n"
            f"Utterance: {utterance}\n\n"
            f"Node Descriptions:\n{node_list}\n\n"
            f"Output only the Node ID or 'None'."
        )
        response = self.call_gpt4o_mini(prompt)
        return response if response != "None" else None
    
    def is_complete_dialogue(self, node_sequence: List[str], end_node: str) -> bool:
        """
        Check if the dialogue is complete: the end node is the last node or there are no None nodes after it.
        """
        # print(node_sequence)
        if end_node not in node_sequence:
            return False
        end_index = node_sequence.index(end_node)
        # Complete if end_node is the last node or no None values appear after it
        return end_index == len(node_sequence) - 1 or None not in node_sequence[end_index + 1:]

    def compute_cpc(self, classification_results: List[Dict]) -> float:
        """
        Compute Complete Path Coverage (CPC) from classification results.
        """
        if not classification_results:
            return 0.0
        complete_paths = sum(1 for result in classification_results if result["has_complete_path"])
        total_dialogues = len(classification_results)
        return complete_paths / total_dialogues if total_dialogues > 0 else 0.0

    def compute_umr(self, node_sequence: List[tuple]) -> float:
        """
        Compute Utterance Matching Ratio (UMR) for a single dialogue from node sequence.
        """
        if not node_sequence:
            return 0.0
        matched_utterances = sum(1 for _, node_id in node_sequence if node_id is not None)
        return matched_utterances / len(node_sequence)

    def compute_avg_umr(self, classification_results: List[Dict]) -> float:
        """
        Compute average UMR across all dialogues from classification results.
        """
        if not classification_results:
            return 0.0
        umr_scores = [self.compute_umr(result["utterance_node_pairs"]) for result in classification_results]
        return sum(umr_scores) / len(umr_scores) if umr_scores else 0.0

    def evaluate(self, mermaid_flowchart: str, dialogues: List[List[str]]) -> Dict:
        """
        Evaluate the flowchart against the dialogue dataset and return CPC, average UMR, and classification results.
        """
        # Parse Mermaid flowchart
        parsed_flowchart = self.parse_mermaid_flowchart(mermaid_flowchart)
        if not parsed_flowchart:
            print("Error: No nodes parsed from flowchart. Verify Mermaid syntax.")
            return {
                "CPC": 0.0,
                "Average_UMR": 0.0,
                "Classification_Results": [],
                "Error": "No nodes parsed from flowchart. Verify Mermaid syntax."
            }

        # Build node descriptions dictionary
        node_descriptions = {node_id: desc for node_id, desc, _ in parsed_flowchart}

        # Find end node, with fallback
        try:
            end_node = next(
                node_id for node_id, desc, _ in parsed_flowchart
                if self._extract_node_type(desc) == "end"
            )
            print(f"End Node Identified: {end_node}")
        except StopIteration:
            print("Error: No node with type 'end' found in the flowchart.")
            return {
                "CPC": 0.0,
                "Average_UMR": 0.0,
                "Classification_Results": [],
                "Error": "No end node found in flowchart"
            }

        # Compute node sequences and completeness with progress bar
        classification_results = []
        print("\nEvaluating Dialogues:")
        for i, dialogue in tqdm(enumerate(dialogues), total=len(dialogues), desc="Processing Dialogues"):
            print(f"\nDialogue {i}:")
            node_sequence = []
            for j, utt in enumerate(tqdm(dialogue, desc=f"Matching Utterances for Dialogue {i}", leave=False)):
                node_id = self.match_utterance_to_node(utt, node_descriptions)
                node_sequence.append((j, node_id))
                print(f"  Utterance {j}: '{utt}' -> Node {node_id if node_id else 'None'}")
            
            # Filter out None values for completeness check
            nodes = [node for _, node in node_sequence]
            has_complete_path = self.is_complete_dialogue(nodes, end_node)
            print(f"  Complete: {has_complete_path}")
            classification_results.append({
                "dialogue_id": i,
                "has_complete_path": has_complete_path,
                "utterance_node_pairs": node_sequence
            })

        # Compute metrics from classification results
        cpc = self.compute_cpc(classification_results)
        avg_umr = self.compute_avg_umr(classification_results)

        return {
            "CPC": cpc,
            "Average_UMR": avg_umr,
            "Classification_Results": classification_results
        }

In [None]:
import sys
import os

# Move one directory up from current working directory
parent_dir = os.path.abspath(os.path.join(os.getcwd(), '..'))
sys.path.insert(0, parent_dir)

from utils import *

# data = load_data()
# len(data)

In [None]:
DOMAINS = ['hotel', 'restaurant', 'attraction', 'train', 'taxi']

In [None]:
def parse_dialog_text(dialog):
    utterances = []
    for i, turn in enumerate(dialog['log']):
        role = 'User' if i % 2 == 0 else 'AI Assistant'
        utterance = [turn["text"]]
        if role == 'User':
            utterances.append(utterance)
    return utterances

In [None]:
from datetime import datetime

current_time = datetime.now().strftime("%Y-%m-%d-%H-%M")

In [None]:
# test_data = {}

# for domain in DOMAINS:
#     print(f"--- Domain: {domain} ---")
#     dialogues = []
#     ids = []
#     for i in range(0, 50):
#         dialog, dialog_id = pick_dialog(data, dialog_id='random', domain=domain, exclusive=False)
#         dialogue_text = parse_dialog_text(dialog)
#         dialogues.append(dialogue_text)
#         ids.append(dialog_id)
#     test_data[domain] = dialogues

#     with open(f"flowchart_eval/dialogues_{domain}_{current_time}.json", "w") as f:
#         json.dump({"ids": ids}, f)

In [None]:
import json
from datetime import datetime
from utils import *

DOMAINS = ['Train', 'Hotel', 'Restaurant', 'Attraction', 'Taxi']
current_time = datetime.now().strftime("%Y%m%d_%H%M%S")

# Load JSON files for each domain
loaded_ids = {}
for domain in DOMAINS:
    filename = f"flowchart_eval/dialogues_{domain}_2025-09-19-16-07.json"
    try:
        with open(filename, "r") as f:
            data = json.load(f)
            loaded_ids[domain.lower()] = data["ids"]
        print(f"Loaded {filename}")
    except FileNotFoundError:
        print(f"File {filename} not found")
    except json.JSONDecodeError:
        print(f"Error decoding JSON from {filename}")

# Load dialogue data
data = load_data()
print(f"Total dialogues loaded: {len(data)}")

# Parse dialogues and organize by domain
output_data = {"test_dialogue_ids": {}, "test_dialogue_texts": {}}
for domain, ids in loaded_ids.items():
    dialogue_ids = []
    dialogue_texts = []
    print(f"--- Domain: {domain} ---")
    for dialog_id in ids:
        try:
            dialog, selected_id = pick_dialog(data, dialog_id=dialog_id, domain=domain, exclusive=False)
            dialogue_text = parse_dialog_text(dialog)
            dialogue_ids.append(selected_id)
            dialogue_texts.append(dialogue_text)
            print(f"Dialog ID {selected_id} in {domain}: {dialogue_text}")
        except Exception as e:
            print(f"Error picking dialog ID {dialog_id} in {domain}: {str(e)}")
    
    output_data["test_dialogue_ids"][domain] = dialogue_ids
    output_data["test_dialogue_texts"][domain] = dialogue_texts

# Save to a single file
output_filename = f"flowchart_eval/dialogues_all.json"
try:
    with open(output_filename, "w") as f:
        json.dump(output_data, f, indent=4)
    print(f"Saved dialogues to {output_filename}")
except Exception as e:
    print(f"Error saving to {output_filename}: {str(e)}")

In [None]:
import json

# Load the single dialogue file
filename = f"flowchart_eval/dialogues_all.json"
test_data = {}

try:
    with open(filename, "r") as f:
        data = json.load(f)
        for domain in DOMAINS:
            domain_lower = domain.lower()
            test_data[domain.lower()] = data["test_dialogue_texts"][domain.lower()]
    print(f"Loaded dialogues from {filename}")
except FileNotFoundError:
    print(f"File {filename} not found")
except json.JSONDecodeError:
    print(f"Error decoding JSON from {filename}")


In [None]:
import re
import os

# Input Mermaid flowchart
with open("selected_flowchart_merged.mermaid", "r") as f:
    mermaid_flowchart = f.read()

# Parse the flowchart to extract domains
def parse_mermaid_flowchart(flowchart):
    domains = {}
    current_domain = None
    domain_lines = []
    in_subgraph = False

    for line in flowchart.splitlines():
        line = line.strip()
        if not line:
            continue

        # Detect start of a subgraph
        if line.startswith('subgraph'):
            current_domain = re.search(r'subgraph\s+(\w+)\["([^"]+)"\]', line)
            if current_domain:
                current_domain = current_domain.group(1)  # e.g., D_T, D_H
                domain_lines = []
                in_subgraph = True
            continue

        # Detect end of a subgraph
        if line == 'end' and in_subgraph:
            # Add an explicit end node to the domain lines
            last_node = domain_lines[-1].split('-->')[-1].strip().split('[')[0].strip()
            domain_lines.append(f"{last_node} --> {current_domain}_END[\"End: Goodbye\"]")
            domains[current_domain] = domain_lines
            in_subgraph = False
            current_domain = None
            continue

        # Collect lines within a subgraph
        if in_subgraph:
            domain_lines.append(line)

    return domains

# Generate individual Mermaid files for each domain
def generate_domain_files(domains):
    # Ensure output directory exists
    os.makedirs("./flowcharts_generated", exist_ok=True)

    for domain_id, lines in domains.items():
        domain_name = {
            'D_T': 'Train',
            'D_H': 'Hotel',
            'D_R': 'Restaurant',
            'D_A': 'Attraction',
            'D_M': 'Multi',
            'D_X': 'Taxi'
        }.get(domain_id, domain_id)

        # Create file content
        file_content = "flowchart TD\n"
        file_content += '\n'.join(f"    {line}" for line in lines)

        # Write to file
        filename = f"./flowcharts_generated/{domain_name.lower()}.workflow"
        with open(filename, 'w') as f:
            f.write(file_content)
        print(f"Generated {filename}")

# Main execution
domains = parse_mermaid_flowchart(mermaid_flowchart)
generate_domain_files(domains)

In [None]:
import os
from typing import Dict

def load_flowcharts(scenarios_dir: str) -> Dict[str, str]:
    """
    Load flowchart data from .workflow files in the specified directory.
    Returns a dictionary mapping domain names to flowchart text.
    
    Args:
        scenarios_dir (str): Directory containing the .workflow files.
    
    Returns:
        Dict[str, str]: Dictionary with domain names as keys and flowchart text as values.
    """
    # Initialize dictionary to store flowcharts
    flowcharts = {}
    
    # Define domains and corresponding flowchart files
    domains = ["restaurant", "hotel", "attraction", "train", "taxi"]
    
    try:
        # Read flowchart files for each domain
        for domain in domains:
            file_path = os.path.join(scenarios_dir, f"{domain}.workflow")
            with open(file_path, "r", encoding="utf-8") as f:
                flowcharts[domain] = f.read()
        print("Successfully read all flowchart files.")
    except FileNotFoundError as e:
        print(f"Error: File not found - {e}")
    except Exception as e:
        print(f"Error reading files: {e}")
    
    return flowcharts

# Example usage

# scenarios_dir = "./prompts/scenarios/"
scenarios_dir = "./abstraction_flowcharts_generated"
flowcharts = load_flowcharts(scenarios_dir)

    # Print loaded flowcharts
    # for domain, flowchart in flowcharts.items():
    #     print(f"\nDomain: {domain}")
    #     print(f"Flowchart:\n{flowchart[:100]}...")  # Print first 100 chars for brevity

In [None]:
# Assuming DOMAINS, test_data, and flowcharts are defined
DOMAINS = ["restaurant", "hotel", "attraction", "train", "taxi"]

# Initialize dictionary to store results for all domains
all_results = {}

for domain in DOMAINS:
    print(f"\n=== Evaluating Domain: {domain} ===")
    # Use domain-specific dialogues from test_data
    dialogues = test_data[domain]
    # Get the domain-specific flowchart
    mermaid_flowchart = flowcharts[domain]
    
    # Initialize evaluator
    evaluator = FlowchartEvaluator(api_key="sk-xxxxx")
    
    # Evaluate
    results = evaluator.evaluate(mermaid_flowchart, dialogues)
    
    # Print results with domain prefix
    if "Error" not in results:
        print(f"{domain} - Complete Path Coverage (CPC): {results['CPC']:.4f}")
        print(f"{domain} - Average Utterance Matching Ratio (UMR): {results['Average_UMR']:.4f}")

        # print(f"{domain} - Classification Results:")
        # for result in results['Classification_Results']:
        #     print(f"Dialogue {result['dialogue_id']}: {'Complete' if result['has_complete_path'] else 'Incomplete'}")
        #     print("Utterance-Node Pairs:")
        #     for utt_id, node_id in result['utterance_node_pairs']:
        #         print(f"  Utterance {utt_id}: Node {node_id if node_id else 'None'}")
    else:
        print(f"{domain} - Error: {results['Error']}")
    
    # Store results for this domain
    all_results[domain] = results

# Save all results to a single JSON file
with open(f"flowchart_evaluation_abstraction_new.json", "w") as f:
    json.dump(all_results, f, indent=2)
print("\nResults saved to flowchart_evaluation_abstraction.json")