<a href="https://colab.research.google.com/github/sr606/LLM/blob/main/mermaid_v4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import json
import uuid

def create_rectangle(text, x, y):
    return {
        "id": str(uuid.uuid4()),
        "type": "rectangle",
        "x": x,
        "y": y,
        "width": 200,
        "height": 80,
        "strokeColor": "#000000",
        "backgroundColor": "transparent",
        "text": text
    }

elements = [
    create_rectangle("Input Layer", 100, 100),
    create_rectangle("Processing Layer", 100, 250),
    create_rectangle("Output Layer", 100, 400)
]

diagram = {
    "type": "excalidraw",
    "version": 2,
    "elements": elements,
    "appState": {}
}

with open("diagram.excalidraw", "w") as f:
    json.dump(diagram, f)


In [None]:
import json
import uuid
import time

def generate_id():
    return str(uuid.uuid4()).replace("-", "")[:16]

def base_element(element_type, x, y):
    return {
        "id": generate_id(),
        "type": element_type,
        "x": x,
        "y": y,
        "angle": 0,
        "strokeColor": "#1e1e1e",
        "backgroundColor": "transparent",
        "fillStyle": "solid",
        "strokeWidth": 2,
        "strokeStyle": "solid",
        "roughness": 1,
        "opacity": 100,
        "groupIds": [],
        "frameId": None,
        "isDeleted": False,
        "locked": False,
        "updated": int(time.time() * 1000)
    }

def create_rectangle(x, y, width, height):
    rect = base_element("rectangle", x, y)
    rect.update({
        "width": width,
        "height": height,
        "roundness": {"type": 3}
    })
    return rect

def create_text(x, y, text, container_id=None):
    txt = base_element("text", x, y)
    txt.update({
        "text": text,
        "fontSize": 28,
        "fontFamily": 6,
        "width": len(text) * 12,
        "height": 35,
        "textAlign": "center",
        "verticalAlign": "middle",
        "containerId": container_id,
        "originalText": text,
        "autoResize": True,
        "lineHeight": 1.25
    })
    return txt

def create_arrow(x, y, dx, dy):
    arrow = base_element("arrow", x, y)
    arrow.update({
        "width": abs(dx),
        "height": abs(dy),
        "points": [[0, 0], [dx, dy]],
        "roundness": {"type": 2},
        "endArrowhead": "arrow"
    })
    return arrow


# Create elements
elements = []

# Rectangles
user_rect = create_rectangle(300, 250, 250, 120)
api_rect = create_rectangle(700, 250, 250, 120)
db_rect = create_rectangle(1100, 250, 250, 120)

elements.extend([user_rect, api_rect, db_rect])

# Text inside rectangles
elements.append(create_text(380, 290, "User", user_rect["id"]))
elements.append(create_text(780, 290, "API Server", api_rect["id"]))
elements.append(create_text(1180, 290, "Database", db_rect["id"]))

# Arrows
elements.append(create_arrow(550, 300, 150, 0))
elements.append(create_arrow(950, 300, 150, 0))

# Final Excalidraw JSON
excalidraw_json = {
    "type": "excalidraw",
    "version": 2,
    "source": "https://app.excalidraw.com",
    "elements": elements,
    "appState": {
        "viewBackgroundColor": "#ffffff"
    },
    "files": {}
}

# Save file
with open("diagram.excalidraw", "w") as f:
    json.dump(excalidraw_json, f, indent=2)

print("Excalidraw file generated successfully!")


Excalidraw file generated successfully!


In [None]:
import json
import uuid
import time

# -------------------------------------------------
# Utility functions
# -------------------------------------------------

def generate_id():
    return str(uuid.uuid4()).replace("-", "")[:16]

def current_timestamp():
    return int(time.time() * 1000)

