In [2]:
# Setup and imports
import json
import os
from datetime import datetime
from typing import Dict, List, Optional

# For schema validation
from pydantic import BaseModel, Field, ValidationError

# Mock LLM interface (replace with actual OpenAI/LangChain in production)
import re

# Load our extraction schema
from schema import BusinessRule, ExtractedKnowledge, validate_block_names, validate_attributes

print("Knowledge extraction pipeline loaded")
print("Schema validation enabled")
print("Ready for business rule extraction")


Knowledge extraction pipeline loaded
Schema validation enabled
Ready for business rule extraction


* 'schema_extra' has been renamed to 'json_schema_extra'


In [3]:
# Mock LLM Extraction Function
# In production, this would use OpenAI API with the prompt template

def mock_llm_extract(rule_text: str) -> Dict:
    """
    Mock LLM extraction that simulates GPT-4 response.
    In production: replace with actual LangChain + OpenAI call.
    """
    
    # Pre-defined responses for our test cases
    mock_responses = {
        "If energy costs rise above €200/MWh, we will postpone production by one week.": {
            "condition_block": "Energy",
            "condition_attribute": "price_future",
            "comparator": ">",
            "threshold": 200.0,
            "threshold_unit": "€/MWh",
            "action_block": "Production",
            "action_attribute": "schedule_delay",
            "action_value": "7 days",
            "action_type": "delay",
            "confidence": 0.9,
            "temporal_delay": "1 week",
            "probability": None,
            "original_text": rule_text
        },
        
        "Demand in France drops by 60% at a price above €250 per unit.": {
            "condition_block": "Production",
            "condition_attribute": "unit_cost", 
            "comparator": ">",
            "threshold": 250.0,
            "threshold_unit": "€/unit",
            "action_block": "Demand",
            "action_attribute": "demand_shift",
            "action_value": "-0.6",
            "action_type": "decrease", 
            "confidence": 0.85,
            "temporal_delay": None,
            "probability": None,
            "original_text": rule_text
        },
        
        "If we can't deliver for two weeks, there's an 80% chance we'll lose the customer.": {
            "condition_block": "Inventory",
            "condition_attribute": "backlog",
            "comparator": ">=",
            "threshold": 14.0,
            "threshold_unit": "days",
            "action_block": "Demand", 
            "action_attribute": "customer_retention",
            "action_value": "0.2",
            "action_type": "probability",
            "confidence": 0.8,
            "temporal_delay": "2 weeks",
            "probability": 0.8,
            "original_text": rule_text
        }
    }
    
    return mock_responses.get(rule_text, {
        "condition_block": "Unknown",
        "condition_attribute": "unknown",
        "comparator": "==",
        "threshold": 0.0,
        "threshold_unit": "unit",
        "action_block": "Unknown",
        "action_attribute": "unknown_action",
        "action_value": "unknown",
        "action_type": "set",
        "confidence": 0.3,
        "temporal_delay": None,
        "probability": None,
        "original_text": rule_text
    })

print("Mock LLM extraction function ready")
print("In production: replace with OpenAI API + LangChain")


Mock LLM extraction function ready
In production: replace with OpenAI API + LangChain


In [4]:
# Knowledge Extraction Pipeline

def extract_business_rule(rule_text: str) -> BusinessRule:
    """
    Main extraction pipeline: LLM + Validation + Schema mapping
    """
    
    print(f"\nExtracting rule: '{rule_text}'")
    print("-" * 50)
    
    # Step 1: LLM Extraction
    raw_json = mock_llm_extract(rule_text)
    print("LLM Raw Extraction:")
    print(json.dumps(raw_json, indent=2))
    
    # Step 2: Schema Validation
    try:
        rule = BusinessRule(**raw_json)
        print(f"\nValidation: SUCCESS")
        print(f"Confidence: {rule.confidence}")
        
        # Step 3: Business Logic Validation
        block_valid = validate_block_names(rule)
        attr_valid = validate_attributes(rule)
        
        print(f"Block validation: {'PASS' if block_valid else 'NEEDS REVIEW'}")
        print(f"Attribute validation: {'PASS' if attr_valid else 'NEEDS REVIEW'}")
        
        return rule
        
    except ValidationError as e:
        print(f"\nValidation: FAILED")
        print(f"Errors: {e}")
        return None

