# Module 5: Building an AI Knowledge System
## Capstone Project: Intelligent Knowledge Graph Assistant

This module combines everything we've learned about Semantic Kernel to build an intelligent system that:
- Extracts information from text using AI
- Stores it in a knowledge graph
- Uses agents to manage and query the knowledge
- Employs processes to handle complex operations

In [2]:
import os
import sys
import asyncio
from typing import List, Dict, Any
from enum import Enum
from datetime import datetime
from pydantic import BaseModel, Field

import semantic_kernel as sk
from semantic_kernel.connectors.ai.open_ai import OpenAIChatCompletion
from semantic_kernel.memory import VolatileMemoryStore
from semantic_kernel.planners import SequentialPlanner

# Graph database
from gqlalchemy import Memgraph

### Part 1: Core Knowledge Graph System

First, let's create our base system that handles the graph database operations:

In [None]:
class NodeType(str, Enum):
    CONCEPT = "CONCEPT"
    ENTITY = "ENTITY"
    EVENT = "EVENT"
    FACT = "FACT"

class RelationType(str, Enum):
    IS_A = "IS_A"
    HAS_PROPERTY = "HAS_PROPERTY"
    RELATED_TO = "RELATED_TO"
    HAPPENED_AT = "HAPPENED_AT"
    PARTICIPATED_IN = "PARTICIPATED_IN"

class KnowledgeGraphDB:
    def __init__(self):
        # Initialize Memgraph connection
        self.db = Memgraph()
        self._setup_schema()
    
    def _setup_schema(self):
        # Create indexes and constraints
        self.db.execute(
            "CREATE INDEX ON :CONCEPT(name);",
            "CREATE INDEX ON :ENTITY(name);",
            "CREATE CONSTRAINT ON (n:CONCEPT) ASSERT n.name IS UNIQUE;",
        )
    
    async def add_node(self, name: str, node_type: NodeType, properties: dict = None):
        query = f"""
        CREATE (n:{node_type} {{name: $name}})
        SET n += $properties
        RETURN n
        """
        result = self.db.execute(query, {'name': name, 'properties': properties or {}})
        return result.single()['n']
    
    async def add_relation(self, from_node: str, to_node: str, relation_type: RelationType, properties: dict = None):
        query = f"""
        MATCH (a), (b)
        WHERE a.name = $from_name AND b.name = $to_name
        CREATE (a)-[r:{relation_type} $properties]->(b)
        RETURN r
        """
        result = self.db.execute(
            query, 
            {
                'from_name': from_node, 
                'to_name': to_node,
                'properties': properties or {}
            }
        )
        return result.single()['r']
    
    async def query_subgraph(self, start_node: str, depth: int = 2):
        query = f"""
        MATCH path = (start)-[*1..{depth}]-(related)
        WHERE start.name = $name
        RETURN path
        """
        return self.db.execute(query, {'name': start_node})

### Part 2: Knowledge Extraction Agents

Now let's create agents that can extract knowledge from text:

In [None]:
class KnowledgeExtraction(BaseModel):
    """Structure for extracted knowledge"""
    concepts: List[str] = Field(description="Main concepts identified")
    entities: List[str] = Field(description="Named entities found")
    relationships: List[Dict[str, str]] = Field(description="Relationships between concepts/entities")
    facts: List[str] = Field(description="Factual statements extracted")

class KnowledgeExtractionAgent:
    def __init__(self, kernel: sk.Kernel):
        # Create semantic function for extraction
        self.extract_knowledge = kernel.create_semantic_function(
            prompt_template="""
            Analyze the following text and extract key knowledge elements.
            Structure the knowledge into:
            - Main concepts
            - Named entities
            - Relationships between them
            - Key facts
            
            Text: {{$input}}
            
            Respond in the following JSON format:
            {
                "concepts": ["concept1", "concept2"],
                "entities": ["entity1", "entity2"],
                "relationships": [
                    {"from": "entity1", "to": "concept1", "type": "IS_A"},
                    {"from": "entity1", "to": "entity2", "type": "RELATED_TO"}
                ],
                "facts": ["fact1", "fact2"]
            }
            """,
            function_name="extract_knowledge",
            description="Extracts structured knowledge from text"
        )
    
    async def process_text(self, text: str) -> KnowledgeExtraction:
        # Extract knowledge
        result = await self.extract_knowledge.invoke(text)
        return KnowledgeExtraction.model_validate_json(str(result))

