# Workflow Documentation Generator with DSPy

This notebook demonstrates how to use DSPy to optimize a prompt for generating documentation from workflow JSON data. It defines a Semantic Intermediate Representation (SIR), a serializer, a DSPy signature, a custom metric using a Rubric Judge, and runs the GEPA optimizer.

In [None]:
import json
import os
from typing import Any, Optional, Literal
from pydantic import BaseModel
import dspy
from dotenv import load_dotenv

## 1. Data Models (SIR)
Define the Semantic Intermediate Representation (SIR) models. These classes (`SIRWorkflow`, `SIRStep`, `SIROutcome`) represent the simplified structure of a workflow, stripping away UI-specific details from the raw DSL.

In [None]:
class SIROutcome(BaseModel):
    """Represents where the flow goes next and why."""

    target_step_name: str
    label: str  # e.g., "Next Step", "If True", "On Error"


class SIRStep(BaseModel):
    """A single step in the workflow."""

    id: str
    name: str
    node_type: str  # Raw node type from DSL
    credentials: Optional[str] = None

    # The simplified technical details (no noise)
    node_config: dict[str, Any]

    # Graph connections
    parent_step_name: Optional[str] = None
    edges: list[SIROutcome] = []


class SIRWorkflow(BaseModel):
    """The complete semantic representation of the workflow."""

    id: str
    name: str
    description: Optional[str]
    steps: list[SIRStep]

## 2. Workflow Serializer
The `WorkflowSerializer` class is responsible for converting the raw JSON DSL (Domain Specific Language) into the clean SIR format. It handles:
- Building adjacency maps for graph traversal.
- Resolving node names and types.
- Mapping edges to outcomes (Next, True, False, Error).
- Cleaning parameters to remove internal IDs.

