Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions .changeset/adr-0030-notification-p3b1-quiet-hours.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
---
"@objectstack/service-messaging": minor
---

ADR-0030 P3b-1 — quiet-hours. A notification that lands inside a recipient's
quiet-hours window is **deferred to the window's end** instead of disturbing
them; it then delivers normally.

- Implemented as a **deferred dispatch** on the P1 outbox — no parallel
scheduler: `EnqueueDeliveryInput.notBefore` sets the delivery row's initial
`nextAttemptAt`, and the existing dispatcher already skips pending rows whose
`nextAttemptAt` is in the future. One delivery spine, reusing claim/retry/
observability.
- `PreferenceResolver` reads `quiet_hours` (`{ tz, start, end }`, P2's field) off
a channel-wildcard preference row (quiet hours are a per-person, channel-
agnostic setting), computes the deferral with `quietHoursDeferral()` (HH:MM in
the row's `tz`, default UTC; supports overnight windows that wrap midnight),
and stamps `notBefore` on the target. `emit()` passes it through to the outbox.
- **critical** severity bypasses quiet hours (delivers immediately), like
mandatory topics bypass muting. Honored on the durable outbox path; inline
best-effort fan-out ignores it.

Tests: service-messaging **92 passing** — adds `quietHoursDeferral` unit cases
(same-day / overnight / outside / degenerate) and resolver cases (notBefore
stamped, critical bypass, JSON-string `quiet_hours`).

Follow-up: **P3b-2 — digest** (batch same-`(user, channel, window)` deliveries
into one) builds on this same deferral foundation, adding the window collapse.
20 changes: 13 additions & 7 deletions docs/handoff/adr-0030-notification-convergence.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,19 @@ flipped — the inbox is being populated the whole time.)
`sys_notification_template` (topic×channel×locale) + `{{ payload.x }}`
renderer with generic `payload.title`/`body` fallback. Same `emit` now
reaches inbox + email per prefs.
- **P3b — digest + quiet-hours**: pending. Plan: express both as **deferring
the delivery row's `next_attempt_at`** in the P1 outbox (digest = enqueue to
the next window + collapse same-`(user, channel, window)` rows; quiet-hours =
push `next_attempt_at` to the window end), reusing the dispatcher's
claim/retry/observability — one delivery spine, not a parallel scheduler.
Consumes the `digest`/`quiet_hours` fields P2 added. critical/mandatory
bypass. tz from `quiet_hours.tz``sys_user` → org/UTC.
- **P3b-1 — quiet-hours**: ✅ shipped. Deferred dispatch on the P1 outbox —
`EnqueueDeliveryInput.notBefore` → the row's initial `nextAttemptAt`; the
dispatcher already skips not-yet-due pending rows. `PreferenceResolver` reads
`quiet_hours` off a channel-wildcard row and computes the window end
(`quietHoursDeferral`, HH:MM in tz, overnight-aware). critical bypasses.
(tz currently from `quiet_hours.tz` → UTC; `sys_user` tz fallback is a
follow-up.)
- **P3b-2 — digest**: pending. Builds on the same deferral: enqueue digest
items to the next window, then a **collapse** step merges same-`(user,
channel, window)` rows into one materialization at window time (needs a
`digest_key` on the delivery row + a digest assembler in/beside the
dispatcher + a digest render template). critical/mandatory bypass. Consumes
P2's `digest` field.
- **Deferred (same seam, incremental)**: **Slack** stays a *connector*
(`connector-slack` ships the raw API path today); a Slack notification
*channel* needs identity mapping (`sys_channel_user_link`) + OAuth and is
Expand Down
5 changes: 3 additions & 2 deletions packages/services/service-messaging/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ export type {
ResolveContext,
} from './recipient-resolver.js';

// Preference filter (ADR-0030 P2)
export { PreferenceResolver, PREFERENCE_OBJECT } from './preference-resolver.js';
// Preference filter (ADR-0030 P2) + quiet-hours deferral (P3b)
export { PreferenceResolver, PREFERENCE_OBJECT, quietHoursDeferral } from './preference-resolver.js';
export type {
PreferenceResolverOptions,
PreferenceResolverLogger,
PreferenceContext,
PreferenceTarget,
QuietHours,
} from './preference-resolver.js';

