In [2]:
# memory_mvp_pipeline_colab.py
# A minimal, local prototype for Nightingale's Memory MVP (T2, T3, T5).
# This script is validated to run in Google Colab, uses only the Python
# standard library, reads test data from a specified JSON file, and
# runs a simple pipeline with performance tests.

import time
import math
import json
import os

# --- Mock Memory Store for Task 3 ---
# A simple dictionary to simulate a memory retrieval system.
MEMORY_STORE = {
    "symptom": [
        "Memory: Ask about onset, duration, and severity.",
        "Memory: Check for related fever or shortness of breath."
    ],
    "appointment": [
        "Memory: Check patient's availability for next week.",
        "Memory: Confirm preferred location."
    ],
    "billing": [
        "Memory: Pull up the latest statement for the patient.",
        "Memory: Check insurance co-pay details."
    ]
}

# --- Task 2: Intent Gate (Classifier) ---
def classify_intent(sentence_id: str, sentence: str) -> dict:
    """
    Classifies the intent of a sentence using simple rule-based string matching.

    Returns:
        A dictionary containing the sentence ID, intent label, a flag indicating
        if a response is needed, and the execution latency.
    """
    start_time = time.perf_counter()
    sentence_lower = sentence.lower()
    intent_label = "other"
    needs_response = False

    if any(kw in sentence_lower for kw in ["pain", "headache", "sick", "nausea"]):
        intent_label = "symptom"
        needs_response = True
    elif any(kw in sentence_lower for kw in ["appointment", "book", "schedule", "visit"]):
        intent_label = "appointment"
        needs_response = True
    elif any(kw in sentence_lower for kw in ["bill", "incorrect", "charge", "payment"]):
        intent_label = "billing"
        needs_response = False

    end_time = time.perf_counter()
    latency_ms = (end_time - start_time) * 1000

    return {
        "sentence_id": sentence_id,
        "intent_label": intent_label,
        "needs_response": needs_response,
        "latency_ms": latency_ms,
    }

# --- Task 3: Retrieval / Storage (Mock) ---
def retrieve_memory(sentence_id: str, intent_label: str) -> dict:
    """
    Mocks a retrieval step by looking up an intent in a hardcoded dictionary.

    Comment: This is a placeholder for a more advanced retrieval system.
    Future paths to investigate:
    - Path-A: Use text embeddings (e.g., from Sentence-BERT) for semantic search
      in a vector database. This provides flexibility but adds a dependency.
    - Path-B: Reuse embeddings from an ASR encoder. This could be more efficient
      by avoiding a separate embedding step but might be less semantically rich.

    Returns:
        A dictionary containing the sentence ID, a list of retrieved items,
        a hit flag, and the execution latency.
    """
    start_time = time.perf_counter()

    retrieved_items = MEMORY_STORE.get(intent_label, [])
    hit = intent_label in MEMORY_STORE

    end_time = time.perf_counter()
    latency_ms = (end_time - start_time) * 1000

    return {
        "sentence_id": sentence_id,
        "retrieved_items": retrieved_items,
        "hit": hit,
        "latency_ms": latency_ms,
    }

# --- Task 5: Provenance / Topic (Dictionary) ---
TOPIC_DICTIONARY = {
    "symptom": ["pain", "headache", "fever", "nausea", "medication"],
    "scheduling": ["schedule", "book", "visit", "hours"],
    "payment": ["bill", "payment", "invoice", "charge", "incorrect"],
    "followup": ["follow-up"],
}

def assign_topic(sentence_id: str, sentence: str) -> dict:
    """
    Assigns a topic label using a fixed dictionary and records provenance.

    Returns:
        A dictionary containing the chunk ID (same as sentence ID here),
        the topic label, a list of source sentence IDs for provenance,
        and the execution latency.
    """
    start_time = time.perf_counter()
    sentence_lower = sentence.lower()
    topic_label = "general" # Default topic

    # Check for specific 'follow-up' topic first as 'appointment' keywords might also be present
    if "follow-up" in sentence_lower:
        topic_label = "followup"
    else:
        for topic, keywords in TOPIC_DICTIONARY.items():
            if any(kw in sentence_lower for kw in keywords):
                topic_label = topic
                break

    end_time = time.perf_counter()
    latency_ms = (end_time - start_time) * 1000

    return {
        "chunk_id": sentence_id, # For this MVP, a chunk is a single sentence
        "topic_label": topic_label,
        "provenance": [sentence_id],
        "latency_ms": latency_ms,
    }

