## 04: Core State Models & Psychological Representation

This notebook is designed to bridge the gap between conceptual descriptions of conversational behaviors and a small, executable codebase. The primary intent is to provide clear, typed data models, straightforward temporal primitives for recording event history, and a few interpretable update rules that capture common conversational phenomena such as abandonment, pauses, and readiness shifts. Each algorithm is intentionally small; the focus is on understandability, testability, and being able to reason about behavior deterministically.

For reproducible downstream processing, we export canonical JSON schemas for the core Pydantic models. The artifact writer serializes the Pydantic `.schema()` output for `Message` and `UserContext` and writes them to `artifacts/state_schema.json`. This file can be used by other notebooks, data validation pipelines, or serializer/deserializer code in production. Making the schema an explicit artifact ensures that changes to models are visible and versionable.## Notebook 03: Core State Models & Psychological Representation

This notebook provides compact, typed data models, temporal graph primitives for append-only event history, and a set of small, interpretable update rules (ABANDON increment, pause detection, readiness update) together with a lightweight runtime and deterministic tests. The goal is understandability, auditability, and reproducible behavior for downstream consumers.

### 1. Enums & Pydantic schemas

This section defines compact enumerations and their Pydantic-hosted schemas so downstream systems can treat symbolic states as first-class, typed values. An enumeration corresponds to a categorical variable; if there are $n$ categories then any probability vector over those categories lies on the standard simplex:

$$\Delta^{n-1} = \{\pi \in \mathbb{R}^n_{\ge 0} : \sum_{i=1}^n \pi_i = 1\}$$

Practically, we use enums to represent compact label sets such as emotional valence or intent clarity, and Pydantic to serialize, validate, and emit JSON Schema. For many downstream uses we expose both the symbolic enum and its one-hot or probability encoding: if $e_i$ is the one-hot for category $i$, then the mapping to probabilities is $\pi = e_i$ for a deterministic label, or any convex combination for calibrated outputs. Using the schema artifact enables robust input validation and automated contract checks in pipelines. The JSON Schema also documents allowed values, types, and field descriptions so that clients can programmatically discover the contract and perform safe conversions between symbolic enums and numeric vectors.

Enums give us a shared vocabulary between humans and code. The schema makes that vocabulary machine-readable so other programs can validate inputs and avoid subtle mapping bugs.

In [1]:
from __future__ import annotations
from enum import Enum
from typing import Optional, List, Dict, Any, Sequence
from pydantic import BaseModel, Field, validator
import json
from pathlib import Path
ARTIFACTS = Path('artifacts')
ARTIFACTS.mkdir(parents=True, exist_ok=True)

class EmotionalState(str, Enum):
    NEUTRAL = 'neutral'
    POSITIVE = 'positive'
    NEGATIVE = 'negative'
    FRUSTRATED = 'frustrated'
    ENGAGED = 'engaged'

class IntentClarity(str, Enum):
    CLEAR = 'clear'
    AMBIGUOUS = 'ambiguous'
    UNKNOWN = 'unknown'

class Message(BaseModel):
    id: str = Field(..., description='Unique message identifier')
    sender: str
    recipient: Optional[str]
    text: str
    ts: float = Field(..., description='Timestamp in seconds (monotonic/epoch)')
    emotional_state: Optional[EmotionalState] = EmotionalState.NEUTRAL
    intent_clarity: Optional[IntentClarity] = IntentClarity.UNKNOWN
    metadata: Optional[Dict[str, Any]] = None

class UserContext(BaseModel):
    user_id: str
    last_seen_ts: Optional[float] = None
    readiness: Optional[List[float]] = Field(default_factory=lambda: [1.0], description='Normalized readiness distribution')
    emotional_state: EmotionalState = EmotionalState.NEUTRAL
    intent_clarity: IntentClarity = IntentClarity.UNKNOWN
    notes: Optional[str] = None

    @validator('readiness')
    def normalize_readiness(cls, v):
        if not v:
            return [1.0]
        s = sum(v)
        if s <= 0:
            raise ValueError('readiness must sum to > 0')
        return [float(x) / s for x in v]

# quick sanity
m = Message(id='m1', sender='user1', recipient='bot', text='hello', ts=0.0)
print('Message OK:', m.id, m.ts)

Message OK: m1 0.0


C:\Users\ADITI\AppData\Local\Temp\ipykernel_9744\2357771918.py:40: PydanticDeprecatedSince20: Pydantic V1 style `@validator` validators are deprecated. You should migrate to Pydantic V2 style `@field_validator` validators, see the migration guide for more details. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.11/migration/
  @validator('readiness')


