Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions libs/db/src/queries/emailQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,40 @@ export const updateQueueStatus = async (
return results[0] || null;
};

/**
* Reschedule a pending queue entry for a future send time. Used by the queue
* processor when an account is over its daily quota — without bumping
* `scheduledFor`, the entry would be picked again on the next batch
* iteration and immediately re-skipped.
*
* Always keeps the entry in `pending` status (only the time and optional
* error message change). `attemptCount` is intentionally NOT incremented
* because no send was actually attempted.
*/
export const rescheduleQueueEntry = async (
id: string,
newScheduledFor: Date,
errorMessage?: string | null
): Promise<EmailQueue | null> => {
const updateData: Record<string, unknown> = {
status: 'pending' as const,
scheduledFor: newScheduledFor,
updatedAt: new Date(),
};

if (errorMessage !== undefined) {
updateData.errorMessage = errorMessage;
}

const results = await db
.update(emailQueue)
.set(updateData)
.where(eq(emailQueue.id, id))
.returning();

return results[0] || null;
};

/**
* Increment attempt count for a queue entry
*/
Expand Down
34 changes: 20 additions & 14 deletions src/lib/emailQueueProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import {
getNextPendingEmails,
updateQueueStatus,
rescheduleQueueEntry,
incrementAttemptCount,
createEmailEvent,
incrementCampaignStat,
getEmailAccountById,
isEmailUnsubscribed,
} from '@coldflow/db';
import { sendEmail, hasAvailableQuota } from './gmailService';
import { computeQuotaRescheduleAt } from './queueScheduling';
import { nanoid } from 'nanoid';

/**
Expand Down Expand Up @@ -84,20 +86,24 @@ export async function processEmailQueue(batchSize: number = 10): Promise<Process
const hasQuota = await hasAvailableQuota(queueEntry.emailAccountId);
if (!hasQuota) {
const account = await getEmailAccountById(queueEntry.emailAccountId);
const quotaResetAt = account?.quotaResetAt ? new Date(account.quotaResetAt) : null;

if (quotaResetAt && quotaResetAt > now) {
// Reschedule for quota reset time
await updateQueueStatus(
queueEntry.id,
'pending', // Keep as pending
null,
'Quota exceeded - rescheduled'
);
result.skipped++;
console.log(`Email ${queueEntry.id} skipped - quota exceeded, rescheduled for ${quotaResetAt}`);
continue;
}
const quotaResetAt = account?.quotaResetAt
? new Date(account.quotaResetAt)
: null;

// Always push the entry's `scheduledFor` forward — without that,
// `getNextPendingEmails` (which filters `scheduledFor <= now`)
// would re-pick this entry on every batch and we'd spin.
const nextAttempt = computeQuotaRescheduleAt(quotaResetAt, now);
await rescheduleQueueEntry(
queueEntry.id,
nextAttempt,
'Quota exceeded - rescheduled'
);
result.skipped++;
console.log(
`Email ${queueEntry.id} skipped - quota exceeded, rescheduled for ${nextAttempt.toISOString()}`
);
continue;
}

// Update status to processing
Expand Down
45 changes: 45 additions & 0 deletions src/lib/queueScheduling.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Pure scheduling helpers for the email queue processor.
*
* Kept here (and not in `emailQueueProcessor.ts`) so the decision logic can
* be unit-tested without the DB / Gmail dependency graph. The processor
* imports from this module instead of inlining the math.
*/

const ONE_MINUTE_MS = 60 * 1000;

/**
* Decide when to next try sending a queue entry whose account is over its
* daily quota.
*
* Inputs:
* - `quotaResetAt` — the account's stored quota reset timestamp (may be
* null if the account record has never been initialized, may be in
* the past if a reset hasn't been written back yet).
* - `now` — wall-clock time of the current processing pass.
* - `minDelayMs` (optional, defaults to 1 minute) — floor on how soon we
* will retry. Prevents a tight loop when `quotaResetAt` is the same
* tick as `now` or already in the past.
*
* Returns a Date strictly in the future relative to `now`. Callers should
* write that Date back to the queue entry's `scheduledFor` so
* `getNextPendingEmails` (which filters by `scheduledFor <= now`) skips it
* until the reset time arrives.
*/
export function computeQuotaRescheduleAt(
quotaResetAt: Date | null | undefined,
now: Date = new Date(),
minDelayMs: number = ONE_MINUTE_MS
): Date {
const floor = new Date(now.getTime() + Math.max(minDelayMs, 0));

if (!quotaResetAt) {
return floor;
}

if (quotaResetAt.getTime() > floor.getTime()) {
return new Date(quotaResetAt.getTime());
}

return floor;
}
91 changes: 91 additions & 0 deletions tests/int/queueScheduling.int.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import { describe, expect, it } from 'vitest';
import { computeQuotaRescheduleAt } from '@/lib/queueScheduling';

