Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions runtime/drivers/sqlite/ai_sessions_cleanup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package sqlite

import (
"context"
"fmt"
"strings"
"time"

"go.uber.org/zap"
)

// TTL for AI sessions; sessions with no messages newer than this are deleted by deleteExpiredAISessionsLoop.
var aiSessionTTL = 90 * 24 * time.Hour // 3 months

// aiSessionDeleteBatchSize bounds how many sessions are deleted per transaction.
// Each batch holds the sqlite write lock briefly, so other registry operations can interleave.
const aiSessionDeleteBatchSize = 200

// deleteExpiredAISessionsLoop runs deleteExpiredAISessions once at startup and then daily.
// It runs in the background so it doesn't block Migrate (and therefore runtime startup).
func (c *connection) deleteExpiredAISessionsLoop() {
for {
if err := c.deleteExpiredAISessions(c.ctx); err != nil && c.ctx.Err() == nil {
c.logger.Error("sqlite: failed to delete expired AI sessions", zap.Error(err))
}
select {
case <-c.ctx.Done():
return
case <-time.After(24 * time.Hour):
}
}
}

// deleteExpiredAISessions deletes AI sessions that have no messages newer than aiSessionTTL.
// It snapshots expired session IDs first, then deletes messages and sessions in batched transactions to bound lock-hold time.
func (c *connection) deleteExpiredAISessions(ctx context.Context) error {
cutoff := time.Now().UTC().Add(-aiSessionTTL)

ids, err := c.expiredAISessionIDs(ctx, cutoff)
if err != nil {
return err
}

// Delete in batches; each batch atomically deletes a session's messages and the session itself.
for start := 0; start < len(ids); start += aiSessionDeleteBatchSize {
end := start + aiSessionDeleteBatchSize
if end > len(ids) {
end = len(ids)
}
if err := c.deleteAISessionBatch(ctx, ids[start:end]); err != nil {
return err
}
}

return nil
}

// expiredAISessionIDs returns the IDs of AI sessions older than cutoff with no messages newer than cutoff.
// It does a single full scan of ai_messages (filtered by created_on >= cutoff) and a single scan of ai_sessions.
func (c *connection) expiredAISessionIDs(ctx context.Context, cutoff time.Time) ([]string, error) {
rows, err := c.db.QueryContext(ctx, `
SELECT id FROM ai_sessions
WHERE created_on < ?
AND id NOT IN (SELECT session_id FROM ai_messages WHERE created_on >= ?)
`, cutoff, cutoff)
if err != nil {
return nil, fmt.Errorf("failed to query expired AI sessions: %w", err)
}
defer rows.Close()

var ids []string
for rows.Next() {
var id string
if err := rows.Scan(&id); err != nil {
return nil, fmt.Errorf("failed to scan expired AI session id: %w", err)
}
ids = append(ids, id)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("failed to read expired AI sessions: %w", err)
}
return ids, nil
}

func (c *connection) deleteAISessionBatch(ctx context.Context, ids []string) error {
placeholders := strings.Repeat("?,", len(ids)-1) + "?"
args := make([]any, len(ids))
for i, id := range ids {
args[i] = id
}

tx, err := c.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin AI session cleanup transaction: %w", err)
}
defer func() { _ = tx.Rollback() }()

if _, err := tx.ExecContext(ctx, fmt.Sprintf(`DELETE FROM ai_messages WHERE session_id IN (%s)`, placeholders), args...); err != nil {
return fmt.Errorf("failed to delete expired AI messages: %w", err)
}
if _, err := tx.ExecContext(ctx, fmt.Sprintf(`DELETE FROM ai_sessions WHERE id IN (%s)`, placeholders), args...); err != nil {
return fmt.Errorf("failed to delete expired AI sessions: %w", err)
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("failed to commit AI session cleanup: %w", err)
}
return nil
}
40 changes: 0 additions & 40 deletions runtime/drivers/sqlite/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"path"
"strconv"
"strings"
"time"
)

// Embed migrations directory in the binary
Expand All @@ -18,9 +17,6 @@ var migrationsFS embed.FS
// Name of the table that tracks migrations.
var migrationVersionTable = "runtime_migration_version"

// TTL for AI sessions; sessions with no messages newer than this are deleted on startup.
var aiSessionTTL = 90 * 24 * time.Hour // 3 months

// Migrate implements drivers.Connection.
// Migrate for SQLite may not be safe for concurrent use.
func (c *connection) Migrate(_ context.Context) (err error) {
Expand Down Expand Up @@ -76,13 +72,6 @@ func (c *connection) Migrate(_ context.Context) (err error) {
}
}

// Apply TTL to AI sessions: delete sessions with no messages newer than aiSessionTTL.
// Messages are deleted explicitly before sessions to avoid reliance on ON DELETE CASCADE.
err = c.deleteExpiredAISessions(ctx)
if err != nil {
return fmt.Errorf("failed to delete expired AI sessions: %w", err)
}

return nil
}

Expand Down Expand Up @@ -145,32 +134,3 @@ func (c *connection) MigrationStatus(_ context.Context) (current, desired int, e
func migrationFilenameToVersion(name string) (int, error) {
return strconv.Atoi(strings.TrimSuffix(name, ".sql"))
}

// deleteExpiredAISessions deletes AI sessions that have no messages newer than aiSessionTTL.
// Messages are deleted explicitly before the sessions to avoid reliance on ON DELETE CASCADE.
func (c *connection) deleteExpiredAISessions(ctx context.Context) error {
cutoff := time.Now().UTC().Add(-aiSessionTTL)

// Identify expired sessions: those older than the cutoff with no messages newer than the cutoff.
expiredSessionsQuery := `
SELECT id FROM ai_sessions
WHERE ai_sessions.created_on < ?
AND NOT EXISTS (
SELECT 1 FROM ai_messages
WHERE ai_messages.session_id = ai_sessions.id
AND ai_messages.created_on >= ?
)
`

// Delete messages first, then sessions.
_, err := c.db.ExecContext(ctx, fmt.Sprintf(`DELETE FROM ai_messages WHERE session_id IN (%s)`, expiredSessionsQuery), cutoff, cutoff)
if err != nil {
return fmt.Errorf("failed to delete expired AI messages: %w", err)
}
_, err = c.db.ExecContext(ctx, fmt.Sprintf(`DELETE FROM ai_sessions WHERE id IN (%s)`, expiredSessionsQuery), cutoff, cutoff)
if err != nil {
return fmt.Errorf("failed to delete expired AI sessions: %w", err)
}

return nil
}
1 change: 1 addition & 0 deletions runtime/drivers/sqlite/migrations/0041.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE INDEX IF NOT EXISTS ai_messages_session_id_idx ON ai_messages (session_id);
4 changes: 4 additions & 0 deletions runtime/drivers/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ func (d driver) Open(_, _ string, config map[string]any, st *storage.Client, ac
// Start backups in the background (no-op if backups are not configured)
go h.startBackups()

// Apply TTL to AI sessions in the background.
// This can be slow on large databases, so it must not block Migrate (and therefore runtime startup).
go h.deleteExpiredAISessionsLoop()

return h, nil
}

Expand Down
Loading