### Part 3: Knowledge Graph Process

Let's create a process to handle the flow of knowledge ingestion:

In [None]:
class KnowledgeIngestionState(BaseModel):
    text_processed: bool = False
    knowledge_extracted: bool = False
    nodes_created: List[str] = []
    relations_created: List[Dict] = []
    errors: List[str] = []

class TextProcessingStep(KernelProcessStep[KnowledgeIngestionState]):
    def __init__(self, extractor: KnowledgeExtractionAgent):
        self.extractor = extractor
        super().__init__()
    
    @kernel_function
    async def process_text(self, context: KernelProcessStepContext, text: str):
        try:
            knowledge = await self.extractor.process_text(text)
            self.state.text_processed = True
            await context.emit_event("KnowledgeExtracted", knowledge)
        except Exception as e:
            self.state.errors.append(f"Text processing failed: {str(e)}")
            await context.emit_event("ProcessingError")

class GraphUpdateStep(KernelProcessStep[KnowledgeIngestionState]):
    def __init__(self, graph_db: KnowledgeGraphDB):
        self.db = graph_db
        super().__init__()
    
    @kernel_function
    async def update_graph(self, context: KernelProcessStepContext, knowledge: KnowledgeExtraction):
        try:
            # Add concepts and entities
            for concept in knowledge.concepts:
                node = await self.db.add_node(concept, NodeType.CONCEPT)
                self.state.nodes_created.append(node.name)
            
            for entity in knowledge.entities:
                node = await self.db.add_node(entity, NodeType.ENTITY)
                self.state.nodes_created.append(node.name)
            
            # Add relationships
            for rel in knowledge.relationships:
                relation = await self.db.add_relation(
                    rel["from"], 
                    rel["to"], 
                    RelationType[rel["type"]]
                )
                self.state.relations_created.append({
                    "from": rel["from"],
                    "to": rel["to"],
                    "type": rel["type"]
                })
            
            self.state.knowledge_extracted = True
            await context.emit_event("GraphUpdated")
        except Exception as e:
            self.state.errors.append(f"Graph update failed: {str(e)}")
            await context.emit_event("UpdateError")

### Part 4: Query Agent System

Now let's create agents that can query and reason about the knowledge:

In [None]:
class QueryResult(BaseModel):
    """Structure for query results"""
    answer: str = Field(description="Direct answer to the query")
    explanation: str = Field(description="Explanation of how the answer was derived")
    confidence: float = Field(description="Confidence score for the answer")
    supporting_facts: List[str] = Field(description="Facts from the graph that support the answer")

class KnowledgeQueryAgent:
    def __init__(self, kernel: sk.Kernel, graph_db: KnowledgeGraphDB):
        self.db = graph_db
        self.query_function = kernel.create_semantic_function(
            prompt_template="""
            Answer the following question using the provided knowledge graph context.
            
            Question: {{$question}}
            
            Context from knowledge graph:
            {{$context}}
            
            Respond in the following JSON format:
            {
                "answer": "direct answer",
                "explanation": "how you arrived at the answer",
                "confidence": 0.95,
                "supporting_facts": ["fact1", "fact2"]
            }
            
            Only use information that is directly supported by the context.
            If you're unsure, indicate lower confidence.
            """,
            function_name="query_knowledge",
            description="Queries knowledge graph to answer questions"
        )
    
    async def query(self, question: str) -> QueryResult:
        # Extract key terms from question
        key_terms = await self._extract_key_terms(question)
        
        # Query graph for relevant context
        context = []
        for term in key_terms:
            subgraph = await self.db.query_subgraph(term)
            context.extend(self._format_subgraph(subgraph))
        
        # Get answer using context
        result = await self.query_function.invoke(
            question=question,
            context="\n".join(context)
        )
        
        return QueryResult.model_validate_json(str(result))
    
    async def _extract_key_terms(self, question: str) -> List[str]:
        # Implementation to extract key terms from question
        pass
    
    def _format_subgraph(self, subgraph) -> List[str]:
        # Implementation to format subgraph into readable context
        pass

