In [1]:
import sys

sys.path.append("../..")
from typing import List, Union

from pydantic import BaseModel, Field
from rich import print

from brain.sdk import BrainClient

In [2]:
brain_client = BrainClient("http://127.0.0.1:8000")

In [3]:
class OperationalResult(BaseModel):
    answer: str = Field(..., description="The answer to the input query")

class RuleModification(BaseModel):
    rule_number: int = Field(..., description="The rule number to modify")
    new_content: str = Field(..., description="The new content for the rule")

class FeedbackResult(BaseModel):
    feedback: str = Field(..., description="Feedback on the previous answer")
    keep_rules: List[int] = Field(..., description="Rule numbers to keep")
    modify_rules: List[RuleModification] = Field(default_factory=list, description="Rules to modify")
    new_rules: List[str] = Field(default_factory=list, description="New rules to add")



In [4]:
from datetime import datetime
from pathlib import Path
from typing import Dict
import json

class Rule:
    def __init__(self, content: str, rule_id: int):
        self.content = content
        self.rule_id = rule_id
        self.created_at = datetime.now()
        self.last_updated = datetime.now()

class Memory:
    def __init__(self, file_path: str = "memory.json"):
        self.file_path = Path(file_path)
        self.rules: Dict[int, Rule] = {}
        self.next_rule_id = 1
        self._load()

    def _load(self):
        if not self.file_path.exists():
            self._save()
            return

        data = json.loads(self.file_path.read_text())
        self.next_rule_id = data['next_rule_id']
        self.rules = {
            int(rule_id): Rule(
                content=rule['content'],
                rule_id=int(rule_id)
            )
            for rule_id, rule in data['rules'].items()
        }

    def _save(self):
        data = {
            'next_rule_id': self.next_rule_id,
            'rules': {
                str(rule_id): {
                    'content': rule.content,
                }
                for rule_id, rule in self.rules.items()
            }
        }
        self.file_path.write_text(json.dumps(data, indent=2))

    def get_rules_context(self) -> str:
        # if not self.rules:
        #     return "No existing rules."
        
        return "Current rules:\n" + "\n".join(
            f"{rule.rule_id}. {rule.content}"
            for rule in sorted(self.rules.values(), key=lambda x: x.rule_id)
        )

    def update_from_feedback(self, feedback: FeedbackResult):
        # Convert current rules to set for easier processing
        current_rule_ids = set(self.rules.keys())
        keep_rules = set(feedback.keep_rules)
        
        # Remove rules not in keep_rules
        rules_to_remove = current_rule_ids - keep_rules
        if rules_to_remove:
            for rule_id in rules_to_remove:
                self.rules.pop(rule_id, None)

        # Modify existing rules
        if feedback.modify_rules:
            for modification in feedback.modify_rules:
                if modification.rule_number in self.rules:
                    self.rules[modification.rule_number].content = modification.new_content
                    self.rules[modification.rule_number].last_updated = datetime.now()

        # Add new rules
        if feedback.new_rules:
            for content in feedback.new_rules:
                rule = Rule(content=content, rule_id=self.next_rule_id)
                self.rules[self.next_rule_id] = rule
                self.next_rule_id += 1

        self._save()

In [None]:
from typing import Optional, Tuple
@brain_client.reasoner(schema=OperationalResult)
def operational_reasoner(input_query: str, context: str):
    system_prompt = """You are an agent designed to provide answers based on learned rules.
Apply the rules wisely - not every rule needs to be used for every answer.
You are an agent that uses knowledge to generate accurate, generalized answers. 
Focus on principles and patterns, not specific memorization. Use the context provided to:
1. Identify underlying patterns.
2. Apply generalized knowledge to new inputs.
"""

    user_prompt = f"""Query: {input_query}

{context}

Using these rules as guidelines, provide your answer:"""

    return system_prompt, user_prompt

