# üß† Symbolic AI & PDDL Mastery Training
## From General AI Concepts to Database Query Optimization

üî¨ **Approach**: Theory + Practice + Implementation

--- 

### üìö Learning Path Overview:
1. **Symbolic AI Fundamentals** - What is symbolic reasoning?
2. **Knowledge Representation** - How to encode knowledge formally
3. **PDDL Language** - Planning Domain Definition Language syntax
4. **Planning Algorithms** - How AI finds optimal action sequences
5. **Practical Implementation** - Real code examples and exercises
6. **Database Application** - Apply to the query optimizer

# üî¨ Section 1: Symbolic AI Fundamentals

## What is Symbolic AI?

**Symbolic AI** (also called Classical AI, GOFAI - Good Old-Fashioned AI) represents knowledge using:
- **Symbols**: Discrete entities representing concepts (table, index, query)
- **Rules**: Logical relationships between symbols (IF-THEN rules)
- **Logic**: Formal reasoning using mathematical logic

### Key Characteristics:
- ‚úÖ **Explicit Knowledge**: Everything is represented explicitly
- ‚úÖ **Interpretable**: Human-readable reasoning
- ‚úÖ **Logical**: Based on formal logic and mathematics
- ‚úÖ **Deterministic**: Same input ‚Üí Same output

### Symbolic vs Other AI Approaches:

| Approach | Knowledge | Reasoning | Interpretability |
|----------|-----------|-----------|------------------|
| **Symbolic AI** | Explicit rules & facts | Logical inference | High ‚úÖ |
| **Neural Networks** | Implicit weights | Pattern matching | Low ‚ùå |
| **Machine Learning** | Statistical patterns | Probability | Medium üü° |

---

**üéØ In Database Context:**
- Symbolic AI: "IF table has many rows AND query filters on column THEN create index"
- Neural AI: "Based on 10,000 similar queries, probably create index" 
- Hybrid: "Use symbolic rules + ML predictions for optimal decisions"

In [12]:
# üß™ HANDS-ON: Implement Basic Symbolic Reasoning

# 1. LOGICAL OPERATORS AND PREDICATES
class Predicate:
    """Represents a logical predicate (statement that can be true/false)"""
    
    def __init__(self, name, *args):
        self.name = name
        self.args = args
    
    def __str__(self):
        if self.args:
            return f"{self.name}({', '.join(map(str, self.args))})"
        return self.name
    
    def __eq__(self, other):
        return self.name == other.name and self.args == other.args
    
    def __hash__(self):
        return hash((self.name, self.args))

# Example: Database predicates
table_customers = Predicate("table", "customers")
has_index = Predicate("has_index", "customers", "c_custkey")
large_table = Predicate("large_table", "customers")

print("üîç Basic Predicates:")
print(f"  {table_customers}")
print(f"  {has_index}")
print(f"  {large_table}")
print()

üîç Basic Predicates:
  table(customers)
  has_index(customers, c_custkey)
  large_table(customers)



In [13]:
# 2. KNOWLEDGE BASE - Storage for facts and rules
class KnowledgeBase:
    """Stores facts and rules for symbolic reasoning"""
    
    def __init__(self):
        self.facts = set()  # Known true statements
        self.rules = []     # IF-THEN rules
    
    def add_fact(self, predicate:Predicate):
        """Add a fact (known true statement)"""
        self.facts.add(predicate)
        print(f"Added fact: {predicate}")
    
    def add_rule(self, conditions, conclusion):
        """Add rule: IF conditions THEN conclusion"""
        self.rules.append((conditions, conclusion))
        cond_str = " AND ".join(str(c) for c in conditions)
        print(f"Added rule: IF {cond_str} THEN {conclusion}")
    
    def query(self, predicate):
        """Check if a predicate is true"""
        # Direct fact check
        if predicate in self.facts:
            return True, "Direct fact"
        
        # Rule-based inference
        for conditions, conclusion in self.rules:
            if conclusion == predicate:
                # Check if all conditions are satisfied
                if all(self.query(cond)[0] for cond in conditions):
                    return True, f"Inferred from rule: {conditions}"
        
        return False, "Not provable"

# Example: Database optimization knowledge base
kb = KnowledgeBase()

print("Building Database Knowledge Base:")
print("-" * 40)

# Add basic facts
kb.add_fact(Predicate("table", "customers"))
kb.add_fact(Predicate("table", "orders"))
kb.add_fact(Predicate("column", "customers", "c_custkey"))
kb.add_fact(Predicate("filtered", "customers", "c_custkey"))
kb.add_fact(Predicate("table_size", "customers", 100000))

print()

# Add optimization rules
kb.add_rule(
    [Predicate("table", "customers"), 
     Predicate("filtered", "customers", "c_custkey"),
     Predicate("table_size", "customers", 100000)],
    Predicate("should_create_index", "customers", "c_custkey")
)

kb.add_rule(
    [Predicate("should_create_index", "customers", "c_custkey")],
    Predicate("optimization_action", "create_index", "customers", "c_custkey")
)

print("\nTesting Reasoning:")
print("-" * 25)

# Test queries
test_predicate = Predicate("should_create_index", "customers", "c_custkey")
result, explanation = kb.query(test_predicate)
print(f"Query: {test_predicate}")
print(f"Result: {result}")
print(f"Explanation: {explanation}")

print()

action_predicate = Predicate("optimization_action", "create_index", "customers", "c_custkey")
result, explanation = kb.query(action_predicate)
print(f"Query: {action_predicate}")
print(f"Result: {result}")
print(f"Explanation: {explanation}")

Building Database Knowledge Base:
----------------------------------------
Added fact: table(customers)
Added fact: table(orders)
Added fact: column(customers, c_custkey)
Added fact: filtered(customers, c_custkey)
Added fact: table_size(customers, 100000)

Added rule: IF table(customers) AND filtered(customers, c_custkey) AND table_size(customers, 100000) THEN should_create_index(customers, c_custkey)
Added rule: IF should_create_index(customers, c_custkey) THEN optimization_action(create_index, customers, c_custkey)

Testing Reasoning:
-------------------------
Query: should_create_index(customers, c_custkey)
Result: True
Explanation: Inferred from rule: [<__main__.Predicate object at 0x000002540B3FF950>, <__main__.Predicate object at 0x000002540B4A03D0>, <__main__.Predicate object at 0x000002540B4A2790>]

Query: optimization_action(create_index, customers, c_custkey)
Result: True
Explanation: Inferred from rule: [<__main__.Predicate object at 0x000002540B4A1A50>]


# üìä Section 2: Knowledge Representation Systems

## What is Knowledge Representation?

**Knowledge Representation (KR)** is how we encode information about the world so that AI systems can reason with it.

### Core Components:

1. **Ontology**: Defines concepts and relationships
   - What entities exist? (tables, columns, indexes)
   - How are they related? (table HAS columns, column HAS index)

2. **Facts**: Specific instances of knowledge
   - `table(customers)` - customers is a table
   - `has_index(customers, c_custkey)` - customers has index on c_custkey

3. **Rules**: General patterns of reasoning  
   - `IF large_table(X) AND filtered(X, Y) THEN should_create_index(X, Y)`

4. **Constraints**: Limitations and requirements
   - `table can have maximum 10 indexes`
   - `index creation requires available storage`

---

### Knowledge Representation Languages:

| Language | Use Case | Expressiveness | Example |
|----------|----------|----------------|---------|
| **First-Order Logic** | Mathematical reasoning | High | `‚àÄx,y: table(x) ‚àß filtered(x,y) ‚Üí candidate_index(x,y)` |
| **Production Rules** | Expert systems | Medium | `IF table_size > 1000 AND filtered THEN create_index` |
| **Semantic Networks** | Conceptual relationships | Medium | `table --HAS--> column --FILTERED-BY--> query` |
| **PDDL** | AI Planning | High | `(:action create_index :parameters (?t ?c) ...)` |

**üéØ PDDL combines the best aspects**: formal logic + practical expressiveness + planning capabilities

In [2]:
# HANDS-ON: Build a Knowledge Representation System

from typing import Dict, List, Set, Tuple, Any
from dataclasses import dataclass