def base_element(element_type, x, y):
    return {
        "id": generate_id(),
        "type": element_type,
        "x": x,
        "y": y,
        "angle": 0,
        "strokeColor": "#1e1e1e",
        "backgroundColor": "transparent",
        "fillStyle": "solid",
        "strokeWidth": 2,
        "strokeStyle": "solid",
        "roughness": 1,
        "opacity": 100,
        "groupIds": [],
        "frameId": None,
        "isDeleted": False,
        "locked": False,
        "updated": current_timestamp(),
        "version": 1,
        "versionNonce": int(uuid.uuid4().int % 100000000)
    }

# -------------------------------------------------
# Create Rectangle Node
# -------------------------------------------------

def create_node(label, x, y):
    rect = base_element("rectangle", x, y)
    rect.update({
        "width": 220,
        "height": 100,
        "roundness": {"type": 3}
    })

    text = base_element("text", x + 40, y + 35)
    text.update({
        "text": label,
        "fontSize": 24,
        "fontFamily": 6,
        "width": 150,
        "height": 35,
        "textAlign": "center",
        "verticalAlign": "middle",
        "containerId": rect["id"],
        "originalText": label,
        "autoResize": True,
        "lineHeight": 1.25
    })

    return rect, text

# -------------------------------------------------
# Create Arrow
# -------------------------------------------------

def create_arrow(start_node, end_node):
    x1 = start_node["x"] + 220
    y1 = start_node["y"] + 50

    x2 = end_node["x"]
    y2 = end_node["y"] + 50

    arrow = base_element("arrow", x1, y1)
    arrow.update({
        "width": x2 - x1,
        "height": y2 - y1,
        "points": [[0, 0], [x2 - x1, y2 - y1]],
        "roundness": {"type": 2},
        "endArrowhead": "arrow"
    })

    return arrow

# -------------------------------------------------
# Define Graph Structure
# -------------------------------------------------

nodes_definition = [
    ("Client", 100, 200),
    ("Load Balancer", 400, 200),
    ("API Gateway", 700, 200),
    ("Auth Service", 1000, 100),
    ("Redis Cache", 1300, 100),
    ("User Service", 1000, 300),
    ("PostgreSQL DB", 1300, 300),
    ("Payment Service", 1000, 500),
    ("MongoDB", 1300, 500)
]

# Connections (graph edges)
edges_definition = [
    (0, 1),
    (1, 2),
    (2, 3),
    (3, 4),
    (2, 5),
    (5, 6),
    (2, 7),
    (7, 8)
]

# -------------------------------------------------
# Build Diagram
# -------------------------------------------------

elements = []
node_objects = []

# Create nodes
for label, x, y in nodes_definition:
    rect, text = create_node(label, x, y)
    elements.extend([rect, text])
    node_objects.append(rect)

# Create edges
for start_idx, end_idx in edges_definition:
    arrow = create_arrow(node_objects[start_idx], node_objects[end_idx])
    elements.append(arrow)

# -------------------------------------------------
# Final Excalidraw JSON
# -------------------------------------------------

diagram = {
    "type": "excalidraw",
    "version": 2,
    "source": "https://app.excalidraw.com",
    "elements": elements,
    "appState": {
        "viewBackgroundColor": "#ffffff"
    },
    "files": {}
}

with open("complex_architecture.excalidraw", "w") as f:
    json.dump(diagram, f, indent=2)

print("Complex Excalidraw graph generated successfully!")


Complex Excalidraw graph generated successfully!


In [None]:
import xml.etree.ElementTree as ET
import json


