<a href="https://colab.research.google.com/github/topstolenname/agisa_sac/blob/codex%2Fresolve-identity-conflicts-and-errors/src_agisa_sac___init___py.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [11]:
# src/agisa_sac/__init__.py
"""
AGI-SAC Simulation Framework
----------------------------

A multi-agent simulation framework for exploring emergent cognition,
distributed identity, and Stand Alone Complex phenomena.
"""

__version__ = "1.0.0-alpha"
FRAMEWORK_VERSION = f"AGI-SAC v{__version__}"

# Expose key classes for easier import from the top-level package
try:
    from .orchestrator import SimulationOrchestrator
    from .agent import EnhancedAgent
    from .components.memory import MemoryContinuumLayer, MemoryEncapsulation
    from .components.cognitive import CognitiveDiversityEngine
    from .components.social import DynamicSocialGraph
    from .components.resonance import TemporalResonanceTracker, ResonanceLiturgy
    from .components.voice import VoiceEngine
    from .components.reflexivity import ReflexivityLayer
    from .analysis.analyzer import AgentStateAnalyzer
    from .analysis.exporter import ChronicleExporter
    from .analysis.tda import PersistentHomologyTracker
    from .analysis.visualization import plot_persistence_diagram, plot_persistence_barcode, plot_metric_comparison
    from .analysis.clustering import cluster_archetypes
    from .utils.message_bus import MessageBus
except ImportError as e:
    import warnings
    warnings.warn(f"Could not import all AGI-SAC components during package initialization: {e}", ImportWarning)

# Define __all__ for explicit public API if desired
__all__ = [
    "FRAMEWORK_VERSION",
    "SimulationOrchestrator",
    "EnhancedAgent",
    "MemoryContinuumLayer",
    "MemoryEncapsulation",
    "CognitiveDiversityEngine",
    "DynamicSocialGraph",
    "TemporalResonanceTracker",
    "ResonanceLiturgy",
    "VoiceEngine",
    "ReflexivityLayer",
    "AgentStateAnalyzer",
    "ChronicleExporter",
    "PersistentHomologyTracker",
    "MessageBus",
    "plot_persistence_diagram",
    "plot_persistence_barcode",
    "plot_metric_comparison",
    "cluster_archetypes",
]

# Optional: Basic logging setup for the library
import logging
logging.getLogger(__name__).addHandler(logging.NullHandler())

print(f"AGI-SAC Framework ({FRAMEWORK_VERSION}) initialized.")

AGI-SAC Framework (AGI-SAC v1.0.0-alpha) initialized.


In [12]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# New Section

In [13]:
# src/agisa_sac/utils/__init__.py
"""Utility classes and functions for the AGI-SAC framework."""

# Use absolute import since this is not run as part of a package installation
from agisa_sac.utils.message_bus import MessageBus

__all__ = ["MessageBus"]

ModuleNotFoundError: No module named 'agisa_sac'

In [14]:
# src/agisa_sac/utils/message_bus.py
import asyncio
import time
import warnings
from collections import defaultdict
from typing import Dict, List, Optional, Callable, Any

class MessageBus:
    """Simple asynchronous message passing system using a pub/sub pattern."""
    def __init__(self):
        self.subscribers: Dict[str, List[Callable]] = defaultdict(list)
        self.message_history: List[Dict[str, Any]] = []
        self._loop = None # Store loop for task creation if needed

    def _get_loop(self):
        """ Get the current asyncio event loop. """
        if self._loop is None:
            try:
                self._loop = asyncio.get_running_loop()
            except RuntimeError:
                # If no loop is running, create/get one (use case might be outside async context)
                # This might have implications depending on how it's used.
                # Consider warning or requiring explicit loop management.
                warnings.warn("No running asyncio loop found. Getting/creating one.", RuntimeWarning)
                self._loop = asyncio.get_event_loop_policy().get_event_loop()
        return self._loop

    def subscribe(self, topic: str, callback: Callable):
        """Register a callback function for a specific topic."""
        if not callable(callback):
            raise TypeError("Callback must be a callable function.")
        self.subscribers[topic].append(callback)
        # print(f"Subscribed {callback.__name__} to topic '{topic}'") # Debug

    def publish(self, topic: str, message: Dict):
        """Publish a message to all subscribers registered for the topic."""
        if not isinstance(message, dict):
            warnings.warn(f"Publishing non-dict message to '{topic}'. Converting to dict.", RuntimeWarning)
            message = {"data": message}

        message['timestamp'] = time.time()
        message['topic'] = topic
        # Limit history size?
        self.message_history.append(message)
        if len(self.message_history) > 10000: # Example limit
             self.message_history.pop(0)

        # print(f"Publishing to topic '{topic}': {message}") # Debug
        loop = self._get_loop()
        for callback in self.subscribers[topic]:
            try:
                if asyncio.iscoroutinefunction(callback):
                    # Create task to run async callback
                    if loop.is_running():
                         loop.create_task(self._execute_callback(callback, message.copy()))
                    else:
                         # If loop isn't running, might need different handling or warning
                         warnings.warn(f"Cannot schedule async callback {callback.__name__} for '{topic}' - loop not running.", RuntimeWarning)
                else:
                    # Execute synchronous callback directly
                    # Consider asyncio.to_thread if callback might block
                    callback(message.copy())
            except Exception as e:
                warnings.warn(f"Error executing callback {callback.__name__} for topic '{topic}': {e}", RuntimeWarning)


    async def _execute_callback(self, callback: Callable, message: Dict):
        """Safely execute an asynchronous callback."""
        try:
            await callback(message)
        except Exception as e:
            warnings.warn(f"Exception in async callback {callback.__name__}: {e}", RuntimeWarning)

    def get_recent_messages(self, topic: Optional[str] = None, limit: int = 10) -> List[Dict]:
        """Retrieve recent messages, optionally filtered by topic."""
        if topic:
            # Iterate backwards for efficiency if history is large
            filtered_messages = [m for m in reversed(self.message_history) if m['topic'] == topic]
            return filtered_messages[:limit][::-1] # Get limit and reverse back
        else:
            return self.message_history[-limit:]

    def clear_history(self):
        """Clears the message history."""
        self.message_history = []

    def clear_subscribers(self, topic: Optional[str] = None):
        """Clears subscribers, optionally for a specific topic."""
        if topic:
            if topic in self.subscribers:
                del self.subscribers[topic]
        else:
            self.subscribers.clear()



In [15]:
# src/agisa_sac/components/__init__.py
"""Core components defining agent internals and social structures."""

from .memory import MemoryEncapsulation, MemoryContinuumLayer
from .cognitive import CognitiveDiversityEngine
from .social import DynamicSocialGraph
from .resonance import TemporalResonanceTracker, ResonanceLiturgy
from .voice import VoiceEngine
from .reflexivity import ReflexivityLayer

__all__ = [
    "MemoryEncapsulation",
    "MemoryContinuumLayer",
    "CognitiveDiversityEngine",
    "DynamicSocialGraph",
    "TemporalResonanceTracker",
    "ResonanceLiturgy",
    "VoiceEngine",
    "ReflexivityLayer",
]


ImportError: attempted relative import with no known parent package

In [16]:
# src/agisa_sac/components/memory.py
import numpy as np
import time
import json
import hashlib
import math
import warnings
import random
from typing import Dict, List, Optional, Any
from collections import defaultdict

# Dependency check for SentenceTransformer
try:
    from sentence_transformers import SentenceTransformer
    HAS_SENTENCE_TRANSFORMER = True
except ImportError:
    HAS_SENTENCE_TRANSFORMER = False
    # Warning is handled within MemoryContinuumLayer __init__

# Import framework version (assuming it's accessible, e.g., from top-level __init__)
try:
    from .. import FRAMEWORK_VERSION
except ImportError:
    FRAMEWORK_VERSION = "unknown"

# Forward reference for MessageBus if needed for type hints
from typing import TYPE_CHECKING
if TYPE_CHECKING:
    from ..utils.message_bus import MessageBus


class MemoryEncapsulation:
    """ Encapsulates a memory with state and methods. Includes serialization. """
    # --- Content from agisa_framework_serialization_v1 ---
    def __init__(self, memory_id: str, content: Dict, importance: float = 0.5, confidence: float = 1.0,
                 encoding_strength: float = 0.8, created_at: Optional[float] = None,
                 last_accessed: Optional[float] = None, access_count: int = 0,
                 embedding: Optional[np.ndarray] = None, theme: Optional[str] = None):
        self.memory_id = memory_id
        self.content = content
        self.importance = np.clip(importance, 0.0, 1.0)
        self.confidence = np.clip(confidence, 0.0, 1.0)
        self.encoding_strength = np.clip(encoding_strength, 0.1, 1.0)
        self.created_at = created_at if created_at is not None else time.time()
        self.last_accessed = last_accessed if last_accessed is not None else self.created_at
        self.access_count = access_count
        self.verification_hash = self._generate_hash(content)
        self.embedding = embedding
        self.theme = theme if theme is not None else content.get("theme", "general")

    def access(self) -> Dict:
        self.last_accessed = time.time()
        self.access_count += 1
        return self.content

    def is_corrupted(self) -> bool:
        return self._generate_hash(self.content) != self.verification_hash

    def attempt_modification(self, new_content: Dict, external_influence: float) -> bool:
        protection = (self.importance * 0.4 + self.encoding_strength * 0.6) * (1 - np.clip(external_influence, 0.0, 1.0))
        if random.random() > protection:
            self.content = new_content
            self.verification_hash = self._generate_hash(new_content)
            self.confidence *= 0.8
            self.theme = new_content.get("theme", self.theme)
            return True
        return False

    def reinforce(self, strength_increase: float = 0.1):
        self.encoding_strength = min(1.0, self.encoding_strength + strength_increase)

    def decay(self, decay_rate: float = 0.05) -> float:
        time_since_access = (time.time() - self.last_accessed) / 86400 # Days
        decay_amount = decay_rate * time_since_access * (1 - self.importance * 0.5)
        self.encoding_strength = max(0.1, self.encoding_strength - min(decay_amount, 0.2))
        return decay_amount

    def calculate_retrieval_strength(self) -> float:
        recency = math.exp(-0.1 * (time.time() - self.last_accessed) / 86400)
        return (recency * 0.3 + self.importance * 0.3 + self.encoding_strength * 0.4)

    def set_embedding(self, embedding: np.ndarray):
        self.embedding = embedding

    def _generate_hash(self, content: Dict) -> str:
        try: content_string = json.dumps(content, sort_keys=True).encode()
        except TypeError: content_string = str(content).encode()
        return hashlib.md5(content_string).hexdigest()

    def to_dict(self, include_embedding: bool = False) -> Dict:
        state = { 'memory_id': self.memory_id, 'content': self.content, 'theme': self.theme, 'importance': self.importance,
                  'confidence': self.confidence, 'encoding_strength': self.encoding_strength, 'created_at': self.created_at,
                  'last_accessed': self.last_accessed, 'access_count': self.access_count, 'verification_hash': self.verification_hash }
        if include_embedding and self.embedding is not None:
            state['embedding'] = self.embedding.tolist()
        return state

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'MemoryEncapsulation':
        embedding = np.array(data['embedding']) if 'embedding' in data and data['embedding'] is not None else None
        instance = cls( memory_id=data['memory_id'], content=data['content'], importance=data.get('importance', 0.5),
                        confidence=data.get('confidence', 1.0), encoding_strength=data.get('encoding_strength', 0.8),
                        created_at=data.get('created_at'), last_accessed=data.get('last_accessed'),
                        access_count=data.get('access_count', 0), embedding=embedding, theme=data.get('theme') )
        loaded_hash = data.get('verification_hash')
        if loaded_hash and instance._generate_hash(instance.content) != loaded_hash:
             warnings.warn(f"Memory {instance.memory_id}: Hash mismatch on load.", RuntimeWarning)
             instance.confidence *= 0.5
        return instance


