In [None]:
# Import TopologicPy modules. This is not needed on other computers
import sys
sys.path.append("C:/Users/sarwj/OneDrive - Cardiff University/Documents/GitHub/topologicpy/src")

from topologicpy.Graph import Graph
from topologicpy.Vertex import Vertex
from topologicpy.Edge import Edge
from topologicpy.Dictionary import Dictionary
from topologicpy.Topology import Topology

class GraphRAG:
    """
    A class that implements a Graph-based Retrieval-Augmented Generation (GraphRAG) system
    using TopologicPy graphs and structural graph matching.
    
    This class stores a knowledge base of graph fragments and supports retrieval of 
    semantically similar graphs using `Graph.Match`, allowing these to be used
    for generative tasks such as design recommendation or graph expansion.
    
    Attributes
    ----------
    knowledge_graphs : list
        A list of known TopologicPy graphs used as the knowledge base.
    """

    def __init__(self, fragments):
        """
        Initialize the GraphRAG instance with a list of graph fragments.

        Parameters
        ----------
        fragments : list
            A list of graph fragments represented as lists of tuples: (node_type, [neighbor_types]).
        """
        self.knowledge_graphs = [self._fragment_to_topologic_graph(f) for f in fragments]

    def _fragment_to_topologic_graph(self, fragment):
        vertex_map = {}
        vertices = []
        edges = []

        for node, neighbors in fragment:
            if node not in vertex_map:
                v = Vertex.ByCoordinates(len(vertex_map), 0, 0)
                d = Dictionary.ByKeyValue("type", node)
                v = Topology.SetDictionary(v, d)
                vertex_map[node] = v
                vertices.append(v)

        for node, neighbors in fragment:
            v1 = vertex_map[node]
            for n in neighbors:
                v2 = vertex_map[n]
                if not any((Edge.StartVertex(e), Edge.EndVertex(e)) == (v1, v2) or
                           (Edge.StartVertex(e), Edge.EndVertex(e)) == (v2, v1) for e in edges):
                    edges.append(Edge.ByVertices(v1, v2))

        return Graph.ByVerticesEdges(vertices, edges)

    def retrieve(self, input_graph, top_k=3):
        """
        Retrieve top-k most structurally similar graphs from the knowledge base.

        Parameters
        ----------
        input_graph : topologic_core.Graph
            The input graph to compare against the knowledge base.
        top_k : int, optional
            The number of top matches to return. Default is 3.

        Returns
        -------
        list
            A list of text prompts describing the top-k matching graphs.
        """
        matches = []
        for kg in self.knowledge_graphs:
            result = Graph.Match(kg, input_graph, silent=True)
            if isinstance(result, list) and len(result) > 0:
                best_mapping = result[0]  # Top match
                score = len(best_mapping) / max(len(Graph.Vertices(kg)), 1)
            else:
                score = 0
            matches.append((score, kg))
        matches.sort(reverse=True, key=lambda x: x[0])
        return [self._graph_to_prompt(g) for _, g in matches[:top_k]]

    def _graph_to_prompt(self, graph):
        prompts = []
        for v in Graph.Vertices(graph):
            d = Topology.Dictionary(v)
            t = Dictionary.ValueAtKey(d, "type", "unknown")
            neighbors = Graph.AdjacentVertices(graph, v)
            ntypes = [Dictionary.ValueAtKey(Topology.Dictionary(n), "type", "unknown") for n in neighbors]
            prompts.append(f"A '{t}' connected to {', '.join(ntypes)}.")
        return "\n".join(prompts)


In [None]:
import os
from openai import OpenAI
from topologicpy.Graph import Graph
from topologicpy.Vertex import Vertex
from topologicpy.Edge import Edge
from topologicpy.Dictionary import Dictionary
from topologicpy.Topology import Topology


import getpass
from openai import OpenAI

# Prompt user for API key (input is hidden)
print("[[[[[[[[[[[[   PROVIDE THE OPENAI API KEY ABOVE   ]]]]]]]]]]]]")
api_key = getpass.getpass("Enter your OpenAI API key: ")

# Initialize OpenAI client
client = OpenAI(api_key=api_key)

# ---------------------------
# 1. Knowledge Graph Setup
# ---------------------------
known_graphs = [
    [("bedroom", ["bathroom", "closet"]),
     ("bathroom", ["bedroom"]),
     ("closet", ["bedroom"])],

    [("kitchen", ["diningroom", "pantry"]),
     ("diningroom", ["kitchen"]),
     ("pantry", ["kitchen"])],

    [("livingroom", ["hallway"]),
     ("hallway", ["livingroom", "bedroom"]),
     ("bedroom", ["hallway"]),
     ("bathroom", ["hallway"]),
     ("kitchen", ["hallway"]),
     ("closet", ["hallway"])]
]

rag = GraphRAG(known_graphs)

# ---------------------------
# 2. Initialize the Graph
# ---------------------------
def initialize_graph():
    v = Vertex.ByCoordinates(0, 0, 0)
    d = Dictionary.ByKeyValue("type", "hallway")
    v = Topology.SetDictionary(v, d)
    return Graph.ByVerticesEdges([v], [])

# ---------------------------
# 3. Convert Graph to Prompt
# ---------------------------
def generate_prompt_from_graph(graph):
    if Graph.IsEmpty(graph):
        return "The graph is empty. Suggest the first node to add."
    else:
        prompts = []
        for v in Graph.Vertices(graph):
            d = Topology.Dictionary(v)
            t = Dictionary.ValueAtKey(d, "type", "unknown")
            neighbors = Graph.AdjacentVertices(graph, v)
            ntypes = [Dictionary.ValueAtKey(Topology.Dictionary(n), "type", "unknown") for n in neighbors]
            prompts.append(f"'{t}' connected to {', '.join(ntypes)}.")
        return "Current graph:\n" + "\n".join(prompts) + "\nSuggest a new node to add and what it's connected to."

