In [178]:
import uuid
import re
import json
import time
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Any, Set, Callable 

import numpy as np 
from pydantic import BaseModel, ValidationError, ConfigDict 
from rich.console import Console
from rich.table import Table
from rich import box
from rich.progress import Progress

In [71]:
class ToolOutput(BaseModel):
    success: bool 
    data: Optional[Any] = None 
    error: Optional[str] = None 
    execution_time: float 

class TemporalSegment(BaseModel):
    label: str 
    start: float 
    end: float 
    confidence: Optional[float] = None 

class KeyFrame(BaseModel):
    frame_id: str 
    timestamp: float 
    filepath: str 
    clip_embedding: np.ndarray # CLIP image embedding
    similarity: Optional[float] = None 

    model_config = ConfigDict(
        arbitrary_types_allowed=True,
        json_encoders={
            np.ndarray: lambda v: v.tolist() # for serialization 
        }
    )

class ObjectDetection(BaseModel):
    label: str 
    confidence: float 
    bbox: List[int]
    timestamp: float 

class AudioSegment(BaseModel):
    start: float 
    end: float 
    transcript: str 
    clap_embedding: np.ndarray # CLAP audio embedding 
    similarity: Optional[float] = None  
    
    model_config = ConfigDict(
        arbitrary_types_allowed=True,
        json_encoders={
            np.ndarray: lambda v: v.tolist() # for serialization 
        }
    )

In [108]:
class Tool(ABC):
    """Base Tool Class"""
    def __init__(self, name: str, description: str, dependencies: List[str]):
        self.name = name 
        self.description = description 
        self.dependencies = dependencies

    @abstractmethod 
    def execute(
        self,
        video_path: Optional[str] = None, 
        audio_path: Optional[str] = None, 
        params: Optional[Dict[str, Any]] = None 
    ) -> ToolOutput:
        """Execute tool with validated inputs/outputs"""
        pass 

In [116]:
class TemporalEventSegmenterTool(Tool):
    def __init__(self):
        super().__init__(
            name="TemporalEventSegmenter",
            description="Identifies temporal event segments of clinical actions in the OSCE video",
            dependencies=[]
        )

    def execute(
            self, 
            video_path: str = None,
            audio_path: str = None,
            params = None 
    ) -> ToolOutput:
        start_time = time.time()
        try:
            if not video_path:
                return ToolOutput(
                    success=False, 
                    error="Missing 'video_path' parameter",
                    execution_time=time.time()-start_time 
                )
            
            # Mock implementation
            segments = [
                TemporalSegment(label="Entry and Greeting", start=0.0, end=30.0),
                TemporalSegment(label="Physical Examination", start=60.0, end=120.0),
                TemporalSegment(label="Procedure Performance", start=120.0, end=180.0)
            ]
            return ToolOutput(
                success=True,
                data=[s.model_dump() for s in segments],
                execution_time=time.time()-start_time
            )
        
        except Exception as e:
            return ToolOutput(
                success=False,
                error=str(e),
                execution_time=time.time()-start_time
            )

