# KnowledgeReduce Implementation for Google Colab

This notebook implements the KnowledgeReduce framework for building stackable knowledge repositories. It demonstrates the core concepts from the paper, including the mapping phase, reducing phase, and knowledge stacking.

## Overview

The implementation includes the following components:
1. **Data Ingestion**: Loading and preprocessing data
2. **Knowledge Mapping**: Entity and relationship extraction
3. **Knowledge Reduction**: Entity resolution and graph synthesis
4. **Graph Storage**: In-memory graph representation
5. **Query Interface**: Tools for querying and analyzing the knowledge graph

Let's start by installing the required dependencies.

In [None]:
# Install required packages
!pip install pandas numpy networkx spacy scikit-learn fuzzywuzzy python-Levenshtein matplotlib
!python -m spacy download en_core_web_sm

## Configuration

First, let's define the configuration settings for our KnowledgeReduce implementation.

In [None]:
# Configuration settings
class Config:
    # Entity resolution settings
    SIMILARITY_THRESHOLD = 0.8  # Threshold for fuzzy matching
    MAX_EDIT_DISTANCE = 3  # Maximum edit distance for name matching
    
    # Knowledge mapping settings
    DEFAULT_ENTITY_TYPES = ["Person", "Organization", "Location", "Concept"]
    DEFAULT_RELATIONSHIP_TYPES = ["WORKS_FOR", "LOCATED_IN", "RELATED_TO", "KNOWS"]
    
    # Knowledge stacking settings
    KNOWLEDGE_LAYERS = ["Raw", "Processed", "Abstract"]

config = Config()

## Data Ingestion

Now, let's implement the data ingestion module for loading and preprocessing data.

In [None]:
import pandas as pd
import numpy as np
import json
import os
from typing import Dict, List, Tuple, Any

class DataLoader:
    """Loads data from various file formats."""
    
    @staticmethod
    def load_csv(file_path: str) -> pd.DataFrame:
        """Load data from a CSV file."""
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"File not found: {file_path}")
        
        return pd.read_csv(file_path)
    
    @staticmethod
    def load_json(file_path: str) -> Dict:
        """Load data from a JSON file."""
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"File not found: {file_path}")
        
        with open(file_path, 'r') as f:
            return json.load(f)
    
    @staticmethod
    def load_dataframe(data: List[Dict]) -> pd.DataFrame:
        """Create a DataFrame from a list of dictionaries."""
        return pd.DataFrame(data)

class DataPreprocessor:
    """Preprocesses data for knowledge mapping."""
    
    @staticmethod
    def clean_text(text: str) -> str:
        """Clean text data by removing extra whitespace and normalizing case."""
        if not text or not isinstance(text, str):
            return ""
        
        # Remove extra whitespace and normalize case
        return " ".join(text.strip().split()).lower()
    
    @staticmethod
    def normalize_entities(entities_df: pd.DataFrame) -> pd.DataFrame:
        """Normalize entity data by cleaning text fields and ensuring required columns."""
        # Ensure required columns exist
        required_columns = ['id', 'name', 'type']
        for col in required_columns:
            if col not in entities_df.columns:
                raise ValueError(f"Required column '{col}' not found in entities data")
        
        # Create a copy to avoid modifying the original
        df = entities_df.copy()
        
        # Clean text fields
        if 'name' in df.columns:
            df['name'] = df['name'].apply(DataPreprocessor.clean_text)
        
        if 'description' in df.columns:
            df['description'] = df['description'].apply(DataPreprocessor.clean_text)
        
        return df
    
    @staticmethod
    def normalize_relationships(relationships_df: pd.DataFrame) -> pd.DataFrame:
        """Normalize relationship data by cleaning text fields and ensuring required columns."""
        # Ensure required columns exist
        required_columns = ['source_id', 'target_id', 'type']
        for col in required_columns:
            if col not in relationships_df.columns:
                raise ValueError(f"Required column '{col}' not found in relationships data")
        
        # Create a copy to avoid modifying the original
        df = relationships_df.copy()
        
        # Clean text fields
        if 'description' in df.columns:
            df['description'] = df['description'].apply(DataPreprocessor.clean_text)
        
        return df

