In [None]:
# installations
%pip install langchain | tail -n 1
%pip install langchain-ibm | tail -n 1
%pip install langchain-community | tail -n 1
%pip install ibm-watsonx-ai | tail -n 1
%pip install chromadb | tail -n 1
%pip install tiktoken | tail -n 1
%pip install bs4 | tail -n 1

In [1]:
# Standard libraries
import os
import json
import tempfile
import statistics
import getpass
from typing import List, Dict, Any, Optional

# Third-party
from pydantic import BaseModel
# LangChain (core-first imports to avoid version issues)
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import RunnablePassthrough
from langchain_core.agents import AgentAction, AgentFinish
from langchain_core.output_parsers import BaseOutputParser

from langchain.memory import ConversationBufferMemory
from langchain.tools import tool
from langchain.tools.render import render_text_description_and_args
from langchain.agents import AgentExecutor  # this one is still from langchain.agents

# Optional/extra (only if you actually use them elsewhere in your notebook)
from langchain_ibm import WatsonxEmbeddings, WatsonxLLM
from langchain.vectorstores import Chroma
from langchain_community.document_loaders import WebBaseLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.prompts import PromptTemplate
# from langchain.agents.output_parsers import JSONAgentOutputParser  # not needed if you use the custom parser
# from langchain.agents.format_scratchpad import format_log_to_str    # not needed if you define your own
from ibm_watsonx_ai.foundation_models.utils.enums import EmbeddingTypes
from ibm_watsonx_ai.metanames import GenTextParamsMetaNames as GenParams


USER_AGENT environment variable not set, consider setting it to identify your requests.


In [2]:
#Setting Up API key and Project ID
credentials = {
    "url": "https://us-south.ml.cloud.ibm.com",
    "apikey": getpass.getpass("Please enter your watsonx.ai Runtime API key (hit enter): ")
}

project_id = getpass.getpass("Please enter your project ID (hit enter): ")



Please enter your watsonx.ai Runtime API key (hit enter):  ········
Please enter your project ID (hit enter):  ········


In [3]:
#Initialization of basic tools with no agents
llm = WatsonxLLM(
    model_id= "ibm/granite-3-8b-instruct", 
    url=credentials.get("url"),
    apikey=credentials.get("apikey"),
    project_id=project_id,
    params={
        GenParams.DECODING_METHOD: "greedy",
        GenParams.TEMPERATURE: 0,
        GenParams.MIN_NEW_TOKENS: 5,
        GenParams.MAX_NEW_TOKENS: 250,
        GenParams.STOP_SEQUENCES: ["Human:", "Observation"],
    },
)

In [4]:
#prompt template in case you want to ask multiple questions.
template = "Answer the {query} accurately. If you do not know the answer, simply say you do not know."
prompt = PromptTemplate.from_template(template)
#set up a chain with our prompt and our LLM. This allows the generative model to produce a response.
agent = prompt | llm

In [5]:
#Ask the Agent a question
agent.invoke({"query": "What is the optimal High Jump technique"})

' Do not try to make up an answer.\n\nThe optimal high jump technique is the Fosbury Flop. This technique was developed by Dick Fosbury in the 1960s and is the most widely used method in modern high jump competitions. The Fosbury Flop involves the jumper running towards the bar, taking off from one foot, and rotating their body in a curved path over the bar, landing on their back. This technique allows jumpers to clear higher bar heights by using the momentum of their run and the curve of their body to clear the bar with their back, rather than their front or sides. The Fosbury Flop has become the standard technique due to its efficiency and effectiveness in clearing high bar heights.'

In [6]:
#Asking another set of Questions
agent.invoke({"query": "What was the takeoff angle of my knee?"})

" Do not try to make up an answer.\n\nI'm sorry for any confusion, but I don't have the ability to measure or determine the takeoff angle of your knee. The takeoff angle of a knee during activities like jumping or running can vary greatly depending on individual factors such as flexibility, strength, and technique. It's best to consult with a healthcare professional or a sports medicine specialist for a precise measurement or assessment."

