diff --git a/.changeset/adr-0030-notification-p3b1-quiet-hours.md b/.changeset/adr-0030-notification-p3b1-quiet-hours.md new file mode 100644 index 000000000..2bf220045 --- /dev/null +++ b/.changeset/adr-0030-notification-p3b1-quiet-hours.md @@ -0,0 +1,28 @@ +--- +"@objectstack/service-messaging": minor +--- + +ADR-0030 P3b-1 — quiet-hours. A notification that lands inside a recipient's +quiet-hours window is **deferred to the window's end** instead of disturbing +them; it then delivers normally. + +- Implemented as a **deferred dispatch** on the P1 outbox — no parallel + scheduler: `EnqueueDeliveryInput.notBefore` sets the delivery row's initial + `nextAttemptAt`, and the existing dispatcher already skips pending rows whose + `nextAttemptAt` is in the future. One delivery spine, reusing claim/retry/ + observability. +- `PreferenceResolver` reads `quiet_hours` (`{ tz, start, end }`, P2's field) off + a channel-wildcard preference row (quiet hours are a per-person, channel- + agnostic setting), computes the deferral with `quietHoursDeferral()` (HH:MM in + the row's `tz`, default UTC; supports overnight windows that wrap midnight), + and stamps `notBefore` on the target. `emit()` passes it through to the outbox. +- **critical** severity bypasses quiet hours (delivers immediately), like + mandatory topics bypass muting. Honored on the durable outbox path; inline + best-effort fan-out ignores it. + +Tests: service-messaging **92 passing** — adds `quietHoursDeferral` unit cases +(same-day / overnight / outside / degenerate) and resolver cases (notBefore +stamped, critical bypass, JSON-string `quiet_hours`). + +Follow-up: **P3b-2 — digest** (batch same-`(user, channel, window)` deliveries +into one) builds on this same deferral foundation, adding the window collapse. diff --git a/docs/handoff/adr-0030-notification-convergence.md b/docs/handoff/adr-0030-notification-convergence.md index 9a1ec6462..bfb9bf846 100644 --- a/docs/handoff/adr-0030-notification-convergence.md +++ b/docs/handoff/adr-0030-notification-convergence.md @@ -165,13 +165,19 @@ flipped — the inbox is being populated the whole time.) `sys_notification_template` (topic×channel×locale) + `{{ payload.x }}` renderer with generic `payload.title`/`body` fallback. Same `emit` now reaches inbox + email per prefs. - - **P3b — digest + quiet-hours**: pending. Plan: express both as **deferring - the delivery row's `next_attempt_at`** in the P1 outbox (digest = enqueue to - the next window + collapse same-`(user, channel, window)` rows; quiet-hours = - push `next_attempt_at` to the window end), reusing the dispatcher's - claim/retry/observability — one delivery spine, not a parallel scheduler. - Consumes the `digest`/`quiet_hours` fields P2 added. critical/mandatory - bypass. tz from `quiet_hours.tz` → `sys_user` → org/UTC. + - **P3b-1 — quiet-hours**: ✅ shipped. Deferred dispatch on the P1 outbox — + `EnqueueDeliveryInput.notBefore` → the row's initial `nextAttemptAt`; the + dispatcher already skips not-yet-due pending rows. `PreferenceResolver` reads + `quiet_hours` off a channel-wildcard row and computes the window end + (`quietHoursDeferral`, HH:MM in tz, overnight-aware). critical bypasses. + (tz currently from `quiet_hours.tz` → UTC; `sys_user` tz fallback is a + follow-up.) + - **P3b-2 — digest**: pending. Builds on the same deferral: enqueue digest + items to the next window, then a **collapse** step merges same-`(user, + channel, window)` rows into one materialization at window time (needs a + `digest_key` on the delivery row + a digest assembler in/beside the + dispatcher + a digest render template). critical/mandatory bypass. Consumes + P2's `digest` field. - **Deferred (same seam, incremental)**: **Slack** stays a *connector* (`connector-slack` ships the raw API path today); a Slack notification *channel* needs identity mapping (`sys_channel_user_link`) + OAuth and is diff --git a/packages/services/service-messaging/src/index.ts b/packages/services/service-messaging/src/index.ts index 1fabccb30..0eb2e35fc 100644 --- a/packages/services/service-messaging/src/index.ts +++ b/packages/services/service-messaging/src/index.ts @@ -42,13 +42,14 @@ export type { ResolveContext, } from './recipient-resolver.js'; -// Preference filter (ADR-0030 P2) -export { PreferenceResolver, PREFERENCE_OBJECT } from './preference-resolver.js'; +// Preference filter (ADR-0030 P2) + quiet-hours deferral (P3b) +export { PreferenceResolver, PREFERENCE_OBJECT, quietHoursDeferral } from './preference-resolver.js'; export type { PreferenceResolverOptions, PreferenceResolverLogger, PreferenceContext, PreferenceTarget, + QuietHours, } from './preference-resolver.js'; // Channel seam diff --git a/packages/services/service-messaging/src/memory-outbox.ts b/packages/services/service-messaging/src/memory-outbox.ts index be6e7f096..c252f5163 100644 --- a/packages/services/service-messaging/src/memory-outbox.ts +++ b/packages/services/service-messaging/src/memory-outbox.ts @@ -48,6 +48,9 @@ export class MemoryNotificationOutbox implements INotificationOutbox { partitionKey: hashPartition(input.notificationId, this.partitionCount), status: 'pending', attempts: 0, + // Deferred dispatch (quiet-hours, P3): claim() skips pending rows + // whose nextAttemptAt is still in the future. + nextAttemptAt: input.notBefore, createdAt: now, updatedAt: now, }); diff --git a/packages/services/service-messaging/src/messaging-service.ts b/packages/services/service-messaging/src/messaging-service.ts index f1b82b129..4bcbbcb97 100644 --- a/packages/services/service-messaging/src/messaging-service.ts +++ b/packages/services/service-messaging/src/messaging-service.ts @@ -216,6 +216,7 @@ export class MessagingService { const targets = await this.preferences.filter(recipients, channels, { topic: input.topic, organizationId: input.organizationId, + severity: input.severity, }); if (targets.length === 0) { this.ctx.logger.info(`[messaging] emit: topic '${input.topic}' suppressed for all recipients by preference`); @@ -269,7 +270,7 @@ export class MessagingService { actionUrl: str(payload.url) ?? str(payload.actionUrl), }; const deliveries: DeliveryOutcome[] = []; - for (const { recipient, channels } of targets) { + for (const { recipient, channels, notBefore } of targets) { for (const channel of channels) { try { const id = await outbox.enqueue({ @@ -279,6 +280,9 @@ export class MessagingService { topic: input.topic, payload: deliveryPayload, organizationId: input.organizationId, + // Quiet-hours deferral (P3b): the dispatcher won't claim + // this row until `notBefore`. Absent ⇒ immediate. + notBefore, }); deliveries.push({ channel, recipient, ok: true, externalId: id }); } catch (err) { diff --git a/packages/services/service-messaging/src/outbox.ts b/packages/services/service-messaging/src/outbox.ts index 5a396c10a..44a829535 100644 --- a/packages/services/service-messaging/src/outbox.ts +++ b/packages/services/service-messaging/src/outbox.ts @@ -54,6 +54,13 @@ export interface EnqueueDeliveryInput { topic?: string; payload: DeliveryPayload; organizationId?: string; + /** + * Earliest dispatch time (epoch ms). When set, the row enqueues with + * `nextAttemptAt = notBefore`, so the dispatcher (which skips pending rows + * whose `nextAttemptAt` is in the future) defers the send until then. Used + * by the ADR-0030 P3 quiet-hours scheduler; absent ⇒ immediate. + */ + notBefore?: number; } export interface ClaimOptions { diff --git a/packages/services/service-messaging/src/preference-resolver.test.ts b/packages/services/service-messaging/src/preference-resolver.test.ts index 5c2cfae1e..e5d389b0c 100644 --- a/packages/services/service-messaging/src/preference-resolver.test.ts +++ b/packages/services/service-messaging/src/preference-resolver.test.ts @@ -1,7 +1,7 @@ // Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. import { describe, it, expect } from 'vitest'; -import { PreferenceResolver } from './preference-resolver.js'; +import { PreferenceResolver, quietHoursDeferral } from './preference-resolver.js'; function silentLogger() { return { info: () => {}, warn: () => {}, error: () => {} }; @@ -141,3 +141,61 @@ describe('PreferenceResolver', () => { expect(data.queries.every((q) => q.where.organization_id === 'org_1')).toBe(true); }); }); + +describe('quietHoursDeferral (P3b)', () => { + it('defers to the end of a same-day window when now is inside it', () => { + const now = Date.UTC(2026, 0, 1, 9, 0); // 09:00 UTC + const out = quietHoursDeferral({ tz: 'UTC', start: '09:00', end: '17:00' }, now); + expect(out).toBe(Date.UTC(2026, 0, 1, 17, 0)); + }); + + it('defers across midnight for an overnight window', () => { + const now = Date.UTC(2026, 0, 1, 23, 0); // 23:00 UTC, window 22:00–08:00 + const out = quietHoursDeferral({ tz: 'UTC', start: '22:00', end: '08:00' }, now); + expect(out).toBe(Date.UTC(2026, 0, 2, 8, 0)); // 08:00 next day + }); + + it('returns undefined when now is outside the window', () => { + const now = Date.UTC(2026, 0, 1, 12, 0); + expect(quietHoursDeferral({ tz: 'UTC', start: '22:00', end: '08:00' }, now)).toBeUndefined(); + expect(quietHoursDeferral({ tz: 'UTC', start: '09:00', end: '17:00' }, Date.UTC(2026, 0, 1, 18, 0))).toBeUndefined(); + }); + + it('returns undefined for a degenerate window or bad input', () => { + const now = Date.UTC(2026, 0, 1, 12, 0); + expect(quietHoursDeferral({ tz: 'UTC', start: '09:00', end: '09:00' }, now)).toBeUndefined(); + expect(quietHoursDeferral({ start: 'nonsense' } as any, now)).toBeUndefined(); + }); +}); + +describe('PreferenceResolver — quiet hours', () => { + it('stamps notBefore on the target when the recipient is inside quiet hours', async () => { + const now = Date.UTC(2026, 0, 1, 23, 0); + const rows = [pref({ user_id: 'u1', topic: '*', channel: '*', enabled: true })]; + (rows[0] as any).quiet_hours = { tz: 'UTC', start: '22:00', end: '08:00' }; + const data = fakeData(rows); + const r = resolver(() => data.engine); + const out = await r.filter(['u1'], ['inbox', 'email'], { topic: 'task.assigned', now }); + expect(out).toEqual([{ recipient: 'u1', channels: ['inbox', 'email'], notBefore: Date.UTC(2026, 0, 2, 8, 0) }]); + }); + + it('does not defer a critical event (bypasses quiet hours)', async () => { + const now = Date.UTC(2026, 0, 1, 23, 0); + const rows = [pref({ user_id: 'u1', topic: '*', channel: '*', enabled: true })]; + (rows[0] as any).quiet_hours = { tz: 'UTC', start: '22:00', end: '08:00' }; + const data = fakeData(rows); + const r = resolver(() => data.engine); + const out = await r.filter(['u1'], ['inbox'], { topic: 'task.assigned', now, severity: 'critical' }); + expect(out).toEqual([{ recipient: 'u1', channels: ['inbox'] }]); // no notBefore + }); + + it('accepts a JSON-string quiet_hours value', async () => { + const now = Date.UTC(2026, 0, 1, 23, 0); + const rows = [pref({ user_id: 'u1', topic: '*', channel: '*', enabled: true })]; + (rows[0] as any).quiet_hours = JSON.stringify({ tz: 'UTC', start: '22:00', end: '08:00' }); + const data = fakeData(rows); + const r = resolver(() => data.engine); + const out = await r.filter(['u1'], ['inbox'], { topic: 'task.assigned', now }); + expect(out[0].notBefore).toBe(Date.UTC(2026, 0, 2, 8, 0)); + }); +}); diff --git a/packages/services/service-messaging/src/preference-resolver.ts b/packages/services/service-messaging/src/preference-resolver.ts index 9a9987681..4f8340fb2 100644 --- a/packages/services/service-messaging/src/preference-resolver.ts +++ b/packages/services/service-messaging/src/preference-resolver.ts @@ -28,12 +28,30 @@ export interface PreferenceResolverOptions { export interface PreferenceContext { topic: string; organizationId?: string; + /** Event severity — `critical` bypasses quiet-hours deferral (P3b). */ + severity?: string; + /** "Now" reference (epoch ms) for quiet-hours math. Defaults to Date.now(). */ + now?: number; } /** A recipient with the channels they accept for this notification. */ export interface PreferenceTarget { recipient: string; channels: string[]; + /** + * Earliest dispatch time (epoch ms) when the recipient is inside their + * quiet-hours window; absent ⇒ send now. Applies to all of this recipient's + * channels (quiet hours are a per-person setting). Honored only on the + * durable outbox path; inline best-effort fan-out ignores it. + */ + notBefore?: number; +} + +/** Quiet-hours window declared on a preference row. */ +export interface QuietHours { + tz?: string; + start?: string; // 'HH:MM' + end?: string; // 'HH:MM' } const WILDCARD = '*'; @@ -94,23 +112,36 @@ export class PreferenceResolver { return all(); } - // Index rows by `${user}|${topic}|${channel}` → enabled. + // Index rows by `${user}|${topic}|${channel}` → { enabled, quietHours }. const recipientSet = new Set(recipients); - const index = new Map(); + const index = new Map(); for (const r of rows) { const user = String(r.user_id ?? ''); if (user !== WILDCARD && !recipientSet.has(user)) continue; // ignore unrelated users const topic = String(r.topic ?? WILDCARD); const channel = String(r.channel ?? WILDCARD); - index.set(`${user}|${topic}|${channel}`, asBool(r.enabled)); + index.set(`${user}|${topic}|${channel}`, { + enabled: asBool(r.enabled), + quietHours: parseQuietHours(r.quiet_hours), + }); } + const nowMs = ctx.now ?? Date.now(); + const critical = ctx.severity === 'critical'; const targets: PreferenceTarget[] = []; for (const recipient of recipients) { - const accepted = channels.filter((channel) => - this.enabledFor(index, recipient, ctx.topic, channel), + const accepted = channels.filter( + (channel) => this.resolveRow(index, recipient, ctx.topic, channel)?.enabled ?? true, ); - if (accepted.length > 0) targets.push({ recipient, channels: accepted }); + if (accepted.length === 0) continue; + // Quiet-hours deferral (per person; declared on a channel-wildcard + // row). Critical events bypass it. + let notBefore: number | undefined; + if (!critical) { + const qh = this.resolveQuietHours(index, recipient, ctx.topic); + notBefore = qh ? quietHoursDeferral(qh, nowMs) : undefined; + } + targets.push(notBefore != null ? { recipient, channels: accepted, notBefore } : { recipient, channels: accepted }); } return targets; } @@ -130,9 +161,9 @@ export class PreferenceResolver { /** * Most-specific-wins lookup for (user, topic, channel). User-specific beats - * the `*` user; topic/channel specific beats their wildcards. Default on. + * the `*` user; topic/channel specific beats their wildcards. */ - private enabledFor(index: Map, user: string, topic: string, channel: string): boolean { + private resolveRow(index: Map, user: string, topic: string, channel: string): PrefRowLite | undefined { for (const u of [user, WILDCARD]) { for (const t of [topic, WILDCARD]) { for (const c of [channel, WILDCARD]) { @@ -141,14 +172,95 @@ export class PreferenceResolver { } } } - return true; // built-in default: opted in + return undefined; // built-in default handled by callers (opted in) } + + /** + * Resolve a recipient's quiet-hours window. Declared on a channel-wildcard + * row (`(user, *, *)` or `(user, topic, *)`) — quiet hours are a per-person, + * channel-agnostic setting. Most-specific user/topic wins. + */ + private resolveQuietHours(index: Map, user: string, topic: string): QuietHours | undefined { + for (const u of [user, WILDCARD]) { + for (const t of [topic, WILDCARD]) { + const hit = index.get(`${u}|${t}|${WILDCARD}`); + if (hit?.quietHours) return hit.quietHours; + } + } + return undefined; + } +} + +interface PrefRowLite { + enabled: boolean; + quietHours?: QuietHours; } function asBool(v: unknown): boolean { return v === true || v === 1 || v === '1' || v === 'true'; } +function parseQuietHours(v: unknown): QuietHours | undefined { + let o: any = v; + if (typeof o === 'string') { + try { o = JSON.parse(o); } catch { return undefined; } + } + if (!o || typeof o !== 'object') return undefined; + if (o.start == null || o.end == null) return undefined; + return { tz: o.tz, start: String(o.start), end: String(o.end) }; +} + +/** + * Compute the deferral target (epoch ms) when `now` falls inside the quiet-hours + * window, else `undefined`. Times are `HH:MM` in `quietHours.tz` (default UTC). + * Supports overnight windows (start > end, e.g. 22:00–08:00). Uses `Intl` to read + * the wall-clock minutes in the tz; returns `now + minutesUntilWindowEnd`. + */ +export function quietHoursDeferral(quietHours: QuietHours, nowMs: number): number | undefined { + const start = parseHHMM(quietHours.start); + const end = parseHHMM(quietHours.end); + if (start == null || end == null || start === end) return undefined; + + const cur = minutesOfDayInTz(nowMs, quietHours.tz ?? 'UTC'); + let untilEnd: number | undefined; + if (start < end) { + if (cur >= start && cur < end) untilEnd = end - cur; + } else { + // Overnight window wrapping midnight. + if (cur >= start) untilEnd = 1440 - cur + end; + else if (cur < end) untilEnd = end - cur; + } + return untilEnd == null ? undefined : nowMs + untilEnd * 60_000; +} + +function parseHHMM(s?: string): number | undefined { + if (!s) return undefined; + const m = /^(\d{1,2}):(\d{2})$/.exec(s.trim()); + if (!m) return undefined; + const h = Number(m[1]); + const min = Number(m[2]); + if (h > 23 || min > 59) return undefined; + return h * 60 + min; +} + +function minutesOfDayInTz(nowMs: number, tz: string): number { + try { + const parts = new Intl.DateTimeFormat('en-US', { + hour12: false, + hour: '2-digit', + minute: '2-digit', + timeZone: tz, + }).formatToParts(new Date(nowMs)); + const hour = Number(parts.find((p) => p.type === 'hour')?.value ?? '0') % 24; // '24' → 0 + const minute = Number(parts.find((p) => p.type === 'minute')?.value ?? '0'); + return hour * 60 + minute; + } catch { + // Unknown tz → treat as UTC. + const d = new Date(nowMs); + return d.getUTCHours() * 60 + d.getUTCMinutes(); + } +} + function msg(err: unknown): string { return (err as Error)?.message ?? String(err); } diff --git a/packages/services/service-messaging/src/sql-outbox.ts b/packages/services/service-messaging/src/sql-outbox.ts index abb838048..0bd369bde 100644 --- a/packages/services/service-messaging/src/sql-outbox.ts +++ b/packages/services/service-messaging/src/sql-outbox.ts @@ -81,6 +81,9 @@ export class SqlNotificationOutbox implements INotificationOutbox { partition_key: hashPartition(input.notificationId, this.partitionCount), status: 'pending', attempts: 0, + // Deferred dispatch (quiet-hours, P3): claim() skips pending rows + // whose next_attempt_at is in the future. + next_attempt_at: input.notBefore ?? null, created_at: now, updated_at: now, };