def create_sample_data():
    """Create sample data for demonstration purposes."""
    # Create sample entities
    entities = [
        {"id": 1, "name": "John Smith", "type": "Person", "description": "Software Engineer"},
        {"id": 2, "name": "Jane Doe", "type": "Person", "description": "Data Scientist"},
        {"id": 3, "name": "Acme Corporation", "type": "Organization", "description": "Technology company"},
        {"id": 4, "name": "TechCorp", "type": "Organization", "description": "Software development firm"},
        {"id": 5, "name": "Machine Learning", "type": "Concept", "description": "Field of AI"},
        {"id": 6, "name": "San Francisco", "type": "Location", "description": "City in California"},
        {"id": 7, "name": "John Smith", "type": "Person", "description": "CTO at TechCorp"},  # Duplicate for resolution
        {"id": 8, "name": "J. Smith", "type": "Person", "description": "Engineer"}  # Similar for resolution
    ]
    
    # Create sample relationships
    relationships = [
        {"source_id": 1, "target_id": 3, "type": "WORKS_FOR", "description": "Employed since 2020"},
        {"source_id": 2, "target_id": 4, "type": "WORKS_FOR", "description": "Senior position"},
        {"source_id": 1, "target_id": 2, "type": "KNOWS", "description": "Colleagues"},
        {"source_id": 1, "target_id": 5, "type": "RELATED_TO", "description": "Expertise"},
        {"source_id": 2, "target_id": 5, "type": "RELATED_TO", "description": "Expertise"},
        {"source_id": 3, "target_id": 6, "type": "LOCATED_IN", "description": "Headquarters"},
        {"source_id": 7, "target_id": 4, "type": "WORKS_FOR", "description": "Executive role"}
    ]
    
    # Create DataFrames
    entities_df = pd.DataFrame(entities)
    relationships_df = pd.DataFrame(relationships)
    
    return entities_df, relationships_df

# Create sample data
entities_df, relationships_df = create_sample_data()

# Preprocess data
preprocessor = DataPreprocessor()
entities_df = preprocessor.normalize_entities(entities_df)
relationships_df = preprocessor.normalize_relationships(relationships_df)

# Display sample data
print("Sample Entities:")
display(entities_df.head(3))
print("\nSample Relationships:")
display(relationships_df.head(3))

## Knowledge Mapping

Next, let's implement the knowledge mapping module, which extracts entities and relationships from the data.

In [None]:
import spacy
from collections import defaultdict

# Load spaCy model
try:
    nlp = spacy.load("en_core_web_sm")
except OSError:
    import subprocess
    subprocess.run(["python", "-m", "spacy", "download", "en_core_web_sm"])
    nlp = spacy.load("en_core_web_sm")

class EntityExtractor:
    """Extracts entities from text data."""
    
    def __init__(self, entity_types=None):
        """Initialize the entity extractor."""
        self.entity_types = entity_types or config.DEFAULT_ENTITY_TYPES
    
    def extract_from_text(self, text: str) -> List[Dict[str, Any]]:
        """Extract entities from text using NLP."""
        if not text:
            return []
        
        doc = nlp(text)
        entities = []
        
        # Extract named entities
        for ent in doc.ents:
            entity_type = self._map_spacy_entity_type(ent.label_)
            if entity_type in self.entity_types:
                entities.append({
                    "name": ent.text,
                    "type": entity_type,
                    "start": ent.start_char,
                    "end": ent.end_char
                })
        
        return entities
    
    def _map_spacy_entity_type(self, spacy_type: str) -> str:
        """Map spaCy entity types to our entity types."""
        # Mapping from spaCy entity types to our types
        mapping = {
            "PERSON": "Person",
            "ORG": "Organization",
            "GPE": "Location",
            "LOC": "Location",
            "PRODUCT": "Concept",
            "WORK_OF_ART": "Concept",
            "EVENT": "Concept"
        }
        
        return mapping.get(spacy_type, "Concept")
    
    def extract_from_structured_data(self, entities_df: pd.DataFrame) -> List[Dict[str, Any]]:
        """Process structured entity data."""
        entities = []
        
        for _, row in entities_df.iterrows():
            entity = {
                "id": row["id"],
                "name": row["name"],
                "type": row["type"]
            }
            
            # Add any additional attributes
            for col in entities_df.columns:
                if col not in ["id", "name", "type"] and not pd.isna(row[col]):
                    entity[col] = row[col]
            
            entities.append(entity)
        
        return entities