def parse_alteryx_workflow(xml_path):
    tree = ET.parse(xml_path)
    root = tree.getroot()

    nodes = []
    edges = []

    # -------------------------
    # Parse Nodes
    # -------------------------
    for node in root.findall(".//Node"):
        tool_id = node.attrib.get("ToolID")

        # Plugin Type
        plugin = node.find("./GuiSettings")
        plugin_name = None
        if plugin is not None:
            plugin_name = plugin.attrib.get("Plugin")

        # Position
        position = node.find("./GuiSettings/Position")
        x, y = None, None
        if position is not None:
            x = position.attrib.get("x")
            y = position.attrib.get("y")

        # Annotation Name
        name_tag = node.find("./Properties/Annotation/Name")
        name = name_tag.text if name_tag is not None else f"Tool_{tool_id}"

        nodes.append({
            "id": tool_id,
            "name": name,
            "plugin": plugin_name,
            "x": int(x) if x else None,
            "y": int(y) if y else None
        })

    # -------------------------
    # Parse Connections
    # -------------------------
    for conn in root.findall(".//Connection"):
        origin = conn.find("Origin")
        dest = conn.find("Destination")

        if origin is not None and dest is not None:
            edges.append({
                "source": origin.attrib.get("ToolID"),
                "source_port": origin.attrib.get("Connection"),
                "target": dest.attrib.get("ToolID"),
                "target_port": dest.attrib.get("Connection")
            })

    return {
        "nodes": nodes,
        "edges": edges
    }


# ---------------------------------
# Run Conversion
# ---------------------------------

xml_file = "alteryx_sttm_example2.yxmd"  # save your XML as this
graph_json = parse_alteryx_workflow(xml_file)

with open("workflow_graph.json", "w") as f:
    json.dump(graph_json, f, indent=2)

print("Workflow converted to JSON successfully!")


Workflow converted to JSON successfully!


In [None]:
import xml.etree.ElementTree as ET
import json
import uuid
import time


# -------------------------------------------------
# Utilities
# -------------------------------------------------

def generate_id():
    return str(uuid.uuid4()).replace("-", "")[:16]

def timestamp():
    return int(time.time() * 1000)

def base_element(element_type, x, y):
    return {
        "id": generate_id(),
        "type": element_type,
        "x": x,
        "y": y,
        "angle": 0,
        "strokeColor": "#1e1e1e",
        "backgroundColor": "transparent",
        "fillStyle": "solid",
        "strokeWidth": 2,
        "strokeStyle": "solid",
        "roughness": 1,
        "opacity": 100,
        "groupIds": [],
        "frameId": None,
        "isDeleted": False,
        "locked": False,
        "updated": timestamp(),
        "version": 1,
        "versionNonce": int(uuid.uuid4().int % 100000000)
    }


# -------------------------------------------------
# Create Rectangle + Text
# -------------------------------------------------

def create_node(label, x, y):
    rect = base_element("rectangle", x, y)
    rect.update({
        "width": 180,
        "height": 80,
        "roundness": {"type": 3}
    })

    text = base_element("text", x + 20, y + 25)
    text.update({
        "text": label,
        "fontSize": 16,
        "fontFamily": 6,
        "width": 140,
        "height": 30,
        "textAlign": "center",
        "verticalAlign": "middle",
        "containerId": rect["id"],
        "originalText": label,
        "autoResize": True,
        "lineHeight": 1.25
    })

    return rect, text


# -------------------------------------------------
# Create Arrow
# -------------------------------------------------

def create_arrow(source_rect, target_rect):
    x1 = source_rect["x"] + 180
    y1 = source_rect["y"] + 40

    x2 = target_rect["x"]
    y2 = target_rect["y"] + 40

    arrow = base_element("arrow", x1, y1)
    arrow.update({
        "width": x2 - x1,
        "height": y2 - y1,
        "points": [[0, 0], [x2 - x1, y2 - y1]],
        "roundness": {"type": 2},
        "endArrowhead": "arrow"
    })

    return arrow


# -------------------------------------------------
# Parse XML and Build Diagram
# -------------------------------------------------