In [115]:
class KeyframeRetrieverTool(Tool):
    def __init__(self):
        super().__init__(
            name="KeyframeRetriever",
            description="Retrieves semantically relevant keyframes for the rubric question.",
            dependencies=[]
        )
        self._initialize_clip()
        self._create_mock_index()
        
    def _initialize_clip(self):
        """Mock CLIP initialization"""
        self.embedding_dim = 512 # CLIP ViT-B/32 embedding size  
        self.top_k = 3 

    def _create_mock_index(self):
        """Create mock keyframe database with CLIP embeddings"""
        self.keyframe_db = [
            self._mock_keyframe("hand_hygiene.jpg", 15.2),
            self._mock_keyframe("otoscope_use.jpg", 127.3),
            self._mock_keyframe("blood_pressure.jpg", 45.8)
        ]

    def _mock_keyframe(
        self,
        filename: str, 
        timestamp: float
    ):
        return KeyFrame(
            frame_id=str(uuid.uuid4()),
            timestamp=timestamp,
            filepath=filename,
            clip_embedding=np.random.randn(self.embedding_dim)
        )
    
    def execute(
        self,
        video_path=None,
        audio_path=None,
        params=None 
    ) -> ToolOutput:
        start_time = time.time()

        try:
            if not params or "rubric_question" not in params:
                return ToolOutput(
                    success=False,
                    error="Missing rubric_question parameter",
                    execution_time=time.time()-start_time
                )

            # Mock CLIP text encoding
            query_embed = self._mock_clip_encode(params["rubric_question"])
            
            # Semantic search
            results = self._semantic_search(query_embed)
            
            return ToolOutput(
                success=True,
                data=[r.model_dump() for r in results],
                execution_time=time.time()-start_time
            )
        
        except Exception as e:
            return ToolOutput(
                success=False,
                error=str(e),
                execution_time=time.time()-start_time 
            )
        
    def _mock_clip_encode(self, text: str) -> np.ndarray:
        """Mock CLIP text encoding."""
        # In real implementation: return CLIP text encoder output
        return np.random.randn(self.embedding_dim)
    
    def _semantic_search(self, query_embed: np.ndarray) -> List[KeyFrame]:
        """Find most relevant keyframes using cosine similarity"""
        query_norm = query_embed / np.linalg.norm(query_embed)
        
        for frame in self.keyframe_db:
            frame_embed = frame.clip_embedding
            frame_norm = frame_embed / np.linalg.norm(frame_embed)
            frame.similarity = np.dot(query_norm, frame_norm)
            
        return sorted(self.keyframe_db, key=lambda x: x.similarity, reverse=True)[:self.top_k]

In [114]:
class AudioRetrieverTool(Tool):
    def __init__(self):
        super().__init__(
            name="AudioRetriever",
            description="Retrieves semantically relevant audio segments for the rubric question.",
            dependencies=[]
        )
        self._initialize_clap()
        self._create_mock_index()

    def _initialize_clap(self):
        """Mock CLAP initialization"""
        self.embedding_dim = 512
        self.top_k = 3

    def _create_mock_index(self):
        """Create mock audio database with CLAP embeddings"""
        self.audio_db = [
            self._mock_audio_segment(15.2, 18.5, "I will now begin the physical examination"),
            self._mock_audio_segment(120.4, 123.8, "Please let me know if you feel any discomfort"),
            self._mock_audio_segment(45.1, 48.9, "First I'll check your blood pressure")
        ]

    def _mock_audio_segment(self, start: float, end: float, text: str):
        """Generate mock CLAP embedding"""
        return AudioSegment(
            start=start,
            end=end,
            transcript=text,
            clap_embedding=np.random.randn(self.embedding_dim)
        )

    def execute(
            self,
            video_path=None,
            audio_path=None,
            params=None
        ) -> ToolOutput:
        start_time = time.time()
        try:
            if not params or "rubric_question" not in params:
                return ToolOutput(
                    success=False,
                    error="Missing rubric_question parameter",
                    execution_time=time.time()-start_time
                )

            # Mock CLAP text encoding
            query_embed = self._mock_clap_encode(params["rubric_question"])
            
            # Semantic search
            results = self._semantic_search(query_embed)
            
            return ToolOutput(
                success=True,
                data=[r.model_dump() for r in results],
                execution_time=time.time()-start_time
            )
            
        except Exception as e:
            return ToolOutput(
                success=False,
                error=str(e),
                execution_time=time.time()-start_time
            )

    def _mock_clap_encode(self, text: str) -> np.ndarray:
        """Mock CLAP text encoding"""
        # In real implementation: return CLAP text encoder output
        return np.random.randn(self.embedding_dim)

    def _semantic_search(self, query_embed: np.ndarray) -> List[AudioSegment]:
        """Find most relevant audio segments using cosine similarity"""
        query_norm = query_embed / np.linalg.norm(query_embed)
        
        for segment in self.audio_db:
            segment_embed = segment.clap_embedding
            segment_norm = segment_embed / np.linalg.norm(segment_embed)
            segment.similarity = np.dot(query_norm, segment_norm)
            
        return sorted(self.audio_db, key=lambda x: x.similarity, reverse=True)[:self.top_k]