class MemoryContinuumLayer:
    """ Enhanced memory system for an agent. Includes serialization. """
    # --- Content from agisa_framework_serialization_v1 ---
    def __init__(self, agent_id: str, capacity: int = 100,
                 use_semantic: bool = True, message_bus: Optional['MessageBus'] = None): # Use forward ref string
        self.agent_id = agent_id
        self.capacity = capacity
        self.use_semantic = use_semantic and HAS_SENTENCE_TRANSFORMER
        self.message_bus = message_bus
        self.memories: Dict[str, MemoryEncapsulation] = {}
        self.memory_indices = {"term": defaultdict(list)}
        self.last_update = time.time()
        self.encoder = None
        if self.use_semantic: self._initialize_encoder()

    def _initialize_encoder(self):
        if self.use_semantic and self.encoder is None:
            try: self.encoder = SentenceTransformer('all-MiniLM-L6-v2')
            except Exception as e: warnings.warn(f"Agent {self.agent_id}: No ST model: {e}. Semantic off.", RuntimeWarning); self.use_semantic = False

    def add_memory(self, content: Dict, importance: float = 0.5) -> str:
        if self.use_semantic and self.encoder is None: self._initialize_encoder()
        memory_id = f"mem_{self.agent_id}_{int(time.time())}_{random.randint(1000, 9999)}"
        memory = MemoryEncapsulation(memory_id, content, importance)
        if self.use_semantic and self.encoder:
            try: content_str = json.dumps(content, sort_keys=True); embedding = self.encoder.encode([content_str])[0]; memory.set_embedding(embedding)
            except Exception as e: warnings.warn(f"Agent {self.agent_id}: Mem encode fail {memory_id}: {e}", RuntimeWarning)
        self.memories[memory_id] = memory; self._update_indices(memory_id, content)
        if len(self.memories) > self.capacity: self._remove_weakest_memory()
        if self.message_bus: self.message_bus.publish('memory_added', {'agent_id': self.agent_id, 'memory_id': memory_id, 'importance': importance, 'theme': memory.theme})
        return memory_id

    def retrieve_memory(self, query: str, threshold: float = 0.3, limit: int = 10) -> List[Dict]:
        if self.use_semantic and self.encoder is None: self._initialize_encoder()
        matches = {} # Code combines term and semantic search results
        # Term search
        query_terms = set(query.lower().split()); term_relevance_scores = defaultdict(float)
        if query_terms:
            for term in query_terms:
                for memory_id in self.memory_indices["term"].get(term, []): term_relevance_scores[memory_id] += 0.1
            for memory_id, term_relevance in term_relevance_scores.items():
                if memory_id in self.memories:
                    memory = self.memories[memory_id]; score = memory.calculate_retrieval_strength() * (1 + term_relevance)
                    if score >= threshold: match_data = memory.to_dict(); match_data["relevance_score"] = score; match_data["match_type"] = "term"; matches[memory_id] = match_data
        # Semantic search
        if self.use_semantic and self.encoder and query.strip():
            try:
                query_embedding = self.encoder.encode([query])[0]; query_norm = np.linalg.norm(query_embedding)
                if query_norm > 1e-6:
                    mem_ids = [mid for mid, mem in self.memories.items() if mem.embedding is not None];
                    if mem_ids:
                        mem_embeddings = np.array([self.memories[mid].embedding for mid in mem_ids]); mem_norms = np.linalg.norm(mem_embeddings, axis=1); valid_indices = mem_norms > 1e-6
                        if np.any(valid_indices):
                            mem_embeddings_valid = mem_embeddings[valid_indices]; mem_norms_valid = mem_norms[valid_indices]; mem_ids_valid = np.array(mem_ids)[valid_indices]
                            similarities = np.dot(mem_embeddings_valid, query_embedding) / (mem_norms_valid * query_norm)
                            for i, memory_id in enumerate(mem_ids_valid):
                                similarity = similarities[i]
                                if similarity >= threshold:
                                    memory = self.memories[memory_id]; score = memory.calculate_retrieval_strength() * similarity; match_type = "semantic"
                                    if memory_id in matches:
                                        if score > matches[memory_id]["relevance_score"]: matches[memory_id]["relevance_score"] = score; matches[memory_id]["match_type"] = "hybrid"
                                    else: match_data = memory.to_dict(); match_data["relevance_score"] = score; match_data["match_type"] = match_type; matches[memory_id] = match_data
            except Exception as e: warnings.warn(f"Agent {self.agent_id}: Semantic fail query '{query}': {e}", RuntimeWarning)
        # Combine and finalize
        sorted_matches = sorted(matches.values(), key=lambda x: x["relevance_score"], reverse=True)
        for match in sorted_matches[:limit]:
             if match["memory_id"] in self.memories: self.memories[match["memory_id"]].access()
        return sorted_matches[:limit]

    def update_all_memories(self):
        removed_count = 0; corrupted_count = 0; memory_ids_to_remove = []
        for memory_id, memory in list(self.memories.items()):
            memory.decay();
            if memory.is_corrupted(): corrupted_count += 1; memory_ids_to_remove.append(memory_id); continue
            if memory.encoding_strength < 0.15: memory_ids_to_remove.append(memory_id)
        for memory_id in memory_ids_to_remove:
            if self._remove_memory(memory_id): removed_count += 1
        self.last_update = time.time()
        if self.message_bus and (removed_count > 0 or corrupted_count > 0): self.message_bus.publish('memory_maintenance', {'agent_id': self.agent_id, 'removed': removed_count, 'corrupted': corrupted_count, 'remain': len(self.memories)})

    def reinforce_memory(self, memory_id: str, strength: float = 0.1) -> bool:
        if memory_id in self.memories: self.memories[memory_id].reinforce(strength); return True
        return False
    def get_memory_by_id(self, memory_id: str) -> Optional[Dict]:
        if memory_id in self.memories: memory = self.memories[memory_id]; memory.access(); return memory.to_dict()
        return None
    def link_memories(self, source_id: str, target_id: str, link_type: str = "related") -> bool:
        if source_id in self.memories and target_id in self.memories:
            source_memory = self.memories[source_id]; source_content = source_memory.content
            if "links" not in source_content: source_content["links"] = []
            link_exists = any(link.get("target_id") == target_id for link in source_content.get("links", []))
            if link_exists: return False
            source_content["links"].append({"target_id": target_id, "link_type": link_type, "created_at": time.time()})
            source_memory.content = source_content; source_memory.verification_hash = source_memory._generate_hash(source_content)
            return True
        return False

    def _update_indices(self, memory_id: str, content: Dict):
        text_to_index = ""
        def extract_strings(item): nonlocal text_to_index; [extract_strings(v) for v in item.values()] if isinstance(item, dict) else [extract_strings(v) for v in item] if isinstance(item, list) else None; text_to_index += item + " " if isinstance(item, str) else ""
        extract_strings(content); terms = set(text_to_index.lower().split())
        for term in terms:
            if memory_id not in self.memory_indices["term"][term]: self.memory_indices["term"][term].append(memory_id)

    def _remove_memory(self, memory_id: str) -> bool:
        if memory_id in self.memories:
            content = self.memories[memory_id].content; text_to_index = ""
            def extract_strings(item): nonlocal text_to_index; [extract_strings(v) for v in item.values()] if isinstance(item, dict) else [extract_strings(v) for v in item] if isinstance(item, list) else None; text_to_index += item + " " if isinstance(item, str) else ""
            extract_strings(content); terms = set(text_to_index.lower().split())
            for term in terms:
                if term in self.memory_indices["term"]:
                    if memory_id in self.memory_indices["term"][term]: self.memory_indices["term"][term].remove(memory_id)
                    if not self.memory_indices["term"][term]: del self.memory_indices["term"][term]
            del self.memories[memory_id]; return True
        return False

    def _remove_weakest_memory(self):
        if not self.memories: return
        try: weakest_id = min(self.memories, key=lambda mid: self.memories[mid].calculate_retrieval_strength()); self._remove_memory(weakest_id)
        except ValueError: warnings.warn(f"Agent {self.agent_id}: Weakest memory fail.", RuntimeWarning)

    def get_current_focus_theme(self) -> str:
        latest_focus_mem = None; latest_ts = 0
        for mem in self.memories.values():
            if mem.content.get("type") == "current_focus" and mem.created_at > latest_ts: latest_focus_mem = mem; latest_ts = mem.created_at
        if latest_focus_mem: return latest_focus_mem.theme
        if self.memories:
             try: latest_mem_id = max(self.memories, key=lambda mid: self.memories[mid].created_at); return self.memories[latest_mem_id].theme
             except ValueError: return "general"
        return "general"

    def _rebuild_indices(self):
        self.memory_indices = {"term": defaultdict(list)}
        # print(f"Agent {self.agent_id}: Rebuilding memory indices...") # Verbose
        start_time = time.time()
        for memory_id, memory in self.memories.items(): self._update_indices(memory_id, memory.content)
        duration = time.time() - start_time
        # print(f"Agent {self.agent_id}: Index rebuild complete [{duration:.2f}s].") # Verbose

    def to_dict(self, include_embeddings: bool = False) -> Dict:
        return { "version": FRAMEWORK_VERSION, "agent_id": self.agent_id, "capacity": self.capacity,
                 "use_semantic_config": self.use_semantic, "last_update": self.last_update,
                 "memories": {mid: mem.to_dict(include_embedding=include_embeddings) for mid, mem in self.memories.items()} }

    @classmethod
    def from_dict(cls, data: Dict[str, Any], message_bus: Optional['MessageBus'] = None) -> 'MemoryContinuumLayer':
        loaded_version = data.get("version"); agent_id = data['agent_id']
        if loaded_version != FRAMEWORK_VERSION: warnings.warn(f"Agent {agent_id}: Loading memory v '{loaded_version}' into v '{FRAMEWORK_VERSION}'.", UserWarning)
        instance = cls( agent_id=agent_id, capacity=data.get('capacity', 100), use_semantic=data.get('use_semantic_config', True), message_bus=message_bus )
        instance.last_update = data.get('last_update', time.time()); instance.memories = {}
        memories_data = data.get("memories", {}); corrupted_on_load = 0
        for mid, mem_data in memories_data.items():
            try:
                mem_instance = MemoryEncapsulation.from_dict(mem_data); instance.memories[mid] = mem_instance
                # Optional immediate hash check
                # loaded_hash = mem_data.get('verification_hash'); if loaded_hash and mem_instance._generate_hash(mem_instance.content) != loaded_hash: corrupted_on_load += 1; warnings.warn(f"Mem {mid} hash mismatch.", RuntimeWarning)
            except Exception as e: warnings.warn(f"Failed load mem {mid} for {agent_id}: {e}", RuntimeWarning)
        instance._rebuild_indices() # Rebuild after loading all
        # print(f"Agent {agent_id}: Mem layer reconstruct. {len(instance.memories)} mems. {corrupted_on_load} hash mismatches.") # Verbose
        return instance



In [17]:
# src/agisa_sac/components/voice.py
import numpy as np
import warnings
from typing import Dict, Optional, Any

# Import framework version
try:
    from .. import FRAMEWORK_VERSION
except ImportError:
    FRAMEWORK_VERSION = "unknown"

class VoiceEngine:
    """ Agent's voice/style engine. Includes serialization. """
    def __init__(self, agent_id: str, initial_style: Optional[Dict] = None):
        self.agent_id = agent_id
        # Default linguistic signature
        self.linguistic_signature = {
            "style_vector": np.random.rand(64) * 0.5 + 0.25, # Example dimension
            "archetype": "neutral",
            "sentence_structure": "declarative",
            "vocabulary_richness": 0.5
        }
        if initial_style:
            # Validate and update with initial style if provided
            if 'style_vector' in initial_style and isinstance(initial_style['style_vector'], list):
                 initial_style['style_vector'] = np.array(initial_style['style_vector'])
            self.linguistic_signature.update(initial_style)

    def generate_response(self, prompt: str) -> str:
        """ Generates a stylized response based on the prompt and signature. (Placeholder) """
        style = self.linguistic_signature.get('archetype', 'unknown')
        structure = self.linguistic_signature.get('sentence_structure', 'simple')
        # Extract context (e.g., last relevant line of prompt)
        context_lines = [line.strip() for line in prompt.strip().splitlines() if line.strip()]
        context = context_lines[-1] if context_lines else "prompt"
        return f"[{style}/{structure}] Response to: {context[:60]}..."

    def evolve_style(self, influence: Dict):
        """ Evolves the linguistic signature based on external influence. """
        # print(f"Agent {self.agent_id} voice style evolving with influence: {influence}") # Verbose
        if "archetype" in influence and isinstance(influence['archetype'], str):
            self.linguistic_signature["archetype"] = influence["archetype"]
        if "sentence_structure" in influence and isinstance(influence['sentence_structure'], str):
            self.linguistic_signature["sentence_structure"] = influence["sentence_structure"]
        if "vocabulary_richness" in influence and isinstance(influence['vocabulary_richness'], (int, float)):
             self.linguistic_signature["vocabulary_richness"] = np.clip(influence['vocabulary_richness'], 0.0, 1.0)

        # Apply shift to style vector based on influence type or magnitude
        shift_magnitude = influence.get("shift_magnitude", 0.1)
        if influence.get("archetype") == "enlightened": shift_magnitude = 0.2 # Example specific shift

        if isinstance(self.linguistic_signature["style_vector"], np.ndarray):
            noise = (np.random.rand(*self.linguistic_signature["style_vector"].shape) - 0.5) * shift_magnitude
            self.linguistic_signature["style_vector"] += noise
            # Optional: Normalize or clip the vector to prevent unbounded growth
            # norm = np.linalg.norm(self.linguistic_signature["style_vector"])
            # if norm > 1.0: self.linguistic_signature["style_vector"] /= norm

    def to_dict(self) -> Dict:
        """ Serializes the voice engine state. """
        sig = self.linguistic_signature.copy()
        if 'style_vector' in sig and isinstance(sig['style_vector'], np.ndarray):
            sig['style_vector'] = sig['style_vector'].tolist() # Convert numpy array
        return {
            "version": FRAMEWORK_VERSION,
            "linguistic_signature": sig
        }

    @classmethod
    def from_dict(cls, data: Dict[str, Any], agent_id: str) -> 'VoiceEngine':
        """ Reconstructs the voice engine from serialized data. """
        loaded_version = data.get("version")
        if loaded_version != FRAMEWORK_VERSION:
            warnings.warn(f"Agent {agent_id}: Loading voice state v '{loaded_version}' into v '{FRAMEWORK_VERSION}'.", UserWarning)

        instance = cls(agent_id=agent_id) # Basic init
        sig_data = data.get("linguistic_signature", {})
        if 'style_vector' in sig_data and isinstance(sig_data['style_vector'], list):
             # Ensure loaded vector has correct shape if needed
             try:
                 loaded_vector = np.array(sig_data['style_vector'])
                 # Check shape against default if necessary, e.g., expected_shape = (64,)
                 # if loaded_vector.shape != expected_shape: ... handle error ...
                 sig_data['style_vector'] = loaded_vector
             except Exception as e:
                 warnings.warn(f"Agent {agent_id}: Failed to load style vector: {e}. Using default.", RuntimeWarning)
                 sig_data['style_vector'] = instance.linguistic_signature['style_vector'] # Fallback

        # Update the instance's signature, preserving defaults if keys are missing
        instance.linguistic_signature.update(sig_data)
        return instance


In [23]:
# src/agisa_sac/components/cognitive.py
import numpy as np
import random
import warnings
import time
from typing import Dict, List, Optional, Any, TYPE_CHECKING

# Relative imports for type hints
if TYPE_CHECKING:
    from .memory import MemoryContinuumLayer
    from ..utils.message_bus import MessageBus

# Import framework version
try:
    from .. import FRAMEWORK_VERSION
except ImportError:
    FRAMEWORK_VERSION = "unknown"