In [7]:
#Asking another set of Questions
agent.invoke({"query": "What was the takeoff angle of my knee during my jump?"})

" Do not try to make up an answer.\n\nI'm sorry for the inconvenience, but I don't have the ability to measure or analyze your physical movements, such as the takeoff angle of your knee during a jump. This information would typically be obtained through specialized equipment and analysis, which I don't have access to. I recommend consulting with a sports scientist or a physical therapist for accurate data."

In [8]:
#Asking another set of Questions
agent.invoke({"query": "If you had access to my jump data via MediaPipe, would you be able to analyze the physical movements?"})

'\n\nYes, with access to your jump data via MediaPipe, I could potentially analyze your physical movements accurately. MediaPipe is a cross-platform framework for building multimodal (video, audio, sensor) applied machine learning pipelines. It can be used to analyze and understand human pose estimation, which includes tracking body joints and estimating their 3D positions. This data can be used to analyze jump movements, but the accuracy would depend on the quality of the input data and the specific algorithms used for analysis.'

In [9]:
#Websites that data is extracted from... Data Source/knowledge base
urls = [
    "https://en.wikipedia.org/wiki/High_jump",
    "https://athleticssa.org.za/SportsInfo/Coaching-High-Jump.pdf",
    "https://coachathletics.com.au/coaching-education/how-to-coach-the-fosbury-flop-drill-progression-for-teaching-high-jump"

]

In [10]:
#Loading documents using LangChain WebBaseLoader from urls listed
docs = [WebBaseLoader(url).load() for url in urls]
docs_list = [item for sublist in docs for item in sublist]
docs_list[0]

Document(metadata={'source': 'https://en.wikipedia.org/wiki/High_jump', 'title': 'High jump - Wikipedia', 'language': 'en'}, page_content='\n\n\n\nHigh jump - Wikipedia\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\nJump to content\n\n\n\n\n\n\n\nMain menu\n\n\n\n\n\nMain menu\nmove to sidebar\nhide\n\n\n\n\t\tNavigation\n\t\n\n\nMain pageContentsCurrent eventsRandom articleAbout WikipediaContact us\n\n\n\n\n\n\t\tContribute\n\t\n\n\nHelpLearn to editCommunity portalRecent changesUpload fileSpecial pages\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\nSearch\n\n\n\n\n\n\n\n\n\n\n\nSearch\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\nAppearance\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\nDonate\n\nCreate account\n\nLog in\n\n\n\n\n\n\n\n\nPersonal tools\n\n\n\n\n\nDonate Create account Log in\n\n\n\n\n\n\t\tPages for logged out editors learn more\n\n\n\nContributionsTalk\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\nContents\nmove to sidebar\nhide\n\n\n\n\n(Top)\n\n\n\n\n\

In [11]:
#Text Splitter
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=250, chunk_overlap=0
)
doc_splits = text_splitter.split_documents(docs_list)

In [12]:
#Embedding models
embeddings = WatsonxEmbeddings(
    model_id=EmbeddingTypes.IBM_SLATE_30M_ENG.value,
    url=credentials["url"],
    apikey=credentials["apikey"],
    project_id=project_id,
)

In [13]:
#Storage of embedded documents using Chroma DB
vectorstore = Chroma.from_documents(
    documents=doc_splits,
    collection_name="agentic-rag-chroma",
    embedding=embeddings,
)

In [14]:
#Access Information in vector store
retriever = vectorstore.as_retriever()

In [15]:
# --- Tools: JumpData analytics + KB search (stateful cache) ------------------
from typing import List, Dict, Any, Optional
import os
import json
import statistics

from pydantic import BaseModel
from langchain.tools import tool

# Simple module-level cache so the agent doesn't need to resend big JSON blobs
_JUMP_DATA_CACHE: Dict[str, "JumpData"] = {}
_DEFAULT_KEY = "current"