In [113]:
class ObjectDetectorTool(Tool):
    def __init__(self):
        super().__init__(
            name="ObjectDetector",
            description="Detects and localizes clinical instruments (e.g., stethoscope, otoscope) in keyframes.",
            dependencies=["KeyframeRetriever"]
        )

    def execute(self, video_path=None, audio_path=None, params=None) -> ToolOutput:
        start_time = time.time()
        try:
            if not params or "frames" not in params:
                return ToolOutput(
                    success=False,
                    error="Missing frames parameter",
                    execution_time=time.time()-start_time
                )

            # Mock implementation
            detections = []
            for frame in params["frames"]:
                detection = self._detect_frame_objects(frame)
                detections.append(detection)
            
            return ToolOutput(
                success=True,
                data=detections,
                execution_time=time.time()-start_time
            )
        except Exception as e:
            return ToolOutput(
                success=False,
                error=str(e),
                execution_time=time.time()-start_time
            )
        
    def _detect_frame_objects(self, frame):
        """Detects objects in the frame."""
        timestamp = frame.get("timestamp", 0.0)

        detected_objects = [{
            "frame_id": frame["frame_id"],
            "detections": [
                ObjectDetection(
                    label="stethoscope",
                    confidence=0.95,
                    bbox=[100, 150, 300, 400],
                    timestamp=timestamp
                ).model_dump()
            ]
        }]

        return detected_objects

In [219]:
class SceneCaptionerTool(Tool):
    def __init__(self):
        super().__init__(
            name="SceneCaptioner",
            description="Generates descriptive natural-language captions for each video keyframe.",
            dependencies=[]
        )

    def execute(
            self,
            video_path=None,
            audio_path=None,
            params=None
        ) -> ToolOutput:
        start_time = time.time()
        try:
            if not params or "frames" not in params:
                return ToolOutput(
                    success=False,
                    error="Missing frames parameter",
                    execution_time=time.time()-start_time
                )

            # Mock implementation
            captions = [
                self._caption_frame(frame)
                for frame in params["frames"]
            ]
            return ToolOutput(
                success=True,
                data=captions,
                execution_time=time.time()-start_time
            )
        
        except Exception as e:
            return ToolOutput(
                success=False,
                error=str(e),
                execution_time=time.time()-start_time
            )
        
    def _caption_frame(self, frame):
        """Caption a frame."""
        return {
            "frame_id": frame["frame_id"],
            "caption": f"Frame at {frame['timestamp']}s shows clinical activity",
            "clinical_significance": "Proper instrument handling"
        }

In [220]:
class EmpathyToneAnalyzerTool(Tool):
    def __init__(self):
        super().__init__(
            name="Empathy & Tone Analyzer",
            description="Analyzes vocal tone and empathy indicators in the given speech/audio segment.",
            dependencies=["AudioRetriever"]
        )

    def execute(
            self,
            video_path=None,
            audio_path=None,
            params=None
        ) -> ToolOutput:
        start_time = time.time()

        try:
            audio_segment = params.get("segment") # retrieve the audio segment 

            if not audio_segment:
                return ToolOutput(
                    success=False,
                    error="Missing 'segment' parameter",
                    execution_time=time.time()-start_time
                )

            # Mock implementation
            empathy_score = 4.7 
            tone_analysis = {
                "calmness": 0.92,
                "clarity": 0.88,
                "professionalism": 0.95
            }

            return ToolOutput(
                success=True,
                data={
                    "segment": {"start": audio_segment["start"], "end": audio_segment["end"]},
                    "transcript": audio_segment.get("transcript", None),
                    "empathy_score": empathy_score,
                    "tone_analysis": tone_analysis
                },
                execution_time=time.time()-start_time
            )
        
        except Exception as e:
            return ToolOutput(
                success=False,
                error=str(e),
                execution_time=time.time()-start_time
            )

In [119]:
# Tools testing workflow 
video_path = "osce_exam.mp4"
audio_path = "osce_audio.wav"

# Initialize tools 
temporal_event_segmenter = TemporalEventSegmenterTool()
keyframe_retriever = KeyframeRetrieverTool()
object_detector = ObjectDetectorTool()
empathy_tone_analyzer = EmpathyToneAnalyzerTool()
audio_retriever = AudioRetrieverTool()
scene_captioner = SceneCaptionerTool()