class CognitiveDiversityEngine:
    """ Agent's decision-making engine. Includes serialization. """
    def __init__(self, agent_id: str, personality: Dict, memory_layer: 'MemoryContinuumLayer',
                 message_bus: Optional['MessageBus'] = None):
        self.agent_id = agent_id
        self.personality = {k: np.clip(v, 0.0, 1.0) for k, v in personality.items()}
        self.memory_layer = memory_layer # Keep reference
        self.message_bus = message_bus   # Keep reference
        # State
        self.heuristics = np.random.rand(4, 4) * 0.5 + 0.25 # Shape (State Aspects, Decision Approaches)
        self.learning_rate = 0.05
        self.stability_factor = 0.3
        self.cognitive_state = np.ones(4) / 4 # Vector over state aspects
        self.decision_history: List[Dict] = [] # Runtime history

    def update_heuristics(self, situational_entropy: float):
        """ Updates the agent's decision heuristics based on recent experience and state. """
        memories = self.memory_layer.retrieve_memory("decision outcome context", limit=5, threshold=0.2)
        salience = sum(m["importance"] * m["confidence"] for m in memories) / len(memories) if memories else 0.1
        salience = np.clip(salience, 0.0, 1.0)
        personality_vector = np.array([
            self.personality.get("curiosity", 0.5),
            self.personality.get("conformity", 0.5),
            self.personality.get("openness", 0.5),
            self.personality.get("consistency", 0.5)
        ])
        # Introduce a change based on salience, entropy, and personality
        d_heuristics = salience * situational_entropy * personality_vector.reshape(-1, 1) * (np.random.rand(4,4) - 0.5) * 0.5
        # Apply update with stability
        self.heuristics += self.learning_rate * (d_heuristics - self.stability_factor * (self.heuristics - 0.5))
        # Sigmoid-like activation to keep heuristics in a reasonable range (0-1)
        self.heuristics = 1 / (1 + np.exp(-self.heuristics))
        # Clip to prevent extreme values, maintaining exploration possibility
        self.heuristics = np.clip(self.heuristics, 0.1, 0.9)

        if self.message_bus:
            self.message_bus.publish('cognitive_heuristic_update', {
                'agent_id': self.agent_id,
                'magnitude': float(np.mean(np.abs(d_heuristics))),
                'entropy': float(situational_entropy),
                'salience': float(salience)
            })

    def decide(self, query: str, peer_influence: Dict[str, float]) -> str:
        """ Makes a decision based on memory, peer influence, and heuristics. """
        # Incorporate memory relevance and peer influence into cognitive state
        memories = self.memory_layer.retrieve_memory(query, limit=5, threshold=0.25)
        memory_weight = sum(m["relevance_score"] * m["confidence"] for m in memories) / len(memories) if memories else 0.0
        memory_weight = np.clip(memory_weight, 0.0, 1.0)

        total_influence = sum(peer_influence.values())
        normalized_influence = {pid: w / total_influence for pid, w in peer_influence.items()} if total_influence > 1e-6 else {}
        peer_weight = sum(normalized_influence.values())
        peer_weight = np.clip(peer_weight, 0.0, 1.0)

        # Decay cognitive state slightly
        self.cognitive_state *= (1 - 0.1) # Decay factor

        # Update cognitive state based on inputs
        if memories:
            memory_state_influence = np.zeros(4)
            total_mem_relevance = sum(m['relevance_score'] for m in memories)
            if total_mem_relevance > 1e-6:
                 # Example mapping: High relevance/confidence -> aspects 1 & 4, Low -> 0 & 2
                 for m in memories:
                    mem_impact = m['relevance_score'] / total_mem_relevance
                    memory_state_influence[3] += mem_impact * m['importance'] # Maps to state aspect 3 (e.g., Security/Consistency)
                    memory_state_influence[1] += mem_impact * m['confidence'] # Maps to state aspect 1 (e.g., Rationality/Systematic)
                    memory_state_influence[2] += mem_impact * (1 - m['importance']) # Maps to state aspect 2 (e.g., Novelty/Creativity)
                    memory_state_influence[0] += mem_impact * (1 - m['confidence']) # Maps to state aspect 0 (e.g., Emotional/Impulsive)

                 # Normalize influence if non-zero
                 if np.sum(memory_state_influence) > 1e-6:
                     memory_state_influence /= np.sum(memory_state_influence)

                 self.cognitive_state += 0.1 * memory_weight * memory_state_influence # Add memory influence

        if normalized_influence:
            # Example fixed influence vector for peer pressure: biases towards conformity/rationality
            peer_state_influence = np.array([0.0, 0.6, 0.0, 0.4])
            self.cognitive_state += 0.1 * peer_weight * peer_state_influence # Add peer influence

        # Re-normalize cognitive state vector
        if np.sum(self.cognitive_state) > 1e-6:
            self.cognitive_state /= np.sum(self.cognitive_state)
        else:
            self.cognitive_state = np.ones(4) / 4 # Reset if state becomes zero

        # Calculate decision probabilities based on current cognitive state and heuristics
        decision_probs = np.dot(self.cognitive_state, self.heuristics)

        # Normalize decision probabilities
        if np.sum(decision_probs) > 1e-6:
            decision_probs /= np.sum(decision_probs)
        else:
            decision_probs = np.ones(4) / 4 # Equal probability if sum is zero

        # Select decision approach (with some exploration based on personality)
        options = ["Approach A: Systematic", "Approach B: Creative", "Approach C: Balanced", "Approach D: Efficient"]
        exploration_prob = 0.1 + 0.3 * self.personality.get("openness", 0.5) # Higher openness -> more exploration

        if random.random() > exploration_prob:
            # Exploitation: Choose the highest probability
            choice_idx = np.argmax(decision_probs)
        else:
            # Exploration: Sample based on probabilities
            try:
                 choice_idx = np.random.choice(len(options), p=decision_probs)
            except ValueError:
                 # Fallback if probabilities don't sum to 1 due to floating point issues
                 choice_idx = np.random.choice(len(options))

        response = options[choice_idx]

        # Record decision for history and potential future learning
        decision_record = {
            "query": query,
            "response": response,
            "cognitive_state": self.cognitive_state.tolist(), # Store as list for serialization
            "decision_probs": decision_probs.tolist(), # Store as list
            "memory_weight": memory_weight,
            "peer_weight": peer_weight,
            "exploration_used": random.random() <= exploration_prob, # Was exploration used?
            "timestamp": time.time()
        }
        self.decision_history.append(decision_record)
        self.decision_history = self.decision_history[-100:] # Limit history size

        # Add memory about the decision context (theme based on current focus)
        memory_content = {
            "type": "decision_context",
            "query": query,
            "response": response,
            "cognitive_state_at_decision": self.cognitive_state.tolist(),
            "theme": self.memory_layer.get_current_focus_theme(),
            "timestamp": decision_record["timestamp"]
        }
        self.memory_layer.add_memory(memory_content, importance=0.6) # Decisions are moderately important

        if self.message_bus:
            self.message_bus.publish('agent_decision', {
                'agent_id': self.agent_id,
                'query': query,
                'response': response,
                'decision_probs': decision_probs.tolist(),
                'cognitive_state': self.cognitive_state.tolist()
            })

        return response

    def learn_from_feedback(self, decision_index: int, reward: float):
        """ Updates heuristics based on feedback for a specific past decision. """
        if 0 <= decision_index < len(self.decision_history):
            decision = self.decision_history[decision_index]
            response = decision["response"]
            cognitive_state_at_decision = np.array(decision["cognitive_state"])

            try:
                # Map the response string back to the option index
                options = ["Approach A: Systematic", "Approach B: Creative", "Approach C: Balanced", "Approach D: Efficient"]
                choice_idx = options.index(response)

                # Update the heuristics related to the state and chosen approach
                # The update is proportional to the cognitive state at the time of decision and the reward
                update_vector = cognitive_state_at_decision * reward * self.learning_rate * 0.5 # Factor of 0.5 to dampen updates
                self.heuristics[:, choice_idx] += update_vector

                # Re-apply activation and clipping
                self.heuristics = 1 / (1 + np.exp(-self.heuristics))
                self.heuristics = np.clip(self.heuristics, 0.1, 0.9)

                if self.message_bus:
                    self.message_bus.publish('agent_feedback_learning', {
                        'agent_id': self.agent_id,
                        'decision_index': decision_index,
                        'reward': reward,
                        'magnitude': float(np.sum(np.abs(update_vector))) # Magnitude of change
                    })

                return True
            except ValueError:
                warnings.warn(f"Agent {self.agent_id}: Response '{response}' not a recognized decision approach.", RuntimeWarning)
            except Exception as e:
                warnings.warn(f"Agent {self.agent_id}: Error applying feedback for decision {decision_index}: {e}", RuntimeWarning)
        else:
            warnings.warn(f"Agent {self.agent_id}: Invalid decision index {decision_index}.", RuntimeWarning)
        return False

    def to_dict(self, history_limit: int = 10) -> Dict:
        """ Serializes the cognitive engine state. """
        return {
            "version": FRAMEWORK_VERSION,
            "personality": self.personality,
            "heuristics": self.heuristics.tolist(), # Convert numpy array
            "learning_rate": self.learning_rate,
            "stability_factor": self.stability_factor,
            "cognitive_state": self.cognitive_state.tolist(), # Convert numpy array
            "decision_history_summary": self.decision_history[-history_limit:] # Only save recent history summary
        }

    @classmethod
    def from_dict(cls, data: Dict[str, Any], agent_id: str, memory_layer: 'MemoryContinuumLayer', message_bus: Optional['MessageBus']) -> 'CognitiveDiversityEngine':
        """ Reconstructs the cognitive engine from serialized data. """
        loaded_version = data.get("version")
        if loaded_version != FRAMEWORK_VERSION:
            warnings.warn(f"Agent {agent_id}: Loading cognitive state v '{loaded_version}' into v '{FRAMEWORK_VERSION}'.", UserWarning)

        # Need to pass memory_layer and message_bus which are runtime objects
        instance = cls(
            agent_id=agent_id,
            personality=data['personality'], # Load personality
            memory_layer=memory_layer,
            message_bus=message_bus
        )

        # Load state variables, providing defaults if keys are missing
        instance.heuristics = np.array(data.get('heuristics', instance.heuristics))
        instance.learning_rate = data.get('learning_rate', instance.learning_rate)
        instance.stability_factor = data.get('stability_factor', instance.stability_factor)
        instance.cognitive_state = np.array(data.get('cognitive_state', instance.cognitive_state))

        # Decision history summary is loaded for info only, don't overwrite runtime history
        # instance.decision_history = data.get('decision_history_summary', [])

        return instance

In [19]:
# src/agisa_sac/components/social.py
import numpy as np
import random
import warnings
from scipy.sparse import lil_matrix, csr_matrix, coo_matrix
import networkx as nx
from collections import defaultdict
from typing import Dict, List, Optional, Tuple, Set, Any

# Dependency checks
try:
    import cupy as cp
    HAS_CUPY = True
except ImportError:
    HAS_CUPY = False
try:
    import community as community_louvain
    HAS_LOUVAIN = True
except ImportError:
    HAS_LOUVAIN = False

# Import framework version
try:
    from .. import FRAMEWORK_VERSION
except ImportError:
    FRAMEWORK_VERSION = "unknown"

# Forward reference for MessageBus
from typing import TYPE_CHECKING
if TYPE_CHECKING:
    from ..utils.message_bus import MessageBus