# ---------------------------
# 4. LLM Query
# ---------------------------
def query_llm(prompt, examples, max_retries=1):
    """
    Query the LLM with retrieval-augmented context. Ensures the reply starts with 'Add' or 'STOP'.
    """
    retrieval_context = "\n".join(examples)
    full_prompt = (
        "You are an assistant that expands a spatial graph by adding one new node at a time.\n"
        "Respond only with one of the following:\n"
        "- Add 'room_type' connected to 'type1', 'type2', ...\n"
        "- STOP (if no more nodes should be added)\n\n"
        f"Examples:\n{retrieval_context}\n\n{prompt}"
    )

    for attempt in range(max_retries + 1):
        response = client.chat.completions.create(
            model="gpt-4",
            messages=[
                {"role": "system", "content": "You are a helpful assistant that expands spatial graphs by adding one node at a time."},
                {"role": "user", "content": full_prompt}
            ]
        )
        suggestion = response.choices[0].message.content.strip()

        # Validate response
        if suggestion.lower().startswith("add ") or suggestion.strip().lower() == "stop":
            return suggestion

        print(f"⚠️ Invalid LLM response (attempt {attempt+1}): {suggestion}")

    # Final fallback
    return "STOP"


# ---------------------------
# 5. Apply Suggestion
# ---------------------------
def apply_suggestion(graph, suggestion):
    import re

    # Clean and normalize suggestion
    suggestion = suggestion.strip().lower().rstrip(".# ")

    if suggestion == "stop":
        return graph, True

    # Try to match pattern like: add 'room_type' connected to 'type1', 'type2'
    match = re.match(r"(?:add|a)? ?'([a-z0-9_ ]+)' connected to (.+)", suggestion)

    if not match:
        print("Could not parse suggestion:", suggestion)
        return graph, True

    new_type = match.group(1).strip()
    neighbor_text = match.group(2).strip()

    # Extract quoted neighbor types if possible
    neighbor_types = re.findall(r"'([a-z0-9_ ]+)'", neighbor_text)

    # Fallback: try comma/space-separated tokens
    if not neighbor_types:
        neighbor_text = re.sub(r"[^a-z0-9_, ]", "", neighbor_text)
        neighbor_types = [n.strip() for n in re.split(r"[ ,]+", neighbor_text) if n.strip()]

    if not neighbor_types:
        print(f"Could not extract neighbor types from: {match.group(2)}")
        return graph, False

    # Prepare graph elements
    existing_vertices = Graph.Vertices(graph)
    all_vs = existing_vertices[:]
    all_es = Graph.Edges(graph)

    # Create the new vertex
    new_v = Vertex.ByCoordinates(len(existing_vertices), 0, 0)
    d_new = Dictionary.ByKeyValue("type", new_type)
    new_v = Topology.SetDictionary(new_v, d_new)
    all_vs.append(new_v)

    # Attempt to connect new node
    connected = False
    for v in existing_vertices:
        t = Dictionary.ValueAtKey(Topology.Dictionary(v), "type", "unknown")
        if t in neighbor_types:
            all_es.append(Edge.ByVertices(new_v, v))
            connected = True

    if not connected:
        print(f"Skipping node '{new_type}' because it would be isolated.")
        return graph, False

    return Graph.ByVerticesEdges(all_vs, all_es), False

def draw_graph(graph):
    flat_g = Graph.Reshape(graph)
    room_type_to_color = {
        "bedroom": "lightblue",
        "bathroom": "lightgray",
        "closet": "saddlebrown",
        "kitchen": "peachpuff",
        "diningroom": "khaki",
        "dining room": "khaki",
        "pantry": "linen",
        "livingroom": "mediumseagreen",
        "living room": "mediumseagreen",
        "hallway": "darkslategray",
        "library": "plum",
        "laundryroom": "powderblue",
        "laundry room": "powderblue",
        "garage": "slategray",
        "basement": "dimgray",
        "homeoffice": "lightsteelblue",
        "home office": "lightsteelblue",
        "mudroom": "burlywood",
        "mud room": "burlywood",
        "master bedroom": "cornflowerblue",
        "guest bedroom": "lightskyblue"
    }

    vertices = Graph.Vertices(flat_g)

    for v in vertices:
        d = Topology.Dictionary(v)
        room_color = room_type_to_color.get(Dictionary.ValueAtKey(d, "type"), "black")
        d = Dictionary.SetValueAtKey(d, "room_color", room_color)
        v = Topology.SetDictionary(v, d)
    Topology.Show(flat_g, vertexLabelKey="type", vertexSize=20, vertexColorKey="room_color", showVertexLabel=True)
# ---------------------------
# 6. Main Loop
# ---------------------------
graph = initialize_graph()
stop = False
step = 0

while not stop and step < 15:
    print(f"\n--- Step {step} ---")
    prompt = generate_prompt_from_graph(graph)
    retrieved_examples = rag.retrieve(graph, top_k=3)
    print(retrieved_examples)
    suggestion = query_llm(prompt, retrieved_examples)
    print("LLM Suggestion:", suggestion)
    graph, stop = apply_suggestion(graph, suggestion)
    draw_graph(graph)
    step += 1

# ---------------------------
# 7. Final Graph Summary
# ---------------------------
print("\nFinal Graph Vertices:")
for v in Graph.Vertices(graph):
    t = Dictionary.ValueAtKey(Topology.Dictionary(v), "type", "unknown")
    print("-", t)


In [None]:
# Draw the flat graph with color-coding