def xml_to_excalidraw(xml_path):
    tree = ET.parse(xml_path)
    root = tree.getroot()

    elements = []
    node_map = {}

    # ---------------------
    # Parse Nodes
    # ---------------------
    for node in root.findall(".//Node"):
        tool_id = node.attrib.get("ToolID")

        pos = node.find("./GuiSettings/Position")
        x = int(pos.attrib.get("x")) if pos is not None else 0
        y = int(pos.attrib.get("y")) if pos is not None else 0

        name_tag = node.find("./Properties/Annotation/Name")
        label = name_tag.text if name_tag is not None else f"Tool {tool_id}"

        rect, text = create_node(label, x, y)
        elements.extend([rect, text])
        node_map[tool_id] = rect

    # ---------------------
    # Parse Connections
    # ---------------------
    for conn in root.findall(".//Connection"):
        origin = conn.find("Origin")
        dest = conn.find("Destination")

        if origin is not None and dest is not None:
            source_id = origin.attrib.get("ToolID")
            target_id = dest.attrib.get("ToolID")

            if source_id in node_map and target_id in node_map:
                arrow = create_arrow(node_map[source_id], node_map[target_id])
                elements.append(arrow)

    # ---------------------
    # Final Excalidraw JSON
    # ---------------------
    return {
        "type": "excalidraw",
        "version": 2,
        "source": "https://app.excalidraw.com",
        "elements": elements,
        "appState": {
            "viewBackgroundColor": "#ffffff"
        },
        "files": {}
    }


# -------------------------------------------------
# Run Conversion
# -------------------------------------------------

diagram = xml_to_excalidraw("alteryx_sttm_example2.yxmd")

with open("workflow.excalidraw", "w") as f:
    json.dump(diagram, f, indent=2)

print("Excalidraw file generated successfully!")


Excalidraw file generated successfully!


In [None]:
import xml.etree.ElementTree as ET
import json
import uuid
import time
from collections import defaultdict

# -------------------------------------------------
# CONFIGURATION
# -------------------------------------------------

LAYER_CONFIG = {
    "Input": {"x": 100, "color": "#2ecc71"},
    "Processing": {"x": 400, "color": "#3498db"},
    "Join": {"x": 700, "color": "#9b59b6"},
    "Analytics": {"x": 1000, "color": "#f39c12"},
    "Output": {"x": 1300, "color": "#e74c3c"},
    "Browse": {"x": 1600, "color": "#95a5a6"},
}

LAYER_ORDER = ["Input", "Processing", "Join", "Analytics", "Output", "Browse"]

NODE_WIDTH = 180
NODE_HEIGHT = 80
VERTICAL_SPACING = 140


# -------------------------------------------------
# UTILITIES
# -------------------------------------------------

def generate_id():
    return str(uuid.uuid4()).replace("-", "")[:16]

def timestamp():
    return int(time.time() * 1000)

def base_element(element_type, x, y):
    return {
        "id": generate_id(),
        "type": element_type,
        "x": x,
        "y": y,
        "angle": 0,
        "strokeColor": "#1e1e1e",
        "backgroundColor": "transparent",
        "fillStyle": "solid",
        "strokeWidth": 2,
        "strokeStyle": "solid",
        "roughness": 1,
        "opacity": 100,
        "groupIds": [],
        "frameId": None,
        "isDeleted": False,
        "locked": False,
        "updated": timestamp(),
        "version": 1,
        "versionNonce": int(uuid.uuid4().int % 100000000)
    }


# -------------------------------------------------
# CLASSIFY TOOL TYPE
# -------------------------------------------------

def classify_layer(plugin_name):
    if not plugin_name:
        return "Processing"

    if "DbFileInput" in plugin_name:
        return "Input"
    if "DataCleansing" in plugin_name or "Formula" in plugin_name \
       or "Filter" in plugin_name or "Sort" in plugin_name \
       or "AutoField" in plugin_name:
        return "Processing"
    if "Join" in plugin_name:
        return "Join"
    if "Summarize" in plugin_name:
        return "Analytics"
    if "DbFileOutput" in plugin_name:
        return "Output"
    if "Browse" in plugin_name:
        return "Browse"

    return "Processing"


# -------------------------------------------------
# CREATE NODE
# -------------------------------------------------

