Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions src/lib/agents/attachment_expiry/loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
*/
import type { SupabaseService } from '../../supabase';
import type { LeaseCoordinator } from '../../embeddings/lease';
import { createLogger } from '../../logger.svelte';

const log = createLogger('attachment-expiry-worker');

export type CycleResult =
/** Just took the lease on this cycle — no work yet, caller should recurse immediately. */
Expand Down Expand Up @@ -42,21 +45,42 @@ export async function runOneCycle(ctx: CycleContext): Promise<CycleResult> {

if (!ctx.coordinator.isHolding) {
const acquired = await ctx.coordinator.acquire();
if (!acquired) return 'polling';
if (!acquired) {
// Polling fires every leasePollMs while another device holds
// the lease. Logged at .debug so an idle worker doesn't spam
// the drawer.
log.debug('polling for lease (another device holds it)');
return 'polling';
}
ctx.coordinator.startHeartbeat(ctx.onLeaseLost);
log.info('lease acquired - starting expiry sweep');
return 'acquired-lease';
}

let affected: number;
try {
affected = await ctx.supabase.expireOldAttachments(ctx.expiryDays);
} catch {
} catch (err) {
// Transient failure — Supabase hiccup, network blip. Back off and
// retry. The work isn't time-sensitive so we don't need tight
// retries; the hourly idle poll will eventually catch up.
log.info(
`expire RPC failed: ${err instanceof Error ? err.message : String(err)}`
);
return 'error';
}
if (affected === 0) return 'empty-queue';
if (affected === 0) {
// Empty-queue is the steady state once the backlog drains.
// .debug rather than .info to avoid an hourly heartbeat line in
// the drawer; the user can flip log level to debug when actively
// checking that the worker is alive.
log.debug(`expire sweep found no rows older than ${ctx.expiryDays} days`);
return 'empty-queue';
}
log.info(
`expired ${affected} attachment row${affected === 1 ? '' : 's'} ` +
`older than ${ctx.expiryDays} days`
);
return 'expired';
}

Expand Down
51 changes: 32 additions & 19 deletions src/lib/agents/samskara/loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,13 @@ async function runAssimilatePhase(ctx: CycleContext): Promise<CycleResult> {
return 'error';
}
if (!claim) return 'empty-phase';
log.debug('assimilate: claimed substrate', {
substrateId: claim.id,
threadId: claim.threadId,
userMessageId: claim.userMessageId,
assistantMessageId: claim.assistantMessageId,
});
// Lifecycle headline parallel to "picked up thread X" in the
// other agent workers - logged at .info so the user can see
// samskara firing in the drawer at the default level.
log.info(
`assimilate: claimed substrate ${claim.id} ` +
`(thread ${claim.threadId})`
);

// Fetch the two messages this substrate row anchors. assistantMessageId
// can be null (a turn that errored before the assistant row landed); in
Expand Down Expand Up @@ -267,7 +268,14 @@ async function runAssimilatePhase(ctx: CycleContext): Promise<CycleResult> {
log.debug('assimilate: save failed', err);
return 'error';
}
if (!saved) log.debug('assimilate: save rejected (claim expired?)', { substrateId: claim.id });
if (!saved) {
log.debug('assimilate: save rejected (claim expired?)', { substrateId: claim.id });
} else {
// Lifecycle "finished" line parallel to "finished thread X" in
// the other agent workers. Logged at .info; details about the
// assimilation result stay on the .debug line above.
log.info(`assimilate: saved substrate ${claim.id}`);
}
return saved ? 'progress' : 'save-rejected';
}

Expand Down Expand Up @@ -322,11 +330,12 @@ async function runPairRelatePhase(ctx: CycleContext): Promise<CycleResult> {
}