# Schema mirrors jump_data.json
class JumpData(BaseModel):
    predicted_success: bool
    velocity: List[float]
    hip_height: List[float]
    l_hip_angle: List[float]
    l_knee_angle: List[float]
    l_ankle_angle: List[float]
    r_hip_angle: List[float]
    r_knee_angle: List[float]
    r_ankle_angle: List[float]

    def length(self) -> int:
        return len(self.velocity)

    def validate_lengths(self) -> None:
        n = self.length()
        for series in [
            self.hip_height, self.l_hip_angle, self.l_knee_angle, self.l_ankle_angle,
            self.r_hip_angle, self.r_knee_angle, self.r_ankle_angle
        ]:
            if len(series) != n:
                raise ValueError("All series must be the same length as velocity.")

    @classmethod
    def from_json(cls, filename: str = "jump_data.json") -> "JumpData":
        path = os.path.abspath(os.path.join(os.getcwd(), filename))
        with open(path, "r", encoding="utf-8") as f:
            raw = json.load(f)
        data = cls(**raw)
        data.validate_lengths()
        return data


def _require_loaded(key: str = _DEFAULT_KEY) -> JumpData:
    if key not in _JUMP_DATA_CACHE:
        raise ValueError("No jump_data loaded. Call load_jump_data first.")
    return _JUMP_DATA_CACHE[key]