_________________________________________________________________________________________________
### 2. Temporal graph primitives

This section describes the append-only temporal graph representation used to record interactions as immutable event lists attached to directed edges. Formally, let $G=(V,E)$ be a directed graph and for each edge $(u,v)\in E$ associate an event sequence $\mathcal{E}_{uv} = \{(t_i, w_i, m_i)\}_{i=1}^T$, where $t_i$ is the timestamp, $w_i$ a scalar weight, and $m_i$ optional metadata. A basic read operation recovers the most recent weight strictly before time $t$:

$$w_{uv}(t) = \max\{w_i : (t_i,w_i) \in \mathcal{E}_{uv},\ t_i \le t\}\,,\quad\text{(step-replay)}$$

Alternative analyses use decay kernels $K(\tau)$ to compute a continuous influence score:

$$W_{uv}(t) = \sum_{(t_i,w_i) \in \mathcal{E}_{uv}} w_i\,K(t - t_i)\,,$$

with $K$ chosen to trade off recency and memory (for example $K(\tau)=e^{-\lambda\tau}$). Append-only semantics guarantee auditability and deterministic replay: the raw event list is preserved so downstream experiments can re-evaluate $W_{uv}(t)$ under different kernels or aggregation rules without loss. Appends are constant-time and compact, and storage can be controlled by pruning or summarization offline while preserving the canonical timeline for reproducibility.

We keep every event so that any later analysis or fix can be replayed deterministically. This makes debugging and experimenting with different aggregation rules safe and auditable.

In [2]:
import networkx as nx
from typing import Any, Optional

def make_temporal_graph() -> nx.DiGraph:
    return nx.DiGraph()

def add_event_edge(G: nx.DiGraph, src: str, dst: str, ts: float, weight: float = 0.0, **meta: Any):
    if G.has_edge(src, dst):
        if 'events' not in G[src][dst]:
            G[src][dst]['events'] = []
        G[src][dst]['events'].append({'ts': float(ts), 'weight': float(weight), **meta})
    else:
        G.add_edge(src, dst, events=[{'ts': float(ts), 'weight': float(weight), **meta}])

def edge_weight_at(G: nx.DiGraph, src: str, dst: str, ts: Optional[float] = None) -> float:
    if not G.has_edge(src, dst):
        return 0.0
    events = G[src][dst].get('events', [])
    if not events:
        return 0.0
    if ts is None:
        return float(events[-1]['weight'])
    eligible = [e for e in events if e['ts'] <= ts]
    if not eligible:
        return 0.0
    return float(sorted(eligible, key=lambda e: e['ts'])[-1]['weight'])

# quick check
G = make_temporal_graph()
add_event_edge(G, 'u','v', ts=0.0, weight=0.1)
add_event_edge(G, 'u','v', ts=1.0, weight=0.4)
print('edge weight at t=0.5 ->', edge_weight_at(G,'u','v', ts=0.5))

edge weight at t=0.5 -> 0.1


_________________________________________________________________________________________________
### 3. ABANDON math + helper

The ABANDON rule is a simple, interpretable incremental score that grows when the predicted reply probability is low. Represent the current abandon weight on an edge as $w_t\in[0,1]$. Given a reply probability estimate $p\in[0,1]$ and a learning rate $\alpha\in[0,1]$, the increment is:

$$\delta_t = \alpha\,\left(1 - p\right)\,,$$

and the updated weight before clipping is $w_t + \delta_t$. To ensure the weight remains in the unit interval we apply clipping:

$$w_{t+1} = \operatorname{clip}(w_t + \delta_t,\ 0,\ 1)\,,$$

where $\operatorname{clip}(x,a,b)=\min(b,\max(a,x))$. The implementation appends an event $(t,w_{t+1},meta)$ to the edge's timeline rather than mutating aggregate state, preserving the full audit trail. This approach is deterministic given fixed inputs and timestamps, and it supports replay: by re-evaluating the same sequence of $(p,\alpha,t)$ updates one recovers the same $w$ trajectory. 

The choice of $\alpha$ controls sensitivity: small $\alpha$ yields slow accumulation, large $\alpha$ reacts quickly to single low-probability events. Recording the $(p,\alpha)$ pair per event allows offline calibration and analysis of how ABANDON evolves under different models.

In short, we slowly increase a small abandon score when replies look unlikely. This gives a simple, auditable signal that can drive followup actions or monitoring alerts.