const partner = recent[bestIdx];
log.debug('pair-relate: selected pair', {
seedId: seed.id,
partnerId: partner.id,
cosine: bestSim,
});
// Lifecycle "picked up" line parallel to the other agents' "picked
// up X" - .info so it's visible at the default log level.
log.info(
`pair-relate: selected pair ${seed.id} <> ${partner.id} ` +
`(cosine ${bestSim.toFixed(3)})`
);
const result = await ctx.agent.relate(
{ situation: seed.situation, outcome: seed.outcome },
{ situation: partner.situation, outcome: partner.outcome },
Expand Down Expand Up @@ -380,12 +389,11 @@ async function runPairRelatePhase(ctx: CycleContext): Promise<CycleResult> {
log.debug('pair-relate: upsert threw', err);
return 'error';
}
log.debug('pair-relate: associated', {
aId,
bId,
kind: result.kind,
label: shorten(result.label),
});
// Lifecycle "finished" line - .info so the user sees the
// association land in the drawer at the default log level.
log.info(
`pair-relate: associated ${aId} <> ${bId} (${result.kind}: ${shorten(result.label)})`
);
return 'progress';
}

Expand Down Expand Up @@ -835,7 +843,12 @@ async function runCompoundRegenPhase(ctx: CycleContext): Promise<CycleResult> {
return 'error';
}
if (rows.length === 0) return 'empty-phase';
log.debug('compound-regen: synthesizing', { rows: rows.length, cap });
// Lifecycle "starting work" line - .info so a compound-regen run
// is visible in the drawer.
log.info(
`compound-regen: synthesizing summary from ${rows.length} sample row(s) ` +
`(cap ${cap})`
);

const summary = await ctx.agent.summarizeCompound(
rows.map((r) => ({
Expand Down
40 changes: 31 additions & 9 deletions src/lib/agents/wiki-librarian/loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,16 @@ export async function runOneCycle(ctx: CycleContext): Promise<CycleResult> {

if (!ctx.coordinator.isHolding) {
const acquired = await ctx.coordinator.acquire();
if (!acquired) return 'polling';
if (!acquired) {
// Polling fires every leasePollMs; .debug rather than .info so
// an idle librarian doesn't spam the drawer with a "still
// polling" line every minute. The acquired-lease branch below
// does land at .info so the user sees the transition.
log.debug('polling for lease (another device holds it)');
return 'polling';
}
ctx.coordinator.startHeartbeat(ctx.onLeaseLost);
log.info('lease acquired - checking eligibility on the next cycle');
return 'acquired-lease';
}

Expand All @@ -72,10 +80,26 @@ export async function runOneCycle(ctx: CycleContext): Promise<CycleResult> {
claimed = await ctx.supabase.claimWikiLibrarianRun(
ctx.minIntervalSeconds
);
} catch {
} catch (err) {
log.info(
`claim RPC failed: ${err instanceof Error ? err.message : String(err)}`
);
return 'error';
}
if (!claimed) return 'too-soon';
if (!claimed) {
// Logged at .info so the user can see "yes, the librarian is
// alive and waiting" without enabling debug. Idle nap is 1h, so
// this lands ~24 lines/day during the 12h cooldown - not noisy.
// The interval is named explicitly so the user knows the next
// attempt timing without doing the math.
const hours = Math.round(ctx.minIntervalSeconds / 3600);
log.info(
`not yet eligible for a librarian run ` +
`(min interval ${hours}h since last successful run)`
);
return 'too-soon';
}
log.info('claim acquired - starting librarian run');

// Snapshot of every article. Same listing the drawer uses, but
// we cap at 500 (matching listWikiArticles' default) - a librarian
Expand All @@ -85,9 +109,8 @@ export async function runOneCycle(ctx: CycleContext): Promise<CycleResult> {
try {
articles = await ctx.supabase.listWikiArticles({ limit: 500 });
} catch (err) {
log.debug(
'failed to list wiki articles for librarian',
err instanceof Error ? err.message : String(err)
log.info(
`failed to list wiki articles: ${err instanceof Error ? err.message : String(err)}`
);
return 'error';
}
Expand All @@ -113,9 +136,8 @@ export async function runOneCycle(ctx: CycleContext): Promise<CycleResult> {
signal: ctx.signal,
});
} catch (err) {
log.debug(
'librarian agent threw unexpectedly',
err instanceof Error ? err.message : String(err)
log.info(
`librarian agent threw unexpectedly: ${err instanceof Error ? err.message : String(err)}`
);
return 'error';
}
Expand Down
Loading