# NetworkX Knowledge Graph

This notebook loads the order dataset, constructs a NetworkX knowledge graph, and answers structured questions by traversing that graph.


In [None]:
import json
import logging
import re
from pathlib import Path
from typing import Dict, List, Optional, Tuple

import difflib
import networkx as nx
import pandas as pd
from IPython.display import display

from customer_query_agent import OrderAnalytics, load_orders


In [None]:
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
DATA_PATH = Path("Dataset_product_orders.csv")
rows = load_orders(str(DATA_PATH))
analytics = OrderAnalytics(rows)
print(f"Loaded {len(rows)} order lines for {len(analytics.customer_profiles)} customers and {len(analytics.product_profiles)} products.")


In [None]:
def build_knowledge_graph(analytics: OrderAnalytics) -> nx.DiGraph:
    """Construct a directed knowledge graph from the analytics aggregates."""
    graph = nx.DiGraph(name="orders_knowledge_graph")

    # Product nodes plus hierarchy edges.
    for product_id, profile in analytics.product_profiles.items():
        product_node = f"product::{product_id}"
        graph.add_node(
            product_node,
            type="product",
            product_id=product_id,
            product_name=profile.display_name,
            category=profile.category,
            sub_category=profile.sub_category,
            broad_category=profile.broad_category,
            total_revenue=profile.total_revenue,
            total_quantity=profile.total_quantity,
            average_unit_price=profile.avg_price,
            unique_customers=profile.unique_customers,
        )
        category_node = f"category::{profile.category}"
        graph.add_node(category_node, type="category", category=profile.category)
        graph.add_edge(product_node, category_node, type="belongs_to", weight=1.0)
        broad_node = f"broad::{profile.broad_category}"
        graph.add_node(broad_node, type="broad_category", broad_category=profile.broad_category)
        graph.add_edge(category_node, broad_node, type="rolls_up_to", weight=1.0)

    display_to_product_id = {profile.display_name: product_id for product_id, profile in analytics.product_profiles.items()}
    display_lower_to_product_id = {name.lower(): product_id for name, product_id in display_to_product_id.items()}

    for customer_id, profile in analytics.customer_profiles.items():
        customer_node = f"customer::{customer_id}"
        graph.add_node(
            customer_node,
            type="customer",
            customer_id=customer_id,
            customer_name=profile.customer_name,
            total_spend=profile.total_spend,
            total_orders=profile.total_orders,
            total_items=profile.total_items,
            avg_ticket=profile.avg_ticket,
            first_purchase=profile.first_purchase.isoformat(),
            last_purchase=profile.last_purchase.isoformat(),
        )
        for category, spend in profile.category_spend.items():
            category_node = f"category::{category}"
            graph.add_node(category_node, type="category", category=category)
            graph.add_edge(customer_node, category_node, type="spend", weight=round(spend, 2))
        for display_name, quantity in profile.product_counts.items():
            product_id = display_to_product_id.get(display_name) or display_lower_to_product_id.get(display_name.lower())
            if not product_id:
                continue
            product_node = f"product::{product_id}"
            graph.add_edge(customer_node, product_node, type="purchased", weight=int(quantity))

    for (prod_a, prod_b), count in analytics.cooccurrence.items():
        if count <= 0:
            continue
        node_a = f"product::{prod_a}"
        node_b = f"product::{prod_b}"
        if node_a in graph and node_b in graph:
            graph.add_edge(node_a, node_b, type="cooccurs", weight=int(count))
            graph.add_edge(node_b, node_a, type="cooccurs", weight=int(count))

    for event in analytics.unusual_events:
        anomaly_node = f"anomaly::{event['customer_id']}::{event['category']}"
        if anomaly_node not in graph:
            graph.add_node(
                anomaly_node,
                type="anomaly",
                **event,
            )
        customer_node = f"customer::{event['customer_id']}"
        category_node = f"category::{event['category']}"
        if customer_node in graph:
            graph.add_edge(customer_node, anomaly_node, type="has_anomaly", weight=event.get("change_ratio", 0.0))
        if category_node in graph:
            graph.add_edge(anomaly_node, category_node, type="anomaly_category", weight=event.get("recent_share", 0.0))

    return graph


In [None]:
knowledge_graph = build_knowledge_graph(analytics)
PRODUCT_NAME_TO_ID: Dict[str, str] = {
    profile.display_name.lower(): product_id for product_id, profile in analytics.product_profiles.items()
}
PRODUCT_ID_TO_NAME: Dict[str, str] = {
    product_id: profile.display_name for product_id, profile in analytics.product_profiles.items()
}
CUSTOMER_NAME_TO_ID: Dict[str, str] = {
    profile.customer_name.lower(): customer_id for customer_id, profile in analytics.customer_profiles.items()
}
print(
    "Knowledge graph stats — nodes: {0}, edges: {1}".format(
        knowledge_graph.number_of_nodes(), knowledge_graph.number_of_edges()
    )
)