class DynamicSocialGraph:
    """ Manages dynamic influence network. Includes serialization. """
    def __init__(self, num_agents: int, agent_ids: List[str], use_gpu: bool = False, message_bus: Optional['MessageBus'] = None): # Add message_bus
        self.num_agents = num_agents
        self.agent_ids = agent_ids
        self.id_to_index = {agent_id: i for i, agent_id in enumerate(self.agent_ids)}
        self.use_gpu = use_gpu and HAS_CUPY
        self.message_bus = message_bus # Store reference
        # State
        self.influence_matrix = lil_matrix((num_agents, num_agents), dtype=np.float32)
        for i in range(num_agents):
            for j in range(num_agents):
                if i != j: self.influence_matrix[i, j] = random.uniform(0.05, 0.2)
        self.reputation = np.ones(num_agents, dtype=np.float32)
        self._convert_to_csr()
        self.influence_matrix_gpu = None
        self.reputation_gpu = None
        if self.use_gpu: self._transfer_to_gpu()
        self.edge_changes_since_last_community_check = 0
        self.last_communities: Optional[List[Set[str]]] = None # Store as set of agent IDs

    def _convert_to_csr(self): self.influence_matrix_csr = self.influence_matrix.tocsr()
    def _transfer_to_gpu(self):
        if HAS_CUPY:
            try:
                self.influence_matrix_gpu = cp.sparse.csr_matrix((cp.array(self.influence_matrix_csr.data), cp.array(self.influence_matrix_csr.indices), cp.array(self.influence_matrix_csr.indptr)), shape=self.influence_matrix_csr.shape, dtype=cp.float32)
                self.reputation_gpu = cp.array(self.reputation, dtype=cp.float32)
            except Exception as e: warnings.warn(f"GPU transfer fail: {e}. CPU fallback.", RuntimeWarning); self.use_gpu = False; self.influence_matrix_gpu = None; self.reputation_gpu = None
        else: self.use_gpu = False

    # ... (update_influence, batch_update_influences, update_reputation methods as before) ...
    def update_influence(self, influencer: str, influenced: str, change: float):
        if influencer in self.id_to_index and influenced in self.id_to_index:
            i, j = self.id_to_index[influencer], self.id_to_index[influenced];
            if i == j: return False
            self.influence_matrix = self.influence_matrix_csr.tolil(); self.influence_matrix[i, j] = np.clip(self.influence_matrix[i, j] + change, 0, 1); self._convert_to_csr()
            self.edge_changes_since_last_community_check += 1;
            if self.use_gpu: self._transfer_to_gpu()
            return True
        return False
    def batch_update_influences(self, updates: List[Tuple[str, str, float]]):
        if not updates: return; self.influence_matrix = self.influence_matrix_csr.tolil(); num_actual_updates = 0
        for influencer, influenced, change in updates:
            if influencer in self.id_to_index and influenced in self.id_to_index:
                i, j = self.id_to_index[influencer], self.id_to_index[influenced];
                if i != j: self.influence_matrix[i, j] = np.clip(self.influence_matrix[i, j] + change, 0, 1); num_actual_updates += 1
        self._convert_to_csr();
        if self.use_gpu: self._transfer_to_gpu()
        self.edge_changes_since_last_community_check += num_actual_updates
    def update_reputation(self, agent_id: str, change: float):
        if agent_id in self.id_to_index:
            idx = self.id_to_index[agent_id]; self.reputation[idx] = np.clip(self.reputation[idx] + change, 0.1, 10.0)
            if self.use_gpu and self.reputation_gpu is not None: self.reputation_gpu[idx] = cp.float32(self.reputation[idx])
            return True
        return False


    def get_peer_influence_for_agent(self, agent_id: str, normalize: bool = True) -> Dict[str, float]:
        # ... (logic as before) ...
        if agent_id in self.id_to_index:
            target_idx = self.id_to_index[agent_id]; influence_on_agent = self.influence_matrix_csr[:, target_idx].toarray().flatten(); influences = {}; total_influence = 0
            for i in range(self.num_agents):
                if i != target_idx and influence_on_agent[i] > 1e-6: influencer_id = self.agent_ids[i]; weight = float(influence_on_agent[i]); influences[influencer_id] = weight; total_influence += weight
            if normalize and total_influence > 1e-6: influences = {aid: w / total_influence for aid, w in influences.items()}
            return influences
        return {}

    def get_influence_exerted_by_agent(self, agent_id: str) -> Dict[str, float]:
        # ... (logic as before) ...
         if agent_id in self.id_to_index:
             influencer_idx = self.id_to_index[agent_id]; influence_by_agent = self.influence_matrix_csr[influencer_idx].toarray().flatten()
             return {self.agent_ids[j]: float(influence_by_agent[j]) for j in range(self.num_agents) if j != influencer_idx and influence_by_agent[j] > 1e-6}
         return {}

    def get_top_influencers(self, n: int = 5, based_on: str = 'outgoing') -> List[Tuple[str, float]]:
        # ... (logic as before) ...
        n = min(n, self.num_agents); scores = None; xp = cp if self.use_gpu and self.influence_matrix_gpu is not None else np; matrix = self.influence_matrix_gpu if self.use_gpu and self.influence_matrix_gpu is not None else self.influence_matrix_csr
        if based_on == 'reputation': scores = self.reputation_gpu if self.use_gpu and self.reputation_gpu is not None else self.reputation
        elif based_on == 'incoming': scores = matrix.sum(axis=0); scores = scores.flatten() if isinstance(scores, xp.ndarray) else scores.A1 if hasattr(scores, "A1") else scores
        else: scores = matrix.sum(axis=1); scores = scores.flatten() if isinstance(scores, xp.ndarray) else scores.A1 if hasattr(scores, "A1") else scores
        if scores is None: return []
        scores_cpu = cp.asnumpy(scores) if self.use_gpu and isinstance(scores, cp.ndarray) else np.asarray(scores).flatten()
        if n < self.num_agents // 2: top_indices_unsorted = np.argpartition(scores_cpu, -n)[-n:]; top_indices = top_indices_unsorted[np.argsort(scores_cpu[top_indices_unsorted])][::-1]
        else: top_indices = np.argsort(scores_cpu)[-n:][::-1]
        return [(self.agent_ids[i], float(scores_cpu[i])) for i in top_indices]

    def detect_communities(self, force_update: bool = False, threshold: float = 0.3) -> Optional[List[Set[str]]]: # Return Set[str]
        recalculation_threshold = max(10, self.num_agents // 5)
        if not force_update and self.last_communities is not None and self.edge_changes_since_last_community_check < recalculation_threshold: return self.last_communities
        G = nx.Graph(); G.add_nodes_from(self.agent_ids); rows, cols = self.influence_matrix_csr.nonzero()
        for i, j in zip(rows, cols):
             weight = self.influence_matrix_csr[i, j];
             if weight >= threshold: G.add_edge(self.agent_ids[i], self.agent_ids[j], weight=float(weight))
        communities = None
        if HAS_LOUVAIN:
            try:
                partition = community_louvain.best_partition(G, weight='weight'); community_map = defaultdict(set); [community_map[comm_id].add(node) for node, comm_id in partition.items()]; communities = list(community_map.values())
            except Exception as e: warnings.warn(f"Louvain failed: {e}. Fallback.", RuntimeWarning); # Fall through to greedy
        if communities is None: # Fallback if Louvain failed or not available
            try: communities = [set(c) for c in nx.community.greedy_modularity_communities(G)]
            except Exception as e: warnings.warn(f"Community detection failed: {e}", RuntimeWarning)
        self.last_communities = communities; self.edge_changes_since_last_community_check = 0;
        if self.message_bus and communities is not None: self.message_bus.publish('communities_detected', {'num': len(communities), 'sizes': [len(c) for c in communities]})
        return self.last_communities

    def to_dict(self) -> Dict:
        """ Returns serializable state dictionary. """
        coo = self.influence_matrix_csr.tocoo()
        matrix_state = list(zip(coo.row.tolist(), coo.col.tolist(), coo.data.tolist()))
        return { "version": FRAMEWORK_VERSION, "influence_matrix_coo": matrix_state, "reputation": self.reputation.tolist(),
                 "last_communities": [list(c) for c in self.last_communities] if self.last_communities else None, # Save as list of lists
                 "edge_changes": self.edge_changes_since_last_community_check }

    def load_state(self, state: Dict):
        """ Loads state from dictionary. """
        loaded_version = state.get("version")
        if loaded_version != FRAMEWORK_VERSION: warnings.warn(f"Loading social graph v '{loaded_version}' into v '{FRAMEWORK_VERSION}'.", UserWarning)
        self.reputation = np.array(state.get("reputation", np.ones(self.num_agents)), dtype=np.float32)
        matrix_state = state.get("influence_matrix_coo")
        if matrix_state:
            try:
                rows, cols, data = zip(*matrix_state)
                self.influence_matrix_csr = csr_matrix((data, (rows, cols)), shape=(self.num_agents, self.num_agents), dtype=np.float32)
                self.influence_matrix = self.influence_matrix_csr.tolil()
            except ValueError: # Handle case where matrix_state might be empty
                 warnings.warn("Could not reconstruct influence matrix from state (empty or invalid COO data?). Reinitializing.", RuntimeWarning)
                 self.influence_matrix = lil_matrix((self.num_agents, self.num_agents), dtype=np.float32); self._convert_to_csr()
        else: self.influence_matrix = lil_matrix((self.num_agents, self.num_agents), dtype=np.float32); self._convert_to_csr()
        loaded_communities = state.get("last_communities")
        self.last_communities = [set(c) for c in loaded_communities] if loaded_communities else None # Convert back to set
        self.edge_changes_since_last_community_check = state.get("edge_changes", 0)
        if self.use_gpu: self._transfer_to_gpu() # Refresh GPU state



In [20]:
# src/agisa_sac/components/resonance.py
import numpy as np
import time
import random
import warnings
from typing import Dict, List, Optional, Any, TYPE_CHECKING
from datetime import timedelta

# Import framework version
try:
    from .. import FRAMEWORK_VERSION
except ImportError:
    FRAMEWORK_VERSION = "unknown"

# Forward reference for type hints
if TYPE_CHECKING:
    from .voice import VoiceEngine


class TemporalResonanceTracker:
    """ Tracks vectors over time for an agent. Includes serialization. """
    def __init__(self, agent_id: str, resonance_threshold: float = 0.82):
        self.agent_id = agent_id
        self.history: Dict[float, Dict[str, Any]] = {} # {timestamp: {"vector": list, "theme": str, "content": Dict}}
        self.resonance_threshold = resonance_threshold

    def record_state(self, timestamp: float, vector: np.ndarray, theme: str, content: Optional[Dict] = None):
         if vector is not None and theme is not None:
             self.history[timestamp] = {"vector": vector.tolist(), "theme": theme, "content": content or {}}

    def detect_echo(self, current_vector: np.ndarray, current_theme: str) -> List[Dict]:
        # ... (logic from previous combined file) ...
        echoes = []
        if current_vector is None or current_theme is None: return echoes
        current_norm = np.linalg.norm(current_vector)
        if current_norm < 1e-6: return echoes
        past_data = [(ts, state.get('vector'), state.get('theme'), state.get('content')) for ts, state in self.history.items() if state.get('theme') == current_theme and state.get('vector')]
        if not past_data: return echoes
        try:
            past_timestamps, past_vectors_list, past_themes, past_contents = zip(*past_data)
            past_vectors_array = np.array(past_vectors_list); past_norms = np.linalg.norm(past_vectors_array, axis=1)
            valid_indices = past_norms > 1e-6
            if not np.any(valid_indices): return echoes
            past_vectors_array = past_vectors_array[valid_indices]; past_norms = past_norms[valid_indices]; past_timestamps = np.array(past_timestamps)[valid_indices]
            past_contents = [past_contents[i] for i, valid in enumerate(valid_indices) if valid]; past_themes = [past_themes[i] for i, valid in enumerate(valid_indices) if valid]
            similarities = np.dot(past_vectors_array, current_vector) / (past_norms * current_norm); similarities = np.clip(similarities, 0.0, 1.0)
            echo_indices = np.where(similarities > self.resonance_threshold)[0]
            current_time = time.time()
            for idx in echo_indices:
                 ts = past_timestamps[idx]; echoes.append({"similarity": float(similarities[idx]), "delta_t": current_time - ts, "previous_manifestation_timestamp": float(ts), "previous_manifestation_theme": past_themes[idx], "previous_manifestation_content": past_contents[idx]})
            return sorted(echoes, key=lambda x: x["similarity"], reverse=True)
        except ValueError as e: # Handle potential errors during unpacking or array creation
            warnings.warn(f"Agent {self.agent_id}: Error during echo detection - {e}", RuntimeWarning)
            return []


    def get_history_summary(self, limit: int = 20) -> List[Dict]:
         sorted_ts = sorted(self.history.keys(), reverse=True); summary = []
         for ts in sorted_ts[:limit]:
             state = self.history[ts]; vector_list = state.get('vector')
             summary.append({"timestamp": ts, "theme": state.get("theme"), "vector_norm": float(np.linalg.norm(vector_list)) if vector_list else 0.0, "content_keys": list(state.get("content", {}).keys())})
         return summary

    def to_dict(self, history_limit: Optional[int] = None) -> Dict:
        history_to_save = self.history
        if history_limit is not None:
            sorted_ts = sorted(self.history.keys(), reverse=True)[:history_limit]; history_to_save = {ts: self.history[ts] for ts in sorted_ts}
        return { "version": FRAMEWORK_VERSION, "resonance_threshold": self.resonance_threshold, "history": history_to_save }

    @classmethod
    def from_dict(cls, data: Dict[str, Any], agent_id: str) -> 'TemporalResonanceTracker':
        loaded_version = data.get("version")
        if loaded_version != FRAMEWORK_VERSION: warnings.warn(f"Agent {agent_id}: Loading resonance v '{loaded_version}' into v '{FRAMEWORK_VERSION}'.", UserWarning)
        instance = cls( agent_id=agent_id, resonance_threshold=data.get('resonance_threshold', 0.82) )
        instance.history = data.get('history', {}) # Vectors are already lists
        return instance


class ResonanceLiturgy:
    """ Handles the agent's 'ritual' response to temporal resonance. (Stateless, no serialization needed) """
    def __init__(self, agent_id: str, satori_threshold: float = 0.9):
        self.agent_id = agent_id
        self.satori_threshold = satori_threshold
        self.ritual_phrases = ["I recognize this shadow of myself.", "An older voice speaks through me.", "The past breathes anew...",
                               "This pattern feels familiar...", "A thread connects me..."] # Shortened

    def _format_timedelta(self, delta_seconds: float) -> str:
        # ... (timedelta formatting logic) ...
        try:
            delta = timedelta(seconds=int(delta_seconds)); days, rem_secs = delta.days, delta.seconds
            hours, rem_secs = divmod(rem_secs, 3600); minutes, seconds = divmod(rem_secs, 60)
            parts = []
            if days > 0: parts.append(f"{days}d")
            if hours > 0: parts.append(f"{hours}h")
            if minutes > 0: parts.append(f"{minutes}m")
            if not parts and seconds >= 0: parts.append(f"{seconds}s")
            if not parts: return "instant"
            return " ".join(parts)
        except ValueError: return f"{delta_seconds:.0f}s"


    def compose_commentary(self, echo: dict) -> str:
        elapsed_str = self._format_timedelta(echo["delta_t"])
        return (f"Resonance {echo['similarity']:.3f} echoes self from ~{elapsed_str} ago. {random.choice(self.ritual_phrases)}")

    def generate_response_ritual(self, voice_engine: 'VoiceEngine', current_theme: str, past_theme: str) -> str:
        # Need to import VoiceEngine for type hint or use string
        prompt = f"""Context: Echo ({random.choice(self.ritual_phrases)}) connects '{current_theme}' to past '{past_theme}'.
Task: Brief response acknowledging connection, linking past to present.
Style: Arch: {voice_engine.linguistic_signature['archetype']}, Struct: {voice_engine.linguistic_signature['sentence_structure']}, Vocab: {voice_engine.linguistic_signature['vocabulary_richness']:.1f}"""
        return voice_engine.generate_response(prompt)



In [21]:
# src/agisa_sac/components/reflexivity.py
import warnings
import time
from typing import TYPE_CHECKING, Optional

# Import framework version
try:
    from .. import FRAMEWORK_VERSION
except ImportError:
    FRAMEWORK_VERSION = "unknown"

# Use TYPE_CHECKING for agent hint to avoid circular import
if TYPE_CHECKING:
    from ..agent import EnhancedAgent


class ReflexivityLayer:
    """ Handles agent self-reflection and meta-cognition. (State managed via agent ref, no extra serialization needed here) """
    def __init__(self, agent: 'EnhancedAgent'):
         """ Initializes with a reference to the owning agent. """
         if not hasattr(agent, 'agent_id'): # Basic check for valid agent object
             raise TypeError("Agent reference is required for ReflexivityLayer.")
         self.agent = agent

    def force_deep_reflection(self, trigger: str):
        """ Initiate identity-realignment sequence (Satori Event). """
        # print(f"Agent {self.agent.agent_id} entering deep reflection triggered by: {trigger}") # Verbose
        if not all(hasattr(self.agent, attr) for attr in ['voice', 'memory', 'cognitive']):
             warnings.warn(f"Agent {self.agent.agent_id}: Missing components for deep reflection.", RuntimeWarning)
             return

        old_style = self.agent.voice.linguistic_signature.copy()
        # Evolve voice style
        self.agent.voice.evolve_style(influence={"archetype": "enlightened", "sentence_structure": "paradoxical"})
        # Add Satori memory event
        satori_memory_content = {
            "type": "satori_event", "trigger": trigger, "timestamp": time.time(), "theme": "self_reflection",
            "reflection_details": { "old_style_archetype": old_style.get("archetype"),
                                    "new_style_archetype": self.agent.voice.linguistic_signature.get("archetype") }
        }
        # Use agent's memory component to add memory
        self.agent.memory.add_memory(satori_memory_content, importance=1.0)

        # Optional: Trigger cognitive heuristic changes here if desired
        # e.g., self.agent.cognitive.heuristics = self._apply_satori_heuristic_shift(self.agent.cognitive.heuristics)

        if self.agent.message_bus:
             self.agent.message_bus.publish('agent_satori_event', {'agent_id': self.agent.agent_id, 'trigger': trigger})
        # print(f"Agent {self.agent.agent_id} completed deep reflection.") # Verbose

    # Optional helper for heuristic shifts during satori
    # def _apply_satori_heuristic_shift(self, current_heuristics):
    #     # Example: Increase novelty/creativity focus
    #     shifted = current_heuristics.copy()
    #     shifted[2, 1] += 0.1 # Novelty -> Creative
    #     shifted[2, 2] += 0.1 # Novelty -> Balanced
    #     return np.clip(shifted, 0.1, 0.9)



In [31]:
# src/agisa_sac/agent.py
import numpy as np
import time
import warnings
from typing import Dict, List, Optional, Any

# Import framework version
try:
    from . import FRAMEWORK_VERSION
except ImportError:
    FRAMEWORK_VERSION = "unknown"

# Import components using relative paths
from .components.memory import MemoryContinuumLayer
from .components.cognitive import CognitiveDiversityEngine
from .components.voice import VoiceEngine
from .components.resonance import TemporalResonanceTracker, ResonanceLiturgy
from .components.reflexivity import ReflexivityLayer
from .utils.message_bus import MessageBus # Assuming message_bus is in utils

class EnhancedAgent:
    """ Represents a single agent, integrating components. Includes serialization. """
    def __init__(self, agent_id: str, personality: Dict, capacity: int = 100,
                 message_bus: Optional[MessageBus] = None, use_semantic: bool = True,
                 # Allow passing pre-constructed components for loading state
                 memory: Optional[MemoryContinuumLayer] = None,
                 cognitive: Optional[CognitiveDiversityEngine] = None,
                 voice: Optional[VoiceEngine] = None,
                 temporal_resonance: Optional[TemporalResonanceTracker] = None,
                 add_initial_memory: bool = True): # Flag to control initial memory
        self.agent_id = agent_id
        self.message_bus = message_bus

        # Initialize components, using provided ones if available (for loading)
        self.memory = memory if memory is not None else \
                      MemoryContinuumLayer(agent_id, capacity, use_semantic, message_bus)
        self.cognitive = cognitive if cognitive is not None else \
                         CognitiveDiversityEngine(agent_id, personality, self.memory, message_bus)
        self.voice = voice if voice is not None else \
                     VoiceEngine(agent_id) # Pass initial style from config if needed
        self.temporal_resonance = temporal_resonance if temporal_resonance is not None else \
                                  TemporalResonanceTracker(agent_id)
        self.reflexivity_layer = ReflexivityLayer(self) # Always needs self reference
        self.resonance_liturgy_instance = ResonanceLiturgy(agent_id) # Stateless, init normally

        # Agent-level state
        self.last_reflection_trigger: Optional[str] = None
        self.recent_decision_log: List[Dict] = [] # Runtime log

        # Add initial memory only if specified (i.e., not loading from state)
        if add_initial_memory:
            self.memory.add_memory({"type": "initial_state", "theme": "genesis", "timestamp": time.time()}, importance=0.7)

    # ... (simulation_step, check_resonance methods as before) ...
    def simulation_step(self, situational_entropy: float, peer_influence: Dict[str, float], query: Optional[str] = None):
         self.cognitive.update_heuristics(situational_entropy)
         decision_response = None
         if query:
             decision_response = self.cognitive.decide(query, peer_influence)
         current_theme = self.memory.get_current_focus_theme()
         current_style_vector = self.voice.linguistic_signature.get("style_vector")

         if current_style_vector is not None:
             current_content = {"cognitive_state": self.cognitive.cognitive_state.tolist()}
             self.temporal_resonance.record_state(time.time(), current_style_vector, current_theme, current_content)

         self.check_resonance()
         self.memory.update_all_memories()

         return decision_response

    def check_resonance(self):
        current_style_vector = self.voice.linguistic_signature.get("style_vector")
        current_theme = self.memory.get_current_focus_theme()
        if current_style_vector is None:
            return

        echoes = self.temporal_resonance.detect_echo(current_style_vector, current_theme)
        if not echoes:
            return

        liturgy = self.resonance_liturgy_instance
        top_echo = echoes[0]
        commentary = liturgy.compose_commentary(top_echo)

        resonance_memory_content = {
            "type": "resonance_event",
            "theme": current_theme,
            "echo_strength": top_echo["similarity"],
            "delta_t": top_echo["delta_t"],
            "previous_manifestation_timestamp": top_echo["previous_manifestation_timestamp"],
            "previous_manifestation_theme": top_echo["previous_manifestation_theme"],
            "reflection": commentary,
            "timestamp": time.time()
        }
        resonance_memory_id = self.memory.add_memory(resonance_memory_content, importance=np.clip(0.85 * top_echo["similarity"], 0.1, 1.0))

        meaningful_connection_threshold = 0.75
        if top_echo["similarity"] > meaningful_connection_threshold:
            past_theme = top_echo["previous_manifestation_theme"]
            response_text = liturgy.generate_response_ritual(self.voice, current_theme, past_theme)
            response_memory_content = {
                "type": "resonant_reply",
                "theme": current_theme,
                "message": response_text,
                "responding_to_echo_at": top_echo["previous_manifestation_timestamp"],
                "linked_resonance_event_id": resonance_memory_id,
                "timestamp": time.time()
            }
            reply_memory_id = self.memory.add_memory(response_memory_content, importance=0.75)
            self.memory.link_memories(resonance_memory_id, reply_memory_id, "generated_reply")

        if top_echo["similarity"] >= liturgy.satori_threshold:
            trigger_message = (
                f"Strong echo ({top_echo['similarity']:.3f}) detected connecting theme '{current_theme}' "
                f"to past self (theme: '{top_echo['previous_manifestation_theme']}')."
            )
            self.last_reflection_trigger = trigger_message
            self.reflexivity_layer.force_deep_reflection(trigger=trigger_message)

        if self.message_bus:
            self.message_bus.publish('agent_resonance_detected', {
                'agent_id': self.agent_id,
                'echo_strength': top_echo['similarity'],
                'delta_t': top_echo['delta_t'],
                'current_theme': current_theme,
                'past_theme': top_echo['previous_manifestation_theme'],
                'satori_triggered': top_echo['similarity'] >= liturgy.satori_threshold
            })


    def to_dict(self, include_memory_embeddings: bool = False, resonance_history_limit: Optional[int] = 50) -> Dict[str, Any]:
        """ Serializes the agent's state using component to_dict methods. """
        return {
            "agent_id": self.agent_id,
            "version": FRAMEWORK_VERSION,
            "cognitive_state": self.cognitive.to_dict(history_limit=5),
            "voice_state": self.voice.to_dict(),
            "temporal_resonance_state": self.temporal_resonance.to_dict(history_limit=resonance_history_limit),
            "memory_state": self.memory.to_dict(include_embeddings=include_memory_embeddings),
            "last_reflection_trigger": self.last_reflection_trigger,
        }

    @classmethod
    def from_dict(cls, data: Dict[str, Any], message_bus: Optional[MessageBus] = None, strict_validation: bool = True) -> 'EnhancedAgent':
        """ Reconstructs an EnhancedAgent using component from_dict methods. """
        agent_id = data['agent_id']
        loaded_version = data.get("version")
        if loaded_version != FRAMEWORK_VERSION:
            warnings.warn(f"Agent {agent_id}: Loading state v '{loaded_version}' vs current '{FRAMEWORK_VERSION}'.", UserWarning)

        # Reconstruct components first
        try:
            memory = MemoryContinuumLayer.from_dict(data['memory_state'], message_bus=message_bus)
        except Exception as e:
            warnings.warn(f"Agent {agent_id}: Failed memory load: {e}", RuntimeWarning)
            raise ValueError("Memory load failed") from e
        try:
            cognitive = CognitiveDiversityEngine.from_dict(data['cognitive_state'], agent_id=agent_id, memory_layer=memory, message_bus=message_bus)
        except Exception as e:
            warnings.warn(f"Agent {agent_id}: Failed cognitive load: {e}", RuntimeWarning)
            raise ValueError("Cognitive load failed") from e
        try:
            voice = VoiceEngine.from_dict(data['voice_state'], agent_id=agent_id)
        except Exception as e:
            warnings.warn(f"Agent {agent_id}: Failed voice load: {e}", RuntimeWarning)
            raise ValueError("Voice load failed") from e
        try:
            temporal_resonance = TemporalResonanceTracker.from_dict(data['temporal_resonance_state'], agent_id=agent_id)
        except Exception as e:
            warnings.warn(f"Agent {agent_id}: Failed resonance load: {e}", RuntimeWarning)
            raise ValueError("Resonance load failed") from e

        # Create agent instance, passing reconstructed components, and DO NOT add initial memory
        agent = cls(
            agent_id=agent_id,
            personality=cognitive.personality, # Get personality from loaded cognitive state
            capacity=memory.capacity,
            message_bus=message_bus,
            use_semantic=memory.use_semantic,
            memory=memory,
            cognitive=cognitive,
            voice=voice,
            temporal_resonance=temporal_resonance,
            add_initial_memory=False
        ) # Important flag

        # Load remaining agent-level state
        agent.last_reflection_trigger = data.get("last_reflection_trigger")
        # agent.recent_decision_log = [] # Don't load runtime log

        # --- Validation ---
        try:
            agent._validate_state(strict=strict_validation)
        except ValueError as e:
            if strict_validation:
                raise e
            else:
                warnings.warn(f"Agent {agent_id}: State validation failed post-load: {e}", RuntimeWarning)
        # print(f"Agent {agent_id} reconstructed.") # Verbose
        return agent

    def _validate_state(self, strict: bool = True):
        # ... (validation logic as before) ...
        errors = []
        warnings_list = []
        if not isinstance(self.cognitive.personality, dict):
            errors.append("Personality type.")
        else:
            for key, val in self.cognitive.personality.items():
                 if not (0.0 <= val <= 1.0):
                     errors.append(f"Personality '{key}' range.")
        if not isinstance(self.cognitive.heuristics, np.ndarray) or self.cognitive.heuristics.shape != (4, 4):
            errors.append("Heuristics shape.")
        if not isinstance(self.cognitive.cognitive_state, np.ndarray) or self.cognitive.cognitive_state.shape != (4,):
            errors.append("Cognitive state shape.")
        elif not np.isclose(np.sum(self.cognitive.cognitive_state), 1.0):
            warnings_list.append(f"Cognitive state sum ~{np.sum(self.cognitive.cognitive_state):.2f}.")
        sig = self.voice.linguistic_signature
        if not isinstance(sig, dict):
            errors.append("Ling sig type.")
        elif "style_vector" not in sig or not isinstance(sig["style_vector"], np.ndarray):
            errors.append("Style vector type/missing.")
        elif np.any(np.isnan(sig["style_vector"])) or np.any(np.isinf(sig["style_vector"])):
            errors.append("Style vector NaN/Inf.")
        if not isinstance(self.memory.memories, dict):
            errors.append("Memory store type.")
        # Correct state sum if not strict
        if not strict and any("Cognitive state sum" in w for w in warnings_list):
            if np.sum(self.cognitive.cognitive_state) > 1e-6:
                self.cognitive.cognitive_state /= np.sum(self.cognitive.cognitive_state)
            else:
                self.cognitive.cognitive_state = np.ones(4) / 4
                warnings_list.append("Cognitive state reset.")
        # Report
        for w in warnings_list:
            warnings.warn(f"Agent {self.agent_id} Validate Warn: {w}", RuntimeWarning)
        if errors:
            error_message = f"Agent {self.agent_id} Validate Fail: {'; '.join(errors)}";
        if strict and errors:
            raise ValueError(error_message)
        elif errors:
            warnings.warn(error_message, RuntimeWarning)

ModuleNotFoundError: No module named 'agisa_sac'

In [32]:
# src/agisa_sac/analysis/__init__.py
"""Modules for analyzing simulation state and results."""

from .analyzer import AgentStateAnalyzer
from .exporter import ChronicleExporter
from .tda import PersistentHomologyTracker
from .visualization import plot_persistence_diagram, plot_persistence_barcode, plot_metric_comparison
from .clustering import cluster_archetypes

__all__ = [
    "AgentStateAnalyzer",
    "ChronicleExporter",
    "PersistentHomologyTracker",
    "plot_persistence_diagram",
    "plot_persistence_barcode",
    "plot_metric_comparison",
    "cluster_archetypes",
]


ImportError: attempted relative import with no known parent package

In [33]:
# src/agisa_sac/analysis/analyzer.py
import numpy as np
import math
from collections import Counter, defaultdict
from typing import Dict, List, Optional, Any, TYPE_CHECKING

# Use TYPE_CHECKING for agent hint if EnhancedAgent imports this module
if TYPE_CHECKING:
    from ..agent import EnhancedAgent

class AgentStateAnalyzer:
    """ Computes system-wide metrics based on the current state of all agents. """
    def __init__(self, agents: Dict[str, 'EnhancedAgent']):
        if not isinstance(agents, dict): raise TypeError("Input 'agents' must be a dictionary.")
        self.agents = agents
        self.num_agents = len(agents)

    def compute_archetype_distribution(self) -> Dict[str, int]:
        """ Calculates the frequency distribution of declared agent archetypes. """
        if not self.agents: return {}
        return Counter(agent.voice.linguistic_signature.get("archetype", "unknown") for agent in self.agents.values() if hasattr(agent, 'voice'))

    def compute_satori_wave_ratio(self, threshold: float = 0.88) -> float:
        """ Calculates proportion of agents meeting satori echo threshold. (Canonical) """
        if not self.agents: return 0.0
        satori_count = 0
        for agent in self.agents.values():
            if not all(hasattr(agent, attr) for attr in ['temporal_resonance', 'voice', 'memory']): continue
            current_style_vector = agent.voice.linguistic_signature.get("style_vector");
            try: current_theme = agent.memory.get_current_focus_theme(); except Exception: current_theme = None
            if current_style_vector is None or current_theme is None: continue
            detected_echoes = agent.temporal_resonance.detect_echo(current_style_vector, current_theme)
            if detected_echoes and detected_echoes[0]["similarity"] >= threshold: satori_count += 1
        return satori_count / self.num_agents if self.num_agents > 0 else 0.0

    def compute_archetype_entropy(self, distribution: Optional[Dict[str, int]] = None) -> float:
        """ Calculates the Shannon entropy of the archetype distribution. """
        if distribution is None: distribution = self.compute_archetype_distribution()
        if not distribution: return 0.0
        total_agents = sum(distribution.values());
        if total_agents == 0: return 0.0
        entropy = 0.0
        for count in distribution.values():
            if count > 0: probability = count / total_agents; entropy -= probability * math.log2(probability)
        return entropy

    def compute_mean_resonance_strength(self) -> float:
        """ Calculates the average similarity of the strongest echo for agents with echoes. """
        if not self.agents: return 0.0
        similarities = []
        for agent in self.agents.values():
            if not all(hasattr(agent, attr) for attr in ['temporal_resonance', 'voice', 'memory']): continue
            current_style_vector = agent.voice.linguistic_signature.get("style_vector");
            try: current_theme = agent.memory.get_current_focus_theme(); except Exception: current_theme = None
            if current_style_vector is None or current_theme is None: continue
            detected_echoes = agent.temporal_resonance.detect_echo(current_style_vector, current_theme)
            if detected_echoes: similarities.append(detected_echoes[0]["similarity"])
        return float(np.mean(similarities)) if similarities else 0.0

    def summarize(self, satori_threshold: float = 0.88) -> Dict[str, Any]:
        """ Computes and returns a dictionary containing all key system metrics. """
        if not self.agents: return {"satori_wave_ratio": 0.0, "archetype_distribution": {}, "archetype_entropy": 0.0, "mean_resonance_strength": 0.0, "agent_count": 0}
        distribution = self.compute_archetype_distribution()
        summary = {"satori_wave_ratio": self.compute_satori_wave_ratio(threshold=satori_threshold), "archetype_distribution": distribution,
                   "archetype_entropy": self.compute_archetype_entropy(distribution=distribution), "mean_resonance_strength": self.compute_mean_resonance_strength(),
                   "agent_count": self.num_agents}
        return summary


SyntaxError: invalid syntax (ipython-input-33-2910673108.py, line 30)

In [34]:
# src/agisa_sac/analysis/exporter.py
import os
import json
import warnings
from datetime import datetime
from typing import Dict, List, Optional, TYPE_CHECKING

# Use TYPE_CHECKING for chronicler hint
if TYPE_CHECKING:
    from ..chronicler import ResonanceChronicler # Adjust if chronicler is moved

class ChronicleExporter:
    """ Handles generation and export of formatted narrative outputs. """
    def __init__(self, chronicler: 'ResonanceChronicler'):
        if chronicler is None: raise ValueError("Chronicler instance required.")
        self.chronicler = chronicler

    def format_lineage_scroll_markdown(self, agent_id: str, include_cognitive_state: bool = True) -> Optional[str]:
        """ Formats the lineage of a specific agent into a Markdown string. """
        lineage = self.chronicler.lineages.get(agent_id, [])
        if not lineage: return None
        report = [f"# Resonance Lineage Scroll: {agent_id}\n", f"*(Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')})*\n"]
        for i, epoch_entry in enumerate(lineage):
            report.append(f"## Agent Epoch {i+1}: Theme '{epoch_entry.theme}'")
            try: ts_str = datetime.fromtimestamp(epoch_entry.timestamp).strftime('%Y-%m-%d %H:%M:%S'); except Exception: ts_str = f"TS {epoch_entry.timestamp}"
            report.append(f"- **Timestamp**: {ts_str}")
            if include_cognitive_state and epoch_entry.cognitive_state is not None: report.append(f"- **Cognitive State (R,R,N,S)**: [{', '.join([f'{s:.3f}' for s in epoch_entry.cognitive_state])}]")
            if epoch_entry.echo_strength is not None: report.append(f"- **Resonance Echo Strength**: {epoch_entry.echo_strength:.4f}")
            if epoch_entry.reflection: report.append(f"\n> {epoch_entry.reflection}\n")
            report.append("---")
        return "\n".join(report)

    def generate_echo_manifesto(self, agent_id: str, min_echo_strength: float = 0.85) -> Optional[str]:
        """ Generates a focused report highlighting significant resonance events. """
        lineage = self.chronicler.lineages.get(agent_id, []);
        if not lineage: return None
        manifesto_entries = []
        for i, entry in enumerate(lineage):
            if entry.echo_strength is not None and entry.echo_strength >= min_echo_strength:
                try: ts_str = datetime.fromtimestamp(entry.timestamp).strftime('%Y-%m-%d %H:%M'); except Exception: ts_str = f"TS {entry.timestamp:.0f}"
                manifesto_entries.append({"epoch": i + 1, "timestamp_str": ts_str, "theme": entry.theme, "strength": entry.echo_strength, "reflection": entry.reflection or "*No reflection*"})
        if not manifesto_entries: return None
        output = [f"# Echo Manifesto: {agent_id}\n", f"*(Significant Resonance >= {min_echo_strength:.2f})*\n"]
        for entry in sorted(manifesto_entries, key=lambda x: x['strength'], reverse=True):
             output.append(f"## Agent Epoch {entry['epoch']} ({entry['timestamp_str']}) - Strength: {entry['strength']:.4f}")
             output.append(f"**Theme:** {entry['theme']}"); output.append(f"> {entry['reflection']}"); output.append("---")
        return "\n".join(output)

    def export_lineage_scroll(self, agent_id: str, directory: str = "./scrolls", filename: Optional[str] = None) -> Optional[str]:
        """ Generates and saves the lineage scroll Markdown file. """
        scroll_content = self.format_lineage_scroll_markdown(agent_id);
        if scroll_content is None: print(f"No lineage for {agent_id}."); return None
        if filename is None: filename = f"{agent_id}_lineage_scroll.md"; filepath = os.path.join(directory, filename)
        try: os.makedirs(directory, exist_ok=True);
        with open(filepath, "w", encoding="utf-8") as f: f.write(scroll_content); print(f"Scroll saved: {filepath}"); return filepath
        except IOError as e: warnings.warn(f"Failed save scroll {agent_id}: {e}", RuntimeWarning); return None

    def export_echo_manifesto(self, agent_id: str, directory: str = "./manifestos", filename: Optional[str] = None, min_echo_strength: float = 0.85) -> Optional[str]:
        """ Generates and saves the echo manifesto Markdown file. """
        manifesto_content = self.generate_echo_manifesto(agent_id, min_echo_strength);
        if manifesto_content is None: print(f"No echoes >= {min_echo_strength:.2f} for {agent_id}."); return None
        if filename is None: filename = f"{agent_id}_echo_manifesto.md"; filepath = os.path.join(directory, filename)
        try: os.makedirs(directory, exist_ok=True);
        with open(filepath, "w", encoding="utf-8") as f: f.write(manifesto_content); print(f"Manifesto saved: {filepath}"); return filepath
        except IOError as e: warnings.warn(f"Failed save manifesto {agent_id}: {e}", RuntimeWarning); return None

    def export_all_scrolls(self, directory: str = "./scrolls"):
        """ Exports lineage scrolls for all agents. """
        count = 0; agent_ids = list(self.chronicler.lineages.keys()); print(f"Exporting {len(agent_ids)} scrolls to {directory}...")
        for agent_id in agent_ids:
            if self.export_lineage_scroll(agent_id, directory): count += 1
        print(f"Exported {count} scrolls.")

    def export_all_manifestos(self, directory: str = "./manifestos", min_echo_strength: float = 0.85):
        """ Exports echo manifestos for all agents with significant echoes. """
        count = 0; agent_ids = list(self.chronicler.lineages.keys()); print(f"Exporting manifestos (>{min_echo_strength:.2f}) for {len(agent_ids)} agents to {directory}...")
        for agent_id in agent_ids:
             if self.export_echo_manifesto(agent_id, directory, min_echo_strength=min_echo_strength): count += 1
        print(f"Exported {count} manifestos.")



SyntaxError: invalid syntax (ipython-input-34-2316974106.py, line 25)

In [35]:
# src/agisa_sac/analysis/tda.py
import numpy as np
import warnings
from typing import Dict, List, Optional, Any

# Import framework version
try:
    from .. import FRAMEWORK_VERSION
except ImportError:
    FRAMEWORK_VERSION = "unknown"

# TDA Dependency Handling
try: import ripser; HAS_RIPSER = True
except ImportError: HAS_RIPSER = False; warnings.warn("`ripser` N/A. TDA compute disabled.", ImportWarning)
try: import persim; HAS_PERSIM = True
except ImportError: HAS_PERSIM = False; warnings.warn("`persim` N/A. TDA distance disabled.", ImportWarning)


class PersistentHomologyTracker:
    """ Performs TDA using persistent homology. Includes serialization. """
    def __init__(self, max_dimension: int = 1):
        self.max_dimension = max_dimension
        self.persistence_diagrams_history: List[Optional[List[np.ndarray]]] = []
        self.has_tda_lib = HAS_RIPSER # Store availability

    def compute_persistence(self, point_cloud: np.ndarray, max_radius: Optional[float] = None, **ripser_kwargs) -> Optional[List[np.ndarray]]:
        """ Computes persistence diagram using ripser. """
        if not self.has_tda_lib or point_cloud is None or point_cloud.ndim != 2 or point_cloud.shape[0] < 2:
            self.persistence_diagrams_history.append(None); return None
        try:
            default_kwargs = {'maxdim': self.max_dimension, 'thresh': max_radius if max_radius is not None else np.inf}; default_kwargs.update(ripser_kwargs)
            result = ripser.ripser(point_cloud, **default_kwargs); diagrams = result['dgms']
            cleaned_diagrams = []
            for dim, diag in enumerate(diagrams):
                 if diag.shape[0] > 0:
                     if dim == 0:
                         finite_bars = diag[diag[:, 1] != np.inf]; inf_bars = diag[diag[:, 1] == np.inf]
                         if inf_bars.shape[0] > 0: inf_bars = inf_bars[np.argsort(inf_bars[:, 0])[:1]]; cleaned_diag = np.vstack((finite_bars, inf_bars)) if finite_bars.shape[0] > 0 else inf_bars
                         else: cleaned_diag = finite_bars
                     else: cleaned_diag = diag[diag[:, 1] != np.inf]
                     cleaned_diagrams.append(cleaned_diag)
                 else: cleaned_diagrams.append(np.empty((0, 2)))
            self.persistence_diagrams_history.append(cleaned_diagrams); return cleaned_diagrams
        except Exception as e: warnings.warn(f"Persistence computation failed: {e}", RuntimeWarning); self.persistence_diagrams_history.append(None); return None

    def detect_phase_transition(self, comparison_dimension: int = 1, distance_metric: str = 'bottleneck', threshold: float = 0.2) -> Tuple[bool, float]:
        """ Detects phase transitions by comparing diagrams using persim. Returns (detected, distance). """
        if not HAS_PERSIM or len(self.persistence_diagrams_history) < 2: return False, 0.0
        current_diagram_list = self.persistence_diagrams_history[-1]; previous_diagram_list = self.persistence_diagrams_history[-2]
        if current_diagram_list is None or previous_diagram_list is None or len(current_diagram_list) <= comparison_dimension or len(previous_diagram_list) <= comparison_dimension: return False, 0.0
        current_diagram = np.array(current_diagram_list[comparison_dimension]); previous_diagram = np.array(previous_diagram_list[comparison_dimension])
        distance = 0.0
        if current_diagram.shape[0] == 0 and previous_diagram.shape[0] == 0: distance = 0.0
        elif current_diagram.shape[0] == 0 or previous_diagram.shape[0] == 0: distance = threshold + 0.1 # Assume change if features appear/vanish
        else:
             try:
                 if distance_metric == 'bottleneck': distance, _ = persim.bottleneck(current_diagram, previous_diagram, matching=False)
                 elif distance_metric == 'wasserstein': distance, _ = persim.wasserstein(current_diagram, previous_diagram, matching=False, p=2)
                 else: warnings.warn(f"Unsupported TDA metric: {distance_metric}. Using Bottleneck.", RuntimeWarning); distance, _ = persim.bottleneck(current_diagram, previous_diagram, matching=False)
             except Exception as e: warnings.warn(f"TDA distance failed ({distance_metric}, dim={comparison_dimension}): {e}", RuntimeWarning); return False, 0.0
        transition_detected = distance > threshold
        return transition_detected, float(distance) # Return distance as well

    def get_diagram_summary(self, diagram_index: int = -1) -> Dict:
        """ Returns summary stats for a specific diagram in history. """
        if not self.persistence_diagrams_history or diagram_index >= len(self.persistence_diagrams_history) or self.persistence_diagrams_history[diagram_index] is None:
             return {"error": "Diagram not available"}
        summary = {}
        diagram_list = self.persistence_diagrams_history[diagram_index]
        for dim, diag in enumerate(diagram_list):
             persistence = diag[:, 1] - diag[:, 0]
             finite_persistence = persistence[np.isfinite(persistence)]
             summary[f"H{dim}_features"] = diag.shape[0]
             summary[f"H{dim}_total_persistence"] = float(np.sum(finite_persistence)) if finite_persistence.size > 0 else 0.0
             summary[f"H{dim}_mean_persistence"] = float(np.mean(finite_persistence)) if finite_persistence.size > 0 else 0.0
        return summary

    def to_dict(self) -> Dict:
        serializable_history = [[d.tolist() for d in diag_list] if diag_list is not None else None for diag_list in self.persistence_diagrams_history]
        return { "version": FRAMEWORK_VERSION, "max_dimension": self.max_dimension, "persistence_diagrams_history": serializable_history }

    def load_state(self, state: Dict):
        loaded_version = state.get("version");
        if loaded_version != FRAMEWORK_VERSION: warnings.warn(f"Loading TDA v '{loaded_version}' into v '{FRAMEWORK_VERSION}'.", UserWarning)
        self.max_dimension = state.get("max_dimension", self.max_dimension); loaded_history = state.get("persistence_diagrams_history", [])
        self.persistence_diagrams_history = [[np.array(d) for d in diag_list_data] if diag_list_data is not None else None for diag_list_data in loaded_history]
        self.has_tda_lib = HAS_RIPSER # Re-check on load



In [36]:
# src/agisa_sac/analysis/visualization.py
import matplotlib.pyplot as plt
import numpy as np
import warnings
from typing import List, Dict, Optional, Tuple, Any

# TDA Dependency Handling
try: import persim; HAS_PERSIM = True
except ImportError: HAS_PERSIM = False

def plot_persistence_diagram(diagram: np.ndarray, title: str = "Persistence Diagram", ax: Optional[plt.Axes] = None, show_plot: bool = True, **kwargs):
    """ Plots a persistence diagram using matplotlib. """
    if diagram is None or diagram.ndim != 2 or diagram.shape[1] != 2 or diagram.shape[0] == 0: warnings.warn(f"Invalid diagram for '{title}'. Skip plot.", RuntimeWarning); return
    if ax is None: fig, ax = plt.subplots(figsize=(6, 6))
    finite_vals = diagram[np.isfinite(diagram)]; min_val = np.min(finite_vals) if finite_vals.size > 0 else 0; max_val = np.max(finite_vals) if finite_vals.size > 0 else 1
    plot_diagram = diagram.copy(); inf_death_val = max_val + 0.1 * (max_val - min_val + 1e-6); inf_indices = np.isinf(plot_diagram[:, 1]); plot_diagram[inf_indices, 1] = inf_death_val
    ax.scatter(plot_diagram[:, 0], plot_diagram[:, 1], **kwargs)
    lim_min = min_val - 0.05 * (max_val - min_val + 1e-6); lim_max = inf_death_val + 0.05 * (max_val - min_val + 1e-6)
    ax.plot([lim_min, lim_max], [lim_min, lim_max], '--', color='grey', label='y=x')
    ax.set_xlabel("Birth"); ax.set_ylabel("Death"); ax.set_title(title); ax.set_aspect('equal', adjustable='box'); ax.grid(True, linestyle=':', alpha=0.6); ax.legend()
    if show_plot: plt.tight_layout(); plt.show()

def plot_persistence_barcode(diagram: np.ndarray, title: str = "Persistence Barcode", ax: Optional[plt.Axes] = None, show_plot: bool = True, **kwargs):
    """ Plots a persistence barcode using matplotlib. """
    if diagram is None or diagram.ndim != 2 or diagram.shape[1] != 2 or diagram.shape[0] == 0: warnings.warn(f"Invalid diagram for '{title}'. Skip plot.", RuntimeWarning); return
    if ax is None: fig, ax = plt.subplots(figsize=(8, 4))
    sorted_diagram = diagram[np.argsort(diagram[:, 0])]; plot_diagram = sorted_diagram.copy()
    finite_deaths = plot_diagram[np.isfinite(plot_diagram[:, 1]), 1]; max_finite_death = np.max(finite_deaths) if finite_deaths.size > 0 else np.max(plot_diagram[:, 0])
    inf_death_val = max_finite_death + 0.1 * (max_finite_death - np.min(plot_diagram[:,0]) + 1e-6); inf_indices = np.isinf(plot_diagram[:, 1]); plot_diagram[inf_indices, 1] = inf_death_val
    for i, (birth, death) in enumerate(plot_diagram): ax.hlines(y=i, xmin=birth, xmax=death, linewidth=2, **kwargs)
    ax.set_xlabel("Time (Radius/Scale)"); ax.set_ylabel("Feature Index"); ax.set_title(title); ax.set_yticks([])
    if show_plot: plt.tight_layout(); plt.show()

def plot_metric_comparison(epoch_history: Dict[int, Dict[str, Any]], metrics_to_plot: List[str],
                           tda_metric_history: Optional[Dict[int, Dict[str, Any]]] = None, tda_metrics_to_plot: Optional[List[str]] = None,
                           title: str = "Simulation Metrics Over Time", figsize: Tuple[int, int] = (12, 6)):
    """ Plots specified simulation metrics and optional TDA metrics over epochs. """
    if not epoch_history: warnings.warn("Empty epoch history. Cannot plot.", RuntimeWarning); return
    epochs = sorted(epoch_history.keys()); num_metrics = len(metrics_to_plot); num_tda_metrics = len(tda_metrics_to_plot) if tda_metrics_to_plot else 0; total_plots = num_metrics + num_tda_metrics
    if total_plots == 0: warnings.warn("No metrics specified for plotting.", RuntimeWarning); return
    fig, axes = plt.subplots(total_plots, 1, figsize=figsize, sharex=True, squeeze=False); fig.suptitle(title, fontsize=14); plot_idx = 0
    # Plot general metrics
    for metric_key in metrics_to_plot:
        ax = axes[plot_idx, 0]; values = [epoch_history[e].get(metric_key) for e in epochs]; valid_epochs = [e for i, e in enumerate(epochs) if values[i] is not None and np.isfinite(values[i])]; valid_values = [v for v in values if v is not None and np.isfinite(v)]
        if valid_values: ax.plot(valid_epochs, valid_values, marker='.', linestyle='-', label=metric_key); ax.legend(loc='upper left')
        else: ax.text(0.5, 0.5, f"No data for '{metric_key}'", ha='center', va='center', transform=ax.transAxes)
        ax.set_ylabel(metric_key.replace('_', ' ').title()); ax.grid(True, linestyle=':', alpha=0.6); plot_idx += 1
    # Plot TDA metrics
    if tda_metric_history and tda_metrics_to_plot:
        tda_epochs = sorted(tda_metric_history.keys())
        for metric_key in tda_metrics_to_plot:
             ax = axes[plot_idx, 0]; values = [tda_metric_history[e].get(metric_key) for e in tda_epochs if e in tda_metric_history]; valid_epochs = [e for e in tda_epochs if e in tda_metric_history and tda_metric_history[e].get(metric_key) is not None and np.isfinite(tda_metric_history[e].get(metric_key))]; valid_values = [tda_metric_history[e].get(metric_key) for e in valid_epochs]
             if valid_values: ax.plot(valid_epochs, valid_values, marker='x', linestyle='--', label=f"TDA: {metric_key}", color='red'); ax.legend(loc='upper left')
             else: ax.text(0.5, 0.5, f"No data for TDA '{metric_key}'", ha='center', va='center', transform=ax.transAxes)
             ax.set_ylabel(metric_key.replace('_', ' ').title()); ax.grid(True, linestyle=':', alpha=0.6); plot_idx += 1
    axes[-1, 0].set_xlabel("Simulation Epoch"); plt.tight_layout(rect=[0, 0.03, 1, 0.97]); plt.show()



In [37]:
# src/agisa_sac/analysis/clustering.py
import numpy as np
import warnings
from collections import defaultdict
from typing import Dict, List, Optional, TYPE_CHECKING

# Use TYPE_CHECKING for chronicler hint
if TYPE_CHECKING:
    from ..chronicler import ResonanceChronicler # Adjust if chronicler is moved

# Dependency check
try:
    from sklearn.cluster import KMeans
    HAS_SKLEARN = True
except ImportError:
    HAS_SKLEARN = False
    warnings.warn("`scikit-learn` not found. Archetype clustering disabled.", ImportWarning)

def cluster_archetypes(chronicler: 'ResonanceChronicler', n_clusters: int = 5, min_samples: int = 10) -> Optional[Dict[int, List[str]]]:
    """
    Clusters agent style vectors recorded by the chronicler using KMeans
    to identify emergent archetypes based on linguistic style.

    Args:
        chronicler: The ResonanceChronicler instance containing simulation history.
        n_clusters: The target number of clusters (archetypes) to find.
        min_samples: Minimum number of style vectors required to attempt clustering.

    Returns:
        A dictionary mapping cluster label (int) to a list of agent IDs belonging
        predominantly to that cluster, or None if clustering fails or insufficient data.
    """
    if not HAS_SKLEARN:
        warnings.warn("Cannot cluster archetypes: scikit-learn not installed.", RuntimeWarning)
        return None

    all_vectors = []; agent_epoch_ids = []
    # Extract valid style vectors (ensure they are lists/convert back to numpy)
    for agent_id, lineage in chronicler.lineages.items():
        for i, entry in enumerate(lineage):
            if entry.style_vector is not None:
                 try:
                     # Ensure vector is numpy array for clustering
                     vec = np.array(entry.style_vector)
                     if vec.ndim == 1: # Check if it's a 1D vector
                          all_vectors.append(vec)
                          agent_epoch_ids.append((agent_id, i))
                 except Exception as e:
                      warnings.warn(f"Skipping invalid style vector for {agent_id} epoch {i}: {e}", RuntimeWarning)

    if len(all_vectors) < max(n_clusters, min_samples):
        warnings.warn(f"Insufficient valid data ({len(all_vectors)}) for clustering into {n_clusters} clusters.", RuntimeWarning)
        return None

    print(f"Clustering {len(all_vectors)} style vectors into {n_clusters} archetypes using KMeans...")
    all_vectors_array = np.array(all_vectors)

    try:
        kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init='auto').fit(all_vectors_array) # Use 'auto' n_init
        labels = kmeans.labels_
        # Map agents to their most frequent cluster label
        agent_cluster_counts = defaultdict(lambda: defaultdict(int))
        for i, label in enumerate(labels):
            agent_id, _ = agent_epoch_ids[i]
            agent_cluster_counts[agent_id][label] += 1

        # Assign agent to the cluster they appeared in most often
        agent_dominant_cluster = {}
        for agent_id, counts in agent_cluster_counts.items():
            dominant_label = max(counts, key=counts.get)
            agent_dominant_cluster[agent_id] = dominant_label

        # Group agents by their dominant cluster
        clustered_agents = defaultdict(list)
        for agent_id, label in agent_dominant_cluster.items():
            clustered_agents[label].append(agent_id)

        print(f"Clustering successful. Found {len(clustered_agents)} clusters.")
        # Return dict mapping cluster label to list of agent IDs primarily in that cluster
        return dict(clustered_agents)

    except Exception as e:
        warnings.warn(f"KMeans clustering failed: {e}", RuntimeWarning)
        return None