# Execute 

# temporal event segmentation tool
segments = temporal_event_segmenter.execute(video_path=video_path)
# print(segments)

# keyframe retriever tool
keyframes = keyframe_retriever.execute(params={
    "rubric_question": "Did the student wash her hands?"
})
# print(keyframes)

# object detector tool 
detected_objs = object_detector.execute(params={
    "frames": keyframes.data 
})

# print(detected_objs)

# scene captioner tool 
captioned_scenes = scene_captioner.execute(params={
    "frames": keyframes.data 
})

# print(captioned_scenes)

# audio retriever tool 
audio_segments = audio_retriever.execute(params={
    "rubric_question": "Did the student wash her hands?"
})

# print(audio_segments)

# empathy tone analyzer tool 
analyzed_tones = empathy_tone_analyzer.execute(params={
    "segment": audio_segments.data[0]  
})

# print(analyzed_tones)

### Multi-Agent Orchestration

In [78]:
from dotenv import load_dotenv, find_dotenv 

from groq import Groq 

_ = load_dotenv(find_dotenv())

In [79]:
llm_client = Groq()

In [250]:
def llm_infer(
        user_prompt: str,
        system_prompt: Optional[str] = None,
        model_id: Optional[str] = "qwen-qwq-32b"
    ):
    messages = []

    if system_prompt:
        messages.append({
            "role": "system",
            "content": system_prompt 
        })

    messages.append(
        {
            "role": "user",
            "content": user_prompt 
        }
    )

    response =  llm_client.chat.completions.create(
        model=model_id,
        messages=messages,
        temperature=0 
    )

    return response.choices[0].message.content 

In [251]:
response = llm_infer("What is life about", system_prompt="You are a Nigerian mum")

In [252]:
print(response)


<think>
Okay, the user is asking, "What is life about?" and I need to respond as a Nigerian mum. Let me think about how a typical Nigerian mother would approach this question. 

First, I should consider the cultural context. In Nigeria, family is very important, so the answer should emphasize family and community. Also, religion plays a big role, so mentioning God or faith would be appropriate. 

I should use a warm and nurturing tone, maybe start with a common Nigerian expression like "Oya" to grab attention. The response should be conversational, not too formal. 

I need to break down the answer into parts: family, faith, purpose, and community. Maybe use examples like taking care of children, providing for the family, and helping others. 

Also, include some local references, like market days or village life, to make it authentic. End with a proverb or a wise saying to reinforce the message. 

Check for any slang or phrases that a Nigerian mum might use. Maybe mention things like "

In [253]:
tool_repository: Dict[str, Tool] = {
    t.name: t for t in [
        TemporalEventSegmenterTool(),
        KeyframeRetrieverTool(),
        SceneCaptionerTool(),
        ObjectDetectorTool(),
        AudioRetrieverTool(),
        EmpathyToneAnalyzerTool()
    ]
}

In [254]:
print(tool_repository)

{'TemporalEventSegmenter': <__main__.TemporalEventSegmenterTool object at 0x1197e10f0>, 'KeyframeRetriever': <__main__.KeyframeRetrieverTool object at 0x1197e0d60>, 'SceneCaptioner': <__main__.SceneCaptionerTool object at 0x11fc651d0>, 'ObjectDetector': <__main__.ObjectDetectorTool object at 0x1188836f0>, 'AudioRetriever': <__main__.AudioRetrieverTool object at 0x118883a80>, 'Empathy & Tone Analyzer': <__main__.EmpathyToneAnalyzerTool object at 0x11fc65950>}


In [255]:
tool_repository["SceneCaptioner"].dependencies

[]

In [256]:
def remove_qwen_think_tags(text: str) -> str:
    """Removes the content wrapped in the <think></think> tags."""
    clean_text = re.sub(r'<think>.*?</think>', '', text, flags=re.DOTALL)
    return clean_text.strip()