# --- Utility Functions ---
def calculate_p95(latencies: list[float]) -> float:
    """Calculates the 95th percentile latency."""
    if not latencies:
        return 0.0
    sorted_latencies = sorted(latencies)
    p95_index = math.ceil(len(sorted_latencies) * 0.95) - 1
    return sorted_latencies[p95_index]

def load_data(filepath: str) -> list[dict]:
    """Loads and validates the test data from a JSON file."""
    if not os.path.exists(filepath):
        print(f"Error: Data file not found at '{filepath}'")
        return []
    try:
        with open(filepath, 'r') as f:
            data = json.load(f)
        # Basic validation
        for i, record in enumerate(data):
            if not all(k in record for k in ["sentence_id", "text", "expected_intent", "expected_topic"]):
                print(f"Error: Record {i} in {filepath} is missing required keys.")
                return []
        return data
    except json.JSONDecodeError:
        print(f"Error: Could not decode JSON from file '{filepath}'")
        return []


# --- Main Pipeline Execution ---
def run_pipeline(sample_data: list[dict]):
    """
    Runs the full T2 -> T3 -> T5 pipeline on the sample data and prints a report.
    """
    if not sample_data:
        print("Pipeline execution halted: No valid data loaded.")
        return

    print("--- Running Nightingale Memory MVP Pipeline in Colab ---")

    t2_latencies, t3_latencies, t5_latencies = [], [], []
    t2_correct, t3_hits, t5_correct = 0, 0, 0

    for item in sample_data:
        sentence_id, text = item["sentence_id"], item["text"]
        expected_intent = item["expected_intent"]
        expected_topic = item["expected_topic"]

        # Task 2: Intent Gate
        t2_output = classify_intent(sentence_id, text)
        t2_latencies.append(t2_output["latency_ms"])
        if t2_output["intent_label"] == expected_intent:
            t2_correct += 1

        # Task 3: Retrieval
        t3_output = retrieve_memory(sentence_id, t2_output["intent_label"])
        t3_latencies.append(t3_output["latency_ms"])
        if t3_output["hit"]:
            t3_hits += 1

        # Task 5: Topic Assignment
        t5_output = assign_topic(sentence_id, text)
        t5_latencies.append(t5_output["latency_ms"])
        if t5_output["topic_label"] == expected_topic:
            t5_correct += 1

    print(f"\nProcessed {len(sample_data)} sentences.\n")

    # --- Calculate and Print Metrics ---
    num_records = len(sample_data)
    t2_accuracy = t2_correct / num_records
    t3_hit_rate = t3_hits / num_records
    t5_hit_rate = t5_correct / num_records

    t2_p95_latency = calculate_p95(t2_latencies)
    t3_p95_latency = calculate_p95(t3_latencies)
    t5_p95_latency = calculate_p95(t5_latencies)

    # --- Final Report ---
    print("--- Task Metric Test Summary ---")
    print("\n[Task 2: Intent Gate]")
    print(f"  Accuracy: {t2_accuracy:.2%} ({t2_correct}/{num_records})")
    print(f"  p95 Latency: {t2_p95_latency:.6f} ms")

    print("\n[Task 3: Retrieval/Storage]")
    print(f"  Hit Rate: {t3_hit_rate:.2%} ({t3_hits}/{num_records})")
    print(f"  p95 Latency: {t3_p95_latency:.6f} ms")

    print("\n[Task 5: Provenance/Topic]")
    print(f"  Topic Hit Rate: {t5_hit_rate:.2%} ({t5_correct}/{num_records})")
    print(f"  p95 Latency: {t5_p95_latency:.6f} ms")
    print("\n----------------------------------\n")
    print("✅ Script successfully validated in Google Colab.")


if __name__ == "__main__":
    # Define the explicit path for the Colab environment
    DATA_PATH = "/content/Nightingale/sample_dialogues.json"
    dialogue_data = load_data(DATA_PATH)
    run_pipeline(dialogue_data)

--- Running Nightingale Memory MVP Pipeline in Colab ---

Processed 6 sentences.

--- Task Metric Test Summary ---

[Task 2: Intent Gate]
  Accuracy: 100.00% (6/6)
  p95 Latency: 0.009528 ms

[Task 3: Retrieval/Storage]
  Hit Rate: 83.33% (5/6)
  p95 Latency: 0.001421 ms

[Task 5: Provenance/Topic]
  Topic Hit Rate: 83.33% (5/6)
  p95 Latency: 0.007424 ms

----------------------------------

✅ Script successfully validated in Google Colab.
