Skip to content

Commit 25a368e

Browse files
committed
Improve Session Summarization to Handle Parallel and Recent Sessions #7861
1 parent 497cab8 commit 25a368e

1 file changed

Lines changed: 145 additions & 10 deletions

File tree

ai/mcp/server/memory-core/services/SessionService.mjs

Lines changed: 145 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,25 @@ import logger from '../logger.mjs';
99

1010
/**
1111
* Service for handling adding, listing, and querying agent memories.
12+
*
13+
* **Architecture & Strategy:**
14+
* The primary challenge in managing session summaries is that agents are stateless and sessions
15+
* rarely have a clean, explicit "end" event (e.g., crashes, network disconnects, or simply stopping work).
16+
*
17+
* To address this, this service employs an **"Eventual Consistency"** strategy for summarization:
18+
* 1. **Startup Scan:** On initialization, we scan for sessions that have been active recently (last 30 days).
19+
* 2. **Drift Detection:** We compare the actual number of memories in the database against the `memoryCount`
20+
* recorded in the existing summary.
21+
* 3. **Self-Healing:** If the counts differ (indicating new memories added or old ones deleted), we
22+
* trigger a re-summarization.
23+
*
24+
* **Parallel Sessions Trade-off:**
25+
* In a scenario where multiple sessions are running in parallel (e.g., Session A and Session B),
26+
* they may continuously update the memory count. This strategy accepts the overhead of
27+
* potentially re-summarizing an active session multiple times to ensure that if any session
28+
* crashes or ends abruptly, its context is preserved and up-to-date for the next agent.
29+
* Data integrity and context availability are prioritized over minimizing LLM token usage.
30+
*
1231
* @class AI.mcp.server.memory-core.services.SessionService
1332
* @extends Neo.core.Base
1433
* @singleton
@@ -122,18 +141,134 @@ class SessionService extends Base {
122141
}
123142

124143
/**
125-
* Finds sessions that have not yet been summarized.
126-
* @returns {Promise<String[]>}
144+
* Finds sessions that need summarization by detecting "Drift" between the database state
145+
* and the summary metadata.
146+
*
147+
* **Logic:**
148+
* 1. **Scope:** Fetches memory and summary metadata for the last 30 days. This optimization
149+
* prevents full-table scans on large databases while covering the vast majority of active work.
150+
* 2. **Grouping:** Aggregates actual memory counts per session from the raw memory data.
151+
* 3. **Comparison:**
152+
* - **Missing Summary:** If a session has memories but no summary, it is flagged.
153+
* - **Count Mismatch:** If `DB_Count !== Summary_Count`, it implies the session has changed
154+
* (new memories added or removed) since the last summary. It is flagged for update.
155+
*
156+
* **Scenarios:**
157+
* - **Happy Path (Sequential):** Session A ends. Summary is created (Count: 10). Next startup sees
158+
* DB: 10, Summary: 10. Match. No action. Zero overhead.
159+
* - **Parallel / Crash Path:** Session A crashes after adding 5 memories. Summary has 10, DB has 15.
160+
* Next startup sees Mismatch. Re-summarizes to capture the lost 5 memories.
161+
*
162+
* @returns {Promise<String[]>} List of Session IDs requiring summarization.
127163
*/
128-
async findUnsummarizedSessions() {
129-
const memories = await this.memoryCollection.get({ include: ['metadatas'] });
130-
if (memories.ids.length === 0) return [];
164+
async findSessionsToSummarize() {
165+
// 1. Get metadata for memories (Last 30 Days only)
166+
// This limits the scope to recent active work.
167+
const ONE_MONTH_MS = 30 * 24 * 60 * 60 * 1000;
168+
const minTimestamp = new Date(Date.now() - ONE_MONTH_MS).toISOString();
169+
170+
let allMetadatas = [];
171+
let offset = 0;
172+
const limit = 2000;
173+
let hasMore = true;
174+
175+
while (hasMore) {
176+
const batch = await this.memoryCollection.get({
177+
include: ['metadatas'],
178+
limit,
179+
offset,
180+
where : {
181+
timestamp: {
182+
'$gt': minTimestamp
183+
}
184+
}
185+
});
186+
187+
if (batch.ids.length === 0) {
188+
hasMore = false;
189+
} else {
190+
allMetadatas = allMetadatas.concat(batch.metadatas);
191+
offset += limit;
192+
193+
if (batch.ids.length < limit) {
194+
hasMore = false;
195+
}
196+
}
197+
}
198+
199+
if (allMetadatas.length === 0) return [];
200+
201+
// 2. Group memories by session
202+
const sessions = {}; // { sessionId: { count: number } }
203+
204+
allMetadatas.forEach(m => {
205+
if (!m.sessionId) return;
206+
207+
if (!sessions[m.sessionId]) {
208+
sessions[m.sessionId] = { count: 0 };
209+
}
210+
211+
sessions[m.sessionId].count++;
212+
});
213+
214+
// 3. Fetch existing summaries (Last 30 Days only)
215+
let allSummaryMetadatas = [];
216+
offset = 0;
217+
hasMore = true;
218+
219+
while (hasMore) {
220+
const batch = await this.sessionsCollection.get({
221+
include: ['metadatas'],
222+
limit,
223+
offset,
224+
where : {
225+
timestamp: {
226+
'$gt': minTimestamp
227+
}
228+
}
229+
});
230+
231+
if (batch.ids.length === 0) {
232+
hasMore = false;
233+
} else {
234+
allSummaryMetadatas = allSummaryMetadatas.concat(batch.metadatas);
235+
offset += limit;
131236

132-
const sessionIds = [...new Set(memories.metadatas.map(m => m.sessionId).filter(Boolean))];
133-
const summaries = await this.sessionsCollection.get({include: ['metadatas']});
134-
const summarizedSessionIds = new Set(summaries.metadatas.map(m => m.sessionId));
237+
if (batch.ids.length < limit) {
238+
hasMore = false;
239+
}
240+
}
241+
}
242+
243+
const summaryMap = {}; // { sessionId: memoryCount }
244+
245+
allSummaryMetadatas.forEach(m => {
246+
if (m.sessionId) {
247+
summaryMap[m.sessionId] = m.memoryCount || 0;
248+
}
249+
});
250+
251+
// 4. Determine candidates
252+
const sessionsToUpdate = [];
253+
254+
Object.keys(sessions).forEach(sessionId => {
255+
const sessionData = sessions[sessionId];
256+
const summaryCount = summaryMap[sessionId];
257+
258+
// Case A: Completely Missing Summary (within the 30-day window)
259+
if (summaryCount === undefined) {
260+
sessionsToUpdate.push(sessionId);
261+
}
262+
// Case B: Partial / Outdated Summary
263+
// If the memory count differs (new memories added OR deleted), we update.
264+
// This self-corrects: once updated, the counts match, and it won't run again.
265+
else if (sessionData.count !== summaryCount) {
266+
logger.info(`[SessionService] Updating active session ${sessionId} (DB: ${sessionData.count} !== Summary: ${summaryCount})`);
267+
sessionsToUpdate.push(sessionId);
268+
}
269+
});
135270

136-
return sessionIds.filter(id => !summarizedSessionIds.has(id));
271+
return sessionsToUpdate;
137272
}
138273

139274
/**
@@ -215,7 +350,7 @@ ${aggregatedContent}
215350
const result = await this.summarizeSession(sessionId);
216351
if (result) processed.push(result);
217352
} else {
218-
const sessionsToSummarize = await this.findUnsummarizedSessions();
353+
const sessionsToSummarize = await this.findSessionsToSummarize();
219354
const promises = sessionsToSummarize.map(id => this.summarizeSession(id));
220355
const results = await Promise.all(promises);
221356

0 commit comments

Comments
 (0)