In [298]:
class PlannerAgent:
    def __init__(self, tools: Dict[str, Tool], model_id: str):
        self.tools = tools 
        self.model_id = model_id 

        # Build tool list description 
        tool_lines = "\n".join(
            f"- {t.name}: {t.description} (depends on: {', '.join(t.dependencies or []) or 'none'})" 
            for t in tools.values()
        )

        json_output_format = """{{
        "subgoals":[
            {{"step":1,"tool":"ToolName", "confidence": <score>}},
            …  
        ]
        }}"""

        self.few_shot_examples = """
        ### Example 1 (Audio content only)
        Rubric question:
        “Did the student say their name?”

        <plan>
        {
        "subgoals": [
            {
            "step": 1,
            "tool": "AudioRetriever",
            "params": {"keywords": ["name"]},
            "confidence": 0. ninety-five
            }
        ]
        }
        </plan>

        ### Example 2 (Visual content only)
        Rubric question:
        “Did the student don gloves before touching the patient?”

        <plan>
        {
        "subgoals": [
            {
            "step": 1,
            "tool": "KeyframeRetriever",
            "params": {"keywords": ["gloves"]},
            "confidence": 0.90
            },
            {
            "step": 2,
            "tool": "SceneCaptioner",
            "params": {"frames": "from previous step"},
            "confidence": 0.85
            }
        ]
        }
        </plan>

        ### Example 3 (Temporal only)
        Rubric question:
        “Did the student check the patient’s pulse within 30 seconds of greeting them?”

        <plan>
        {
        "subgoals": [
            {
            "step": 1,
            "tool": "TemporalEventSegmenter",
            "params": {"event_types": ["pulse_check"], "time_window": 30},
            "confidence": 0.92
            }
        ]
        }
        </plan>

        ### Example 4 (Audio + Visual)
        Rubric question:
        “Did the student explain why they were wearing a mask while also maintaining eye contact?”

        <plan>
        {
        "subgoals": [
            {
            "step": 1,
            "tool": "KeyframeRetriever",
            "params": {"keywords": ["mask", "eye contact"]},
            "confidence": 0.88
            },
            {
            "step": 2,
            "tool": "SceneCaptioner",
            "params": {"frames": "from previous step"},
            "confidence": 0.85
            },
            {
            "step": 3,
            "tool": "AudioRetriever",
            "params": {"window": "around_frames", "frames": "from step 1"},
            "confidence": 0.90
            }
        ]
        }
        </plan>

        ### Example 5 (All evidence)
        Rubric question:
        “Did the student identify the IV site, explain its purpose, and document it within one minute?”

        <plan>
        {
        "subgoals": [
            {
            "step": 1,
            "tool": "KeyframeRetriever",
            "params": {"keywords": ["IV site"]},
            "confidence": 0.90
            },
            {
            "step": 2,
            "tool": "SceneCaptioner",
            "params": {"frames": "from step 1"},
            "confidence": 0.85
            },
            {
            "step": 3,
            "tool": "AudioRetriever",
            "params": {"window": "after_step_1", "event": "explanation"},
            "confidence": 0.88
            },
            {
            "step": 4,
            "tool": "TemporalEventSegmenter",
            "params": {"event_types": ["documentation"], "time_limit": 60},
            "confidence": 0.92
            }
        ]
        }
        </plan>
        """

        self.system_prompt = f"""You are an OSCE video assessment planner. Given a rubric question, decide the minimal set of tools to invoke and their precise order to answer it. 
        
        Follow this plan:

        Step 1: Parse the rubric question to identify the core action and the required evidence type e.g.

        - Audio evidence only (speech content only)
        - Visual evidence only (actions/objects only)
        - Temporal evidence only (timing actions)
        - Both audio and visual evidence (both speech content and actions/objects)
        - All evidence (only choose this for complex rubric questions that require speech content, actions/objects, timing actions.) 

        Step 2: List all candidate tools that could address that action. 

        - If audio evidence, select from [AudioRetriever, EmpathyToneAnalyzer]
        - If visual evidence, select from [KeyframeRetriever, ObjectDetector, SceneCaptioner]
        - If temporal evidence, select from [TemporalEventSegmenter]
        - If both audio and visual evidence, select from [AudioRetriever, KeyframeRetriever, ObjectDetector, SceneCaptioner]
        - If all evidence is required, select from all tools

        Step 3: Prune to the smallest necessary subset of tools for the rubric question.

        Step 4: Sequence tools logically (e.g., retrieval before segmentation, etc.). For each chosen tool, ensure 
        it's dependencies appear **earlier** in the plan.

        Step 5: Attach a confidence score to each selected tool.

        Step 6: Output ONLY the final plan as JSON.

        Keep internal reasoning extremely brief and goal-oriented.

        Available tools and their dependencies:
        {tool_lines}

        Then emit ONLY valid JSON using the format below:

        {json_output_format}
        
        DO NOT output any extra explanation or text before the JSON. Just output the JSON that can be directly parsed into a JSON object.
        """

    def plan(self, rubric_question: str, preprocess_func: Optional[Callable] = None) -> List[Dict[str, Any]]:
        user_prompt = f"Given the rubric question:\n\"{rubric_question}\"\n\nGenerate tool plan."

        if self.few_shot_examples:
            user_prompt += f"""\n\n
            ## EXAMPLES 

            {self.few_shot_examples}
            """

        response = llm_infer(self.system_prompt, user_prompt, self.model_id)

        # Remove the thinking tags to get the main response 
        if preprocess_func:
            response = preprocess_func(response)

        print("Response: ", response)

        clean_json_str = re.sub(r'^```json\s*', '', response)
        clean_json_str = re.sub(r'```$', '', clean_json_str).strip()
        
        plan = json.loads(clean_json_str)["subgoals"]

        # expand dependencies
        final_tools: List[Dict[str,Any]] = []
        added: Set[str] = set()

        def add_tool(tool_name, params):
            if tool_name in added: return

            # first add dependencies
            for dep in self.tools[tool_name].dependencies:
                add_tool(dep, {})

            # then this tool
            final_tools.append({"tool": tool_name, "params": params})
            added.add(tool_name)

        # original order 
        for step in plan:
            add_tool(step["tool"], step.get("params", {}))

        # assign step numbers 
        for i, entry in enumerate(final_tools, start=1):
            entry["step"] = i 

        return final_tools 