class RelationshipExtractor:
    """Extracts relationships between entities."""
    
    def __init__(self, relationship_types=None):
        """Initialize the relationship extractor."""
        self.relationship_types = relationship_types or config.DEFAULT_RELATIONSHIP_TYPES
    
    def extract_from_structured_data(self, relationships_df: pd.DataFrame) -> List[Dict[str, Any]]:
        """Process structured relationship data."""
        relationships = []
        
        for _, row in relationships_df.iterrows():
            relationship = {
                "source_id": row["source_id"],
                "target_id": row["target_id"],
                "type": row["type"]
            }
            
            # Add any additional attributes
            for col in relationships_df.columns:
                if col not in ["source_id", "target_id", "type"] and not pd.isna(row[col]):
                    relationship[col] = row[col]
            
            relationships.append(relationship)
        
        return relationships

class KnowledgeMapper:
    """Implements the mapping phase of KnowledgeReduce."""
    
    def __init__(self):
        """Initialize the knowledge mapper."""
        self.entity_extractor = EntityExtractor()
        self.relationship_extractor = RelationshipExtractor()
    
    def map(self, entities_df: pd.DataFrame, relationships_df: pd.DataFrame) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
        """Map raw data to entities and relationships."""
        # Process structured data
        entities = self.entity_extractor.extract_from_structured_data(entities_df)
        relationships = self.relationship_extractor.extract_from_structured_data(relationships_df)
        
        return entities, relationships

# Map data to entities and relationships
mapper = KnowledgeMapper()
entities, relationships = mapper.map(entities_df, relationships_df)

print(f"Mapped {len(entities)} entities and {len(relationships)} relationships")
print("\nSample mapped entities:")
for entity in entities[:3]:
    print(f"  {entity['name']} ({entity['type']})")

print("\nSample mapped relationships:")
for rel in relationships[:3]:
    print(f"  {rel['source_id']} --[{rel['type']}]--> {rel['target_id']}")

## Knowledge Reduction

Now, let's implement the knowledge reduction module, which resolves entities and synthesizes the knowledge graph.

In [None]:
import networkx as nx
from fuzzywuzzy import fuzz