// Channel seam
Expand Down
3 changes: 3 additions & 0 deletions packages/services/service-messaging/src/memory-outbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ export class MemoryNotificationOutbox implements INotificationOutbox {
partitionKey: hashPartition(input.notificationId, this.partitionCount),
status: 'pending',
attempts: 0,
// Deferred dispatch (quiet-hours, P3): claim() skips pending rows
// whose nextAttemptAt is still in the future.
nextAttemptAt: input.notBefore,
createdAt: now,
updatedAt: now,
});
Expand Down
6 changes: 5 additions & 1 deletion packages/services/service-messaging/src/messaging-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ export class MessagingService {
const targets = await this.preferences.filter(recipients, channels, {
topic: input.topic,
organizationId: input.organizationId,
severity: input.severity,
});
if (targets.length === 0) {
this.ctx.logger.info(`[messaging] emit: topic '${input.topic}' suppressed for all recipients by preference`);
Expand Down Expand Up @@ -269,7 +270,7 @@ export class MessagingService {
actionUrl: str(payload.url) ?? str(payload.actionUrl),
};
const deliveries: DeliveryOutcome[] = [];
for (const { recipient, channels } of targets) {
for (const { recipient, channels, notBefore } of targets) {
for (const channel of channels) {
try {
const id = await outbox.enqueue({
Expand All @@ -279,6 +280,9 @@ export class MessagingService {
topic: input.topic,
payload: deliveryPayload,
organizationId: input.organizationId,
// Quiet-hours deferral (P3b): the dispatcher won't claim
// this row until `notBefore`. Absent ⇒ immediate.
notBefore,
});
deliveries.push({ channel, recipient, ok: true, externalId: id });
} catch (err) {
Expand Down
7 changes: 7 additions & 0 deletions packages/services/service-messaging/src/outbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ export interface EnqueueDeliveryInput {
topic?: string;
payload: DeliveryPayload;
organizationId?: string;
/**
* Earliest dispatch time (epoch ms). When set, the row enqueues with
* `nextAttemptAt = notBefore`, so the dispatcher (which skips pending rows
* whose `nextAttemptAt` is in the future) defers the send until then. Used
* by the ADR-0030 P3 quiet-hours scheduler; absent ⇒ immediate.
*/
notBefore?: number;
}

export interface ClaimOptions {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license.

import { describe, it, expect } from 'vitest';
import { PreferenceResolver } from './preference-resolver.js';
import { PreferenceResolver, quietHoursDeferral } from './preference-resolver.js';

function silentLogger() {
return { info: () => {}, warn: () => {}, error: () => {} };
Expand Down Expand Up @@ -141,3 +141,61 @@ describe('PreferenceResolver', () => {
expect(data.queries.every((q) => q.where.organization_id === 'org_1')).toBe(true);
});
});

describe('quietHoursDeferral (P3b)', () => {
it('defers to the end of a same-day window when now is inside it', () => {
const now = Date.UTC(2026, 0, 1, 9, 0); // 09:00 UTC
const out = quietHoursDeferral({ tz: 'UTC', start: '09:00', end: '17:00' }, now);
expect(out).toBe(Date.UTC(2026, 0, 1, 17, 0));
});

it('defers across midnight for an overnight window', () => {
const now = Date.UTC(2026, 0, 1, 23, 0); // 23:00 UTC, window 22:00–08:00
const out = quietHoursDeferral({ tz: 'UTC', start: '22:00', end: '08:00' }, now);
expect(out).toBe(Date.UTC(2026, 0, 2, 8, 0)); // 08:00 next day
});

it('returns undefined when now is outside the window', () => {
const now = Date.UTC(2026, 0, 1, 12, 0);
expect(quietHoursDeferral({ tz: 'UTC', start: '22:00', end: '08:00' }, now)).toBeUndefined();
expect(quietHoursDeferral({ tz: 'UTC', start: '09:00', end: '17:00' }, Date.UTC(2026, 0, 1, 18, 0))).toBeUndefined();
});

it('returns undefined for a degenerate window or bad input', () => {
const now = Date.UTC(2026, 0, 1, 12, 0);
expect(quietHoursDeferral({ tz: 'UTC', start: '09:00', end: '09:00' }, now)).toBeUndefined();
expect(quietHoursDeferral({ start: 'nonsense' } as any, now)).toBeUndefined();
});
});

describe('PreferenceResolver — quiet hours', () => {
it('stamps notBefore on the target when the recipient is inside quiet hours', async () => {
const now = Date.UTC(2026, 0, 1, 23, 0);
const rows = [pref({ user_id: 'u1', topic: '*', channel: '*', enabled: true })];
(rows[0] as any).quiet_hours = { tz: 'UTC', start: '22:00', end: '08:00' };
const data = fakeData(rows);
const r = resolver(() => data.engine);
const out = await r.filter(['u1'], ['inbox', 'email'], { topic: 'task.assigned', now });
expect(out).toEqual([{ recipient: 'u1', channels: ['inbox', 'email'], notBefore: Date.UTC(2026, 0, 2, 8, 0) }]);
});

it('does not defer a critical event (bypasses quiet hours)', async () => {
const now = Date.UTC(2026, 0, 1, 23, 0);
const rows = [pref({ user_id: 'u1', topic: '*', channel: '*', enabled: true })];
(rows[0] as any).quiet_hours = { tz: 'UTC', start: '22:00', end: '08:00' };
const data = fakeData(rows);
const r = resolver(() => data.engine);
const out = await r.filter(['u1'], ['inbox'], { topic: 'task.assigned', now, severity: 'critical' });
expect(out).toEqual([{ recipient: 'u1', channels: ['inbox'] }]); // no notBefore
});

it('accepts a JSON-string quiet_hours value', async () => {
const now = Date.UTC(2026, 0, 1, 23, 0);
const rows = [pref({ user_id: 'u1', topic: '*', channel: '*', enabled: true })];
(rows[0] as any).quiet_hours = JSON.stringify({ tz: 'UTC', start: '22:00', end: '08:00' });
const data = fakeData(rows);
const r = resolver(() => data.engine);
const out = await r.filter(['u1'], ['inbox'], { topic: 'task.assigned', now });
expect(out[0].notBefore).toBe(Date.UTC(2026, 0, 2, 8, 0));
});
});
130 changes: 121 additions & 9 deletions packages/services/service-messaging/src/preference-resolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,30 @@ export interface PreferenceResolverOptions {
export interface PreferenceContext {
topic: string;
organizationId?: string;
/** Event severity — `critical` bypasses quiet-hours deferral (P3b). */
severity?: string;
/** "Now" reference (epoch ms) for quiet-hours math. Defaults to Date.now(). */
now?: number;
}

/** A recipient with the channels they accept for this notification. */
export interface PreferenceTarget {
recipient: string;
channels: string[];
/**
* Earliest dispatch time (epoch ms) when the recipient is inside their
* quiet-hours window; absent ⇒ send now. Applies to all of this recipient's
* channels (quiet hours are a per-person setting). Honored only on the
* durable outbox path; inline best-effort fan-out ignores it.
*/
notBefore?: number;
}

/** Quiet-hours window declared on a preference row. */
export interface QuietHours {
tz?: string;
start?: string; // 'HH:MM'
end?: string; // 'HH:MM'
}

const WILDCARD = '*';
Expand Down Expand Up @@ -94,23 +112,36 @@ export class PreferenceResolver {
return all();
}

// Index rows by `${user}|${topic}|${channel}` → enabled.
// Index rows by `${user}|${topic}|${channel}` → { enabled, quietHours }.
const recipientSet = new Set(recipients);
const index = new Map<string, boolean>();
const index = new Map<string, PrefRowLite>();
for (const r of rows) {
const user = String(r.user_id ?? '');
if (user !== WILDCARD && !recipientSet.has(user)) continue; // ignore unrelated users
const topic = String(r.topic ?? WILDCARD);
const channel = String(r.channel ?? WILDCARD);
index.set(`${user}|${topic}|${channel}`, asBool(r.enabled));
index.set(`${user}|${topic}|${channel}`, {
enabled: asBool(r.enabled),
quietHours: parseQuietHours(r.quiet_hours),
});
}

const nowMs = ctx.now ?? Date.now();
const critical = ctx.severity === 'critical';
const targets: PreferenceTarget[] = [];
for (const recipient of recipients) {
const accepted = channels.filter((channel) =>
this.enabledFor(index, recipient, ctx.topic, channel),
const accepted = channels.filter(
(channel) => this.resolveRow(index, recipient, ctx.topic, channel)?.enabled ?? true,
);
if (accepted.length > 0) targets.push({ recipient, channels: accepted });
if (accepted.length === 0) continue;
// Quiet-hours deferral (per person; declared on a channel-wildcard
// row). Critical events bypass it.
let notBefore: number | undefined;
if (!critical) {
const qh = this.resolveQuietHours(index, recipient, ctx.topic);
notBefore = qh ? quietHoursDeferral(qh, nowMs) : undefined;
}
targets.push(notBefore != null ? { recipient, channels: accepted, notBefore } : { recipient, channels: accepted });
}
return targets;
}
Expand All @@ -130,9 +161,9 @@ export class PreferenceResolver {

/**
* Most-specific-wins lookup for (user, topic, channel). User-specific beats
* the `*` user; topic/channel specific beats their wildcards. Default on.
* the `*` user; topic/channel specific beats their wildcards.
*/
private enabledFor(index: Map<string, boolean>, user: string, topic: string, channel: string): boolean {
private resolveRow(index: Map<string, PrefRowLite>, user: string, topic: string, channel: string): PrefRowLite | undefined {
for (const u of [user, WILDCARD]) {
for (const t of [topic, WILDCARD]) {
for (const c of [channel, WILDCARD]) {
Expand All @@ -141,14 +172,95 @@ export class PreferenceResolver {
}
}
}
return true; // built-in default: opted in
return undefined; // built-in default handled by callers (opted in)
}

/**
* Resolve a recipient's quiet-hours window. Declared on a channel-wildcard
* row (`(user, *, *)` or `(user, topic, *)`) — quiet hours are a per-person,
* channel-agnostic setting. Most-specific user/topic wins.
*/
private resolveQuietHours(index: Map<string, PrefRowLite>, user: string, topic: string): QuietHours | undefined {
for (const u of [user, WILDCARD]) {
for (const t of [topic, WILDCARD]) {
const hit = index.get(`${u}|${t}|${WILDCARD}`);
if (hit?.quietHours) return hit.quietHours;
}
}
return undefined;
}
}

interface PrefRowLite {
enabled: boolean;
quietHours?: QuietHours;
}

function asBool(v: unknown): boolean {
return v === true || v === 1 || v === '1' || v === 'true';
}

function parseQuietHours(v: unknown): QuietHours | undefined {
let o: any = v;
if (typeof o === 'string') {
try { o = JSON.parse(o); } catch { return undefined; }
}
if (!o || typeof o !== 'object') return undefined;
if (o.start == null || o.end == null) return undefined;
return { tz: o.tz, start: String(o.start), end: String(o.end) };
}

/**
* Compute the deferral target (epoch ms) when `now` falls inside the quiet-hours
* window, else `undefined`. Times are `HH:MM` in `quietHours.tz` (default UTC).
* Supports overnight windows (start > end, e.g. 22:00–08:00). Uses `Intl` to read
* the wall-clock minutes in the tz; returns `now + minutesUntilWindowEnd`.
*/
export function quietHoursDeferral(quietHours: QuietHours, nowMs: number): number | undefined {
const start = parseHHMM(quietHours.start);
const end = parseHHMM(quietHours.end);
if (start == null || end == null || start === end) return undefined;

const cur = minutesOfDayInTz(nowMs, quietHours.tz ?? 'UTC');
let untilEnd: number | undefined;
if (start < end) {
if (cur >= start && cur < end) untilEnd = end - cur;
} else {
// Overnight window wrapping midnight.
if (cur >= start) untilEnd = 1440 - cur + end;
else if (cur < end) untilEnd = end - cur;
}
return untilEnd == null ? undefined : nowMs + untilEnd * 60_000;
}

function parseHHMM(s?: string): number | undefined {
if (!s) return undefined;
const m = /^(\d{1,2}):(\d{2})$/.exec(s.trim());
if (!m) return undefined;
const h = Number(m[1]);
const min = Number(m[2]);
if (h > 23 || min > 59) return undefined;
return h * 60 + min;
}

function minutesOfDayInTz(nowMs: number, tz: string): number {
try {
const parts = new Intl.DateTimeFormat('en-US', {
hour12: false,
hour: '2-digit',
minute: '2-digit',
timeZone: tz,
}).formatToParts(new Date(nowMs));
const hour = Number(parts.find((p) => p.type === 'hour')?.value ?? '0') % 24; // '24' → 0
const minute = Number(parts.find((p) => p.type === 'minute')?.value ?? '0');
return hour * 60 + minute;
} catch {
// Unknown tz → treat as UTC.
const d = new Date(nowMs);
return d.getUTCHours() * 60 + d.getUTCMinutes();
}
}

function msg(err: unknown): string {
return (err as Error)?.message ?? String(err);
}
3 changes: 3 additions & 0 deletions packages/services/service-messaging/src/sql-outbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ export class SqlNotificationOutbox implements INotificationOutbox {
partition_key: hashPartition(input.notificationId, this.partitionCount),
status: 'pending',
attempts: 0,
// Deferred dispatch (quiet-hours, P3): claim() skips pending rows
// whose next_attempt_at is in the future.
next_attempt_at: input.notBefore ?? null,
created_at: now,
updated_at: now,
};
Expand Down