def create_node(label, layer, y_position):
    x_position = LAYER_CONFIG[layer]["x"]
    color = LAYER_CONFIG[layer]["color"]

    rect = base_element("rectangle", x_position, y_position)
    rect.update({
        "width": NODE_WIDTH,
        "height": NODE_HEIGHT,
        "roundness": {"type": 3},
        "backgroundColor": color,
        "fillStyle": "solid"
    })

    text = base_element("text", x_position + 20, y_position + 25)
    text.update({
        "text": label,
        "fontSize": 16,
        "fontFamily": 6,
        "width": NODE_WIDTH - 40,
        "height": 30,
        "textAlign": "center",
        "verticalAlign": "middle",
        "containerId": rect["id"],
        "originalText": label,
        "autoResize": True,
        "lineHeight": 1.25
    })

    return rect, text


# -------------------------------------------------
# CREATE SWIMLANE CONTAINER
# -------------------------------------------------

def create_swimlane(layer, max_height):
    x = LAYER_CONFIG[layer]["x"] - 40
    y = 0
    width = NODE_WIDTH + 80
    height = max_height + 100

    lane = base_element("rectangle", x, y)
    lane.update({
        "width": width,
        "height": height,
        "strokeStyle": "dashed",
        "backgroundColor": "transparent"
    })

    label = base_element("text", x + 20, 20)
    label.update({
        "text": layer,
        "fontSize": 18,
        "width": 150,
        "height": 30
    })

    return lane, label


# -------------------------------------------------
# CREATE SMART ARROW
# -------------------------------------------------

def create_arrow(source_rect, target_rect, offset=0):
    x1 = source_rect["x"] + NODE_WIDTH
    y1 = source_rect["y"] + NODE_HEIGHT // 2 + offset

    x2 = target_rect["x"]
    y2 = target_rect["y"] + NODE_HEIGHT // 2 + offset

    arrow = base_element("arrow", x1, y1)
    arrow.update({
        "width": x2 - x1,
        "height": y2 - y1,
        "points": [[0, 0], [x2 - x1, y2 - y1]],
        "roundness": {"type": 2},
        "endArrowhead": "arrow"
    })

    return arrow


# -------------------------------------------------
# MAIN ENGINE
# -------------------------------------------------

def xml_to_excalidraw_clean(xml_path):
    tree = ET.parse(xml_path)
    root = tree.getroot()

    elements = []
    node_map = {}
    layer_positions = defaultdict(int)

    # -------------------------
    # Create Nodes
    # -------------------------
    for node in root.findall(".//Node"):
        tool_id = node.attrib.get("ToolID")
        plugin = node.find("./GuiSettings")
        plugin_name = plugin.attrib.get("Plugin") if plugin is not None else ""

        name_tag = node.find("./Properties/Annotation/Name")
        label = name_tag.text if name_tag is not None else f"Tool {tool_id}"

        layer = classify_layer(plugin_name)

        y_position = 100 + layer_positions[layer] * VERTICAL_SPACING
        layer_positions[layer] += 1

        rect, text = create_node(label, layer, y_position)

        elements.extend([rect, text])
        node_map[tool_id] = rect

    max_height = max(layer_positions.values()) * VERTICAL_SPACING + 200

    # -------------------------
    # Add Swimlanes
    # -------------------------
    for layer in LAYER_ORDER:
        lane, label = create_swimlane(layer, max_height)
        elements.insert(0, lane)
        elements.insert(1, label)

    # -------------------------
    # Create Connections
    # -------------------------
    connection_offset = 0

    for conn in root.findall(".//Connection"):
        origin = conn.find("Origin")
        dest = conn.find("Destination")

        if origin is not None and dest is not None:
            source_id = origin.attrib.get("ToolID")
            target_id = dest.attrib.get("ToolID")

            if source_id in node_map and target_id in node_map:
                arrow = create_arrow(
                    node_map[source_id],
                    node_map[target_id],
                    offset=(connection_offset % 10)
                )
                connection_offset += 3
                elements.append(arrow)

    return {
        "type": "excalidraw",
        "version": 2,
        "source": "https://app.excalidraw.com",
        "elements": elements,
        "appState": {
            "viewBackgroundColor": "#ffffff"
        },
        "files": {}
    }