In [3]:
def apply_abandon_increment(G, src: str, dst: str, alpha: float, p_reply: float, ts: float):
    """Apply ABANDON increment and append event. Returns (w_before, w_after)."""
    w_before = edge_weight_at(G, src, dst, ts=ts)
    delta = float(alpha) * (1.0 - float(p_reply))
    w_after = min(1.0, max(0.0, w_before + delta))
    add_event_edge(G, src, dst, ts=ts, weight=w_after, reason='abandon', alpha=alpha, p_reply=p_reply)
    return w_before, w_after

# demonstration
G_demo = make_temporal_graph()
add_event_edge(G_demo, 'user1','bot', ts=0.0, weight=0.2)
print('before', edge_weight_at(G_demo,'user1','bot'))
wb, wa = apply_abandon_increment(G_demo, 'user1','bot', alpha=0.5, p_reply=0.6, ts=1.0)
print('wb,wa', wb, wa)

before 0.2
wb,wa 0.2 0.4


_________________________________________________________________________________________________
### 4. Pause detection math + smoother class

Detecting pauses robustly requires a lightweight online statistic that adapts to recent conversation tempo while remaining resistant to transient spikes. 

The exponential smoother implemented in this notebook maintains two compact statistics: a running mean and a running second moment. From these we compute variance as $\mathbb{E}[x^2] - (\mathbb{E}[x])^2$, which is numerically stable for streaming updates. 

The smoother uses a single hyperparameter $\alpha$ (smoothing factor) that governs the trade-off between responsiveness and stability - larger $\alpha$ means the smoother reacts faster to new intervals, smaller $\alpha$ makes it more conservative.

The pause test itself is intentionally simple and interpretable: given the latest inter-message interval $\Delta t$, compute the z-like score $z = (\Delta t - \mu)/\sigma$ where $\mu$ and $\sigma$ are the smoother mean and standard deviation. If $z$ exceeds a threshold $k$, we flag a pause. This leverages empirical variability rather than absolute thresholds, so the detector adapts to different users and contexts without changing parameters.

We detect unusually long pauses relative to the recent tempo so the system can trigger reminders, followups, or logging for later analysis.

In [4]:
from dataclasses import dataclass
import math

@dataclass
class ExpSmoother:
    alpha: float = 0.2
    mean: float = 0.0
    second_moment: float = 0.0
    initialized: bool = False

    def update(self, x: float):
        x = float(x)
        if not self.initialized:
            self.mean = x
            self.second_moment = x * x
            self.initialized = True
            return self.mean, self.std
        self.mean = self.alpha * x + (1 - self.alpha) * self.mean
        self.second_moment = self.alpha * (x * x) + (1 - self.alpha) * self.second_moment
        return self.mean, self.std

    @property
    def var(self) -> float:
        v = self.second_moment - (self.mean * self.mean)
        return max(0.0, v)

    @property
    def std(self) -> float:
        return math.sqrt(self.var)

def detect_pause(delta_t: float, smoother: ExpSmoother, k: float = 2.0) -> bool:
    if not smoother.initialized:
        return False
    return float(delta_t) > (smoother.mean + float(k) * smoother.std)

# small diagnostics
s = ExpSmoother(alpha=0.2)
for d in [0.5,0.6,0.55,0.45]:
    print('update', d, '->', s.update(d))
print('mean,std after small deltas', s.mean, s.std)
print('detect pause with 10.0 ->', detect_pause(10.0, s, k=2.0))

update 0.5 -> (0.5, 0.0)
update 0.6 -> (0.52, 0.039999999999999876)
update 0.55 -> (0.526, 0.03773592452822616)
update 0.45 -> (0.5108, 0.04542422261305039)
mean,std after small deltas 0.5108 0.04542422261305039
detect pause with 10.0 -> True


_________________________________________________________________________________________________
### 5. Readiness update math + function

Readiness is a compact, probabilistic summary of how prepared an agent or user is to take some next action. We represent readiness as a probability vector π over discrete outcomes and update it on each observation using a multiplicative, discounted rule. 

The update multiplies a discounted prior (π_t ** γ) by the evidence likelihood for each outcome, then re-normalizes to remain on the probability simplex. This form behaves like a tempered Bayesian update: discounting via `γ` implements inertia so that short, noisy signals do not cause large swings, while the multiplicative factor ensures evidence is weighted proportionally to prior belief.

Numerical safeguards are important: clamp tiny values with a small epsilon before exponentiation or multiplication to avoid underflow or zeroing out the distribution. 

If the normalizer sum becomes zero due to numerical issues, fall back to a uniform distribution to preserve validity. The update is intentionally model-agnostic: the likelihood input can come from a classifier, a heuristic, or a learned model, so long as it represents relative support for outcomes.