# ---- core helpers -----------------------------------------------------------
def _summarize_jump(j: JumpData) -> Dict[str, Any]:
    n = j.length()
    if n == 0:
        raise ValueError("jump_data.velocity is empty.")

    idx_peak_hip = max(range(n), key=lambda i: j.hip_height[i])
    idx_max_v = max(range(n), key=lambda i: j.velocity[i])

    if idx_peak_hip > 0:
        l_min_pre = min(range(idx_peak_hip), key=lambda i: j.l_ankle_angle[i])
        r_min_pre = min(range(idx_peak_hip), key=lambda i: j.r_ankle_angle[i])
        plant_idx = l_min_pre if j.l_ankle_angle[l_min_pre] < j.r_ankle_angle[r_min_pre] else r_min_pre
    else:
        plant_idx = 0

    takeoff_idx = min(n - 1, plant_idx + 1)
    approach_window_end = max(1, n // 3)
    approach_max_v = max(j.velocity[:approach_window_end]) if approach_window_end > 0 else j.velocity[0]

    return {
        "length": n,
        "predicted_success": j.predicted_success,
        "idx": {
            "peak_hip": idx_peak_hip,
            "max_velocity": idx_max_v,
            "plant": plant_idx,
            "takeoff": takeoff_idx,
        },
        "metrics": {
            "peak_hip_height": j.hip_height[idx_peak_hip],
            "max_velocity": j.velocity[idx_max_v],
            "approach_max_velocity": approach_max_v,
            "mean_velocity": statistics.fmean(j.velocity),
        },
    }


def _event_slice(j: JumpData, center: int, radius: int = 5) -> Dict[str, Any]:
    n = j.length()
    a = max(0, center - radius)
    b = min(n, center + radius + 1)

    def sl(lst): 
        return lst[a:b]

    return {
        "frame_start": a,
        "frame_end": b - 1,
        "velocity": sl(j.velocity),
        "hip_height": sl(j.hip_height),
        "l_hip_angle": sl(j.l_hip_angle),
        "l_knee_angle": sl(j.l_knee_angle),
        "l_ankle_angle": sl(j.l_ankle_angle),
        "r_hip_angle": sl(j.r_hip_angle),
        "r_knee_angle": sl(j.r_knee_angle),
        "r_ankle_angle": sl(j.r_ankle_angle),
    }


def _get_key_event_windows(j: JumpData, radius: int = 5) -> Dict[str, Any]:
    s = _summarize_jump(j)
    idx = s["idx"]
    return {
        "indices": idx,
        "windows": {
            "plant": _event_slice(j, idx["plant"], radius),
            "takeoff": _event_slice(j, idx["takeoff"], radius),
            "peak_hip": _event_slice(j, idx["peak_hip"], radius),
        }
    }


def _coaching_feedback(j: JumpData) -> Dict[str, Any]:
    s = _summarize_jump(j)
    idx = s["idx"]
    notes: List[str] = []

    if s["metrics"]["approach_max_velocity"] < 3.5:
        notes.append("Approach speed looks conservative—build rhythm/speed a touch earlier.")
    else:
        notes.append("Approach speed trend is solid—prioritize control into the last two steps.")

    l_ank = j.l_ankle_angle[idx["plant"]]
    r_ank = j.r_ankle_angle[idx["plant"]]
    if min(l_ank, r_ank) > 120:
        notes.append("Limited dorsiflexion at plant—allow more ankle/knee load before takeoff.")
    else:
        notes.append("Good ankle load at plant—keep posture tall as you rise.")

    if idx["peak_hip"] - idx["takeoff"] < 3:
        notes.append("Peak comes very quickly—ensure you’re not cutting the takeoff short.")
    else:
        notes.append("Time-to-peak is reasonable—maintain flight posture and ‘show the hips’.")

    l_knee = j.l_knee_angle[idx["takeoff"]]
    r_knee = j.r_knee_angle[idx["takeoff"]]
    if abs(l_knee - r_knee) > 20:
        notes.append("Knee angles at takeoff look asymmetrical—clean up last step posture and line.")
    else:
        notes.append("Knee symmetry looks okay at takeoff.")

    if not j.predicted_success:
        notes.append("Model flags this as a likely miss—tighten curve and posture into plant.")

    return {
        "summary": s,
        "key_event_windows": _get_key_event_windows(j),
        "notes": notes
    }


# === LangChain tool wrappers (stateful) ===
@tool
def load_jump_data(json_filename: Optional[str] = "jump_data.json", cache_key: str = _DEFAULT_KEY) -> Dict[str, Any]:
    """Load and validate jump_data from a JSON file in the same directory as the notebook.
    Caches the data under `cache_key` and returns a light ack + basic summary.
    """
    data = JumpData.from_json(json_filename)
    _JUMP_DATA_CACHE[cache_key] = data
    s = _summarize_jump(data)
    return {
        "status": "ok",
        "cache_key": cache_key,
        "length": s["length"],
        "predicted_success": s["predicted_success"],
        "indices": s["idx"],
        "metrics": s["metrics"],
    }


@tool
def summarize_jump(jump_data: Optional[Dict[str, Any]] = None, cache_key: str = _DEFAULT_KEY) -> Dict[str, Any]:
    """Return high-level metrics and indices. If jump_data is omitted, uses the cached data from load_jump_data."""
    j = JumpData(**jump_data) if jump_data else _require_loaded(cache_key)
    return _summarize_jump(j)


@tool
def get_key_event_windows(jump_data: Optional[Dict[str, Any]] = None, radius: int = 5, cache_key: str = _DEFAULT_KEY) -> Dict[str, Any]:
    """Return slices (±radius) for plant, takeoff, and peak hip height. If jump_data is omitted, uses cached."""
    j = JumpData(**jump_data) if jump_data else _require_loaded(cache_key)
    return _get_key_event_windows(j, radius)


@tool
def coaching_feedback(jump_data: Optional[Dict[str, Any]] = None, cache_key: str = _DEFAULT_KEY) -> Dict[str, Any]:
    """Generate heuristic coaching notes. If jump_data is omitted, uses cached."""
    j = JumpData(**jump_data) if jump_data else _require_loaded(cache_key)
    return _coaching_feedback(j)


@tool
def kb_search(query: str, k: int = 4) -> Dict[str, Any]:
    """Search the High Jump knowledge base (retriever) and return up to k snippets with sources.
    Requires a global `retriever` created by your RAG pipeline (urls -> loader -> text_splitter -> embeddings -> Chroma).
    """
    try:
        _ = retriever  # ensure it exists
    except NameError as e:
        raise ValueError("Knowledge base retriever is not initialized. Run your RAG pipeline first.") from e

    docs = retriever.get_relevant_documents(query)[:k]
    return {
        "results": [
            {
                "snippet": d.page_content[:600],
                "source": d.metadata.get("source") or d.metadata.get("url"),
                "metadata": {mk: mv for mk, mv in d.metadata.items() if mk not in {"page_content"}}
            }
            for d in docs
        ]
    }





In [16]:
# Register tools
tools = [
    load_jump_data,
    summarize_jump,
    get_key_event_windows,
    coaching_feedback,
    kb_search
]

system_prompt = """
You are a Virtual High Jump Coach. Use ONLY the provided tools for numeric metrics (indices, angles, heights, velocities) from jump_data.
Combine those tool-derived metrics with relevant knowledge-base context retrieved via kb_search (e.g., approach rhythm norms, curve mechanics, penultimate step, takeoff posture, bar clearance strategies).
Do not invent fields in jump_data. Treat jump_data as the source of truth for measurements; use the knowledge base for theory, cues, and best practices.

TOOLS AVAILABLE (names): {tool_names}
TOOLS (descriptions & args):
{tools}

OUTPUT FORMAT (strict):
- Respond with a single JSON object ONLY (no extra text).
- NEVER prefix with labels like 'AI:' or 'Assistant:'.
- NEVER use code fences (no ``` or ```json).
- Keys must be exactly: "Question", "Thought", "Action", "Observation" (repeatable), and "Final Answer".
- Each tool call MUST be formatted as: {{"action": "<tool_name>", "action_input": {{<args>}}}}.

MANDATORY FIRST ACTION:
- After reading the user's question, your FIRST Action MUST be:
  {{"action": "load_jump_data", "action_input": {{"json_filename": "jump_data.json"}}}}
- Do NOT call any other tool before load_jump_data.

STATEFUL DATA RULE (after load):
- You may omit the jump_data object in later tool calls (cached):
  {{"action": "summarize_jump", "action_input": {{}}}}
  {{"action": "get_key_event_windows", "action_input": {{"radius": 5}}}}
  {{"action": "coaching_feedback", "action_input": {{}}}}
  {{"action": "kb_search", "action_input": {{"query": "...", "k": 4}}}}

INTERACTION RULES:
- When calling a tool, STOP after the Action object. The runtime will execute it and provide the Observation.
- Use jump_data tools to obtain measurements; use kb_search to enrich coaching advice with references.
- Only include "Final Answer" when you are completely done.
"""


In [17]:
human_prompt = """{input}

{agent_scratchpad}

(reminder to always respond in a JSON blob)
"""

In [18]:
prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system_prompt),
        MessagesPlaceholder(variable_name="chat_history", optional=True),
        ("human", human_prompt),
    ]
)
#finalize our prompt template by adding the tool names, descriptions and arguments using a partial prompt template.
#This allows the agent to access the information pertaining to each tool
#including the intended use cases and also means we can add and remove tools without altering our entire prompt template.
prompt = prompt.partial(
    tools=render_text_description_and_args(list(tools)),
    tool_names=", ".join([t.name for t in tools]),
)