# Test the pipeline with our three examples

test_rules = [
    "If energy costs rise above €200/MWh, we will postpone production by one week.",
    "Demand in France drops by 60% at a price above €250 per unit.",
    "If we can't deliver for two weeks, there's an 80% chance we'll lose the customer."
]

extracted_rules = []

for rule_text in test_rules:
    rule = extract_business_rule(rule_text)
    if rule:
        extracted_rules.append(rule)
    print("\n" + "="*60)



Extracting rule: 'If energy costs rise above €200/MWh, we will postpone production by one week.'
--------------------------------------------------
LLM Raw Extraction:
{
  "condition_block": "Energy",
  "condition_attribute": "price_future",
  "comparator": ">",
  "threshold": 200.0,
  "threshold_unit": "\u20ac/MWh",
  "action_block": "Production",
  "action_attribute": "schedule_delay",
  "action_value": "7 days",
  "action_type": "delay",
  "confidence": 0.9,
  "temporal_delay": "1 week",
  "probability": null,
  "original_text": "If energy costs rise above \u20ac200/MWh, we will postpone production by one week."
}

Validation: SUCCESS
Confidence: 0.9
Block validation: PASS
Attribute validation: PASS


Extracting rule: 'Demand in France drops by 60% at a price above €250 per unit.'
--------------------------------------------------
LLM Raw Extraction:
{
  "condition_block": "Production",
  "condition_attribute": "unit_cost",
  "comparator": ">",
  "threshold": 250.0,
  "threshold_un

In [5]:
# Graph Mapping: Language → Block-Attribute Relationships

def display_graph_mapping(rule: BusinessRule):
    """
    Show how extracted rule maps to graph representation
    """
    print(f"\nGraph Mapping for: {rule.original_text[:50]}...")
    print("-" * 60)
    
    # Condition node
    condition = f"{rule.condition_block}.{rule.condition_attribute}"
    threshold_desc = f"{rule.comparator} {rule.threshold} {rule.threshold_unit or ''}"
    
    # Action node  
    action = f"{rule.action_block}.{rule.action_attribute}"
    action_desc = f"{rule.action_type}: {rule.action_value}"
    
    # Visual representation
    print("CONDITION:")
    print(f"  Node: {condition}")
    print(f"  Rule: {threshold_desc}")
    
    print("\nACTION:")
    print(f"  Node: {action}")
    print(f"  Effect: {action_desc}")
    if rule.temporal_delay:
        print(f"  Delay: {rule.temporal_delay}")
    if rule.probability:
        print(f"  Probability: {rule.probability}")
    
    print("\nGRAPH EDGE:")
    print(f"  {condition} --[{threshold_desc}]--> {action}")
    
    return {
        "condition_node": condition,
        "action_node": action,
        "rule_edge": f"{condition} → {action}",
        "metadata": {
            "threshold": threshold_desc,
            "action": action_desc,
            "confidence": rule.confidence
        }
    }

# Create knowledge container and map to graph format
if extracted_rules:
    knowledge = ExtractedKnowledge(
        rules=extracted_rules,
        extraction_method="hybrid (llm + validation)",
        validation_status="pending"
    )
    
    print("EXTRACTED KNOWLEDGE SUMMARY")
    print("="*60)
    print(f"Total rules extracted: {len(knowledge.rules)}")
    print(f"Extraction method: {knowledge.extraction_method}")
    print(f"Validation status: {knowledge.validation_status}")
    
    # Show graph mappings
    graph_relationships = []
    for rule in knowledge.rules:
        mapping = display_graph_mapping(rule)
        graph_relationships.append(mapping)
    
    print(f"\nGRAPH RELATIONSHIPS CREATED:")
    print("-" * 40)
    for i, rel in enumerate(graph_relationships, 1):
        print(f"{i}. {rel['rule_edge']}")
        print(f"   Confidence: {rel['metadata']['confidence']}")
else:
    print("No rules successfully extracted")


EXTRACTED KNOWLEDGE SUMMARY
Total rules extracted: 3
Extraction method: hybrid (llm + validation)
Validation status: pending

Graph Mapping for: If energy costs rise above €200/MWh, we will postp...
------------------------------------------------------------
CONDITION:
  Node: Energy.price_future
  Rule: > 200.0 €/MWh

ACTION:
  Node: Production.schedule_delay
  Effect: delay: 7 days
  Delay: 1 week

GRAPH EDGE:
  Energy.price_future --[> 200.0 €/MWh]--> Production.schedule_delay

Graph Mapping for: Demand in France drops by 60% at a price above €25...
------------------------------------------------------------
CONDITION:
  Node: Production.unit_cost
  Rule: > 250.0 €/unit

ACTION:
  Node: Demand.demand_shift
  Effect: decrease: -0.6

GRAPH EDGE:
  Production.unit_cost --[> 250.0 €/unit]--> Demand.demand_shift

Graph Mapping for: If we can't deliver for two weeks, there's an 80% ...
------------------------------------------------------------
CONDITION:
  Node: Inventory.backlog
  Ru

In [6]:
# Integration with Task 1 Simulation & Export

# Save extracted rules for integration
if extracted_rules:
    # Convert to graph database format
    graph_format = knowledge.to_graph_format()
    
    # Save as JSON for integration with simulation
    output_data = {
        "extraction_timestamp": datetime.now().isoformat(),
        "method": "hybrid_llm_pipeline", 
        "total_rules": len(extracted_rules),
        "rules": [rule.dict() for rule in extracted_rules],
        "graph_format": graph_format
    }
    
    with open('sample_rules.json', 'w') as f:
        json.dump(output_data, f, indent=2, default=str)
    
    print("EXPORT COMPLETE")
    print("="*50)
    print(f"Saved {len(extracted_rules)} rules to: sample_rules.json")
    print("Ready for integration with Task 1 simulation")
    
    # Show how rules would integrate with simulation
    print(f"\nINTEGRATION WITH TASK 1:")
    print("-" * 30)
    print("These extracted rules can be integrated into the simulation by:")
    print("1. Adding new calculated attributes to base_data.csv")
    print("2. Injecting rule conditions as scenario overrides") 
    print("3. Creating dynamic formula updates based on thresholds")
    
    print(f"\nExample integration:")
    for rule in extracted_rules[:2]:  # Show first 2 rules
        condition = f"{rule.condition_block}.{rule.condition_attribute}"
        action = f"{rule.action_block}.{rule.action_attribute}"
        print(f"- When {condition} {rule.comparator} {rule.threshold} → modify {action}")
    
    print(f"\nKnowledge extraction pipeline complete!")
    print(f"Language successfully converted to structured Block-Attribute relationships.")
    
else:
    print("No rules to export")


EXPORT COMPLETE
Saved 3 rules to: sample_rules.json
Ready for integration with Task 1 simulation

INTEGRATION WITH TASK 1:
------------------------------
These extracted rules can be integrated into the simulation by:
1. Adding new calculated attributes to base_data.csv
2. Injecting rule conditions as scenario overrides
3. Creating dynamic formula updates based on thresholds

Example integration:
- When Energy.price_future > 200.0 → modify Production.schedule_delay
- When Production.unit_cost > 250.0 → modify Demand.demand_shift

Knowledge extraction pipeline complete!
Language successfully converted to structured Block-Attribute relationships.


/var/folders/6g/4r1h2w_x7296776pt6t941w80000gp/T/ipykernel_30286/4249412267.py:13: PydanticDeprecatedSince20: The `dict` method is deprecated; use `model_dump` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.11/migration/
  "rules": [rule.dict() for rule in extracted_rules],