In [5]:
def update_readiness(pi_t: Sequence[float], likelihood: Sequence[float], gamma: float = 0.9) -> List[float]:
    if len(pi_t) != len(likelihood):
        raise ValueError('pi_t and likelihood must have same length')
    eps = 1e-12
    updated = [(max(eps, p) ** gamma) * max(eps, l) for p, l in zip(pi_t, likelihood)]
    s = sum(updated)
    if s <= 0:
        n = len(updated)
        return [1.0 / n] * n
    return [float(u) / s for u in updated]

# demo: prior favors first state but likelihood favors second
print('update_readiness ->', update_readiness([0.6,0.4], [0.2,0.8], gamma=0.8))

update_readiness -> [0.25694227692548016, 0.74305772307452]


_________________________________________________________________________________________________
### 6. Graph + state ops glue

The graph + state glue coordinates the low-level temporal storage and the lightweight stateful primitives so they behave as a cohesive runtime. `ConversationRuntime` in this notebook acts as a small orchestrator: it maintains the temporal `DiGraph` of event histories, a per-user `ExpSmoother` instance to track inter-message statistics, and convenience methods to ingest messages, record ABANDON observations, run pause checks, and apply readiness updates. 

The runtime keeps concerns separated storage (graph), smoothing/statistics (smoothers), and decision rules (abandon/readiness) which makes it easier to test and replace individual components.

Key operational patterns: when a message is ingested, the runtime appends a lightweight event to the appropriate temporal edge and updates the sender's smoother with the observed $\Delta t$. Observing an abandon produces an append-only event on the edge (preserving history) rather than mutating aggregated state. 

Readiness updates are computed using the prior stored in the user's `UserContext` and then written back, so downstream code always reads a normalized distribution. 

This design supports auditability and deterministic replay: because events are append-only and timestamped, you can re-run offline analyses with alternative decay kernels or thresholds without losing the raw timeline.

In [6]:
class ConversationRuntime:
    def __init__(self, alpha_smooth=0.2):
        self.G = make_temporal_graph()
        self.smoothers = {}  # user_id -> ExpSmoother
        self.alpha_smooth = alpha_smooth

    def get_smoother(self, user_id: str) -> ExpSmoother:
        if user_id not in self.smoothers:
            self.smoothers[user_id] = ExpSmoother(alpha=self.alpha_smooth)
        return self.smoothers[user_id]

    def ingest_message(self, msg: Message):
        add_event_edge(self.G, msg.sender, msg.recipient or '__channel__', ts=msg.ts, weight=0.0, message_id=msg.id)
        s = self.get_smoother(msg.sender)
        last_ts = msg.metadata.get('prev_ts') if msg.metadata else None
        if last_ts is not None:
            delta = msg.ts - last_ts
            s.update(delta)

    def observe_abandon(self, src: str, dst: str, p_reply: float, alpha: float, ts: float):
        return apply_abandon_increment(self.G, src, dst, alpha=alpha, p_reply=p_reply, ts=ts)

    def check_pause(self, user_id: str, delta_t: float, k: float = 2.0) -> bool:
        s = self.get_smoother(user_id)
        return detect_pause(delta_t, s, k=k)

    def apply_readiness_update(self, user_ctx: UserContext, likelihood: Sequence[float], gamma: float = 0.9):
        new_ready = update_readiness(user_ctx.readiness, likelihood, gamma)
        user_ctx.readiness = new_ready
        return new_ready

# small runtime demonstration
rt = ConversationRuntime()
rt.observe_abandon('user1','bot', p_reply=0.6, alpha=0.5, ts=2.0)
print('observed abandon, latest weight ->', edge_weight_at(rt.G,'user1','bot'))

observed abandon, latest weight -> 0.2


_________________________________________________________________________________________________
### 6. Deterministic unit tests

A compact test harness validates core invariants (ABANDON increments, pause detection, readiness normalization). The tests are deterministic when the notebook sets fixed seeds or uses fixed inputs.

In [7]:
def _run_tests():
    import math, traceback
    try:
        # ABANDON increment test
        Gt = make_temporal_graph()
        add_event_edge(Gt, 'u1', 'bot', ts=0.0, weight=0.2)
        wb, wa = apply_abandon_increment(Gt, 'u1', 'bot', alpha=0.5, p_reply=0.6, ts=1.0)
        assert math.isclose(wb, 0.2, rel_tol=1e-9), f'wb mismatch {wb}'
        assert math.isclose(wa, 0.4, rel_tol=1e-9), f'wa mismatch {wa}'
        # Pause detection test
        s = ExpSmoother(alpha=0.2)
        for x in [0.5, 0.6, 0.55, 0.45]:
            s.update(x)
        if not detect_pause(10.0, s, k=2.0):
            raise AssertionError(f'Pause detection failed: mean={s.mean}, std={s.std}')
        # readiness update test
        new_pi = update_readiness([0.6,0.4], [0.2,0.8], gamma=0.8)
        assert len(new_pi) == 2 and abs(sum(new_pi)-1.0) < 1e-9 and new_pi[1] > new_pi[0], f'readiness unexpected {new_pi}'
        print('All deterministic core-state tests passed')
    except Exception as e:
        traceback.print_exc()
        raise