@dataclass
class Entity:
    """Represents an entity in our knowledge base"""
    name: str
    type: str
    attributes: Dict[str, Any] = None
    
    def __post_init__(self):
        if self.attributes is None:
            self.attributes = {}

@dataclass 
class Relation:
    """Represents a relationship between entities"""
    subject: str
    predicate: str  
    object: str
    
    def __str__(self):
        return f"{self.predicate}({self.subject}, {self.object})"

class DatabaseOntology:
    """Ontology for database optimization domain"""
    
    def __init__(self):
        self.entities = {}      # name -> Entity
        self.relations = []     # List of relations
        self.rules = []         # Inference rules
        self.types = {          # Type hierarchy
            'database_object': ['table', 'column', 'index'],
            'optimization_action': ['create_index', 'reorder_joins', 'filter_pushdown'],
            'query_component': ['select', 'join', 'where', 'order_by']
        }
    
    def add_entity(self, name: str, entity_type: str, **attributes):
        """Add an entity to the ontology"""
        entity = Entity(name, entity_type, attributes)
        self.entities[name] = entity
        print(f"Added {entity_type}: {name}")
        if attributes:
            print(f"   Attributes: {attributes}")
    
    def add_relation(self, subject: str, predicate: str, object: str):
        """Add a relation between entities"""
        relation = Relation(subject, predicate, object)
        self.relations.append(relation)
        print(f"Added relation: {relation}")
    
    def add_inference_rule(self, conditions: List[str], conclusion: str):
        """Add an inference rule"""
        self.rules.append((conditions, conclusion))
        print(f"Added rule: IF {' AND '.join(conditions)} THEN {conclusion}")
    
    def get_related_entities(self, entity_name: str, predicate: str = None):
        """Find entities related to given entity"""
        related = []
        for relation in self.relations:
            if relation.subject == entity_name:
                if predicate is None or relation.predicate == predicate:
                    related.append((relation.predicate, relation.object))
        return related
    
    def infer_facts(self):
        """Apply inference rules to derive new facts"""
        new_facts = []
        
        for conditions, conclusion in self.rules:
            # Simple pattern matching (in real systems, this would be more sophisticated)
            if self._check_conditions(conditions):
                new_facts.append(conclusion)
                print(f"Inferred: {conclusion}")
        
        return new_facts
    
    def _check_conditions(self, conditions):
        """Check if all conditions are satisfied"""
        # Simplified condition checking
        for condition in conditions:
            if not self._condition_satisfied(condition):
                return False
        return True
    
    def _condition_satisfied(self, condition):
        """Check if a single condition is satisfied"""
        # Parse condition and check against facts
        # This is simplified - real implementation would use proper logic
        if "large_table" in condition:
            for name, entity in self.entities.items():
                if entity.type == "table" and entity.attributes.get("row_count", 0) > 10000:
                    return True
        return False

# Example: Build Database Optimization Ontology
print("Building Database Optimization Ontology")
print("=" * 50)

ontology = DatabaseOntology()

# Add database entities
ontology.add_entity("customers", "table", row_count=100000, size_mb=500)
ontology.add_entity("orders", "table", row_count=1000000, size_mb=2000)
ontology.add_entity("c_custkey", "column", data_type="integer", indexed=False)
ontology.add_entity("o_custkey", "column", data_type="integer", indexed=False)

print()

# Add relationships
ontology.add_relation("customers", "has_column", "c_custkey")
ontology.add_relation("orders", "has_column", "o_custkey") 
ontology.add_relation("customers", "joins_with", "orders")
ontology.add_relation("c_custkey", "foreign_key_to", "o_custkey")

print()

# Add optimization rules
ontology.add_inference_rule(
    ["large_table(customers)", "filtered_column(c_custkey)"],
    "recommend_index(customers, c_custkey)"
)

ontology.add_inference_rule(
    ["joins_with(customers, orders)", "foreign_key(c_custkey, o_custkey)"],
    "recommend_join_index(c_custkey)"
)

print("\nQuerying Ontology:")
print("-" * 25)

# Query relationships
customer_relations = ontology.get_related_entities("customers")
print(f"Customers related to: {customer_relations}")

# Check entity attributes
customers = ontology.entities["customers"]
print(f"Customers info: {customers.type}, {customers.attributes}")

print("\nKnowledge Inference:")
print("-" * 25)
inferred = ontology.infer_facts()
if not inferred:
    print("No new facts inferred (conditions not met)")

Building Database Optimization Ontology
Added table: customers
   Attributes: {'row_count': 100000, 'size_mb': 500}
Added table: orders
   Attributes: {'row_count': 1000000, 'size_mb': 2000}
Added column: c_custkey
   Attributes: {'data_type': 'integer', 'indexed': False}
Added column: o_custkey
   Attributes: {'data_type': 'integer', 'indexed': False}

Added relation: has_column(customers, c_custkey)
Added relation: has_column(orders, o_custkey)
Added relation: joins_with(customers, orders)
Added relation: foreign_key_to(c_custkey, o_custkey)

Added rule: IF large_table(customers) AND filtered_column(c_custkey) THEN recommend_index(customers, c_custkey)
Added rule: IF joins_with(customers, orders) AND foreign_key(c_custkey, o_custkey) THEN recommend_join_index(c_custkey)

Querying Ontology:
-------------------------
Customers related to: [('has_column', 'c_custkey'), ('joins_with', 'orders')]
Customers info: table, {'row_count': 100000, 'size_mb': 500}

Knowledge Inference:
----------

# üéØ Section 3: PDDL - Planning Domain Definition Language

## What is PDDL?

**PDDL** is the standard language for AI planning. Think of it as "SQL for AI planning systems."

### üß¨ PDDL History & Evolution:
- **1998**: PDDL 1.2 - Basic planning language
- **2000**: PDDL 2.1 - Added numeric fluents and durative actions  
- **2006**: PDDL 3.0 - Preferences and soft constraints
- **2011**: PDDL 3.1 - Object fluents and timed initial literals
- **Today**: Used in AI competitions, research, and production systems

---

## üèóÔ∏è PDDL Structure Overview

### Two Main Files:
1. **DOMAIN** (.pddl): The "physics" of your world
   - What objects exist?
   - What properties can they have?
   - What actions are possible?

2. **PROBLEM** (.pddl): A specific scenario to solve
   - Initial state (how things are now)
   - Goal state (what we want to achieve)
   - Objects in this particular scenario

---

## üß† Core PDDL Concepts

### 1. **Requirements** - What PDDL features you need
```lisp
(:requirements :strips :typing :fluents :durative-actions)
```

### 2. **Types** - Categories of objects (like classes in programming)
```lisp
(:types 
    table column index - database_object
    btree hash bitmap - index_type
)
```

### 3. **Predicates** - Boolean properties and relationships
```lisp
(:predicates
    (table ?t - table)                    ; ?t is a table
    (has_index ?t - table ?c - column)    ; table ?t has index on column ?c
    (joined ?t1 - table ?t2 - table)     ; tables ?t1 and ?t2 are joined
)
```

### 4. **Functions** - Numeric values for optimization
```lisp
(:functions
    (total-cost) - number                 ; Current total execution cost
    (table_size ?t - table) - number     ; Number of rows in table ?t
    (scan_cost ?t - table) - number      ; Cost to scan table ?t
)
```

### 5. **Actions** - What can be done to change the world
```lisp
(:action create_index
    :parameters (?table - table ?column - column)
    :precondition (AND ...)
    :effect (AND ...)
)
```

In [4]:
# HANDS-ON: Parse and Understand PDDL Syntax

import re
from typing import List, Dict, Tuple

def parse_pddl_types(pddl_text: str) -> Dict[str, List[str]]:
    """Parse PDDL types section"""
    type_pattern = r'\(:types\s+(.*?)\)'
    match = re.search(type_pattern, pddl_text, re.DOTALL)
    
    if not match:
        return {}
    
    types_section = match.group(1)
    type_hierarchy = {}
    
    # Simple parsing (real parser would be more robust)
    lines = types_section.strip().split('\n')
    for line in lines:
        line = line.strip()
        if '-' in line:
            # Format: child_type1 child_type2 - parent_type
            parts = line.split('-')
            if len(parts) == 2:
                children = parts[0].strip().split()
                parent = parts[1].strip()
                type_hierarchy[parent] = type_hierarchy.get(parent, []) + children
    
    return type_hierarchy