describe('computeQuotaRescheduleAt', () => {
const now = new Date('2026-05-03T12:00:00Z');

it('uses quotaResetAt when it is far in the future', () => {
const reset = new Date('2026-05-04T00:00:00Z');
expect(computeQuotaRescheduleAt(reset, now).toISOString()).toBe(
reset.toISOString()
);
});

it('falls back to now+minDelay when quotaResetAt is null', () => {
const out = computeQuotaRescheduleAt(null, now);
expect(out.getTime()).toBe(now.getTime() + 60_000);
});

it('falls back to now+minDelay when quotaResetAt is undefined', () => {
const out = computeQuotaRescheduleAt(undefined, now);
expect(out.getTime()).toBe(now.getTime() + 60_000);
});

it('floors at now+minDelay when quotaResetAt is already in the past', () => {
const past = new Date('2026-05-03T11:00:00Z');
const out = computeQuotaRescheduleAt(past, now);
expect(out.getTime()).toBe(now.getTime() + 60_000);
});

it('floors at now+minDelay when quotaResetAt equals now', () => {
const out = computeQuotaRescheduleAt(now, now);
expect(out.getTime()).toBe(now.getTime() + 60_000);
});

it('floors at now+minDelay when quotaResetAt is sooner than the floor', () => {
const reset = new Date(now.getTime() + 30_000); // 30s ahead of now
const out = computeQuotaRescheduleAt(reset, now);
expect(out.getTime()).toBe(now.getTime() + 60_000);
});

it('uses quotaResetAt when it is just past the floor', () => {
const reset = new Date(now.getTime() + 90_000); // 90s ahead
const out = computeQuotaRescheduleAt(reset, now);
expect(out.toISOString()).toBe(reset.toISOString());
});

it('honors a custom minDelayMs', () => {
const out = computeQuotaRescheduleAt(null, now, 5 * 60_000);
expect(out.getTime()).toBe(now.getTime() + 5 * 60_000);
});

it('treats negative minDelayMs as zero (never schedules earlier than now)', () => {
const out = computeQuotaRescheduleAt(null, now, -1000);
expect(out.getTime()).toBe(now.getTime());
// Strictly: >= now. The processor still won't pick it again until the
// next pass, so this is intentional rather than buggy.
expect(out.getTime()).toBeGreaterThanOrEqual(now.getTime());
});

it('returns a fresh Date instance (not the input)', () => {
const reset = new Date('2026-05-04T00:00:00Z');
const out = computeQuotaRescheduleAt(reset, now);
expect(out).not.toBe(reset);
expect(out.getTime()).toBe(reset.getTime());
});

it('uses the system clock when `now` is omitted', () => {
const before = Date.now();
const out = computeQuotaRescheduleAt(null);
const after = Date.now();
// Output should be in [before+60_000, after+60_000].
expect(out.getTime()).toBeGreaterThanOrEqual(before + 60_000);
expect(out.getTime()).toBeLessThanOrEqual(after + 60_000);
});

it('never returns a Date earlier than `now` for any reset value', () => {
const cases: Array<Date | null | undefined> = [
null,
undefined,
new Date('1970-01-01T00:00:00Z'),
new Date('2026-05-03T11:59:59Z'),
new Date('2026-05-03T12:00:00Z'),
new Date('2026-05-03T12:00:01Z'),
new Date('2099-01-01T00:00:00Z'),
];
for (const reset of cases) {
const out = computeQuotaRescheduleAt(reset, now);
expect(out.getTime()).toBeGreaterThanOrEqual(now.getTime());
}
});
});