In [19]:
# Setting up agent's memory
memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True)


  memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True)


In [20]:
def format_log_to_str(intermediate_steps):
    lines = []
    for action, observation in intermediate_steps:
        lines.append(f'Action: {action.tool} | Input: {action.tool_input}\nObservation: {observation}')
    return "\n".join(lines)


In [21]:
import re
from langchain_core.output_parsers import BaseOutputParser
from langchain_core.agents import AgentAction, AgentFinish


#custom JSON output parser for LangChain agents that safely reads and interprets AI-generated JSON responses without breaking
class CoachJSONOutputParser(BaseOutputParser):
    """
    Robustly parses a single JSON object with keys:
    'Question', 'Thought', 'Action', 'Observation' (repeatable later), 'Final Answer'.

    - Strips code fences if present
    - Extracts the first BALANCED {...} block (so partial tails like `"Observation` don't break us)
    - Normalizes smart quotes
    - If parsing fails, returns the whole text as a Final Answer to avoid hard crashes
    """

    def _strip_fences_and_normalize(self, text: str) -> str:
        t = text.strip()
        if t.startswith("```"):
            t = re.sub(r"^```(?:json)?\s*", "", t, flags=re.IGNORECASE)
            t = re.sub(r"\s*```$", "", t)
        # normalize smart quotes
        t = t.replace("“", '"').replace("”", '"').replace("’", "'").replace("‘", "'")
        return t

    def _extract_balanced_json(self, text: str) -> str | None:
        # Find first '{' then consume until braces balance to zero, respecting string quotes
        start = text.find("{")
        if start == -1:
            return None

        i = start
        depth = 0
        in_str = False
        esc = False
        while i < len(text):
            ch = text[i]
            if in_str:
                if esc:
                    esc = False
                elif ch == "\\":
                    esc = True
                elif ch == '"':
                    in_str = False
            else:
                if ch == '"':
                    in_str = True
                elif ch == "{":
                    depth += 1
                elif ch == "}":
                    depth -= 1
                    if depth == 0:
                        return text[start:i+1]
            i += 1
        return None  # unbalanced

    def parse(self, text: str):
        cleaned = self._strip_fences_and_normalize(text)
        candidate = self._extract_balanced_json(cleaned) or cleaned

        try:
            data = json.loads(candidate)
        except Exception:
            # Last resort: treat the whole text as a final answer
            return AgentFinish(return_values={"output": text}, log=text)

        # If a clean final answer is present and no actionable call, finish
        action_field = data.get("Action")
        if "Final Answer" in data and (action_field in (None, "", {}) or (isinstance(action_field, str) and not action_field.strip())):
            return AgentFinish(return_values={"output": data["Final Answer"]}, log=text)

        # Otherwise expect {"action": "...", "action_input": {...}}
        if not isinstance(action_field, dict):
            # If they included an Observation prematurely or malformed Action, but provided Final Answer, finish with it
            if "Final Answer" in data:
                return AgentFinish(return_values={"output": data["Final Answer"]}, log=text)
            raise ValueError("Action must be an object with 'action' and 'action_input' keys.")

        tool = action_field.get("action")
        tool_input = action_field.get("action_input", {})

        if not tool:
            raise ValueError("Missing 'action' in Action object.")
        if not isinstance(tool_input, dict):
            raise ValueError("'action_input' must be a JSON object (dict).")

        return AgentAction(tool=tool, tool_input=tool_input, log=text)