def parse_pddl_predicates(pddl_text: str) -> List[str]:
    """Parse PDDL predicates section"""
    pred_pattern = r'\(:predicates\s+(.*?)\)'
    match = re.search(pred_pattern, pddl_text, re.DOTALL)
    
    if not match:
        return []
    
    predicates_section = match.group(1)
    predicates = []
    
    # Find all predicates (format: (predicate_name ?param1 - type1 ?param2 - type2))
    pred_lines = re.findall(r'\([^)]+\)', predicates_section)
    for pred in pred_lines:
        if pred.strip() and not pred.startswith(';'):
            predicates.append(pred.strip())
    
    return predicates

def parse_pddl_action(action_text: str) -> Dict[str, any]:
    """Parse a single PDDL action"""
    action_info = {}
    
    # Extract action name
    name_match = re.search(r':action\s+(\w+)', action_text)
    if name_match:
        action_info['name'] = name_match.group(1)
    
    # Extract parameters
    param_match = re.search(r':parameters\s+\((.*?)\)', action_text, re.DOTALL)
    if param_match:
        action_info['parameters'] = param_match.group(1).strip()
    
    # Extract precondition
    prec_match = re.search(r':precondition\s+(.*?)(?=:effect)', action_text, re.DOTALL)
    if prec_match:
        action_info['precondition'] = prec_match.group(1).strip()
    
    # Extract effect
    effect_match = re.search(r':effect\s+(.*?)$', action_text, re.DOTALL)
    if effect_match:
        action_info['effect'] = effect_match.group(1).strip()
    
    return action_info

# Example PDDL Domain for Database Optimization
sample_domain = '''
(define (domain database_optimization)
    (:requirements :strips :typing :fluents)
    
    (:types
        table column index - database_object
        btree_index hash_index - index
        scan_method join_method - execution_method
    )
    
    (:predicates
        (table ?t - table)
        (column ?t - table ?c - column)  
        (has_index ?t - table ?c - column)
        (filtered ?t - table ?c - column)
        (joined ?t1 - table ?t2 - table)
        (optimized ?t - table)
    )
    
    (:functions
        (total-cost) - number
        (table_size ?t - table) - number
        (selectivity ?t - table ?c - column) - number
    )
    
    (:action create_index
        :parameters (?table - table ?column - column)
        :precondition (AND
            (table ?table)
            (column ?table ?column)
            (filtered ?table ?column)
            (not (has_index ?table ?column))
            (> (table_size ?table) 1000)
        )
        :effect (AND
            (has_index ?table ?column)
            (decrease (total-cost) (* (table_size ?table) 0.3))
            (optimized ?table)
        )
    )
)
'''

print("PDDL Domain Analysis")
print("=" * 30)

# Parse types
types = parse_pddl_types(sample_domain)
print("Type Hierarchy:")
for parent, children in types.items():
    print(f"  {parent}: {', '.join(children)}")

print()

# Parse predicates  
predicates = parse_pddl_predicates(sample_domain)
print("Predicates:")
for i, pred in enumerate(predicates, 1):
    print(f"  {i}. {pred}")

print()

# Parse action
action_match = re.search(r'(:action.*?)(?=\)(?:\s*\))?$)', sample_domain, re.DOTALL)
if action_match:
    action_text = action_match.group(1)
    action_info = parse_pddl_action(action_text)
    
    print("Action Analysis:")
    print(f"  Name: {action_info.get('name', 'N/A')}")
    print(f"  Parameters: {action_info.get('parameters', 'N/A')}")
    print("  Precondition:")
    print(f"    {action_info.get('precondition', 'N/A')}")
    print("  Effect:")
    print(f"    {action_info.get('effect', 'N/A')}")

print()
print("üéØ Understanding PDDL:")
print("  ‚úÖ Types define object categories") 
print("  ‚úÖ Predicates define boolean properties")
print("  ‚úÖ Functions define numeric values")
print("  ‚úÖ Actions define state transitions")
print("  ‚úÖ Everything is formal and mathematical!")

PDDL Domain Analysis
Type Hierarchy:
  database_object: table, column, index
  index: btree_index, hash_index
  execution_method: scan_method, join_method

Predicates:

Action Analysis:
  Name: create_index
  Parameters: ?table - table ?column - column
  Precondition:
    (AND
            (table ?table)
            (column ?table ?column)
            (filtered ?table ?column)
            (not (has_index ?table ?column))
            (> (table_size ?table) 1000)
        )
  Effect:
    (AND
            (has_index ?table ?column)
            (decrease (total-cost) (* (table_size ?table) 0.3))
            (optimized ?table)
        )

üéØ Understanding PDDL:
  ‚úÖ Types define object categories
  ‚úÖ Predicates define boolean properties
  ‚úÖ Functions define numeric values
  ‚úÖ Actions define state transitions
  ‚úÖ Everything is formal and mathematical!


# üèóÔ∏è Section 4: Domain Definition Deep Dive

## PDDL Domain = The Rules of Your World

A PDDL domain defines the **physics** and **logic** of your problem space. Think of it like defining the rules of chess:

- **Pieces** (types): King, Queen, Rook, Pawn...
- **Board positions** (predicates): `(at King e1)`, `(empty d4)`  
- **Legal moves** (actions): `move`, `castle`, `capture`
- **Game state** (functions): `move_count`, `material_value`

For database optimization:
- **Objects**: tables, columns, indexes, queries
- **Properties**: `(has_index table column)`, `(large_table table)`
- **Operations**: `create_index`, `reorder_joins`, `filter_pushdown`
- **Metrics**: `total_cost`, `execution_time`, `storage_used`

---

## üéØ PDDL Domains: Best Practices

### 1. **Start with Ontology Design**
```
What concepts exist in your domain?
‚îú‚îÄ‚îÄ Core Entities: table, column, index, query
‚îú‚îÄ‚îÄ Relationships: table HAS columns, column HAS index
‚îú‚îÄ‚îÄ Properties: indexed, filtered, joined, optimized  
‚îî‚îÄ‚îÄ Metrics: cost, size, selectivity, performance
```

### 2. **Design Type Hierarchy**
```lisp
(:types
    ;; Database structure
    table column constraint - database_object
    primary_key foreign_key unique_key - constraint  
    btree_index hash_index bitmap_index - index
    
    ;; Query execution
    scan_method join_method sort_method - execution_method
    sequential_scan index_scan - scan_method
    hash_join nested_loop_join - join_method
)
```

### 3. **Model Properties as Predicates**
```lisp
(:predicates
    ;; Static properties (don't change during planning)
    (table ?t - table)
    (column ?t - table ?c - column)
    
    ;; Dynamic properties (change during planning)
    (has_index ?t - table ?c - column) 
    (optimized ?t - table)
    
    ;; Query-specific properties
    (filtered ?t - table ?c - column)
    (joined ?t1 - table ?t2 - table)
)
```

### 4. **Use Functions for Optimization**
```lisp
(:functions
    ;; Optimization objectives
    (total-cost) - number
    (execution-time) - number
    
    ;; Table statistics  
    (table_size ?t - table) - number
    (selectivity ?t - table ?c - column) - number
    
    ;; Resource constraints
    (storage_used) - number
    (memory_available) - number
)
```

In [1]:
# HANDS-ON: Build Production-Quality PDDL Domain

from dataclasses import dataclass
from typing import List, Dict, Any, Optional

@dataclass
class PDDLType:
    """Represents a PDDL type with optional parent type"""
    name: str
    parent: Optional[str] = None
    
    def __str__(self):
        if self.parent:
            return f"{self.name} - {self.parent}"
        return self.name

@dataclass 
class PDDLPredicate:
    """Represents a PDDL predicate"""
    name: str
    parameters: List[str]
    
    def __str__(self):
        param_str = " ".join(self.parameters)
        return f"({self.name} {param_str})"

@dataclass
class PDDLFunction:
    """Represents a PDDL function"""
    name: str
    parameters: List[str] 
    return_type: str = "number"
    
    def __str__(self):
        param_str = " ".join(self.parameters)
        if self.parameters:
            return f"({self.name} {param_str}) - {self.return_type}"
        return f"({self.name}) - {self.return_type}"