In [None]:
class WorkflowSerializer:
    def __init__(self, workflow_dsl: dict[str, Any]):
        self.dsl = workflow_dsl
        self.nodes = {n["id"]: n for n in workflow_dsl.get("nodes", [])}
        self.edges = {e["id"]: e for e in workflow_dsl.get("edges", [])}

        # Build adjacency maps
        self.outgoing_edges: dict[str, list[dict[str, Any]]] = {}
        self.incoming_edges: dict[str, list[dict[str, Any]]] = {}

        for edge in workflow_dsl.get("edges", []):
            src = edge["src"]
            dst = edge["dst"]

            if src not in self.outgoing_edges:
                self.outgoing_edges[src] = []
            self.outgoing_edges[src].append(edge)

            if dst not in self.incoming_edges:
                self.incoming_edges[dst] = []
            self.incoming_edges[dst].append(edge)

    def serialize(self) -> SIRWorkflow:
        """Converts the raw DSL into the Semantic Intermediate Representation."""

        sir_steps = []

        for node_data in self.dsl.get("nodes", []):
            step = self._process_node(node_data)
            sir_steps.append(step)

        return SIRWorkflow(
            id=self.dsl.get("id", ""),
            name=self.dsl.get("name", "Untitled Workflow"),
            description=self.dsl.get("description", ""),
            steps=sir_steps,
        )

    def _resolve_target_name(self, edge_id: str) -> str:
        if edge_id in self.edges:
            edge = self.edges[edge_id]
            target_id = edge["dst"]
            target_node = self.nodes.get(target_id)
            return target_node.get("name", target_id) if target_node else target_id
        return edge_id

    def _process_node(self, node: dict[str, Any]) -> SIRStep:
        node_id = node["id"]
        node_name = node.get("name", node_id)
        node_type = node.get("type", "unknown")

        outcomes = []
        if node_id in self.outgoing_edges:
            for edge in self.outgoing_edges[node_id]:
                edge_id = edge["id"]
                target_id = edge["dst"]
                target_node = self.nodes.get(target_id)
                target_name = (
                    target_node.get("name", target_id) if target_node else target_id
                )

                # Determine the label for this outcome
                label = edge.get("label", "Next")

                # Check if this edge is referenced in parameters (e.g. conditional)
                params = node.get("parameters", {})
                if params.get("true_edge_id") == edge_id:
                    condition = params.get("condition") or params.get("expression", "")
                    label = f"Condition met: {condition}"
                elif params.get("false_edge_id") == edge_id:
                    label = "Condition not met"
                elif params.get("error_edge") == edge_id:
                    label = "Error"

                # Handle switch node routes
                routes = params.get("routes", [])
                if isinstance(routes, list) and edge_id in routes:
                    try:
                        idx = routes.index(edge_id)
                        rules = params.get("rules", [])
                        if idx < len(rules):
                            rule = rules[idx]
                            val = rule.get("value", "")
                            op = rule.get("operator", "==")
                            comp = rule.get("compare", "")
                            label = f"Case: {val} {op} {comp}"
                        else:
                            label = "Default"
                    except ValueError:
                        pass

                outcomes.append(
                    SIROutcome(
                        target_step_name=target_name,
                        label=label,
                    )
                )

        # Determine Previous Step
        prev_name = None
        if node_id in self.incoming_edges and self.incoming_edges[node_id]:
            e = self.incoming_edges[node_id][0]
            src_id = e["src"]
            src_node = self.nodes.get(src_id)
            prev_name = src_node.get("name", src_id) if src_node else src_id

        # Clean and Substitute Parameters
        params = node.get("parameters", {}).copy()

        # Map switch routes to rules
        if "routes" in params and isinstance(params["routes"], list):
            routes = params["routes"]
            rules = params.get("rules", [])
            if isinstance(rules, list):
                new_rules = []
                for idx, rule in enumerate(rules):
                    if isinstance(rule, dict):
                        new_rule = rule.copy()
                        if idx < len(routes):
                            new_rule["target"] = self._resolve_target_name(routes[idx])
                        new_rules.append(new_rule)
                params["rules"] = new_rules

            # Handle default route
            if len(routes) > len(rules):
                default_edge = routes[len(rules)]
                if default_edge:
                    params["default_target"] = self._resolve_target_name(default_edge)

        clean_params = self._clean_parameters(params)

        # Extract Credentials
        creds = node.get("credentials", None)
        cred_type = None
        if creds and isinstance(creds, dict):
            cred_type = creds.get("type")

        return SIRStep(
            id=node_id,
            name=node_name,
            node_type=node_type,  # Use raw type
            credentials=cred_type,
            node_config=clean_params,
            parent_step_name=prev_name,
            edges=outcomes,
        )

    def _clean_parameters(self, params: dict[str, Any]) -> dict[str, Any]:
        """Removes internal IDs or noisy fields and substitutes IDs with Names."""
        clean = {}
        # Remove keys that are not relevant for documentation
        keys_to_remove = ["credentials", "position", "routes"]

        for k, v in params.items():
            if k in keys_to_remove:
                continue

            if isinstance(v, str):
                # If value is an edge ID, replace it with the target node name
                if v in self.edges:
                    clean[k] = self._resolve_target_name(v)
                else:
                    clean[k] = v
            elif isinstance(v, dict):
                clean[k] = self._clean_parameters(
                    v
                )  # Recursive for nested dicts (e.g. headers)
            else:
                clean[k] = v

        return clean

## 3. Execution & Testing
Load the `data.json` file, run the serializer on the workflows, and print the results. This section tests the entire pipeline from raw data to SIR.

In [None]:
data_file_path = "data.json"

if os.path.exists(data_file_path):
    with open(data_file_path, "r", encoding="utf-8") as f:
        data = json.load(f)

    serialized_results = []
    for item in data:
        workflow_data = item.get("workflow_data")
        if not workflow_data:
            continue

        if "id" not in workflow_data and "id" in item:
            workflow_data["id"] = str(item["id"])
        if "name" not in workflow_data and "name" in item:
            workflow_data["name"] = item["name"]
        if "description" not in workflow_data and "description" in item:
            workflow_data["description"] = item["description"]

        try:
            serializer = WorkflowSerializer(workflow_data)
            sir_workflow = serializer.serialize()
            serialized_results.append(sir_workflow.model_dump())
        except Exception as e:
            print(f"Error serializing workflow {item.get('id')}: {e}")

    print(f"Serialized {len(serialized_results)} workflows.")

    # Save results to file
    with open("serialized_data.json", "w", encoding="utf-8") as f:
        json.dump(serialized_results, f, indent=2)
    print("Saved serialized data to serialized_data.json")

    # Example output of first one
    if serialized_results:
        print(json.dumps(serialized_results[0], indent=2))
else:
    print("data.json not found.")