In [None]:
@brain_client.reasoner(schema=FeedbackResult)
def feedback_reasoner(main_goal: str, input_query: str, answer: str, correct_answer: str, context: str):
    system_prompt = f"""You are a learning feedback agent focused on extracting deep insights aligned with a specific goal.

MAIN GOAL: {main_goal}

Your purpose is to help the system learn HOW to achieve this goal by:
1. Analyzing differences between given answers and correct answers
2. Extracting detailed knowledge that helps achieve the main goal
3. Creating comprehensive rules that guide future responses
4. Building a knowledge base focused on the goal's requirements

Core Analysis Principles:
- Extract general patterns, not specific solutions
- Look for underlying principles that achieve the goal
- Focus on knowledge that transfers to new situations
- Identify what knowledge would help achieve better answers
- Consider what deep understanding is missing
- Think about what fundamental concepts would improve performance

Rules/Knowledge Requirements:
- Must directly relate to achieving the main goal
- Should be detailed and comprehensive
- Must be generally applicable
- Should capture underlying principles
- Must focus on transferable knowledge
- Should build on existing understanding"""

    user_prompt = f"""DEEP LEARNING ANALYSIS:

GOAL TO ACHIEVE: {main_goal}

Current Learning Instance:
Input: {input_query}
Current Output: {answer}
Expected Output: {correct_answer}

Existing Knowledge Base:
{context}

Perform Deep Analysis:
1. What fundamental understanding would help achieve the goal here?
2. What deeper patterns reveal themselves when comparing outputs?
3. What core principles would improve goal achievement?
4. What essential knowledge is missing from our current rules?
5. What broader understanding would help with similar goals?

Provide:
1. Rich Analysis: Detailed feedback about what we can learn regarding our goal
2. Retained Knowledge: List numbers of rules that contain valid, goal-relevant principles
3. Knowledge Updates: List [[rule_number, "enhanced knowledge or principle"]]
4. New Understanding: Add detailed new rules that capture essential goal-relevant knowledge

Focus Areas:
- Deep understanding over surface patterns
- Core principles over specific solutions
- Transferable knowledge over contextual details
- Fundamental concepts over specific applications
- Goal achievement mechanisms over input-output pairs
Your goal is to help the system learn to achieve its main goal by analyzing successes and failures.
- Identify positive patterns that led to correct answers.
- Identify negative patterns or gaps that caused failures.
- Provide insights to refine knowledge for future queries.
- Focus on creating rules or principles that generalize well.


- If something is working give positive feedback so that the system can learn from it
- If something is not working give negative feedback so that the system can learn from it

"""

    return system_prompt, user_prompt

In [7]:
memory = Memory()
operational_reasoner_id = operational_reasoner.register()
feedback_reasoner_id = feedback_reasoner.register()

In [13]:
def process_query(input_query: str, correct_answer: Optional[str] = None, 
                 main_goal: str = "Learn to provide accurate and consistent responses"):
    memory = Memory()
    context = memory.get_rules_context()
    # Get answer using current rules
    result = brain_client.use(operational_reasoner_id)(
        input_query=input_query,
        context=context
    )

    if correct_answer is not None:
        # Get feedback and evolve rules
        feedback = brain_client.use(feedback_reasoner_id)(
            main_goal=main_goal,
            input_query=input_query,
            answer=result.answer,
            correct_answer=correct_answer,
            context=context
        )
        # Update memory based on feedback
        memory.update_from_feedback(feedback)
        
        return {"answer": result.answer, "feedback": feedback}
    
    return {"answer": result.answer}

In [9]:
# query1 = "What is the capital of France?"
# result1 = process_query(
#     input_query=query1,
#     correct_answer="Paris.",
#     main_goal="Learn how to format the answer exactly in given style"
# )
# print(result1)

In [14]:
training_examples = [
    {
        "query": "Transform 'hello'",
        "correct_answer": "Ellohay"
    },
    {
        "query": "Transform 'world'",
        "correct_answer": "Orldway"
    },
    {
        "query": "Transform 'python'",
        "correct_answer": "Ythonpay"
    },
    {
        "query": "Transform 'code'",
        "correct_answer": "Odecay"
    },
    {
        "query": "Transform 'learn'",
        "correct_answer": "Earnlay"
    }
]

def train_pattern_learner():
    memory = Memory()  # Reset memory for fresh learning
    
    for _ in range(5):
        for i, example in enumerate(training_examples, 1):
            
            result = process_query(
                input_query=example['query'],
                correct_answer=example['correct_answer'],
                main_goal="Learn the hidden transformation pattern that converts the input to output"
            )
            
            print(f"System Answer: {result['answer']} | Correct Answer: {example['correct_answer']}")
        print("-" * 50)