class PDDLDomainBuilder:
    """Builder for creating PDDL domains systematically"""
    
    def __init__(self, domain_name: str):
        self.domain_name = domain_name
        self.requirements = []
        self.types = []
        self.predicates = []
        self.functions = []
        self.actions = []
    
    def add_requirements(self, *reqs):
        """Add PDDL requirements"""
        self.requirements.extend(reqs)
        return self
    
    def add_type(self, name: str, parent: str = None):
        """Add a type to the domain"""
        self.types.append(PDDLType(name, parent))
        return self
    
    def add_predicate(self, name: str, *parameters):
        """Add a predicate to the domain"""
        self.predicates.append(PDDLPredicate(name, list(parameters)))
        return self
    
    def add_function(self, name: str, *parameters, return_type="number"):
        """Add a function to the domain"""
        self.functions.append(PDDLFunction(name, list(parameters), return_type))
        return self
    
    def generate_domain_string(self) -> str:
        """Generate complete PDDL domain file"""
        domain_parts = []
        
        # Header
        domain_parts.append(f"(define (domain {self.domain_name})")
        
        # Requirements
        if self.requirements:
            req_str = " ".join(f":{req}" for req in self.requirements)
            domain_parts.append(f"    (:requirements {req_str})")
        
        # Types
        if self.types:
            domain_parts.append("    (:types")
            # Group types by parent
            type_groups = {}
            for pddl_type in self.types:
                parent = pddl_type.parent or "object"
                if parent not in type_groups:
                    type_groups[parent] = []
                type_groups[parent].append(pddl_type.name)
            
            for parent, children in type_groups.items():
                if parent != "object":
                    children_str = " ".join(children)
                    domain_parts.append(f"        {children_str} - {parent}")
                else:
                    # Top-level types
                    for child in children:
                        domain_parts.append(f"        {child}")
            domain_parts.append("    )")
        
        # Predicates
        if self.predicates:
            domain_parts.append("    (:predicates")
            for predicate in self.predicates:
                domain_parts.append(f"        {predicate}")
            domain_parts.append("    )")
        
        # Functions
        if self.functions:
            domain_parts.append("    (:functions")
            for function in self.functions:
                domain_parts.append(f"        {function}")
            domain_parts.append("    )")
        
        domain_parts.append(")")
        
        return "\n".join(domain_parts)

# Build Professional Database Optimization Domain
print("Building Professional Database Optimization Domain")
print("=" * 55)

# Create domain builder
builder = PDDLDomainBuilder("database_optimization")

# Add requirements
builder.add_requirements("strips", "typing", "fluents", "durative-actions", "conditional-effects")

print("‚úÖ Added requirements: strips, typing, fluents, durative-actions, conditional-effects")

# Add type hierarchy
(builder.add_type("database_object")
       .add_type("table", "database_object")
       .add_type("column", "database_object")
       .add_type("index", "database_object")
       .add_type("constraint", "database_object"))

(builder.add_type("index_type")
       .add_type("btree_index", "index_type")
       .add_type("hash_index", "index_type")
       .add_type("bitmap_index", "index_type"))

(builder.add_type("execution_method")
       .add_type("scan_method", "execution_method")
       .add_type("join_method", "execution_method"))

print("‚úÖ Added type hierarchy: database objects, index types, execution methods")

# Add predicates for database structure
(builder.add_predicate("table", "?t - table")
       .add_predicate("column", "?t - table", "?c - column")
       .add_predicate("primary_key", "?t - table", "?c - column")
       .add_predicate("foreign_key", "?t1 - table", "?c1 - column", "?t2 - table", "?c2 - column")
       .add_predicate("has_index", "?t - table", "?c - column"))

# Add predicates for query structure  
(builder.add_predicate("selected", "?t - table")
       .add_predicate("joined", "?t1 - table", "?t2 - table")
       .add_predicate("filtered", "?t - table", "?c - column")
       .add_predicate("ordered", "?t - table", "?c - column"))

# Add predicates for optimization state
(builder.add_predicate("optimized", "?t - table")
       .add_predicate("index_exists", "?t - table", "?c - column")
       .add_predicate("scan_method", "?t - table", "?method - scan_method"))

print("Added predicates: structure, query, optimization state")

# Add functions for cost modeling
(builder.add_function("total-cost")
       .add_function("execution-time")
       .add_function("storage-used")
       .add_function("table_size", "?t - table")
       .add_function("selectivity", "?t - table", "?c - column")
       .add_function("join_cost", "?t1 - table", "?t2 - table")
       .add_function("scan_cost", "?t - table", "?method - scan_method"))

print("Added functions: costs, statistics, metrics")

# Generate the complete domain
domain_string = builder.generate_domain_string()

print("\nGenerated PDDL Domain:")
print("=" * 30)
print(domain_string)

print("\nüéØ Domain Analysis:")
print(f"  üìä Types: {len(builder.types)}")
print(f"  üîó Predicates: {len(builder.predicates)}") 
print(f"  üìà Functions: {len(builder.functions)}")
print(f"  ‚ö° Requirements: {len(builder.requirements)}")

print("\n‚úÖ This domain can model:")
print("  ‚Ä¢ Database schema structure (tables, columns, indexes)")
print("  ‚Ä¢ Query patterns (joins, filters, ordering)")
print("  ‚Ä¢ Optimization state (what's been optimized)")
print("  ‚Ä¢ Cost metrics (execution time, storage, I/O)")
print("  ‚Ä¢ Complex optimization actions with preconditions and effects")

Building Professional Database Optimization Domain
‚úÖ Added requirements: strips, typing, fluents, durative-actions, conditional-effects
‚úÖ Added type hierarchy: database objects, index types, execution methods
Added predicates: structure, query, optimization state
Added functions: costs, statistics, metrics