## 4. DSPy Signature
Define the `WorkflowDocumentation` signature for the LLM. This tells DSPy what the input (workflow JSON, target audience) and output (markdown report) should look like.

In [None]:
class WorkflowDocumentation(dspy.Signature):
    """
    Generates documentation for an automation workflow based on its JSON definition
    and a specific target audience.
    """

    workflow_json: dict[str, Any] = dspy.InputField(
        desc="The raw JSON domain-specific language of the workflow."
    )
    target_audience: Literal["Technical Developer", "Executive Summary"] = (
        dspy.InputField(desc="The persona to write for.")
    )

    report = dspy.OutputField(
        desc="The generated documentation report in Markdown format."
    )

## 5. DSPy Configuration
Configure the Language Models (LMs) using Vertex AI credentials.
- **Student**: `gemini-2.5-flash-lite` (for generation)
- **Judge**: `gemini-2.5-flash` (for evaluation)
- **Reflector**: `gemini-3-pro-preview` (for feedback/improvement)

In [None]:
load_dotenv()
file_path = os.getenv("google_creds_path")

if file_path and os.path.exists(file_path):
    with open(file_path, "r") as file:
        vertex_credentials = json.load(file)

    # Convert to JSON string
    vertex_credentials_json = json.dumps(vertex_credentials)

    student = dspy.LM(
        "vertex_ai/gemini-2.5-flash-lite",
        temperature=0,
        vertex_credentials=vertex_credentials_json,
    )
    teacher = dspy.LM(
        "vertex_ai/gemini-2.5-pro",
        temperature=0,
        vertex_credentials=vertex_credentials_json,
    )
    reflector = dspy.LM(
        "vertex_ai/gemini-2.5-pro",
        temperature=0,
        vertex_credentials=vertex_credentials_json,
    )
    dspy.configure(lm=student)
else:
    print("Warning: google_creds_path not found or invalid in .env")

## 6. Metric Definition: Rubric Judge
We define a custom `RubricJudge` signature that acts as a QA Specialist. It evaluates the generated report against the ground truth JSON and the target audience requirements based on a strict 1-10 scoring rubric.

In [None]:
class RubricJudge(dspy.Signature):
    """
    You are a meticulous QA Specialist evaluating documentation for a workflow automation platform.

    YOUR TASK:
    Evaluate the 'generated_report' by cross-referencing it against the 'workflow_json' (ground truth)
    and checking appropriateness for the 'target_audience'.

    EVALUATION PROCESS - Follow these steps IN ORDER:

    STEP 1: FACTUAL ACCURACY CHECK (Most Critical)
    - List every node mentioned in generated_report
    - Verify EACH exists in workflow_json (check "type" and "name" fields)
    - Flag any hallucinated nodes, fabricated parameters, or incorrect connections
    - Check if credentials, and execution order match the JSON

    STEP 2: COMPLETENESS CHECK
    - Are all nodes from workflow_json mentioned?
    - Are critical parameters (webhooks, API endpoints, expressions) documented?


    STEP 3: AUDIENCE APPROPRIATENESS CHECK
    Target Audience Expectations:
    - "executive" / "non-technical": No code snippets, no JSON paths, focus on business outcomes
    - "developer" / "technical": Include node types, expressions, data flow details


    STEP 4: FORMATTING & READABILITY CHECK
    - Valid Markdown structure (headers, lists render correctly)?
    - Logical flow (overview → details → conclusion)?
    - Free of orphaned sentences or abrupt endings?

    STEP 5: INSIGHT & VALUE CHECK
    - Does it explain WHY this workflow exists (business purpose)?
    - Does it go beyond listing steps to provide actionable understanding?

    ═══════════════════════════════════════════════════════════
    SCORING RUBRIC (Apply strictly based on steps above):
    ═══════════════════════════════════════════════════════════

    1-2 (REJECT - Critical Failure):
        • ANY hallucinated node not in workflow_json
        • Broken/unrenderable Markdown
        • Empty, truncated, or placeholder output
        • Completely wrong workflow description

    3-4 (MAJOR ISSUES):
        • Factually accurate BUT severe audience mismatch
          (e.g., code snippets for executives, oversimplified for developers)
        • Missing >50% of workflow nodes
        • Structurally confusing (no clear sections)

    5-6 (ACCEPTABLE - Minimum Viable):
        • All nodes accurately documented
        • Audience-appropriate language
        • Readable structure
        • BUT: Merely descriptive ("Node A sends to Node B")
        • No business context

    7-8 (GOOD):
        • Fully accurate with all parameters covered
        • Strong Markdown formatting with clear sections
        • Explains WHAT the workflow accomplishes end-to-end
        • Appropriate depth for target audience
        • Minor omissions only (e.g., optional parameters)

    9-10 (EXEMPLARY):
        • Everything in 7-8 PLUS:
        • Explains WHY this workflow matters to the business
        • Perfect persona voice for the audience
        • Zero factual errors, zero missing critical details
        • Could be published as official documentation

    ═══════════════════════════════════════════════════════════
    OUTPUT INSTRUCTIONS:
    ═══════════════════════════════════════════════════════════
    Provide your integer score and reasoning.
    """

    workflow_json: str = dspy.InputField(
        desc="The source-of-truth workflow definition in JSON format.  All claims must be verified against this."
    )
    target_audience: Literal["Technical Developer", "Executive Summary"] = (
        dspy.InputField(desc="The persona to write for.")
    )
    generated_report: str = dspy.InputField(
        desc="The documentation/report to evaluate."
    )
    reasoning: str = dspy.OutputField(
        desc="Your step-by-step reasoning process leading to the score.",
    )
    score: int = dspy.OutputField(
        desc="Integer score from 1 to 10 inclusive. Output ONLY the number, no text.",
    )