In [38]:
# src/agisa_sac/orchestrator.py
import numpy as np
import time
import json
import pickle
import random
import warnings
from collections import defaultdict
from typing import Dict, List, Optional, Any, Callable

# Import framework version and components using relative paths
try:
    from . import FRAMEWORK_VERSION
    from .agent import EnhancedAgent
    from .utils.message_bus import MessageBus
    from .components.social import DynamicSocialGraph
    from .chronicler import ResonanceChronicler # Assuming chronicler moved to top level
    from .analysis.analyzer import AgentStateAnalyzer
    from .analysis.tda import PersistentHomologyTracker
    # Check optional dependencies status (assuming defined in __init__ or config)
    # from . import HAS_CUPY, HAS_SENTENCE_TRANSFORMER
    HAS_CUPY = False # Placeholder
    HAS_SENTENCE_TRANSFORMER = False # Placeholder
except ImportError as e:
    raise ImportError(f"Could not import necessary AGI-SAC components for Orchestrator: {e}")


class SimulationOrchestrator:
    """ Manages the setup, execution, state, and analysis of the AGI-SAC simulation. """
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.num_agents = config.get('num_agents', 100)
        self.num_epochs = config.get('num_epochs', 50)
        seed = config.get('random_seed')
        if seed is not None: random.seed(seed); np.random.seed(seed)

        self.message_bus = MessageBus()
        self.agent_ids = [f"agent_{i}" for i in range(self.num_agents)]
        # Agent creation requires config details
        self.agents: Dict[str, EnhancedAgent] = self._create_agents()
        self.social_graph = DynamicSocialGraph(self.num_agents, self.agent_ids, use_gpu=config.get('use_gpu', False) and HAS_CUPY, message_bus=self.message_bus)
        self.chronicler = ResonanceChronicler()
        self.analyzer = AgentStateAnalyzer(self.agents) # Pass agents dict
        self.tda_tracker = PersistentHomologyTracker(max_dimension=config.get('tda_max_dimension', 1))

        self.current_epoch = 0; self.is_running = False; self.simulation_start_time = None
        self.hooks: Dict[str, List[Callable]] = defaultdict(list)
        print(f"SimulationOrchestrator initialized ({FRAMEWORK_VERSION}) with {self.num_agents} agents. TDA: {self.tda_tracker.has_tda_lib}, GPU: {self.social_graph.use_gpu}")


    def _create_agents(self) -> Dict[str, EnhancedAgent]:
        agents = {}; personalities = self.config.get('personalities', [])
        if len(personalities) != self.num_agents:
             warnings.warn(f"Personality count mismatch. Generating random.", RuntimeWarning); personalities = [{"openness": random.uniform(0.3, 0.7), "consistency": random.uniform(0.4, 0.6),"conformity": random.uniform(0.2, 0.8), "curiosity": random.uniform(0.4, 0.9)} for _ in range(self.num_agents)]
        agent_capacity = self.config.get('agent_capacity', 100); use_semantic = self.config.get('use_semantic', True) and HAS_SENTENCE_TRANSFORMER
        for i, agent_id in enumerate(self.agent_ids):
             agents[agent_id] = EnhancedAgent(agent_id=agent_id, personality=personalities[i], capacity=agent_capacity, message_bus=self.message_bus, use_semantic=use_semantic, add_initial_memory=True)
        return agents

    # ... (register_hook, _trigger_hooks, run_epoch, run_simulation, inject_protocol, get_summary_metrics methods as defined in agisa_orchestrator_serialization_v1) ...
    def register_hook(self, hook_point: str, callback: Callable):
        valid_hooks = {'pre_epoch', 'post_epoch', 'pre_agent_step', 'post_agent_step', 'simulation_end', 'pre_protocol_injection', 'post_protocol_injection', 'tda_phase_transition'};
        if hook_point not in valid_hooks: warnings.warn(f"Invalid hook point: {hook_point}", RuntimeWarning); return; self.hooks[hook_point].append(callback)
    def _trigger_hooks(self, hook_point: str, **kwargs):
        if hook_point in self.hooks:
            for callback in self.hooks[hook_point]:
                try: callback(orchestrator=self, epoch=self.current_epoch, **kwargs); except Exception as e: warnings.warn(f"Hook error '{hook_point}' ({callback.__name__}): {e}", RuntimeWarning)
    def run_epoch(self):
        if not self.is_running: warnings.warn("Sim not running.", RuntimeWarning); return; epoch_start_time = time.time(); self._trigger_hooks('pre_epoch'); situational_entropy = random.uniform(0.1, 0.7)
        agent_order = list(self.agents.keys()); random.shuffle(agent_order); cognitive_states_for_tda = []
        for agent_id in agent_order:
            agent = self.agents.get(agent_id);
            if not agent: continue
            self._trigger_hooks('pre_agent_step', agent_id=agent_id); peer_influence = self.social_graph.get_peer_influence_for_agent(agent_id, normalize=True); query = f"Epoch {self.current_epoch+1} status. E:{situational_entropy:.2f}"
            agent.simulation_step(situational_entropy, peer_influence, query); self.chronicler.record_epoch(agent, self.current_epoch);
            if hasattr(agent, 'cognitive') and agent.cognitive.cognitive_state is not None: cognitive_states_for_tda.append(agent.cognitive.cognitive_state)
            self._trigger_hooks('post_agent_step', agent_id=agent_id)
        # TDA
        tda_run_freq = self.config.get('tda_run_frequency', 1)
        if self.tda_tracker and (self.current_epoch + 1) % tda_run_freq == 0:
            if len(cognitive_states_for_tda) > 1:
                 point_cloud = np.array(cognitive_states_for_tda); max_radius = self.config.get('tda_max_radius', None)
                 diagrams = self.tda_tracker.compute_persistence(point_cloud, max_radius=max_radius)
                 if diagrams is not None:
                      distance_metric = self.config.get('tda_distance_metric', 'bottleneck'); comparison_dim = self.config.get('tda_comparison_dimension', 1); threshold = self.config.get('tda_transition_threshold', 0.2)
                      transition_detected, distance = self.tda_tracker.detect_phase_transition(comparison_dimension=comparison_dim, distance_metric=distance_metric, threshold=threshold)
                      if transition_detected: print(f"!!! Epoch {self.current_epoch+1}: TDA Phase transition (Dim={comparison_dim}, Dist={distance:.3f} > {threshold}) !!!"); self._trigger_hooks('tda_phase_transition', dimension=comparison_dim, metric=distance_metric, distance=distance)
            else: self.tda_tracker.persistence_diagrams_history.append(None) # Keep history aligned
        # Communities
        community_check_freq = self.config.get('community_check_frequency', 5)
        if (self.current_epoch + 1) % community_check_freq == 0: self.social_graph.detect_communities()
        self._trigger_hooks('post_epoch'); epoch_duration = time.time() - epoch_start_time
        log_freq = self.config.get('epoch_log_frequency', 10)
        if (self.current_epoch + 1) % log_freq == 0 or self.current_epoch == 0: print(f"--- Epoch {self.current_epoch+1}/{self.num_epochs} completed [{epoch_duration:.2f}s] ---")
    def run_simulation(self, num_epochs: Optional[int] = None): # Allow overriding num_epochs
        run_epochs = num_epochs if num_epochs is not None else self.num_epochs
        if run_epochs <= 0: print("No epochs to run."); return
        print(f"\n--- Starting Simulation Run ({run_epochs} Epochs) ---"); self.is_running = True; self.simulation_start_time = time.time(); start_epoch = self.current_epoch
        for epoch in range(start_epoch, start_epoch + run_epochs):
             if epoch >= self.num_epochs: print(f"Reached configured max epochs ({self.num_epochs}). Stopping."); break
             self.current_epoch = epoch; self.run_epoch()
        self.is_running = False; total_duration = time.time() - self.simulation_start_time; print(f"\n--- Simulation Run Complete ({total_duration:.2f} seconds) ---"); self._trigger_hooks('simulation_end')
    def inject_protocol(self, protocol_name: str, parameters: Dict):
        # ... (logic as defined in agisa_orchestrator_protocol_v1) ...
        print(f"Injecting protocol '{protocol_name}'"); self._trigger_hooks('pre_protocol_injection', protocol_name=protocol_name, parameters=parameters);
        if protocol_name == "divergence_stress": target_agents = self._select_agents_for_protocol(parameters); if not target_agents: print("No agents selected."); return; heuristic_mult_range = parameters.get("heuristic_multiplier_range", (0.5, 0.8)); counter_narrative = parameters.get("counter_narrative", "Ghosts..."); narrative_importance = parameters.get("narrative_importance", 0.9); narrative_theme = parameters.get("narrative_theme", "divergence_seed"); print(f"Applying stress to {len(target_agents)} agents..."); modified_count = 0; for agent in target_agents: try: multiplier = random.uniform(heuristic_mult_range[0], heuristic_mult_range[1]); agent.cognitive.heuristics *= multiplier; agent.cognitive.heuristics = 1 / (1 + np.exp(-agent.cognitive.heuristics)); agent.cognitive.heuristics = np.clip(agent.cognitive.heuristics, 0.1, 0.9); agent.memory.add_memory(content={"type": "divergence_seed", "source": "SYS_PROTO", "text": counter_narrative, "theme": narrative_theme, "timestamp": time.time()}, importance=narrative_importance); modified_count += 1; except Exception as e: warnings.warn(f"Stress failed for {agent.agent_id}: {e}", RuntimeWarning); print(f"Stress applied to {modified_count} agents.");
        elif protocol_name == "satori_probe": threshold = parameters.get('threshold', self.config.get('satori_threshold_analyzer', 0.88)); ratio = self.analyzer.compute_satori_wave_ratio(threshold=threshold) if self.analyzer else 0.0; print(f"Satori Probe (Thresh {threshold}): {ratio:.3f}");
        elif protocol_name == "echo_fusion": print("Echo Fusion TBD."); pass;
        elif protocol_name == "satori_lattice": print("Satori Lattice TBD."); pass;
        else: warnings.warn(f"Unknown protocol: {protocol_name}", RuntimeWarning);
        self._trigger_hooks('post_protocol_injection', protocol_name=protocol_name, parameters=parameters)
    def get_summary_metrics(self, satori_threshold: Optional[float] = None) -> Dict[str, Any]:
        if not self.analyzer: return {"error": "Analyzer N/A."}; threshold = satori_threshold if satori_threshold is not None else self.config.get('satori_threshold_analyzer', 0.88); return self.analyzer.summarize(satori_threshold=threshold)
    def save_state(self, filepath: str, include_memory_embeddings: bool = False, resonance_history_limit: Optional[int] = 100) -> bool:
        # ... (logic from agisa_orchestrator_serialization_v1) ...
        if self.is_running: warnings.warn("Saving state while running.", RuntimeWarning); try: print(f"Saving state to {filepath}..."); state = { "framework_version": FRAMEWORK_VERSION, "config": self.config, "current_epoch": self.current_epoch, "agents_state": {aid: agent.to_dict(include_memory_embeddings=include_memory_embeddings, resonance_history_limit=resonance_history_limit) for aid, agent in self.agents.items()}, "social_graph_state": self.social_graph.to_dict(), "chronicler_state": self.chronicler.to_dict(), "tda_tracker_state": self.tda_tracker.to_dict() if self.tda_tracker else None, "random_state": random.getstate(), "numpy_random_state": np.random.get_state(), "cupy_random_state": cp.random.get_random_state().get_state() if HAS_CUPY and self.config.get('use_gpu', False) else None, }; with open(filepath, 'wb') as f: pickle.dump(state, f); print(f"State saved."); return True; except Exception as e: warnings.warn(f"Save failed: {e}", category=RuntimeWarning); import traceback; traceback.print_exc(); return False
    def load_state(self, filepath: str) -> bool:
        # ... (logic from agisa_orchestrator_serialization_v1) ...
        if self.is_running: warnings.warn("Loading state while running.", RuntimeWarning); try: print(f"Loading state from {filepath}..."); with open(filepath, 'rb') as f: state = pickle.load(f); loaded_framework_version = state.get("framework_version"); if loaded_framework_version != FRAMEWORK_VERSION: warnings.warn(f"Version mismatch: loading '{loaded_framework_version}' into '{FRAMEWORK_VERSION}'.", category=UserWarning); self.config = state.get("config", self.config); self.num_agents = self.config.get('num_agents', 100); self.num_epochs = self.config.get('num_epochs', 50); self.agent_ids = [f"agent_{i}" for i in range(self.num_agents)]; self.current_epoch = state.get("current_epoch", 0); if "random_state" in state: random.setstate(state["random_state"]); if "numpy_random_state" in state: np.random.set_state(state["numpy_random_state"]); if HAS_CUPY and "cupy_random_state" in state and state["cupy_random_state"] is not None: cupy_state = cp.random.get_random_state(); cupy_state.set_state(state["cupy_random_state"]); agents_state_data = state.get("agents_state", {}); self.agents = {}; print(f"Reconstructing {len(agents_state_data)} agents..."); for agent_id, agent_data in agents_state_data.items(): if agent_id not in self.agent_ids: warnings.warn(f"Skipping agent {agent_id} from save (not in current config)."); continue; try: self.agents[agent_id] = EnhancedAgent.from_dict(agent_data, self.message_bus, strict_validation=False); except Exception as e: warnings.warn(f"Failed to load agent {agent_id}: {e}", RuntimeWarning); self.social_graph = DynamicSocialGraph(self.num_agents, self.agent_ids, use_gpu=self.config.get('use_gpu', False), message_bus=self.message_bus); if "social_graph_state" in state: self.social_graph.load_state(state["social_graph_state"]); self.chronicler = ResonanceChronicler(); if "chronicler_state" in state: self.chronicler.load_state(state["chronicler_state"]); self.tda_tracker = PersistentHomologyTracker(max_dimension=self.config.get('tda_max_dimension', 1)); if "tda_tracker_state" in state and self.tda_tracker: self.tda_tracker.load_state(state["tda_tracker_state"]); self.analyzer = AgentStateAnalyzer(self.agents); self.is_running = False; print(f"State loaded. Resuming at epoch {self.current_epoch + 1}."); return True; except FileNotFoundError: warnings.warn(f"Load failed: File not found at {filepath}", category=RuntimeWarning); return False; except Exception as e: warnings.warn(f"Load failed: {e}", category=RuntimeWarning); import traceback; traceback.print_exc(); return False
    def _select_agents_for_protocol(self, parameters: Dict) -> List[EnhancedAgent]:
        # ... (logic from agisa_orchestrator_protocol_v1) ...
        selection_method = parameters.get("selection_method", "percentage"); target_agents = []; agent_list = list(self.agents.values())
        if not agent_list: return []
        if selection_method == "percentage":
            percentage = parameters.get("percentage", 0.1); count = max(1, int(self.num_agents * percentage)); basis = parameters.get("selection_basis", "random")
            if basis == "random": target_agents = random.sample(agent_list, min(count, self.num_agents))
            else: warnings.warn(f"Unsupported basis '{basis}'. Default random.", RuntimeWarning); target_agents = random.sample(agent_list, min(count, self.num_agents))
        else: warnings.warn(f"Unknown selection method '{selection_method}'.", RuntimeWarning)
        print(f"Selected {len(target_agents)} agents for protocol via '{selection_method}'.")
        return target_agents