### Part 5: Putting It All Together

Here's how to use the complete system:

In [None]:
async def main():
    # Initialize Kernel
    kernel = sk.Kernel()
    kernel.add_service(
        OpenAIChatCompletion(
            service_id="chat-gpt",
            ai_model_id="gpt-4"
        )
    )
    
    # Initialize components
    graph_db = KnowledgeGraphDB()
    extractor = KnowledgeExtractionAgent(kernel)
    query_agent = KnowledgeQueryAgent(kernel, graph_db)
    
    # Create ingestion process
    from semantic_kernel.processes.process_builder import ProcessBuilder
    
    process = ProcessBuilder("KnowledgeIngestion")
    text_step = process.add_step(TextProcessingStep(extractor))
    graph_step = process.add_step(GraphUpdateStep(graph_db))
    
    # Configure process flow
    process.on_input_event("StartIngestion").send_event_to(text_step)
    text_step.on_event("KnowledgeExtracted").send_event_to(graph_step)
    text_step.on_event("ProcessingError").stop_process()
    graph_step.on_event("UpdateError").stop_process()
    
    # Example usage
    sample_text = """
    The Python programming language was created by Guido van Rossum and was first released in 1991. 
    Python is known for its simple syntax and readability, which makes it popular among beginners. 
    It supports multiple programming paradigms, including procedural, object-oriented, and functional programming.
    """
    
    # Ingest knowledge
    kernel_process = process.build()
    result = await start(
        process=kernel_process,
        kernel=kernel,
        initial_event=KernelProcessEvent(id="StartIngestion", data=sample_text)
    )
    
    # Query knowledge
    questions = [
        "Who created Python?",
        "What are the main characteristics of Python?",
        "When was Python first released?"
    ]
    
    for question in questions:
        result = await query_agent.query(question)
        print(f"\nQ: {question}")
        print(f"A: {result.answer}")
        print(f"Confidence: {result.confidence}")
        print("Supporting facts:")
        for fact in result.supporting_facts:
            print(f"- {fact}")

if __name__ == "__main__":
    asyncio.run(main())


### Part 6: Enhancements and Extensions

1. Add document processing for different file types
2. Implement fact verification using external sources
3. Add temporal reasoning capabilities
4. Create visualization tools for the knowledge graph
5. Implement belief system (certainty scores for facts)
6. Add automatic knowledge refreshing

### Best Practices

1. **Knowledge Quality**
   - Validate extracted information
   - Track knowledge provenance
   - Handle contradictions
   - Update outdated information

2. **Performance**
   - Index frequently accessed nodes
   - Cache common query patterns
   - Batch graph updates
   - Implement query optimization

3. **Error Handling**
   - Validate input text
   - Handle extraction failures
   - Manage graph inconsistencies
   - Track failed queries

4. **Security**
   - Validate knowledge sources
   - Implement access control
   - Audit knowledge changes
   - Secure sensitive information

### Resources
- [Semantic Kernel Documentation](https://learn.microsoft.com/semantic-kernel/)
- [MemGraph Documentation](https://memgraph.com/docs)
- [Graph Database Patterns](https://neo4j.com/developer/cypher/guide-sql-to-cypher/)