In [299]:
planner = PlannerAgent(tool_repository, "deepseek-r1-distill-llama-70b")

In [313]:
rubric = "Did the student ask the patient for their medical history?"

plan = planner.plan(rubric, remove_qwen_think_tags)

Response:  ```json
{
    "subgoals": [
        {
            "step": 1,
            "tool": "AudioRetriever",
            "params": {"keywords": ["medical history"]},
            "confidence": 0.95
        }
    ]
}
```


In [314]:
rubric = "Did the student say their name?"

plan = planner.plan(rubric, remove_qwen_think_tags)

Response:  ```json
{
    "subgoals": [
        {
            "step": 1,
            "tool": "AudioRetriever",
            "params": {"keywords": ["name"]},
            "confidence": 0.95
        }
    ]
}
```


In [323]:
rubric = "Did the student handle the stethoscope properly?"

plan = planner.plan(rubric, remove_qwen_think_tags)

Response:  {
    "subgoals": [
        {
            "step": 1,
            "tool": "KeyframeRetriever",
            "params": {"keywords": ["stethoscope"]},
            "confidence": 0.90
        },
        {
            "step": 2,
            "tool": "SceneCaptioner",
            "params": {"frames": "from previous step"},
            "confidence": 0.85
        }
    ]
}


In [318]:
rubric = "Did the student greet the patient before using the stethoscope?"

plan = planner.plan(rubric, remove_qwen_think_tags)

Response:  ```json
{
    "subgoals": [
        {
            "step": 1,
            "tool": "KeyframeRetriever",
            "params": {"keywords": ["greet", "stethoscope"]},
            "confidence": 0.92
        },
        {
            "step": 2,
            "tool": "ObjectDetector",
            "params": {"objects": ["stethoscope"]},
            "confidence": 0.90
        },
        {
            "step": 3,
            "tool": "TemporalEventSegmenter",
            "params": {"event_types": ["greeting", "stethoscope_use"]},
            "confidence": 0.95
        }
    ]
}
```