In [None]:
def _resolve_customer_id(question: str) -> Optional[str]:
    match = re.search(r"CUST[_\-\s]*(\d+)", question, flags=re.IGNORECASE)
    if match:
        candidate = f"CUST_{int(match.group(1)):03d}"
        if candidate in analytics.customer_profiles:
            return candidate
    question_lower = question.lower()
    for name, customer_id in CUSTOMER_NAME_TO_ID.items():
        if name in question_lower:
            return customer_id
    best_score: float = 0.0
    best_id: Optional[str] = None
    for name, customer_id in CUSTOMER_NAME_TO_ID.items():
        score = difflib.SequenceMatcher(None, question_lower, name).ratio()
        if score > 0.65 and score > best_score:
            best_score = score
            best_id = customer_id
    return best_id

def _resolve_product_id(question: str) -> Optional[str]:
    match = re.search(r"PROD[_\-\s]*(\d+)", question, flags=re.IGNORECASE)
    if match:
        candidate = f"PROD_{int(match.group(1)):03d}"
        if candidate in analytics.product_profiles:
            return candidate
    match = re.search(r"Product[_\-\s]*(\d+)", question, flags=re.IGNORECASE)
    if match:
        candidate = f"PROD_{int(match.group(1)):03d}"
        if candidate in analytics.product_profiles:
            return candidate
    question_lower = question.lower()
    for name, product_id in PRODUCT_NAME_TO_ID.items():
        if name in question_lower:
            return product_id
    best_score: float = 0.0
    best_id: Optional[str] = None
    for name, product_id in PRODUCT_NAME_TO_ID.items():
        score = difflib.SequenceMatcher(None, question_lower, name).ratio()
        if score > 0.6 and score > best_score:
            best_score = score
            best_id = product_id
    return best_id

def _top_edges(node: str, edge_type: str, top_k: int = 5, direction: str = "out") -> List[Tuple[str, Dict]]:
    if node not in knowledge_graph:
        return []
    edges: List[Tuple[str, Dict]] = []
    if direction == "out":
        neighbors = knowledge_graph.successors(node)
        for neighbor in neighbors:
            data = knowledge_graph[node][neighbor]
            if data.get("type") == edge_type:
                edges.append((neighbor, data))
    else:
        neighbors = knowledge_graph.predecessors(node)
        for neighbor in neighbors:
            data = knowledge_graph[neighbor][node]
            if data.get("type") == edge_type:
                edges.append((neighbor, data))
    edges.sort(key=lambda item: item[1].get("weight", 0), reverse=True)
    return edges[:top_k]