class EntityResolver:
    """Resolves and merges duplicate entities."""
    
    def __init__(self, similarity_threshold=None, max_edit_distance=None):
        """Initialize the entity resolver."""
        self.similarity_threshold = similarity_threshold or config.SIMILARITY_THRESHOLD
        self.max_edit_distance = max_edit_distance or config.MAX_EDIT_DISTANCE
    
    def resolve(self, entities: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Resolve duplicate entities by identifying and merging them."""
        if not entities:
            return []
        
        # Group entities by type for more efficient comparison
        entities_by_type = defaultdict(list)
        for entity in entities:
            entities_by_type[entity["type"]].append(entity)
        
        resolved_entities = []
        entity_groups = []
        
        # Find duplicate groups within each entity type
        for entity_type, type_entities in entities_by_type.items():
            # Create groups of duplicate entities
            type_entity_groups = self._find_duplicate_groups(type_entities)
            entity_groups.extend(type_entity_groups)
        
        # Merge each group of duplicates
        for group in entity_groups:
            merged_entity = self._merge_entities(group)
            resolved_entities.append(merged_entity)
        
        return resolved_entities
    
    def _find_duplicate_groups(self, entities: List[Dict[str, Any]]) -> List[List[Dict[str, Any]]]:
        """Find groups of duplicate entities."""
        # Initialize each entity as its own group
        groups = [[entity] for entity in entities]
        
        # Merge groups if they contain similar entities
        i = 0
        while i < len(groups):
            j = i + 1
            while j < len(groups):
                if self._groups_contain_similar_entities(groups[i], groups[j]):
                    # Merge groups
                    groups[i].extend(groups[j])
                    groups.pop(j)
                else:
                    j += 1
            i += 1
        
        return groups
    
    def _groups_contain_similar_entities(self, group1: List[Dict[str, Any]], group2: List[Dict[str, Any]]) -> bool:
        """Check if two groups contain similar entities."""
        for entity1 in group1:
            for entity2 in group2:
                if self._are_similar_entities(entity1, entity2):
                    return True
        return False
    
    def _are_similar_entities(self, entity1: Dict[str, Any], entity2: Dict[str, Any]) -> bool:
        """Check if two entities are similar based on name and attributes."""
        # Check for exact ID match
        if "id" in entity1 and "id" in entity2 and entity1["id"] == entity2["id"]:
            return True
        
        # Check for name similarity
        name1 = entity1.get("name", "").lower()
        name2 = entity2.get("name", "").lower()
        
        # Exact name match
        if name1 == name2 and name1:
            return True
        
        # Fuzzy name match
        similarity = fuzz.ratio(name1, name2) / 100.0
        if similarity >= self.similarity_threshold:
            return True
        
        return False
    
    def _merge_entities(self, entities: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Merge a group of duplicate entities into a single entity."""
        if not entities:
            return {}
        
        if len(entities) == 1:
            return entities[0]
        
        # Start with the first entity as the base
        merged = entities[0].copy()
        
        # Track all IDs that this entity represents
        merged["original_ids"] = [entities[0].get("id")]
        
        # Merge attributes from other entities
        for entity in entities[1:]:
            # Add original ID
            if "id" in entity:
                merged["original_ids"].append(entity["id"])
            
            # Merge other attributes
            for key, value in entity.items():
                if key not in merged or not merged[key]:
                    merged[key] = value
                elif key == "description" and value and value != merged[key]:
                    # Concatenate descriptions
                    merged[key] = f"{merged[key]}; {value}"
        
        return merged

class RelationshipAggregator:
    """Aggregates relationships from multiple sources."""
    
    def aggregate(self, relationships: List[Dict[str, Any]], entity_id_map: Dict[Any, Any]) -> List[Dict[str, Any]]:
        """Aggregate relationships, updating entity references and removing duplicates."""
        if not relationships:
            return []
        
        # Update entity references
        updated_relationships = []
        for rel in relationships:
            # Skip if source or target entity was removed during resolution
            if rel["source_id"] not in entity_id_map or rel["target_id"] not in entity_id_map:
                continue
            
            # Create updated relationship with resolved entity IDs
            updated_rel = rel.copy()
            updated_rel["source_id"] = entity_id_map[rel["source_id"]]
            updated_rel["target_id"] = entity_id_map[rel["target_id"]]
            
            # Skip self-relationships (after resolution)
            if updated_rel["source_id"] == updated_rel["target_id"]:
                continue
            
            updated_relationships.append(updated_rel)
        
        # Remove duplicate relationships
        aggregated = self._remove_duplicates(updated_relationships)
        
        return aggregated
    
    def _remove_duplicates(self, relationships: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Remove duplicate relationships."""
        # Use a set to track unique relationship keys
        unique_keys = set()
        unique_relationships = []
        
        for rel in relationships:
            # Create a key for the relationship
            key = (rel["source_id"], rel["target_id"], rel["type"])
            
            if key not in unique_keys:
                unique_keys.add(key)
                unique_relationships.append(rel)
        
        return unique_relationships

class GraphSynthesizer:
    """Synthesizes the knowledge graph from entities and relationships."""
    
    def __init__(self):
        """Initialize the graph synthesizer."""
        self.graph = nx.DiGraph()
    
    def synthesize(self, entities: List[Dict[str, Any]], relationships: List[Dict[str, Any]]) -> nx.DiGraph:
        """Synthesize a knowledge graph from entities and relationships."""
        # Create a new graph
        self.graph = nx.DiGraph()
        
        # Add entities as nodes
        for entity in entities:
            self.graph.add_node(
                entity["id"],
                name=entity.get("name", ""),
                type=entity.get("type", ""),
                description=entity.get("description", ""),
                layer=entity.get("layer", "Raw"),
                attributes={k: v for k, v in entity.items() if k not in ["id", "name", "type", "description", "layer"]}
            )
        
        # Add relationships as edges
        for rel in relationships:
            self.graph.add_edge(
                rel["source_id"],
                rel["target_id"],
                type=rel["type"],
                description=rel.get("description", ""),
                attributes={k: v for k, v in rel.items() if k not in ["source_id", "target_id", "type", "description"]}
            )
        
        return self.graph
    
    def get_graph(self) -> nx.DiGraph:
        """Get the synthesized knowledge graph."""
        return self.graph
    
    def get_statistics(self) -> Dict[str, Any]:
        """Get statistics about the knowledge graph."""
        stats = {
            "num_nodes": self.graph.number_of_nodes(),
            "num_edges": self.graph.number_of_edges(),
            "node_types": defaultdict(int),
            "edge_types": defaultdict(int),
            "avg_degree": sum(dict(self.graph.degree()).values()) / max(1, self.graph.number_of_nodes())
        }
        
        # Count node types
        for node, data in self.graph.nodes(data=True):
            node_type = data.get("type", "Unknown")
            stats["node_types"][node_type] += 1
        
        # Count edge types
        for _, _, data in self.graph.edges(data=True):
            edge_type = data.get("type", "Unknown")
            stats["edge_types"][edge_type] += 1
        
        return stats

class KnowledgeReducer:
    """Implements the reducing phase of KnowledgeReduce."""
    
    def __init__(self):
        """Initialize the knowledge reducer."""
        self.entity_resolver = EntityResolver()
        self.relationship_aggregator = RelationshipAggregator()
        self.graph_synthesizer = GraphSynthesizer()
    
    def reduce(self, entities: List[Dict[str, Any]], relationships: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], nx.DiGraph]:
        """Reduce mapped knowledge by resolving entities, aggregating relationships, and synthesizing the graph."""
        # Resolve entities
        resolved_entities = self.entity_resolver.resolve(entities)
        
        # Create mapping from original entity IDs to resolved entity IDs
        entity_id_map = {}
        for entity in resolved_entities:
            for original_id in entity.get("original_ids", [entity["id"]]):
                entity_id_map[original_id] = entity["id"]
        
        # Aggregate relationships
        aggregated_relationships = self.relationship_aggregator.aggregate(relationships, entity_id_map)
        
        # Synthesize knowledge graph
        knowledge_graph = self.graph_synthesizer.synthesize(resolved_entities, aggregated_relationships)
        
        return resolved_entities, aggregated_relationships, knowledge_graph

# Reduce mapped knowledge
reducer = KnowledgeReducer()
resolved_entities, aggregated_relationships, knowledge_graph = reducer.reduce(entities, relationships)

print(f"Reduced {len(entities)} entities to {len(resolved_entities)} entities")
print(f"Reduced {len(relationships)} relationships to {len(aggregated_relationships)} relationships")

# Print graph statistics
stats = reducer.graph_synthesizer.get_statistics()
print(f"\nGraph statistics:")
print(f"  Nodes: {stats['num_nodes']}")
print(f"  Edges: {stats['num_edges']}")
print(f"  Node types: {dict(stats['node_types'])}")
print(f"  Edge types: {dict(stats['edge_types'])}")
print(f"  Average degree: {stats['avg_degree']:.2f}")

## Graph Storage and Query Interface

Now, let's implement the graph storage and query interface components.

In [None]:
class InMemoryGraphStore:
    """In-memory graph store using NetworkX."""
    
    def __init__(self):
        """Initialize the in-memory graph store."""
        self.graph = nx.DiGraph()
    
    def store_graph(self, graph: nx.DiGraph) -> None:
        """Store a knowledge graph."""
        self.graph = graph.copy()
    
    def get_graph(self) -> nx.DiGraph:
        """Get the stored knowledge graph."""
        return self.graph
    
    def get_node(self, node_id: Any) -> Dict[str, Any]:
        """Get a node by ID."""
        if node_id in self.graph.nodes:
            return {**{"id": node_id}, **self.graph.nodes[node_id]}
        return {}
    
    def get_nodes_by_type(self, node_type: str) -> List[Dict[str, Any]]:
        """Get nodes by type."""
        nodes = []
        for node_id, data in self.graph.nodes(data=True):
            if data.get("type") == node_type:
                nodes.append({**{"id": node_id}, **data})
        return nodes
    
    def get_relationships(self, source_id: Any = None, target_id: Any = None, rel_type: str = None) -> List[Dict[str, Any]]:
        """Get relationships, optionally filtered by source, target, or type."""
        relationships = []
        
        for source, target, data in self.graph.edges(data=True):
            # Apply filters
            if source_id is not None and source != source_id:
                continue
            if target_id is not None and target != target_id:
                continue
            if rel_type is not None and data.get("type") != rel_type:
                continue
            
            relationships.append({
                "source_id": source,
                "target_id": target,
                **data
            })
        
        return relationships
    
    def get_neighbors(self, node_id: Any, direction: str = "both") -> List[Dict[str, Any]]:
        """Get neighboring nodes of a given node."""
        if node_id not in self.graph.nodes:
            return []
        
        neighbors = []
        
        if direction in ["out", "both"]:
            for target in self.graph.successors(node_id):
                neighbors.append({**{"id": target}, **self.graph.nodes[target]})
        
        if direction in ["in", "both"]:
            for source in self.graph.predecessors(node_id):
                if {**{"id": source}, **self.graph.nodes[source]} not in neighbors:
                    neighbors.append({**{"id": source}, **self.graph.nodes[source]})
        
        return neighbors

class GraphQuery:
    """Interface for querying the knowledge graph."""
    
    def __init__(self, graph_store):
        """Initialize the graph query interface."""
        self.graph_store = graph_store
    
    def get_entities_by_type(self, entity_type: str) -> List[Dict[str, Any]]:
        """Get entities of a specific type."""
        return self.graph_store.get_nodes_by_type(entity_type)
    
    def get_entity_by_id(self, entity_id: Any) -> Dict[str, Any]:
        """Get an entity by ID."""
        return self.graph_store.get_node(entity_id)
    
    def get_entity_by_name(self, name: str) -> Dict[str, Any]:
        """Get an entity by name (returns first match)."""
        graph = self.graph_store.get_graph()
        for node_id, data in graph.nodes(data=True):
            if data.get("name", "").lower() == name.lower():
                return {**{"id": node_id}, **data}
        return {}
    
    def get_relationships_between(self, source_id: Any, target_id: Any) -> List[Dict[str, Any]]:
        """Get relationships between two entities."""
        return self.graph_store.get_relationships(source_id=source_id, target_id=target_id)
    
    def get_relationships_by_type(self, rel_type: str) -> List[Dict[str, Any]]:
        """Get relationships of a specific type."""
        return self.graph_store.get_relationships(rel_type=rel_type)
    
    def get_neighbors(self, entity_id: Any, direction: str = "both") -> List[Dict[str, Any]]:
        """Get neighboring entities of a given entity."""
        return self.graph_store.get_neighbors(entity_id, direction)
    
    def find_paths(self, source_id: Any, target_id: Any, max_length: int = 3) -> List[List[Dict[str, Any]]]:
        """Find paths between two entities."""
        graph = self.graph_store.get_graph()
        
        if source_id not in graph.nodes or target_id not in graph.nodes:
            return []
        
        paths = []
        
        # Find all simple paths up to max_length
        for path in nx.all_simple_paths(graph, source_id, target_id, cutoff=max_length):
            entity_path = []
            for node_id in path:
                entity_path.append({**{"id": node_id}, **graph.nodes[node_id]})
            paths.append(entity_path)
        
        return paths

class KnowledgeAnalyzer:
    """Analyzes the knowledge graph to extract insights."""
    
    def __init__(self, graph_store):
        """Initialize the knowledge analyzer."""
        self.graph_store = graph_store
    
    def get_central_entities(self, top_n: int = 10) -> List[Dict[str, Any]]:
        """Get the most central entities based on degree centrality."""
        graph = self.graph_store.get_graph()
        
        # Calculate degree centrality
        centrality = nx.degree_centrality(graph)
        
        # Sort entities by centrality
        sorted_entities = sorted(centrality.items(), key=lambda x: x[1], reverse=True)[:top_n]
        
        # Convert to entity dictionaries with centrality scores
        central_entities = []
        for node_id, score in sorted_entities:
            entity = {**{"id": node_id}, **graph.nodes[node_id], "centrality": score}
            central_entities.append(entity)
        
        return central_entities
    
    def generate_summary(self) -> Dict[str, Any]:
        """Generate a summary of the knowledge graph."""
        graph = self.graph_store.get_graph()
        
        # Count entities by type
        entity_types = defaultdict(int)
        for _, data in graph.nodes(data=True):
            entity_type = data.get("type", "Unknown")
            entity_types[entity_type] += 1
        
        # Count relationships by type
        relationship_types = defaultdict(int)
        for _, _, data in graph.edges(data=True):
            rel_type = data.get("type", "Unknown")
            relationship_types[rel_type] += 1
        
        # Calculate average number of relationships per entity
        avg_degree = sum(dict(graph.degree()).values()) / max(1, graph.number_of_nodes())
        
        # Get central entities
        central_entities = self.get_central_entities(5)
        
        return {
            "entity_statistics": {
                "total_entities": graph.number_of_nodes(),
                "entity_types": dict(entity_types),
                "avg_relationships": avg_degree
            },
            "relationship_statistics": {
                "total_relationships": graph.number_of_edges(),
                "relationship_types": dict(relationship_types)
            },
            "central_entities": central_entities
        }

# Store the graph
graph_store = InMemoryGraphStore()
graph_store.store_graph(knowledge_graph)

# Create query interface
query = GraphQuery(graph_store)

# Test queries
print("\nEntities by type:")
for entity_type in set(entity['type'] for entity in resolved_entities):
    entities_of_type = query.get_entities_by_type(entity_type)
    print(f"  {entity_type}: {len(entities_of_type)} entities")

# Find entity by name
john = query.get_entity_by_name("john smith")
if john:
    print(f"\nFound entity: {john['name']} ({john['id']})")
    
    # Get neighbors
    neighbors = query.get_neighbors(john['id'])
    print(f"\nNeighbors of {john['name']}:")
    for neighbor in neighbors:
        print(f"  {neighbor['name']} ({neighbor['type']})")

# Create analyzer
analyzer = KnowledgeAnalyzer(graph_store)

# Get central entities
print("\nMost central entities:")
central = analyzer.get_central_entities(3)
for entity in central:
    print(f"  {entity['name']} (centrality: {entity['centrality']:.3f})")

# Generate summary
summary = analyzer.generate_summary()
print(f"\nKnowledge Graph Summary:")
print(f"  Entities: {summary['entity_statistics']['total_entities']}")
print(f"  Relationships: {summary['relationship_statistics']['total_relationships']}")
print(f"  Entity types: {summary['entity_statistics']['entity_types']}")
print(f"  Relationship types: {summary['relationship_statistics']['relationship_types']}")

## Knowledge Stacking

Now, let's implement the knowledge stacking mechanism, which organizes knowledge in hierarchical layers.

In [None]:
def implement_knowledge_stacking(graph_store):
    """Implement knowledge stacking by assigning entities to layers."""
    print("\n=== Knowledge Stacking Implementation ===\n")
    
    # Get the knowledge graph
    graph = graph_store.get_graph()
    
    # Create knowledge layers
    layers = config.KNOWLEDGE_LAYERS
    print(f"Creating {len(layers)} knowledge layers: {', '.join(layers)}")
    
    # Assign entities to layers
    for node_id, data in graph.nodes(data=True):
        # Assign layer based on entity type
        if data.get("type") == "Concept":
            # Concepts go to the Abstract layer
            graph.nodes[node_id]["layer"] = "Abstract"
        elif data.get("type") in ["Person", "Organization"]:
            # People and organizations go to the Processed layer
            graph.nodes[node_id]["layer"] = "Processed"
        else:
            # Everything else goes to the Raw layer
            graph.nodes[node_id]["layer"] = "Raw"
    
    # Count entities by layer
    layer_counts = {}
    for layer in layers:
        count = sum(1 for _, data in graph.nodes(data=True) if data.get("layer") == layer)
        layer_counts[layer] = count
    
    print("\nEntities by layer:")
    for layer, count in layer_counts.items():
        print(f"  {layer}: {count} entities")
    
    # Update the graph in the store
    graph_store.store_graph(graph)
    
    return graph

def demonstrate_cross_layer_query(graph_store, query):
    """Demonstrate cross-layer querying capabilities."""
    print("\n=== Cross-Layer Query Demonstration ===\n")
    
    # Find concepts (Abstract layer) related to people (Processed layer)
    print("Concepts related to people:")
    
    # Get all people
    people = query.get_entities_by_type("Person")
    
    # For each person, find related concepts
    for person in people:
        neighbors = query.get_neighbors(person["id"])
        concepts = [n for n in neighbors if n.get("type") == "Concept"]
        
        if concepts:
            print(f"  {person['name']} is related to concepts:")
            for concept in concepts:
                print(f"    - {concept['name']}")

# Implement knowledge stacking
stacked_graph = implement_knowledge_stacking(graph_store)

# Demonstrate cross-layer querying
demonstrate_cross_layer_query(graph_store, query)

## Visualization

Finally, let's visualize the knowledge graph to better understand its structure.

In [None]:
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors

def visualize_knowledge_graph(graph_store):
    """Visualize the knowledge graph."""
    graph = graph_store.get_graph()
    
    # Create a larger figure
    plt.figure(figsize=(12, 10))
    
    # Define node colors by type
    type_colors = {
        "Person": "skyblue",
        "Organization": "lightgreen",
        "Location": "salmon",
        "Concept": "gold"
    }
    
    # Define node colors by layer
    layer_colors = {
        "Raw": "lightgray",
        "Processed": "lightblue",
        "Abstract": "lightyellow"
    }
    
    # Define edge colors by type
    edge_colors = {
        "WORKS_FOR": "blue",
        "LOCATED_IN": "green",
        "KNOWS": "red",
        "RELATED_TO": "purple"
    }
    
    # Create node color map
    node_colors = []
    for node, data in graph.nodes(data=True):
        node_type = data.get("type", "Unknown")
        node_colors.append(type_colors.get(node_type, "gray"))
    
    # Create edge color map
    edge_colors_list = []
    for _, _, data in graph.edges(data=True):
        edge_type = data.get("type", "Unknown")
        edge_colors_list.append(edge_colors.get(edge_type, "gray"))
    
    # Create node labels
    node_labels = {}
    for node, data in graph.nodes(data=True):
        node_labels[node] = data.get("name", str(node))
    
    # Use spring layout for node positioning
    pos = nx.spring_layout(graph, seed=42)
    
    # Draw the graph
    nx.draw_networkx_nodes(graph, pos, node_color=node_colors, node_size=500, alpha=0.8)
    nx.draw_networkx_edges(graph, pos, edge_color=edge_colors_list, width=2, alpha=0.7, arrowsize=15)
    nx.draw_networkx_labels(graph, pos, labels=node_labels, font_size=10, font_weight="bold")
    
    # Add legend for node types
    node_type_patches = [plt.Line2D([0], [0], marker='o', color='w', markerfacecolor=color, markersize=10, label=type_name)
                         for type_name, color in type_colors.items()]
    plt.legend(handles=node_type_patches, title="Entity Types", loc="upper left")
    
    plt.title("KnowledgeReduce Knowledge Graph")
    plt.axis('off')
    plt.tight_layout()
    plt.show()
    
    # Create a second visualization showing layers
    plt.figure(figsize=(12, 10))
    
    # Create node color map by layer
    node_layer_colors = []
    for node, data in graph.nodes(data=True):
        node_layer = data.get("layer", "Raw")
        node_layer_colors.append(layer_colors.get(node_layer, "gray"))
    
    # Draw the graph with layer colors
    nx.draw_networkx_nodes(graph, pos, node_color=node_layer_colors, node_size=500, alpha=0.8)
    nx.draw_networkx_edges(graph, pos, edge_color="gray", width=2, alpha=0.7, arrowsize=15)
    nx.draw_networkx_labels(graph, pos, labels=node_labels, font_size=10, font_weight="bold")
    
    # Add legend for layers
    layer_patches = [plt.Line2D([0], [0], marker='o', color='w', markerfacecolor=color, markersize=10, label=layer_name)
                     for layer_name, color in layer_colors.items()]
    plt.legend(handles=layer_patches, title="Knowledge Layers", loc="upper left")
    
    plt.title("KnowledgeReduce Stackable Knowledge Layers")
    plt.axis('off')
    plt.tight_layout()
    plt.show()

# Visualize the knowledge graph
visualize_knowledge_graph(graph_store)

## Conclusion

This notebook has demonstrated the implementation of the KnowledgeReduce framework for building stackable knowledge repositories. We've covered:

1. **Data Ingestion**: Loading and preprocessing data from various sources
2. **Knowledge Mapping**: Extracting entities and relationships from data
3. **Knowledge Reduction**: Resolving entities and synthesizing the knowledge graph
4. **Knowledge Stacking**: Organizing knowledge in hierarchical layers
5. **Query and Analysis**: Querying and analyzing the knowledge graph
6. **Visualization**: Visualizing the knowledge graph structure

The KnowledgeReduce framework provides a powerful approach to building knowledge repositories with stackable knowledge capabilities. This implementation demonstrates the core concepts and can be extended for more complex use cases.