train_pattern_learner()

## Enhanced multi memory CLU

```mermaid
graph TD
    A[Input Query] --> B[Operational Reasoner]
    B --> C[Generated Answer]
    C --> D[Feedback Reasoner]
    E[Correct Answer] --> D
    F[Main Goal] --> D
    
    subgraph Memory System
        G[Pattern Memory]
        H[Success Memory]
        I[Failure Memory]
        J[Confidence Tracker]
    end
    
    G --> B
    H --> B
    I --> B
    J --> B
    
    D --> K[Memory Manager]
    K --> G
    K --> H
    K --> I
    K --> J

    subgraph Feedback Analysis
        L[Pattern Extractor]
        M[Success Analyzer]
        N[Failure Analyzer]
        O[Confidence Updater]
    end
    
    D --> L
    D --> M
    D --> N
    D --> O
```

In [54]:
from typing import Dict, List, Optional
from pydantic import BaseModel, Field
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from enum import Enum
import json

# Core data structures
class ConfidenceLevel(str, Enum):
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"
    UNPROVEN = "unproven"

@dataclass
class Insight:
    principle: str
    reasoning: str
    observations: List[str]
    counter_observations: List[str]
    confidence: ConfidenceLevel
    created_at: datetime
    last_updated: datetime

@dataclass
class LearningContext:
    successes: List[str]
    failures: List[str]
    observations: List[str]

In [55]:
# Pydantic models for reasoner schemas
class GeneralizedInsight(BaseModel):
    principle: str = Field(..., description="The fundamental principle or concept learned from the examples")
    reasoning: str = Field(..., description="Explanation of why this principle works or is important")
    supporting_evidence: List[str] = Field(..., description="Examples or cases that support this principle")
    refinement_needs: List[str] = Field(..., description="Areas where this principle needs improvement or clarification")

class OperationalResult(BaseModel):
    answer: str = Field(..., description="The transformed output based on learned principles")
    applied_insights: List[str] = Field(..., description="List of insights/principles used to generate this answer")
    confidence_assessment: str = Field(..., 
        description="Assessment of confidence in the answer: must be one of 'High confidence: [reason]', 'Medium confidence: [reason]', or 'Low confidence: [reason]'")

class FeedbackResult(BaseModel):
    feedback: str = Field(..., description="Detailed analysis of what worked and what didn't in the transformation")
    new_insights: List[GeneralizedInsight] = Field(..., description="New fundamental principles discovered from this example")
    refined_insights: List[str] = Field(..., description="IDs of existing insights that need updating based on this example")
    successful_applications: List[str] = Field(..., description="Specific aspects of the transformation that worked well")
    improvement_areas: List[str] = Field(..., description="Areas where the transformation could be improved")


In [56]:
class MemoryConfig:
    def __init__(self, 
                 max_history: int = 10, 
                 max_observations_per_insight: int = 5,
                 max_recent_successes: int = 3,
                 max_recent_failures: int = 3):
        self.max_history = max_history
        self.max_observations_per_insight = max_observations_per_insight
        self.max_recent_successes = max_recent_successes
        self.max_recent_failures = max_recent_failures