# -------------------------------------------------
# RUN
# -------------------------------------------------

diagram = xml_to_excalidraw_clean("alteryx_sttm_example2.yxmd")

with open("workflow_clean.excalidraw", "w") as f:
    json.dump(diagram, f, indent=2)

print("Clean Excalidraw workflow generated successfully!")


Clean Excalidraw workflow generated successfully!


In [None]:
#graph_model
class Node:
    def __init__(self, node_id, name, node_type):
        self.id = node_id
        self.name = name
        self.type = node_type


class Edge:
    def __init__(self, source, target):
        self.source = source
        self.target = target


class Graph:
    def __init__(self):
        self.nodes = {}
        self.edges = []

    def add_node(self, node_id, name, node_type):
        if node_id not in self.nodes:
            self.nodes[node_id] = Node(node_id, name, node_type)

    def add_edge(self, source, target):
        self.edges.append(Edge(source, target))


In [None]:
#datastage_parser
import re
from models.graph_model import Graph


def parse_datastage(file_path):
    graph = Graph()

    with open(file_path, "r", encoding="utf-8") as f:
        content = f.read()

    # Find all stage blocks
    stage_blocks = re.findall(r"--- \[(.*?)\] ---", content)

    for block in stage_blocks:
        parts = block.split(":")
        if len(parts) >= 2:
            stage_type = parts[0].strip()
            stage_name = parts[1].strip()

            graph.add_node(stage_name, stage_name, stage_type)

    # Extract simple Input/Output links
    link_matches = re.findall(r"Input: ← dataset_\d+ \((.*?)\)", content)
    output_matches = re.findall(r"Output: → dataset_\d+ \((.*?)\)", content)

    # Very basic linking logic (we improve later)
    for src in link_matches:
        for tgt in output_matches:
            graph.add_edge(src, tgt)

    return graph


In [None]:
#graphviz_renderer
from graphviz import Digraph


def render_graph(graph, output_name="etl_diagram"):
    dot = Digraph()

    # Add nodes
    for node in graph.nodes.values():
        dot.node(node.id, f"{node.name}\n({node.type})")

    # Add edges
    for edge in graph.edges:
        dot.edge(edge.source, edge.target)

    dot.render(output_name, format="png", cleanup=True)


In [None]:
#main.py
from parsers.datastage_parser import parse_datastage
from renderers.graphviz_renderer import render_graph

if __name__ == "__main__":
    file_path = "Sample_Job1 1 2_detailed_pseudocode.txt"

    graph = parse_datastage(file_path)

    render_graph(graph, "datastage_output")


In [None]:
import re
from models.graph_model import Graph


def parse_datastage(file_path):
    graph = Graph()

    with open(file_path, "r", encoding="utf-8") as f:
        content = f.read()

    # Split into stage sections
    stage_sections = re.split(r"// --- \[", content)[1:]

    for section in stage_sections:
        header, *body = section.split("\n", 1)
        body_text = body[0] if body else ""

        # Extract stage name and type
        header_parts = header.split("]")
        stage_info = header_parts[0]  # e.g. CUSTOMSTAGE : ORA_Ext_Vehicle_Off_Road_Data
        parts = stage_info.split(":")
        if len(parts) < 2:
            continue

        stage_type = parts[0].strip()
        stage_name = parts[1].strip()

        # Add stage node
        graph.add_node(stage_name, stage_name, stage_type)

        # Extract inputs
        inputs = re.findall(r"Input:\s*←\s*dataset_\d+\s*\((.*?)\)", body_text)

        # Extract outputs
        outputs = re.findall(r"Output:\s*→\s*dataset_\d+\s*\((.*?)\)", body_text)

        # Add dataset nodes and edges
        for inp in inputs:
            graph.add_node(inp, inp, "DATASET")
            graph.add_edge(inp, stage_name)

        for out in outputs:
            graph.add_node(out, out, "DATASET")
            graph.add_edge(stage_name, out)

    return graph