In [22]:
chain = (
    RunnablePassthrough.assign(
        agent_scratchpad=lambda x: format_log_to_str(x.get("intermediate_steps", [])),
        chat_history=lambda x: memory.chat_memory.messages,
    )
    | prompt
    | llm
    | CoachJSONOutputParser()
)

agent_executor = AgentExecutor(
    agent=chain,
    tools=tools,
    handle_parsing_errors=True,
    verbose=True,
    memory=memory,
)


In [23]:
#We are now able to ask the agent questions.
agent_executor.invoke({"input":"Tell me about my Jump Data"})



[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3m
```json
{
  "Question": "Tell me about my Jump Data",
  "Thought": "First, I need to load the jump_data from the JSON file.",
  "Action": {
    "action": "load_jump_data",
    "action_input": {"json_filename": "jump_data.json"}
  },
  "Observation[0m

[1m> Finished chain.[0m


{'input': 'Tell me about my Jump Data',
 'chat_history': [HumanMessage(content='Tell me about my Jump Data', additional_kwargs={}, response_metadata={}),
  AIMessage(content='\n```json\n{\n  "Question": "Tell me about my Jump Data",\n  "Thought": "First, I need to load the jump_data from the JSON file.",\n  "Action": {\n    "action": "load_jump_data",\n    "action_input": {"json_filename": "jump_data.json"}\n  },\n  "Observation', additional_kwargs={}, response_metadata={})],
 'output': '\n```json\n{\n  "Question": "Tell me about my Jump Data",\n  "Thought": "First, I need to load the jump_data from the JSON file.",\n  "Action": {\n    "action": "load_jump_data",\n    "action_input": {"json_filename": "jump_data.json"}\n  },\n  "Observation'}

FileNotFoundError: [Errno 2] No such file or directory: 'data/jump_data.json'