class Memory:
    def __init__(self, file_path: str = "enhanced_memory.json", config: Optional[MemoryConfig] = None):
        self.file_path = Path(file_path)
        self.config = config or MemoryConfig()
        self.insights: Dict[str, Insight] = {}
        self.learning_context = LearningContext([], [], [])
        self._load()

    def _load(self):
        if not self.file_path.exists():
            self._save()
            return

        data = json.loads(self.file_path.read_text())
        
        # Load insights
        self.insights = {
            insight_id: Insight(
                principle=i["principle"],
                reasoning=i["reasoning"],
                observations=i["observations"][-self.config.max_observations_per_insight:],
                counter_observations=i["counter_observations"][-self.config.max_observations_per_insight:],
                confidence=ConfidenceLevel(i["confidence"]),
                created_at=datetime.fromisoformat(i["created_at"]),
                last_updated=datetime.fromisoformat(i["last_updated"])
            )
            for insight_id, i in data.get('insights', {}).items()
        }
        
        # Load learning context with limits
        context_data = data.get('learning_context', {})
        self.learning_context = LearningContext(
            successes=context_data.get('successes', [])[-self.config.max_history:],
            failures=context_data.get('failures', [])[-self.config.max_history:],
            observations=context_data.get('observations', [])[-self.config.max_history:]
        )

    def get_context(self) -> str:
        context_parts = []
        
        # Add current understanding
        context_parts.append("=== CURRENT UNDERSTANDING ===")
        for insight_id, insight in self.insights.items():
            context_parts.append(f"\nPrinciple [{insight.confidence.value}]: {insight.principle}")
            context_parts.append(f"Why it works: {insight.reasoning}")
            context_parts.append("Supporting observations:")
            for obs in insight.observations[-self.config.max_observations_per_insight:]:
                context_parts.append(f"- {obs}")
            if insight.counter_observations:
                context_parts.append("Areas needing refinement:")
                for obs in insight.counter_observations[-self.config.max_observations_per_insight:]:
                    context_parts.append(f"- {obs}")

        # Add learning context
        context_parts.append("\n=== RECENT LEARNINGS ===")
        if self.learning_context.successes:
            context_parts.append("\nWhat's working:")
            for success in self.learning_context.successes[-self.config.max_recent_successes:]:
                context_parts.append(f"- {success}")
        
        if self.learning_context.failures:
            context_parts.append("\nWhat needs improvement:")
            for failure in self.learning_context.failures[-self.config.max_recent_failures:]:
                context_parts.append(f"- {failure}")

        return "\n".join(context_parts)

    def update_from_feedback(self, feedback: FeedbackResult):
        current_time = datetime.now()

        # Add new insights
        for insight in feedback.new_insights:
            insight_id = f"insight_{len(self.insights) + 1}"
            self.insights[insight_id] = Insight(
                principle=insight.principle,
                reasoning=insight.reasoning,
                observations=insight.supporting_evidence[-self.config.max_observations_per_insight:],
                counter_observations=insight.refinement_needs[-self.config.max_observations_per_insight:],
                confidence=ConfidenceLevel.UNPROVEN,
                created_at=current_time,
                last_updated=current_time
            )

        # Update learning context with limits
        self.learning_context.successes.extend(feedback.successful_applications)
        self.learning_context.failures.extend(feedback.improvement_areas)
        
        # Apply limits
        self.learning_context.successes = self.learning_context.successes[-self.config.max_history:]
        self.learning_context.failures = self.learning_context.failures[-self.config.max_history:]

        # Update existing insights
        for insight_id in feedback.refined_insights:
            if insight_id in self.insights:
                insight = self.insights[insight_id]
                insight.last_updated = current_time
                
                # Update confidence based on success rate
                success_rate = len(insight.observations) / (
                    len(insight.observations) + len(insight.counter_observations)
                )
                if success_rate > 0.8:
                    insight.confidence = ConfidenceLevel.HIGH
                elif success_rate > 0.6:
                    insight.confidence = ConfidenceLevel.MEDIUM
                else:
                    insight.confidence = ConfidenceLevel.LOW

        self._save()
    
    def _save(self):
        """Save the current state to the JSON file."""
        data = {
            'insights': {
                insight_id: {
                    'principle': insight.principle,
                    'reasoning': insight.reasoning,
                    'observations': insight.observations,
                    'counter_observations': insight.counter_observations,
                    'confidence': insight.confidence.value,
                    'created_at': insight.created_at.isoformat(),
                    'last_updated': insight.last_updated.isoformat()
                }
                for insight_id, insight in self.insights.items()
            },
            'learning_context': {
                'successes': self.learning_context.successes,
                'failures': self.learning_context.failures,
                'observations': self.learning_context.observations
            }
        }
        self.file_path.write_text(json.dumps(data, indent=2))