## 7. Metric Function
The `normalized_rubric_metric` function wraps the `RubricJudge`. It executes the judge using a teacher model and normalizes the 1-10 score to a 0.0-1.0 float required by DSPy optimizers.

In [None]:
def normalized_rubric_metric(gold, pred, trace=None, pred_name=None, pred_trace=None):
    """
    1. Calls the RubricJudge.
    2. Parses the 1-10 score.
    3. Returns a float between 0.0 and 1.0.
    """

    # 1. Extract inputs (Inputs come from 'gold', Output comes from 'pred')

    judge = dspy.ChainOfThought(RubricJudge)

    with dspy.context(lm=teacher):
        assessment = judge(
            workflow_json=gold.workflow_json,
            target_audience=gold.target_audience,
            generated_report=pred.report,
        )

    # 2. Parse and Normalize
    try:
        raw_score = float(assessment.score)

        # Clamp score to ensure it is 1-10 (just in case)
        raw_score = max(1, min(10, raw_score))

        # Normalize to 0.0 - 1.0
        normalized_score = raw_score / 10.0

    except ValueError:
        # If the LLM output "7/10" or "Score: 7", parse it or fail safe
        normalized_score = 0.0
        print(f"Metric Failed to parse score: {assessment.score}")
    return normalized_score

## 8. Dataset Preparation
We create a training set from the serialized workflows. For each workflow, we create two examples: one for an "Executive Summary" audience and one for a "Technical Developer" audience.

In [None]:
training_set = []

for workflow in serialized_results:
    # Create example for Executive Summary
    training_set.append(
        dspy.Example(
            workflow_json=workflow, target_audience="Executive Summary", report=""
        ).with_inputs("workflow_json", "target_audience")
    )

    # Create example for Technical Developer
    training_set.append(
        dspy.Example(
            workflow_json=workflow, target_audience="Technical Developer", report=""
        ).with_inputs("workflow_json", "target_audience")
    )

print(f"Populated training_set with {len(training_set)} examples.")

## 9. Optimization with GEPA
We use the Genetically Evolutionary Prompt Authorization (GEPA) optimizer to improve the `WorkflowDocumentation` signature. The optimizer uses the `reflector` model to propose improvements and the `normalized_rubric_metric` to evaluate them.

In [None]:
from dspy.teleprompt import GEPA

optimizer = GEPA(
    metric=normalized_rubric_metric,
    reflection_lm=reflector,  # The "Brain" that reads feedback and rewrites prompts
    auto="medium",  # Budget: 'light', 'medium', or 'heavy'
    num_threads=16,
    # reflection_minibatch_size=10,
)
bot = dspy.ChainOfThought(WorkflowDocumentation)
compiled_gepa = optimizer.compile(
    bot,
    trainset=training_set,
)
compiled_gepa.save("rune_optimized_prompt.json")