_run_tests()

All deterministic core-state tests passed


_________________________________________________________________________________________________
### 6. Worked example / walkthrough

A short deterministic example that shows the runtime primitives interacting.

In [8]:
# Worked example prints
Gx = make_temporal_graph()
add_event_edge(Gx, 'user1', 'bot', ts=0.0, weight=0.2)
print('Initial weight @0.0 ->', edge_weight_at(Gx, 'user1', 'bot', ts=0.0))
wb, wa = apply_abandon_increment(Gx, 'user1', 'bot', alpha=0.5, p_reply=0.6, ts=1.0)
print('ABANDON: w_before=', wb, 'w_after=', wa)
s = ExpSmoother(alpha=0.2)
for d in [0.5,0.6,0.55,0.45]:
    s.update(d)
print('Smoother mean, std ->', round(s.mean, 4), round(s.std, 4))
print('Pause triggered for Δt=10.0 ? ->', detect_pause(10.0, s, k=2.0))
ctx = UserContext(user_id='user1', last_seen_ts=1.0, readiness=[0.7,0.3])
print('readiness before ->', ctx.readiness)
ctx.readiness = update_readiness(ctx.readiness, [0.2,0.8], gamma=0.85)
print('readiness after ->', [round(x,4) for x in ctx.readiness])

Initial weight @0.0 -> 0.2
ABANDON: w_before= 0.2 w_after= 0.4
Smoother mean, std -> 0.5108 0.0454
Pause triggered for Δt=10.0 ? -> True
readiness before -> [0.7, 0.3]
readiness after -> [0.3394, 0.6606]


_________________________________________________________________________________________________
### 7. Artifact writer

Export canonical JSON schemas for `Message` and `UserContext` so downstream notebooks and systems can validate inputs against a stable contract.

In [9]:
schema = {
    'Message': json.loads(Message.schema_json()),
    'UserContext': json.loads(UserContext.schema_json()),
}
with open(ARTIFACTS / 'state_schema.json', 'w', encoding='utf-8') as f:
    json.dump(schema, f, indent=2)
print('Wrote artifact ->', ARTIFACTS / 'state_schema.json')

Wrote artifact -> artifacts\state_schema.json


C:\Users\ADITI\AppData\Local\Temp\ipykernel_9744\729319765.py:2: PydanticDeprecatedSince20: The `schema_json` method is deprecated; use `model_json_schema` and json.dumps instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.11/migration/
  'Message': json.loads(Message.schema_json()),
C:\Users\ADITI\AppData\Local\Temp\ipykernel_9744\729319765.py:3: PydanticDeprecatedSince20: The `schema_json` method is deprecated; use `model_json_schema` and json.dumps instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.11/migration/
  'UserContext': json.loads(UserContext.schema_json()),


**JSON preview: artifacts/state_schema.json**

Load and pretty-print the generated schema to verify its contents.
________________________________________________________________________________________________

In [10]:
from pprint import pprint
p = ARTIFACTS / 'state_schema.json'
print('Reading', p)
with open(p, 'r', encoding='utf-8') as f:
    schema = json.load(f)
pprint(schema, width=120)
print('\n--- JSON (compact) ---')
print(json.dumps(schema, indent=2))

Reading artifacts\state_schema.json
{'Message': {'$defs': {'EmotionalState': {'enum': ['neutral', 'positive', 'negative', 'frustrated', 'engaged'],
                                          'title': 'EmotionalState',
                                          'type': 'string'},
                       'IntentClarity': {'enum': ['clear', 'ambiguous', 'unknown'],
                                         'title': 'IntentClarity',
                                         'type': 'string'}},
             'properties': {'emotional_state': {'anyOf': [{'$ref': '#/$defs/EmotionalState'}, {'type': 'null'}],
                                                'default': 'neutral'},
                            'id': {'description': 'Unique message identifier', 'title': 'Id', 'type': 'string'},
                            'intent_clarity': {'anyOf': [{'$ref': '#/$defs/IntentClarity'}, {'type': 'null'}],
                                               'default': 'unknown'},
                            'met