In [None]:
def answer_with_graph(question: str, top_k: int = 3) -> Dict[str, object]:
    """Route a natural-language question through NetworkX traversals."""
    question_lower = question.lower()
    customer_id = _resolve_customer_id(question)
    product_id = _resolve_product_id(question)
    response: Dict[str, object] = {
        "question": question,
        "intent": "fallback",
        "answer": "",
        "details": [],
    }

    bundle_keywords = ("bundle", "together", "co-occur", "cooccur", "pair")
    anomaly_keywords = ("unusual", "shift", "anomaly", "change")
    profile_keywords = ("spend", "profile", "category", "overview")

    if any(keyword in question_lower for keyword in bundle_keywords):
        response["intent"] = "product_bundle"
        if product_id:
            product_node = f"product::{product_id}"
            neighbors = _top_edges(product_node, "cooccurs", top_k=top_k)
            if not neighbors:
                response["answer"] = f"No strong co-occurrence partners found for {PRODUCT_ID_TO_NAME.get(product_id, product_id)}."
                return response
            lines: List[str] = []
            details: List[Dict[str, object]] = []
            for neighbor, edge_data in neighbors:
                neighbor_id = neighbor.split("::", 1)[1]
                neighbor_profile = analytics.product_profiles[neighbor_id]
                count = int(edge_data.get("weight", 0))
                lines.append(f"{neighbor_profile.display_name} ({neighbor_profile.category}) — {count} baskets")
                details.append(
                    {
                        "product_id": neighbor_id,
                        "product_name": neighbor_profile.display_name,
                        "category": neighbor_profile.category,
                        "cooccurrence_count": count,
                    }
                )
            focus_name = PRODUCT_ID_TO_NAME.get(product_id, product_id)
            response["answer"] = f"Products most commonly bought with {focus_name}: " + "; ".join(lines)
            response["details"] = details
        else:
            cooccurrence_records: Dict[Tuple[str, str], int] = {}
            for source, target, edge_data in knowledge_graph.edges(data=True):
                if edge_data.get("type") != "cooccurs":
                    continue
                if source >= target:
                    continue
                pair = (source.split("::", 1)[1], target.split("::", 1)[1])
                cooccurrence_records[pair] = int(edge_data.get("weight", 0))
            top_pairs = sorted(cooccurrence_records.items(), key=lambda item: item[1], reverse=True)[:top_k]
            if not top_pairs:
                response["answer"] = "No co-occurrence signals available in the knowledge graph."
                return response
            lines = []
            details: List[Dict[str, object]] = []
            for (prod_a, prod_b), count in top_pairs:
                profile_a = analytics.product_profiles[prod_a]
                profile_b = analytics.product_profiles[prod_b]
                lines.append(f"{profile_a.display_name} with {profile_b.display_name} — {count} baskets")
                details.append(
                    {
                        "product_a": prod_a,
                        "product_a_name": profile_a.display_name,
                        "product_b": prod_b,
                        "product_b_name": profile_b.display_name,
                        "cooccurrence_count": count,
                    }
                )
            response["answer"] = "Top co-occurring product pairs: " + "; ".join(lines)
            response["details"] = details
        return response

    if any(keyword in question_lower for keyword in anomaly_keywords):
        response["intent"] = "anomaly"
        events: List[Dict[str, object]] = []
        if customer_id:
            customer_node = f"customer::{customer_id}"
            neighbor_edges = _top_edges(customer_node, "has_anomaly", top_k=top_k)
            for neighbor, _ in neighbor_edges:
                node_data = knowledge_graph.nodes[neighbor]
                events.append(node_data)
        else:
            events = sorted(analytics.unusual_events, key=lambda item: item["change_ratio"], reverse=True)[:top_k]
        if not events:
            response["answer"] = "No notable share shifts detected for the requested scope."
            return response
        lines = []
        details = []
        for event in events:
            details.append(event)
            lines.append(
                "{name} shifted toward {category} ({recent:.0%} vs {previous:.0%}).".format(
                    name=event.get("customer_name"),
                    category=event.get("category"),
                    recent=event.get("recent_share", 0.0),
                    previous=event.get("previous_share", 0.0),
                )
            )
        response["answer"] = "Recent anomalies: " + " ".join(lines)
        response["details"] = details
        return response

    if customer_id or any(keyword in question_lower for keyword in profile_keywords):
        if customer_id:
            response["intent"] = "customer_profile"
            profile = analytics.customer_profiles[customer_id]
            customer_node = f"customer::{customer_id}"
            top_categories = _top_edges(customer_node, "spend", top_k=top_k)
            top_products = _top_edges(customer_node, "purchased", top_k=top_k)
            category_lines = [
                f"{neighbor.split('::', 1)[1]} (${edge_data.get('weight', 0):,.0f})"
                for neighbor, edge_data in top_categories
            ]
            product_lines = []
            details: List[Dict[str, object]] = []
            for neighbor, edge_data in top_products:
                neighbor_id = neighbor.split("::", 1)[1]
                product_profile = analytics.product_profiles[neighbor_id]
                product_lines.append(f"{product_profile.display_name} (qty {int(edge_data.get('weight', 0))})")
                details.append(
                    {
                        "product_id": neighbor_id,
                        "product_name": product_profile.display_name,
                        "category": product_profile.category,
                        "quantity": int(edge_data.get("weight", 0)),
                    }
                )
            answer_parts = [
                f"{profile.customer_name} spent ${profile.total_spend:,.0f} across {profile.total_orders} orders.",
            ]
            if category_lines:
                answer_parts.append("Top categories: " + "; ".join(category_lines) + ".")
            if product_lines:
                answer_parts.append("Frequent products: " + "; ".join(product_lines) + ".")
            response["answer"] = " ".join(answer_parts)
            response["details"] = details
            return response
        else:
            response["intent"] = "dataset_overview"
            overview = analytics.dataset_overview
            broad = analytics.broad_category_summary[:top_k]
            lines = [
                f"Dataset spans {overview.get('rows', 0)} rows from {overview.get('date_range', {}).get('min')} to {overview.get('date_range', {}).get('max')}"
            ]
            if broad:
                cat_text = "; ".join(
                    f"{entry['broad_category']} (${entry['total_revenue']:,.0f})" for entry in broad
                )
                lines.append("Top broad categories: " + cat_text)
            response["answer"] = ". ".join(lines)
            response["details"] = broad
            return response

    response["answer"] = "No graph-derived insight matched the request; consider refining the question."
    return response


In [None]:
sample_questions = [
    "Which products do high-value shoppers bundle with Product 77?",
    "What is the overall spend and top categories for CUST_015?",
    "Call out any unusual shift for Bonnie Garrett.",
]
responses: List[Dict[str, object]] = []
for question in sample_questions:
    result = answer_with_graph(question)
    responses.append(result)
    print(f"Q: {question}")
    print(f"A: {result['answer']}")
    if result.get("details"):
        display(pd.DataFrame(result["details"]))
responses


In [None]:
graph_payload = nx.readwrite.json_graph.node_link_data(knowledge_graph)
artifacts_dir = Path("artifacts")
artifacts_dir.mkdir(exist_ok=True)
graph_path = artifacts_dir / "knowledge_graph.json"
graph_path.write_text(json.dumps(graph_payload, indent=2))
print(f"Graph snapshot written to {graph_path}")