Generated PDDL Domain:
(define (domain database_optimization)
    (:requirements :strips :typing :fluents :durative-actions :conditional-effects)
    (:types
        database_object
        index_type
        execution_method
        table column index constraint - database_object
        btree_index hash_index bitmap_index - index_type
        scan_method join_method - execution_method
    )
    (:predicates
        (table ?t - table)
        (column ?t - table ?c - column)
        (primary_key ?t - table ?c - column)
        (foreign_key ?t1 - table ?c1 - column ?t2 - table ?c2 - column)
        (has_index ?t - table ?c - column)
        (selected ?t - table)
        (joined ?

# üé≤ Section 5: Problem Definition in PDDL

## PDDL Problem = A Specific Scenario to Solve

While the **domain** defines the rules of your world, a **problem** defines:
- **Initial State**: How things are right now
- **Goal State**: What you want to achieve  
- **Objects**: Specific instances in this scenario

Think of it like a specific chess game:
- **Domain**: Rules of chess (how pieces move)
- **Problem**: Specific board position + goal (checkmate in 3 moves)

---

## üèóÔ∏è PDDL Problem Structure

### 1. **Problem Header**
```lisp
(define (problem optimize_customer_query)
    (:domain database_optimization)  ; Links to domain file
```

### 2. **Objects Declaration** 
```lisp
    (:objects
        customers orders - table              ; Specific tables
        c_custkey o_custkey - column         ; Specific columns  
        idx_customers_custkey - index        ; Specific index
    )
```

### 3. **Initial State** - Facts that are currently true
```lisp
    (:init
        ; Database structure facts
        (table customers)
        (table orders) 
        (column customers c_custkey)
        (column orders o_custkey)
        
        ; Query pattern facts
        (filtered customers c_custkey)
        (joined customers orders)
        
        ; Current state facts
        (= (table_size customers) 100000)
        (= (table_size orders) 500000) 
        (= (total-cost) 1000)
    )
```

### 4. **Goal State** - What we want to achieve
```lisp
    (:goal (AND
        (< (total-cost) 200)              ; Reduce cost by 80%
        (optimized customers)             ; Customers table optimized
        (optimized orders)                ; Orders table optimized
    ))
```

---

## üéØ Initial State Design Principles

### ‚úÖ **Complete State Description**
- Include ALL relevant facts about current state
- Don't assume anything - make everything explicit
- Use realistic values based on actual data

### ‚úÖ **Fact Categories**
1. **Structural Facts**: `(table customers)`, `(column customers c_custkey)`
2. **Relational Facts**: `(joined customers orders)`, `(foreign_key ...)`  
3. **Query Facts**: `(filtered customers c_custkey)`, `(selected customers)`
4. **Metric Facts**: `(= (table_size customers) 100000)`

### ‚úÖ **Goal Design Patterns**
- **Optimization Goals**: `(< (total-cost) threshold)`
- **State Goals**: `(optimized table)`, `(has_index table column)`
- **Constraint Goals**: `(< (storage_used) limit)`

In [2]:
# HANDS-ON: Create PDDL Problem Instances

from dataclasses import dataclass
from typing import List, Dict, Any, Union

@dataclass
class PDDLObject:
    """Represents a PDDL object with its type"""
    name: str
    type: str
    
    def __str__(self):
        return f"{self.name} - {self.type}"

@dataclass
class PDDLFact:
    """Represents a fact in initial state or goal"""
    predicate: str
    arguments: List[str]
    
    def __str__(self):
        if self.arguments:
            args_str = " ".join(self.arguments)
            return f"({self.predicate} {args_str})"
        return f"({self.predicate})"

@dataclass 
class PDDLAssignment:
    """Represents a numeric assignment (= (function arg) value)"""
    function: str
    arguments: List[str]
    value: Union[int, float]
    
    def __str__(self):
        if self.arguments:
            args_str = " ".join(self.arguments)
            return f"(= ({self.function} {args_str}) {self.value})"
        return f"(= ({self.function}) {self.value})"

class PDDLProblemBuilder:
    """Builder for creating PDDL problem instances"""
    
    def __init__(self, problem_name: str, domain_name: str):
        self.problem_name = problem_name
        self.domain_name = domain_name
        self.objects = []
        self.initial_facts = []
        self.initial_assignments = []
        self.goal_facts = []
        self.goal_constraints = []
    
    def add_object(self, name: str, obj_type: str):
        """Add an object to the problem"""
        self.objects.append(PDDLObject(name, obj_type))
        return self
    
    def add_initial_fact(self, predicate: str, *arguments):
        """Add a fact to initial state"""
        self.initial_facts.append(PDDLFact(predicate, list(arguments)))
        return self
    
    def add_initial_assignment(self, function: str, arguments: List[str], value: Union[int, float]):
        """Add a numeric assignment to initial state"""
        self.initial_assignments.append(PDDLAssignment(function, arguments, value))
        return self
    
    def add_goal_fact(self, predicate: str, *arguments):
        """Add a fact to goal state"""
        self.goal_facts.append(PDDLFact(predicate, list(arguments)))
        return self
    
    def add_goal_constraint(self, constraint: str):
        """Add a constraint to goal (e.g., '(< (total-cost) 200)')"""
        self.goal_constraints.append(constraint)
        return self
    
    def generate_problem_string(self) -> str:
        """Generate complete PDDL problem file"""
        problem_parts = []
        
        # Header
        problem_parts.append(f"(define (problem {self.problem_name})")
        problem_parts.append(f"    (:domain {self.domain_name})")
        
        # Objects
        if self.objects:
            problem_parts.append("    (:objects")
            # Group objects by type
            type_groups = {}
            for obj in self.objects:
                if obj.type not in type_groups:
                    type_groups[obj.type] = []
                type_groups[obj.type].append(obj.name)
            
            for obj_type, names in type_groups.items():
                names_str = " ".join(names)
                problem_parts.append(f"        {names_str} - {obj_type}")
            problem_parts.append("    )")
        
        # Initial state
        problem_parts.append("    (:init")
        for fact in self.initial_facts:
            problem_parts.append(f"        {fact}")
        for assignment in self.initial_assignments:
            problem_parts.append(f"        {assignment}")
        problem_parts.append("    )")
        
        # Goal
        if self.goal_facts or self.goal_constraints:
            if len(self.goal_facts) + len(self.goal_constraints) > 1:
                problem_parts.append("    (:goal (AND")
                for fact in self.goal_facts:
                    problem_parts.append(f"        {fact}")
                for constraint in self.goal_constraints:
                    problem_parts.append(f"        {constraint}")
                problem_parts.append("    ))")
            else:
                # Single goal
                if self.goal_facts:
                    problem_parts.append(f"    (:goal {self.goal_facts[0]})")
                elif self.goal_constraints:
                    problem_parts.append(f"    (:goal {self.goal_constraints[0]})")
        
        problem_parts.append(")")
        
        return "\\n".join(problem_parts)

# Create Real Database Optimization Problem
print("Creating Database Optimization Problem")
print("=" * 45)

# Create problem for optimizing a customer-orders query
problem = PDDLProblemBuilder("optimize_customer_orders", "database_optimization")

print("Building Problem: Optimize Customer-Orders Query")
print()

# Add objects (specific database entities)
problem.add_object("customers", "table") \
       .add_object("orders", "table") \
       .add_object("c_custkey", "column") \
       .add_object("c_name", "column") \
       .add_object("o_custkey", "column") \
       .add_object("o_orderdate", "column")

print("Added objects: tables (customers, orders) and columns")

# Add initial database structure facts
problem.add_initial_fact("table", "customers") \
       .add_initial_fact("table", "orders") \
       .add_initial_fact("column", "customers", "c_custkey") \
       .add_initial_fact("column", "customers", "c_name") \
       .add_initial_fact("column", "orders", "o_custkey") \
       .add_initial_fact("column", "orders", "o_orderdate")

# Add relationship facts
problem.add_initial_fact("foreign_key", "orders", "o_custkey", "customers", "c_custkey")

print("Added structural facts: tables, columns, relationships")

# Add query pattern facts (what the query does)
problem.add_initial_fact("selected", "customers") \
       .add_initial_fact("selected", "orders") \
       .add_initial_fact("joined", "customers", "orders") \
       .add_initial_fact("filtered", "customers", "c_custkey") \
       .add_initial_fact("ordered", "orders", "o_orderdate")

print("Added query facts: selections, joins, filters, ordering")

# Add initial metric values (current performance)
problem.add_initial_assignment("table_size", ["customers"], 100000) \
       .add_initial_assignment("table_size", ["orders"], 500000) \
       .add_initial_assignment("selectivity", ["customers", "c_custkey"], 0.1) \
       .add_initial_assignment("selectivity", ["orders", "o_orderdate"], 0.05) \
       .add_initial_assignment("total-cost", [], 2000) \
       .add_initial_assignment("execution-time", [], 5.5) \
       .add_initial_assignment("storage-used", [], 1000)

print("Added metrics: table sizes, selectivities, current costs")

# Define optimization goals
problem.add_goal_constraint("(< (total-cost) 500)") \
       .add_goal_constraint("(< (execution-time) 2.0)") \
       .add_goal_fact("optimized", "customers") \
       .add_goal_fact("optimized", "orders")

print("Added goals: cost < 500, time < 2.0s, tables optimized")

# Generate the complete problem
problem_string = problem.generate_problem_string()

print("\\nGenerated PDDL Problem:")
print("=" * 32)
print(problem_string)

print("\\nüìä Problem Statistics:")
print(f"  üì¶ Objects: {len(problem.objects)}")
print(f"  üìã Initial facts: {len(problem.initial_facts)}")
print(f"  üî¢ Initial assignments: {len(problem.initial_assignments)}")
print(f"  üéØ Goal facts: {len(problem.goal_facts)}")
print(f"  üìè Goal constraints: {len(problem.goal_constraints)}")

print("\\nüéØ This problem models:")
print("  ‚Ä¢ Real database schema (customers, orders tables)")
print("  ‚Ä¢ Specific query pattern (join + filter + order)")
print("  ‚Ä¢ Current performance metrics (2000 cost, 5.5s time)")
print("  ‚Ä¢ Optimization goals (reduce cost 75%, time 64%)")
print("  ‚Ä¢ Realistic table sizes (100k customers, 500k orders)")

Creating Database Optimization Problem
Building Problem: Optimize Customer-Orders Query

Added objects: tables (customers, orders) and columns
Added structural facts: tables, columns, relationships
Added query facts: selections, joins, filters, ordering
Added metrics: table sizes, selectivities, current costs
Added goals: cost < 500, time < 2.0s, tables optimized
\nGenerated PDDL Problem:
(define (problem optimize_customer_orders)\n    (:domain database_optimization)\n    (:objects\n        customers orders - table\n        c_custkey c_name o_custkey o_orderdate - column\n    )\n    (:init\n        (table customers)\n        (table orders)\n        (column customers c_custkey)\n        (column customers c_name)\n        (column orders o_custkey)\n        (column orders o_orderdate)\n        (foreign_key orders o_custkey customers c_custkey)\n        (selected customers)\n        (selected orders)\n        (joined customers orders)\n        (filtered customers c_custkey)\n        (order

# üß† Section 6: Planning Algorithms - How AI Finds Solutions

## What is AI Planning?

**AI Planning** is the process of finding a sequence of actions that transforms an initial state into a goal state.

### üéØ Planning = Search Through State Space

```
Initial State ‚îÄ‚îÄ[action1]‚îÄ‚îÄ> State1 ‚îÄ‚îÄ[action2]‚îÄ‚îÄ> State2 ‚îÄ‚îÄ[action3]‚îÄ‚îÄ> Goal State
```

For database optimization:
```
Current DB State ‚îÄ‚îÄ[create_index]‚îÄ‚îÄ> State1 ‚îÄ‚îÄ[reorder_joins]‚îÄ‚îÄ> Optimized State
```

---

## üîç Core Planning Algorithms

### 1. **Forward Search (Most Common)**
- Start from initial state
- Apply actions to reach new states  
- Continue until goal is reached

**Advantages**: Natural, intuitive direction
**Disadvantages**: Large search space, many irrelevant states

### 2. **Backward Search**
- Start from goal state
- Work backwards using inverse actions
- Continue until initial state is reached

**Advantages**: Goal-focused, smaller search space
**Disadvantages**: Complex inverse reasoning

### 3. **Bidirectional Search**  
- Search forward from initial state
- Search backward from goal state
- Meet in the middle

**Advantages**: Faster convergence
**Disadvantages**: Complex implementation

---

## üé≤ State Space Concepts

### **State**: Complete description of world at one point
```python
state = {
    'facts': {('table', 'customers'), ('has_index', 'customers', 'c_custkey')},
    'functions': {'total-cost': 500, 'table_size_customers': 100000}
}
```

### **Action Application**: State ‚Üí New State
```python
# Apply create_index(orders, o_orderdate)
new_state = apply_action(current_state, create_index_action)
# Result: new_state has (has_index orders o_orderdate)
```

### **Goal Test**: Is current state a goal state?
```python
def is_goal(state):
    return (state['functions']['total-cost'] < 500 and 
            ('optimized', 'customers') in state['facts'])
```

---

## ‚ö° Search Strategies

### **Breadth-First Search (BFS)**
- Explore all actions at depth n before depth n+1
- Guarantees optimal solution (fewest actions)
- Memory intensive

### **Depth-First Search (DFS)**  
- Explore one path completely before backtracking
- Memory efficient
- May find suboptimal solutions

### **Best-First Search**
- Always expand the "most promising" state
- Uses heuristic function to estimate promise
- Fast but not guaranteed optimal

### **A\* Search**
- f(state) = g(state) + h(state)
- g(state) = actual cost from start  
- h(state) = estimated cost to goal (heuristic)
- Optimal if heuristic is admissible

In [3]:
# HANDS-ON: Implement Simple Planning Algorithm

from collections import deque
from typing import Set, List, Dict, Any, Tuple, Optional
import heapq

class PlanningState:
    """Represents a state in the planning search space"""
    
    def __init__(self, facts: Set[str], functions: Dict[str, float]):
        self.facts = facts.copy()
        self.functions = functions.copy()
        self.hash_value = None
    
    def __hash__(self):
        if self.hash_value is None:
            fact_tuple = tuple(sorted(self.facts))
            func_tuple = tuple(sorted(self.functions.items()))
            self.hash_value = hash((fact_tuple, func_tuple))
        return self.hash_value
    
    def __eq__(self, other):
        return self.facts == other.facts and self.functions == other.functions
    
    def copy(self):
        return PlanningState(self.facts, self.functions)
    
    def __str__(self):
        facts_str = ", ".join(sorted(list(self.facts))[:5])  # Show first 5 facts
        if len(self.facts) > 5:
            facts_str += f", ... ({len(self.facts)} total)"
        
        funcs_str = ", ".join(f"{k}={v}" for k, v in list(self.functions.items())[:3])
        if len(self.functions) > 3:
            funcs_str += f", ... ({len(self.functions)} total)"
            
        return f"State(facts=[{facts_str}], functions=[{funcs_str}])"

class SimplePlanningAction:
    """Simple representation of a planning action"""
    
    def __init__(self, name: str, preconditions: List[str], 
                 add_effects: List[str], delete_effects: List[str],
                 function_effects: Dict[str, float]):
        self.name = name
        self.preconditions = preconditions
        self.add_effects = add_effects
        self.delete_effects = delete_effects
        self.function_effects = function_effects
    
    def is_applicable(self, state: PlanningState) -> bool:
        """Check if action can be applied in given state"""
        for precond in self.preconditions:
            if precond.startswith("NOT "):
                # Negative precondition
                fact = precond[4:]  # Remove "NOT "
                if fact in state.facts:
                    return False
            else:
                # Positive precondition
                if precond not in state.facts:
                    return False
        return True
    
    def apply(self, state: PlanningState) -> PlanningState:
        """Apply action to state and return new state"""
        if not self.is_applicable(state):
            raise ValueError(f"Action {self.name} not applicable in current state")
        
        new_state = state.copy()
        
        # Apply add effects
        for effect in self.add_effects:
            new_state.facts.add(effect)
        
        # Apply delete effects  
        for effect in self.delete_effects:
            new_state.facts.discard(effect)
        
        # Apply function effects
        for func, change in self.function_effects.items():
            if func in new_state.functions:
                new_state.functions[func] += change
            else:
                new_state.functions[func] = change
        
        return new_state

class ForwardSearchPlanner:
    """Simple forward search planner"""
    
    def __init__(self, initial_state: PlanningState, goal_facts: List[str], 
                 goal_constraints: List[Tuple[str, str, float]], actions: List[SimplePlanningAction]):
        self.initial_state = initial_state
        self.goal_facts = goal_facts
        self.goal_constraints = goal_constraints  # [(function, operator, value)]
        self.actions = actions
        self.visited = set()
    
    def is_goal(self, state: PlanningState) -> bool:
        """Check if state satisfies goal conditions"""
        # Check goal facts
        for fact in self.goal_facts:
            if fact not in state.facts:
                return False
        
        # Check goal constraints
        for func, op, value in self.goal_constraints:
            if func not in state.functions:
                return False
            
            state_value = state.functions[func]
            if op == "<" and not (state_value < value):
                return False
            elif op == ">" and not (state_value > value):
                return False
            elif op == "=" and not (abs(state_value - value) < 0.001):
                return False
        
        return True
    
    def breadth_first_search(self, max_depth: int = 10) -> Optional[List[str]]:
        """Find plan using breadth-first search"""
        if self.is_goal(self.initial_state):
            return []  # Already at goal
        
        queue = deque([(self.initial_state, [])])  # (state, action_sequence)
        visited = {self.initial_state}
        
        for depth in range(max_depth):
            if not queue:
                break
            
            # Process all states at current depth
            for _ in range(len(queue)):
                current_state, action_sequence = queue.popleft()
                
                # Try each action
                for action in self.actions:
                    if action.is_applicable(current_state):
                        new_state = action.apply(current_state)
                        new_sequence = action_sequence + [action.name]
                        
                        if self.is_goal(new_state):
                            return new_sequence
                        
                        if new_state not in visited:
                            visited.add(new_state)
                            queue.append((new_state, new_sequence))
            
            print(f"  Depth {depth + 1}: {len(queue)} states in queue")
        
        return None  # No plan found

# Example: Database Optimization Planning
print("Database Optimization Planning Example")
print("=" * 45)

# Define initial state
initial_facts = {
    "table_customers", "table_orders",
    "column_customers_c_custkey", "column_orders_o_custkey",
    "filtered_customers_c_custkey", "joined_customers_orders"
}

initial_functions = {
    "total-cost": 2000.0,
    "execution-time": 5.5,
    "table_size_customers": 100000.0,
    "table_size_orders": 500000.0
}

initial_state = PlanningState(initial_facts, initial_functions)
print(f"Initial state: {initial_state}")

# Define actions
actions = [
    SimplePlanningAction(
        name="create_index_customers_custkey",
        preconditions=["table_customers", "column_customers_c_custkey", 
                      "filtered_customers_c_custkey", "NOT has_index_customers_c_custkey"],
        add_effects=["has_index_customers_c_custkey", "optimized_customers"],
        delete_effects=[],
        function_effects={"total-cost": -600.0, "execution-time": -2.0}
    ),
    
    SimplePlanningAction(
        name="create_index_orders_custkey", 
        preconditions=["table_orders", "column_orders_o_custkey",
                      "joined_customers_orders", "NOT has_index_orders_o_custkey"],
        add_effects=["has_index_orders_o_custkey", "optimized_orders"],
        delete_effects=[],
        function_effects={"total-cost": -800.0, "execution-time": -1.5}
    ),
    
    SimplePlanningAction(
        name="reorder_joins_customers_orders",
        preconditions=["joined_customers_orders", "table_customers", "table_orders"],
        add_effects=["optimized_join_order"],
        delete_effects=[],
        function_effects={"total-cost": -300.0, "execution-time": -0.8}
    )
]

print(f"Actions available: {[a.name for a in actions]}")

# Define goals
goal_facts = ["optimized_customers", "optimized_orders"]
goal_constraints = [("total-cost", "<", 500.0), ("execution-time", "<", 2.0)]

print(f"Goal facts: {goal_facts}")
print(f"Goal constraints: {goal_constraints}")

# Create planner and search for solution
planner = ForwardSearchPlanner(initial_state, goal_facts, goal_constraints, actions)

print("\\nSearching for optimal plan...")
plan = planner.breadth_first_search(max_depth=5)

if plan:
    print(f"\\nPlan found: {plan}")
    
    # Simulate plan execution
    print("\nPlan Execution Simulation:")
    current_state = initial_state
    
    for i, action_name in enumerate(plan, 1):
        action = next(a for a in actions if a.name == action_name)
        new_state = action.apply(current_state)
        
        cost_change = new_state.functions["total-cost"] - current_state.functions["total-cost"]
        time_change = new_state.functions["execution-time"] - current_state.functions["execution-time"]
        
        print(f"  {i}. {action_name}")
        print(f"     Cost: {current_state.functions['total-cost']:.1f} ‚Üí {new_state.functions['total-cost']:.1f} ({cost_change:+.1f})")
        print(f"     Time: {current_state.functions['execution-time']:.1f}s ‚Üí {new_state.functions['execution-time']:.1f}s ({time_change:+.1f}s)")
        
        current_state = new_state
    
    print(f"\\nFinal state achieved:")
    print(f"   Total cost: {current_state.functions['total-cost']:.1f} (goal: < 500)")
    print(f"   Execution time: {current_state.functions['execution-time']:.1f}s (goal: < 2.0s)")
    
    goal_achieved = planner.is_goal(current_state)
    print(f"   Goal achieved: {goal_achieved}" if goal_achieved else f"   Goal achieved: {goal_achieved} ‚ùå")
    
else:
    print("\nNo plan found within depth limit")

print("\\nüß† Planning Concepts Demonstrated:")
print("  ‚úÖ State representation with facts and functions")
print("  ‚úÖ Action preconditions and effects")
print("  ‚úÖ Forward search with breadth-first strategy") 
print("  ‚úÖ Goal testing with facts and numeric constraints")
print("  ‚úÖ Plan execution and state transitions")

Database Optimization Planning Example
Initial state: State(facts=[column_customers_c_custkey, column_orders_o_custkey, filtered_customers_c_custkey, joined_customers_orders, table_customers, ... (6 total)], functions=[total-cost=2000.0, execution-time=5.5, table_size_customers=100000.0, ... (4 total)])
Actions available: ['create_index_customers_custkey', 'create_index_orders_custkey', 'reorder_joins_customers_orders']
Goal facts: ['optimized_customers', 'optimized_orders']
Goal constraints: [('total-cost', '<', 500.0), ('execution-time', '<', 2.0)]
\nSearching for optimal plan...
  Depth 1: 3 states in queue
  Depth 2: 4 states in queue
\nPlan found: ['create_index_customers_custkey', 'create_index_orders_custkey', 'reorder_joins_customers_orders']

Plan Execution Simulation:
  1. create_index_customers_custkey
     Cost: 2000.0 ‚Üí 1400.0 (-600.0)
     Time: 5.5s ‚Üí 3.5s (-2.0s)
  2. create_index_orders_custkey
     Cost: 1400.0 ‚Üí 600.0 (-800.0)
     Time: 3.5s ‚Üí 2.0s (-1.5s)
 

# üîó Section 7: Connecting to Your Database Optimizer

## üéØ From Theory to Your Implementation

You now understand the fundamentals! Let's see how this connects to your **SymbolicAIAgent** in the intelligent database optimizer.

---

## üìÇ Implementation Analysis

### üèóÔ∏è **PDDL Domain**
```python
domain = {
    'domain_name': 'query_optimization',
    'requirements': [':strips', ':typing', ':fluents'],
    'types': ['table', 'column', 'index', 'join_method', 'scan_method'],
    'predicates': [
        '(table ?t - table)',
        '(has_index ?t - table ?c - column)', 
        '(joined ?t1 - table ?t2 - table)',
        '(filtered ?t - table ?c - column)'
    ],
    'functions': [
        '(total-cost) - number',
        '(table_size ?t - table) - number',
        '(selectivity ?t - table ?c - column) - number'
    ]
}
```

### **PDDL Actions** (lines 170-240)
1. **`add_index`**: Creates B-tree indexes with cost reduction
2. **`reorder_joins`**: Optimizes join order based on cardinality
3. **`choose_scan_method`**: Selects sequential vs index scan  
4. **`filter_pushdown`**: Applies predicate pushdown optimization

### **Planning Process**
```python
def _symbolic_planning(self, pddl_problem):
    # 1. Analyze query structure from Knowledge Graph
    # 2. Generate optimization opportunities  
    # 3. Create PDDL plan with actions
    # 4. Return structured optimization plan
```

### **Cost Model** (lines 380-420)
- **Base cost**: 100.0 (starting point)
- **Table scan cost**: tables √ó 500.0
- **Join cost**: joins¬≤ √ó 1000.0 (quadratic complexity!)
- **Condition cost**: conditions √ó 200.0

---

## Advanced Features in Your System

### **Knowledge Graph Integration**
Your SymbolicAIAgent doesn't work in isolation - it leverages:
- **Schema Ontology**: Table relationships, foreign keys, constraints
- **Query Structure Analysis**: Parsed SQL with joins, filters, ordering
- **Statistical Information**: Table sizes, selectivities, cardinalities

In [4]:
# HANDS-ON: Analyze Your Real Implementation

# Let's examine your actual SymbolicAIAgent to understand how theory maps to practice

# This is a simplified version of your actual implementation
class SymbolicAIAgentAnalysis:
    """Analysis of your real SymbolicAIAgent implementation"""
    
    def __init__(self):
        self.domain_analysis = self._analyze_domain()
        self.action_analysis = self._analyze_actions()
        self.planning_analysis = self._analyze_planning()
        self.cost_analysis = self._analyze_cost_model()
    
    def _analyze_domain(self):
        """Analyze the PDDL domain in your implementation"""
        return {
            'domain_name': 'query_optimization',
            'complexity': 'Production-grade',
            'types_count': 5,
            'predicates_count': 8,
            'functions_count': 3,
            'features': [
                'Strong typing with type hierarchy',
                'Numeric fluents for cost optimization',
                'Database-specific predicates',
                'Realistic cost functions'
            ]
        }
    
    def _analyze_actions(self):
        """Analyze the PDDL actions in your implementation"""
        actions_info = {
            'add_index': {
                'purpose': 'Create B-tree index on filtered columns',
                'preconditions': [
                    'table validation',
                    'column validation', 
                    'no existing index',
                    'column is filtered in query'
                ],
                'effects': [
                    'index exists',
                    'cost reduction: table_size √ó 0.3',
                    'optimization flag set'
                ],
                'cost_reduction': 50.0,
                'realism_score': 9  # Very realistic
            },
            
            'reorder_joins': {
                'purpose': 'Optimize join order based on table sizes',
                'preconditions': [
                    'tables are joined',
                    'join order positions defined',
                    'smaller table first strategy'
                ],
                'effects': [
                    'swap join positions',
                    'cost reduction: table_size √ó 0.2'
                ],
                'cost_reduction': 30.0,
                'realism_score': 8  # Realistic join optimization
            },
            
            'choose_scan_method': {
                'purpose': 'Select optimal scan method (seq vs index)',
                'preconditions': [
                    'table is filtered',
                    'index exists on filter column'
                ],
                'effects': [
                    'set scan method to index scan',
                    'cost reduction based on selectivity'
                ],
                'cost_reduction': 40.0,
                'realism_score': 10  # Extremely realistic
            },
            
            'filter_pushdown': {
                'purpose': 'Apply predicate pushdown optimization',
                'preconditions': [
                    'tables are joined',
                    'filter exists on table'
                ],
                'effects': [
                    'early filter flag set',
                    'cost reduction: table_size √ó 0.4'
                ],
                'cost_reduction': 25.0,
                'realism_score': 9  # Very realistic optimization
            }
        }
        
        return actions_info
    
    def _analyze_planning(self):
        """Analyze the planning algorithm in your implementation"""
        return {
            'algorithm_type': 'Forward search with greedy selection',
            'search_strategy': 'Opportunity-based planning',
            'steps': [
                '1. Extract tables from query structure',
                '2. Identify index creation opportunities',
                '3. Add join reordering if multiple tables',
                '4. Add filter pushdown for filtered tables',
                '5. Generate structured optimization plan'
            ],
            'complexity': 'O(n) where n = number of tables',
            'optimality': 'Heuristic-based, not guaranteed optimal',
            'strengths': [
                'Fast execution',
                'Practical optimizations', 
                'Domain-specific heuristics'
            ],
            'areas_for_improvement': [
                'Could use formal search algorithms',
                'Could optimize across multiple objectives',
                'Could consider resource constraints'
            ]
        }
    
    def _analyze_cost_model(self):
        """Analyze the cost estimation model"""
        return {
            'base_cost': 100.0,
            'cost_components': {
                'table_scan': 'tables √ó 500.0',
                'join_cost': 'joins¬≤ √ó 1000.0',  # Quadratic!
                'condition_cost': 'conditions √ó 200.0'
            },
            'optimization_benefits': {
                'index_creation': 'table_size √ó 0.3 reduction',
                'join_reordering': 'table_size √ó 0.2 reduction',
                'scan_method': 'selectivity-based reduction',
                'filter_pushdown': 'table_size √ó 0.4 reduction'
            },
            'realism': {
                'score': 7,  # Good but could be more sophisticated
                'strengths': [
                    'Considers table sizes',
                    'Quadratic join cost is realistic',
                    'Index benefits scale with table size'
                ],
                'improvements': [
                    'Could model disk I/O vs CPU costs',
                    'Could consider memory constraints',
                    'Could use actual database cost functions'
                ]
            }
        }

# Analyze your implementation
print("Analysis of Your SymbolicAIAgent Implementation")
print("=" * 55)

analyzer = SymbolicAIAgentAnalysis()

# Domain analysis
print("DOMAIN ANALYSIS:")
domain = analyzer.domain_analysis
print(f"  Domain: {domain['domain_name']}")
print(f"  Complexity: {domain['complexity']}")
print(f"  Types: {domain['types_count']}, Predicates: {domain['predicates_count']}, Functions: {domain['functions_count']}")
print("  Features:")
for feature in domain['features']:
    print(f"    ‚úÖ {feature}")

print()

# Actions analysis
print("ACTIONS ANALYSIS:")
actions = analyzer.action_analysis
for action_name, info in actions.items():
    print(f"  {action_name}:")
    print(f"    Purpose: {info['purpose']}")
    print(f"    Cost Reduction: {info['cost_reduction']}")
    print(f"    Realism Score: {info['realism_score']}/10")
    print()

# Planning analysis
print("PLANNING ANALYSIS:")
planning = analyzer.planning_analysis
print(f"  Algorithm: {planning['algorithm_type']}")
print(f"  Complexity: {planning['complexity']}")
print("  Search Steps:")
for step in planning['steps']:
    print(f"    {step}")
print()

# Cost model analysis
print("COST MODEL ANALYSIS:")
cost_model = analyzer.cost_analysis
print(f"  Base Cost: {cost_model['base_cost']}")
print("  Cost Components:")
for component, formula in cost_model['cost_components'].items():
    print(f"    {component}: {formula}")
print(f"  Realism Score: {cost_model['realism']['score']}/10")

print()
print("üéØ KEY INSIGHTS:")
print("  ‚úÖ Your implementation follows PDDL principles correctly")
print("  ‚úÖ Actions model real database optimizations")
print("  ‚úÖ Cost functions are mathematically sound")
print("  ‚úÖ Integration with Knowledge Graph adds semantic understanding")
print("  ‚úÖ Production-ready with error handling and logging")

print()
print("üöÄ ADVANCED OPPORTUNITIES:")
print("  üìà Could implement A* search for optimal planning")
print("  üîß Could add more sophisticated cost models")
print("  ‚è±Ô∏è Could add temporal planning for durative actions")
print("  üé≤ Could handle uncertainty with probabilistic planning")

Analysis of Your SymbolicAIAgent Implementation
DOMAIN ANALYSIS:
  Domain: query_optimization
  Complexity: Production-grade
  Types: 5, Predicates: 8, Functions: 3
  Features:
    ‚úÖ Strong typing with type hierarchy
    ‚úÖ Numeric fluents for cost optimization
    ‚úÖ Database-specific predicates
    ‚úÖ Realistic cost functions

ACTIONS ANALYSIS:
  add_index:
    Purpose: Create B-tree index on filtered columns
    Cost Reduction: 50.0
    Realism Score: 9/10

  reorder_joins:
    Purpose: Optimize join order based on table sizes
    Cost Reduction: 30.0
    Realism Score: 8/10

  choose_scan_method:
    Purpose: Select optimal scan method (seq vs index)
    Cost Reduction: 40.0
    Realism Score: 10/10

  filter_pushdown:
    Purpose: Apply predicate pushdown optimization
    Cost Reduction: 25.0
    Realism Score: 9/10

PLANNING ANALYSIS:
  Algorithm: Forward search with greedy selection
  Complexity: O(n) where n = number of tables
  Search Steps:
    1. Extract tables from que

# Summary

### **Theoretical Foundations:**
1. **Symbolic AI Fundamentals**
   - Knowledge representation with symbols and logic
   - Rule-based reasoning and inference
   - Difference from neural networks and ML approaches

2. **PDDL Language Structure**
   - Requirements, types, predicates, functions, actions
   - Domain vs problem file distinction  
   - Formal logical syntax and semantics

3. **Planning Algorithms**
   - Forward/backward search strategies
   - State space representation and exploration
   - Heuristic functions and optimality guarantees

4. **Production Implementation**
   - Professional PDDL domain engineering
   - Realistic cost modeling and optimization
   - Integration with knowledge systems

---

### ** SymbolicAIAgent Mastery:**

You now fully understand how your **450+ line SymbolicAIAgent** works:

#### **Domain**
- **Type hierarchy**: database objects, execution methods, indexes
- **Predicates**: table structure, query patterns, optimization state
- **Functions**: cost metrics, table statistics, selectivity measures

#### ** Action** 
- **`add_index`**: B-tree index creation with realistic preconditions
- **`reorder_joins`**: Cardinality-based join optimization
- **`choose_scan_method`**: Sequential vs index scan selection
- **`filter_pushdown`**: Predicate pushdown optimization

#### **Planning Process**
- Query structure analysis from Knowledge Graph
- Opportunity identification and action selection
- Cost estimation with mathematical models
- Plan generation and execution

#### **Cost Modeling**
- Quadratic join costs (realistic!)
- Table-size-based optimizations
- Selectivity-driven index benefits
- Multi-objective optimization goals