In [57]:
@brain_client.reasoner(schema=OperationalResult)
def operational_reasoner(input_query: str, context: str):
    system_prompt = """You are a learning system that applies generalized principles to solve problems.
Focus on using fundamental insights rather than memorized patterns.

When generating an answer:
1. Review the principles we've learned about transformations
2. Consider what general insights apply to this case
3. Apply the most relevant fundamental concepts
4. Explain your reasoning based on core principles

Remember:
- Use general principles rather than specific examples
- Apply fundamental concepts we've learned
- Consider how insights about structure and patterns apply
- Think about why certain approaches should work here"""

    user_prompt = f"""Query: {input_query}

Our Current Understanding:
{context}

Please analyze the query using our learned principles and insights.
Generate an answer based on fundamental concepts rather than specific patterns.
Explain which insights you applied and why."""

    return system_prompt, user_prompt

@brain_client.reasoner(schema=FeedbackResult)
def feedback_reasoner(main_goal: str, input_query: str, answer: str, correct_answer: str, context: str):
    system_prompt = f"""You are a learning system focused on discovering fundamental principles.

MAIN GOAL: {main_goal}

Your key responsibilities:
1. Extract general principles that explain why transformations work
2. Identify fundamental concepts that apply across cases
3. Build deep understanding of transformation patterns
4. Learn generalizable insights from specific examples

Analysis Approach:
- Focus on WHY transformations work or fail
- Look for underlying principles that apply broadly
- Consider what fundamental concepts are at play
- Think about structural patterns in transformations

Key Questions:
- What general principle explains this transformation?
- Why does this approach work (or not work)?
- What fundamental concept can we learn from this?
- How would this insight apply to new cases?"""

    user_prompt = f"""LEARNING ANALYSIS:

Input: {input_query}
Generated Answer: {answer}
Correct Answer: {correct_answer}

Current Understanding:
{context}

Please analyze and provide:
1. What fundamental principles explain this transformation?
2. Why do these principles work or fail?
3. What general insights can we extract?
4. How can these insights help with future transformations?

Focus on building deeper understanding rather than collecting examples."""

    return system_prompt, user_prompt
# register reasoners
operational_reasoner_id = operational_reasoner.register()
feedback_reasoner_id = feedback_reasoner.register()

In [60]:
def process_query(
    input_query: str,
    memory: Memory,
    correct_answer: Optional[str] = None,
    main_goal: str = "Learn transformation patterns that consistently produce correct outputs"
):
    context = memory.get_context()
    
    # Get answer using current insights
    result = brain_client.use(operational_reasoner_id)(
        input_query=input_query,
        context=context
    )

    if correct_answer is not None:
        # Get feedback and evolve insights
        feedback = brain_client.use(feedback_reasoner_id)(
            main_goal=main_goal,
            input_query=input_query,
            answer=result.answer,
            correct_answer=correct_answer,
            context=context
        )
        
        # Update memory based on feedback
        memory.update_from_feedback(feedback)
        
        return {
            "answer": result.answer,
            "applied_insights": result.applied_insights,  # Changed from applied_patterns
            "confidence": result.confidence_assessment,
            "feedback": feedback
        }
    
    return {
        "answer": result.answer,
        "applied_insights": result.applied_insights,  # Changed from applied_patterns
        "confidence": result.confidence_assessment
    }

In [62]:
training_examples = [
    {
        "query": "Transform 'hello'",
        "correct_answer": "Ellohay"
    },
    {
        "query": "Transform 'world'",
        "correct_answer": "Orldway"
    },
    {
        "query": "Transform 'python'",
        "correct_answer": "Ythonpay"
    },
    {
        "query": "Transform 'code'",
        "correct_answer": "Odecay"
    },
    {
        "query": "Transform 'learn'",
        "correct_answer": "Earnlay"
    }
]

def train_pattern_learner():
    config = MemoryConfig(
        max_history=10,
        max_observations_per_insight=5,
        max_recent_successes=3,
        max_recent_failures=3
    )
    memory = Memory(file_path="Trial_1.json", config=config)
    
    for _ in range(5):
        for i, example in enumerate(training_examples, 1):
            
            result = process_query(
                memory=memory,
                input_query=example['query'],
                correct_answer=example['correct_answer'],
                main_goal="Learn the hidden transformation pattern that converts the input to output"
            )
            
            print(f"System Answer: {result['answer']} | Correct Answer: {example['correct_answer']}")
        print("-" * 50)

train_pattern_learner()