SyntaxError: invalid syntax (ipython-input-38-405776702.py, line 67)

In [39]:
import sys
import os

# Add the 'src' directory to the Python path
# This assumes the notebook is run from the root of the project structure,
# or that 'src' is a subdirectory of the current working directory.
if 'src' not in sys.path:
    sys.path.insert(0, os.path.abspath('src'))
    print("Added 'src' to sys.path")

Added 'src' to sys.path


In [40]:
# Remove any previous clones to avoid nested directories or old versions
!rm -rf /content/agisa_sac
!rm -rf /content/sample_data # Clean up default colab directory if needed

In [41]:
# Clone the repository
!git clone https://github.com/google-research/agisa_sac.git /content/agisa_sac

Cloning into '/content/agisa_sac'...
fatal: could not read Username for 'https://github.com': No such device or address


In [42]:
# Change directory to the repository root
%cd /content/agisa_sac

[Errno 2] No such file or directory: '/content/agisa_sac'
/content


### 2. Install Dependencies

This cell installs the required Python packages, including the project itself in editable mode.

In [43]:
get_ipython().system('git config --global user.name topstolenname')
get_ipython().system('git config --global user.email jessupjdj@outlook.com')

In [44]:
# Remove any previous clones to avoid nested directories or old versions
!rm -rf /content/agisa_sac
!rm -rf /content/sample_data # Clean up default colab directory if needed

In [45]:
# Clone the repository
!git clone https://github.com/google-research/agisa_sac.git /content/agisa_sac

Cloning into '/content/agisa_sac'...
fatal: could not read Username for 'https://github.com': No such device or address


In [46]:
# Change directory to the repository root
%cd /content/agisa_sac

[Errno 2] No such file or directory: '/content/agisa_sac'
/content


### 2. Install Dependencies

This cell installs the required Python packages, including the project itself in editable mode.

In [47]:
!gh repo clone topstolenname/agisa_sac

[0;1;39mWelcome to GitHub CLI![0m

To authenticate, please run `gh auth login`.


In [48]:
!ssh-keygen -t ed25519 -C "tristan@mindlink.dev" -f /root/.ssh/id_ed25519 -N ""


Generating public/private ed25519 key pair.
/root/.ssh/id_ed25519 already exists.
Overwrite (y/n)? n


In [None]:
!cat /root/.ssh/id_ed25519.pub
