From 3102ee597f194ae9af371502a84e7e39f5411921 Mon Sep 17 00:00:00 2001 From: Pierre Brisorgueil Date: Sat, 2 May 2026 21:24:46 +0200 Subject: [PATCH 1/4] fix(billing): forceRotateForPlanChange + atomic usage+extras outbox MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 🟠 B: BillingResetService.forceRotateForPlanChange() updates current week snapshot mid-period when plan changes (was no-op when week doc existed, keeping stale quota until next reset). Webhook plan.changed handler now calls forceRotate with preserveUsage:true default. Consumers needing clean-break downgrade pass {preserveUsage:false}. 🟠 C: Outbox pattern guarantees extras debit completes even if first attempt fails. incrementMeterWithOutbox() inserts BillingMeterOutbox row alongside usage increment; out-of-band cron retry-pending-extras-debit reconciles within 5min. After 5 failed attempts, row marked 'failed' + billing.extras_debit.exhausted event emitted for downstream alerting. unique index on idempotencyKey prevents double-create races. Documents both behaviors in modules/billing/README.md. --- modules/billing/README.md | 14 ++ modules/billing/crons/README.md | 2 + .../crons/retry-pending-extras-debit.cron.js | 43 ++++ modules/billing/lib/events.js | 4 + .../billing.meter.outbox.model.mongoose.js | 40 ++++ .../models/billing.meter.outbox.schema.js | 35 +++ .../billing.meter.outbox.repository.js | 116 ++++++++++ .../repositories/billing.usage.repository.js | 54 ++++- .../services/billing.meter.outbox.service.js | 81 +++++++ .../billing/services/billing.meter.service.js | 38 ++-- .../billing/services/billing.reset.service.js | 58 +++++ .../billing/services/billing.usage.service.js | 30 +++ .../services/billing.webhook.service.js | 19 +- ...cron.retryPendingExtrasDebit.unit.tests.js | 119 ++++++++++ .../billing.lifecycle.integration.tests.js | 211 ++++++++++++++++++ .../tests/billing.meter.outbox.unit.tests.js | 167 ++++++++++++++ .../tests/billing.meter.service.unit.tests.js | 142 +++++++++--- .../tests/billing.reset.service.unit.tests.js | 87 ++++++++ .../billing.usage.repository.unit.tests.js | 48 ++++ .../tests/billing.usage.service.unit.tests.js | 50 +++++ ...billing.webhook.subscription.unit.tests.js | 60 ++--- 21 files changed, 1328 insertions(+), 90 deletions(-) create mode 100644 modules/billing/crons/retry-pending-extras-debit.cron.js create mode 100644 modules/billing/models/billing.meter.outbox.model.mongoose.js create mode 100644 modules/billing/models/billing.meter.outbox.schema.js create mode 100644 modules/billing/repositories/billing.meter.outbox.repository.js create mode 100644 modules/billing/services/billing.meter.outbox.service.js create mode 100644 modules/billing/tests/billing.cron.retryPendingExtrasDebit.unit.tests.js create mode 100644 modules/billing/tests/billing.lifecycle.integration.tests.js create mode 100644 modules/billing/tests/billing.meter.outbox.unit.tests.js diff --git a/modules/billing/README.md b/modules/billing/README.md index 3fd1a1432..9a889cbe0 100644 --- a/modules/billing/README.md +++ b/modules/billing/README.md @@ -89,6 +89,20 @@ step. Passing the cumulative total will double-charge costs already attributed i **Backward compat**: callers that only ever attribute once (no multi-step) continue to work unchanged β€” the default `stepKey='initial'` makes the idempotency key `${history._id}:initial`. +## Plan-change semantics + +When Stripe `plan.changed` webhook fires, devkit calls `forceRotateForPlanChange(orgId, { preserveUsage: true })` by default: +- Updates `meterQuota` and `planVersion` snapshot to the new plan +- Preserves `meterUsed` (no refund, no double-charge) + +Consumers wanting clean-break behavior on downgrade should pass `{ preserveUsage: false }`. + +## Extras debit reliability + +`attribute()` returns optimistically after usage increment + outbox row insert. Extras debit happens out of band; if it fails, cron `retry-pending-extras-debit` reconciles within 5min. After 5 failed attempts, the outbox row is marked `failed` and event `billing.extras_debit.exhausted` is emitted for alerting. + +Consumers should NOT retry on `applied: true` β€” the outbox handles eventual consistency. + ## Stripe β€” `automatic_tax` flag ```js diff --git a/modules/billing/crons/README.md b/modules/billing/crons/README.md index 833443ccf..cdf5682f2 100644 --- a/modules/billing/crons/README.md +++ b/modules/billing/crons/README.md @@ -15,6 +15,7 @@ No `node-cron` dependency β€” orchestration is handled by Kubernetes CronJob man | `billing.weeklyReset.js` | Reset meter counters for orgs whose billing period rolled over | Daily `0 1 * * *` | | `billing.extrasExpiration.js` | Expire topup ledger entries past their `expiresAt` date | Daily `0 2 * * *` | | `billing.dunningSweep.js` | Downgrade stale `past_due` subs (>14d) to `unpaid` + `free` | Daily `0 3 * * *` | +| `retry-pending-extras-debit.cron.js` | Retry pending extras debits from the meter outbox | Every 5 minutes `*/5 * * * *` | ## Usage @@ -22,6 +23,7 @@ No `node-cron` dependency β€” orchestration is handled by Kubernetes CronJob man NODE_ENV=production node modules/billing/crons/billing.weeklyReset.js NODE_ENV=production node modules/billing/crons/billing.extrasExpiration.js NODE_ENV=production node modules/billing/crons/billing.dunningSweep.js +NODE_ENV=production node modules/billing/crons/retry-pending-extras-debit.cron.js ``` Exit code 0 = success (or meterMode disabled). Exit code 1 = at least one error or fatal failure. diff --git a/modules/billing/crons/retry-pending-extras-debit.cron.js b/modules/billing/crons/retry-pending-extras-debit.cron.js new file mode 100644 index 000000000..09d455438 --- /dev/null +++ b/modules/billing/crons/retry-pending-extras-debit.cron.js @@ -0,0 +1,43 @@ +/** + * Cron script β€” retry pending extras debits from the meter outbox. + * + * No-op when config.billing.meterMode === false (default). + * Intended to run as a Kubernetes CronJob every 5 minutes. + * + * Usage: + * NODE_ENV=production node modules/billing/crons/retry-pending-extras-debit.cron.js + */ + +process.env.NODE_ENV = process.env.NODE_ENV || 'development'; + +const [{ default: config }, { default: mongooseService }] = await Promise.all([ + import('../../../config/index.js'), + import('../../../lib/services/mongoose.js'), +]); + +if (!config?.billing?.meterMode) { + console.log('[billing.retryPendingExtrasDebit] meterMode disabled β€” skipping.'); + process.exit(0); +} + +const jitterMs = Math.floor(Math.random() * 60_000); +await new Promise((resolve) => setTimeout(resolve, jitterMs)); + +try { + await mongooseService.loadModels(); + await mongooseService.connect(); + + const { default: BillingMeterOutboxService } = await import('../services/billing.meter.outbox.service.js'); + const result = await BillingMeterOutboxService.retryPendingExtrasDebits(5 * 60 * 1000, 100); + + console.log( + `[billing.retryPendingExtrasDebit] done β€” scanned: ${result.scanned}, committed: ${result.committed}, failedAttempts: ${result.failedAttempts}, exhausted: ${result.exhausted}`, + ); + process.exitCode = result.exhausted > 0 ? 1 : 0; +} catch (err) { + console.error('[billing.retryPendingExtrasDebit] fatal:', err); + process.exitCode = 1; +} finally { + await mongooseService.disconnect?.(); +} +process.exit(process.exitCode ?? 0); diff --git a/modules/billing/lib/events.js b/modules/billing/lib/events.js index c88a218df..ffe2dc237 100644 --- a/modules/billing/lib/events.js +++ b/modules/billing/lib/events.js @@ -9,6 +9,10 @@ import { EventEmitter } from 'events'; * Events: * - `plan.changed` β€” emitted when a subscription's plan changes * Payload: { organizationId, previousPlan, newPlan, subscription, isDowngrade } + * - `billing.plan_change.rotated` β€” emitted after current-week meter snapshot refresh + * Payload: { organizationId, oldQuota, newQuota, oldVersion, newVersion, preserveUsage } + * - `billing.extras_debit.exhausted` β€” emitted when outbox extras debit retries fail 5 times + * Payload: { organizationId, idempotencyKey, extrasUnits, attempts, lastError } * - `payment.failed` β€” emitted when an invoice payment fails (pastDueSince set on first failure) * Payload: { organizationId } */ diff --git a/modules/billing/models/billing.meter.outbox.model.mongoose.js b/modules/billing/models/billing.meter.outbox.model.mongoose.js new file mode 100644 index 000000000..23ac2d6e7 --- /dev/null +++ b/modules/billing/models/billing.meter.outbox.model.mongoose.js @@ -0,0 +1,40 @@ +/** + * Module dependencies + */ +import mongoose from 'mongoose'; + +const Schema = mongoose.Schema; + +/** + * Meter outbox model. + * + * Stores deferred extras debits created after meter usage crosses plan quota. + * Pending rows are retried by the billing retry-pending-extras-debit cron. + */ +const BillingMeterOutboxMongoose = new Schema({ + organizationId: { type: Schema.ObjectId, required: true, index: true }, + idempotencyKey: { type: String, required: true, unique: true }, + extrasUnits: { type: Number, required: true }, + status: { type: String, enum: ['pending', 'committed', 'failed'], default: 'pending', index: true }, + attempts: { type: Number, default: 0 }, + lastError: { type: String, default: null }, + lastAttemptedAt: { type: Date, default: null }, + createdAt: { type: Date, default: () => new Date() }, +}); + +BillingMeterOutboxMongoose.index({ status: 1, lastAttemptedAt: 1 }); + +/** + * Returns the hex string representation of the document ObjectId. + * @returns {string} Hex string of the ObjectId. + */ +function addID() { + return this._id.toHexString(); +} + +BillingMeterOutboxMongoose.virtual('id').get(addID); +BillingMeterOutboxMongoose.set('toJSON', { + virtuals: true, +}); + +mongoose.model('BillingMeterOutbox', BillingMeterOutboxMongoose); diff --git a/modules/billing/models/billing.meter.outbox.schema.js b/modules/billing/models/billing.meter.outbox.schema.js new file mode 100644 index 000000000..780e711d8 --- /dev/null +++ b/modules/billing/models/billing.meter.outbox.schema.js @@ -0,0 +1,35 @@ +/** + * Module dependencies + */ +import { z } from 'zod'; + +/** + * BillingMeterOutbox Zod schema β€” mirrors billing.meter.outbox.model.mongoose.js + */ + +const objectIdRegex = /^[a-f\d]{24}$/i; + +const BillingMeterOutboxStatus = z.enum(['pending', 'committed', 'failed']); + +const BillingMeterOutbox = z.object({ + organizationId: z.string().trim().regex(objectIdRegex, 'organizationId must be a valid ObjectId'), + idempotencyKey: z.string().trim().min(1, 'idempotencyKey is required'), + extrasUnits: z.number().int().min(1, 'extrasUnits must be >= 1'), + status: BillingMeterOutboxStatus.default('pending'), + attempts: z.number().int().min(0).default(0), + lastError: z.string().nullable().default(null), + lastAttemptedAt: z.coerce.date().nullable().default(null), + createdAt: z.coerce.date().optional(), +}); + +const BillingMeterOutboxCreate = z.object({ + organizationId: z.string().trim().regex(objectIdRegex, 'organizationId must be a valid ObjectId'), + idempotencyKey: z.string().trim().min(1, 'idempotencyKey is required'), + extrasUnits: z.number().int().min(1, 'extrasUnits must be >= 1'), +}); + +export default { + BillingMeterOutboxStatus, + BillingMeterOutbox, + BillingMeterOutboxCreate, +}; diff --git a/modules/billing/repositories/billing.meter.outbox.repository.js b/modules/billing/repositories/billing.meter.outbox.repository.js new file mode 100644 index 000000000..af0876801 --- /dev/null +++ b/modules/billing/repositories/billing.meter.outbox.repository.js @@ -0,0 +1,116 @@ +/** + * Module dependencies + */ +import mongoose from 'mongoose'; + +/** + * @function BillingMeterOutbox + * @description Lazily resolves the BillingMeterOutbox Mongoose model. + * Deferred to keep unit tests importable before model registration. + * @returns {import('mongoose').Model} The registered BillingMeterOutbox model. + */ +// biome-ignore lint/correctness/useQwikValidLexicalScope: false positive β€” Node.js repository, not Qwik +const BillingMeterOutbox = () => mongoose.model('BillingMeterOutbox'); + +/** + * @function create + * @description Insert a pending outbox row for a deferred extras debit. + * @param {Object} payload - Outbox row fields. + * @param {string} payload.organizationId - Organization ObjectId. + * @param {string} payload.idempotencyKey - Usage attribution idempotency key. + * @param {number} payload.extrasUnits - Extras units to debit. + * @param {Object} [options={}] - Optional write options. + * @param {import('mongoose').ClientSession} [options.session] - Optional Mongo session. + * @returns {Promise} Inserted outbox document. + */ +// biome-ignore lint/correctness/useQwikValidLexicalScope: false positive β€” Node.js repository, not Qwik +const create = async ({ organizationId, idempotencyKey, extrasUnits }, options = {}) => { + const docs = await BillingMeterOutbox().create( + [{ + organizationId, + idempotencyKey, + extrasUnits, + status: 'pending', + }], + options.session ? { session: options.session } : undefined, + ); + return docs[0]; +}; + +/** + * @function findPendingDue + * @description Return pending outbox rows whose last attempt is due for retry. + * Rows with lastAttemptedAt=null are due immediately. + * @param {number} [thresholdMs=300000] - Retry backoff threshold in milliseconds. + * @param {number} [limit=100] - Maximum rows to return. + * @returns {Promise} Pending due outbox rows. + */ +// biome-ignore lint/correctness/useQwikValidLexicalScope: false positive β€” Node.js repository, not Qwik +const findPendingDue = (thresholdMs = 5 * 60 * 1000, limit = 100) => { + const dueBefore = new Date(Date.now() - thresholdMs); + return BillingMeterOutbox() + .find({ + status: 'pending', + $or: [ + { lastAttemptedAt: null }, + { lastAttemptedAt: { $lt: dueBefore } }, + ], + }) + .sort({ lastAttemptedAt: 1, createdAt: 1 }) + .limit(limit) + .lean(); +}; + +/** + * @function markCommitted + * @description Mark an outbox row as committed after a successful extras debit. + * @param {string} id - Outbox row id. + * @returns {Promise} Mongo update result. + */ +// biome-ignore lint/correctness/useQwikValidLexicalScope: false positive β€” Node.js repository, not Qwik +const markCommitted = (id) => + BillingMeterOutbox().updateOne( + { _id: id }, + { $set: { status: 'committed', lastError: null, lastAttemptedAt: new Date() } }, + ); + +/** + * @function markFailedAttempt + * @description Record a failed debit attempt. The fifth failed attempt exhausts + * the row and moves it to failed status. + * @param {string} id - Outbox row id. + * @param {Error|string} error - Failure to record. + * @returns {Promise} Updated outbox row after failure accounting. + */ +// biome-ignore lint/correctness/useQwikValidLexicalScope: false positive β€” Node.js repository, not Qwik +const markFailedAttempt = async (id, error) => { + const message = error?.message ?? String(error); + const doc = await BillingMeterOutbox().findOneAndUpdate( + { _id: id }, + { + $inc: { attempts: 1 }, + $set: { + lastError: message, + lastAttemptedAt: new Date(), + }, + }, + { returnDocument: 'after' }, + ).lean(); + + if (!doc) return null; + if (doc.attempts >= 5 && doc.status !== 'failed') { + return BillingMeterOutbox().findOneAndUpdate( + { _id: id }, + { $set: { status: 'failed' } }, + { returnDocument: 'after' }, + ).lean(); + } + return doc; +}; + +export default { + create, + findPendingDue, + markCommitted, + markFailedAttempt, +}; diff --git a/modules/billing/repositories/billing.usage.repository.js b/modules/billing/repositories/billing.usage.repository.js index deebf01e5..75c0f5cae 100644 --- a/modules/billing/repositories/billing.usage.repository.js +++ b/modules/billing/repositories/billing.usage.repository.js @@ -92,10 +92,12 @@ const findByWeek = (organizationId, weekKey) => { * @param {Object} breakdown - Feature-level breakdown map { featureKey: units }. * @param {String} idempotencyKey - Unique key (usually history._id.toString()) for replay protection. * @param {Object} baseSnapshot - Fields written only on first upsert: { meterQuota, planVersion, resetAt, month }. + * @param {Object} [options={}] - Optional write options. + * @param {import('mongoose').ClientSession} [options.session] - Optional Mongo session for transaction-scoped writes. * @returns {Promise} The updated usage document, or null if this was a replay (no-op). */ // biome-ignore lint/correctness/useQwikValidLexicalScope: false positive β€” Node.js repository, not Qwik -const incrementMeter = async (organizationId, weekKey, units, breakdown, idempotencyKey, baseSnapshot) => { +const incrementMeter = async (organizationId, weekKey, units, breakdown, idempotencyKey, baseSnapshot, options = {}) => { if (!mongoose.Types.ObjectId.isValid(organizationId)) return null; // Build $inc for meterUsed + per-feature breakdown keys @@ -108,6 +110,7 @@ const incrementMeter = async (organizationId, weekKey, units, breakdown, idempot } } } + const hasBreakdownIncrements = Object.keys(incPayload).some((key) => key.startsWith('meterBreakdown.')); // Transition logic β€” remove after migration is confirmed complete on all production deployments. // TODO(PR-A migration): drop legacy consumedHistoryIds dual-read once deployed data is fully migrated. @@ -150,12 +153,19 @@ const incrementMeter = async (organizationId, weekKey, units, breakdown, idempot meterQuota: baseSnapshot?.meterQuota ?? 0, planVersion: baseSnapshot?.planVersion ?? null, resetAt: baseSnapshot?.resetAt ?? null, - meterBreakdown: {}, + ...(hasBreakdownIncrements ? {} : { meterBreakdown: {} }), alertedAt80: null, alertedAt100: null, }, }, - { upsert: true, returnDocument: 'after', runValidators: false, strict: false, strictQuery: false }, + { + upsert: true, + returnDocument: 'after', + runValidators: false, + strict: false, + strictQuery: false, + session: options.session, + }, ); return doc; } catch (err) { @@ -167,7 +177,7 @@ const incrementMeter = async (organizationId, weekKey, units, breakdown, idempot $inc: incPayload, $push: { consumedAttributionKeys: idempotencyKey }, }, - { returnDocument: 'after', strict: false, strictQuery: false }, + { returnDocument: 'after', strict: false, strictQuery: false, session: options.session }, ); } throw err; @@ -215,6 +225,41 @@ const upsertWeekSnapshot = (orgId, weekKey, snapshotFields) => { upsert: true, returnDocument: 'after', runValidators: false }, ); +/** + * @function rotateWeekSnapshotForPlanChange + * @description Update an existing current-week usage document with the active + * plan snapshot. Preserves usage by default; optionally resets + * usage and clears the breakdown for clean-break plan changes. + * Does not upsert β€” plan-change rotation is only needed when a + * current week document already exists. + * @param {string} orgId - The organization ObjectId (string). + * @param {string} weekKey - The current ISO week key. + * @param {Object} snapshotFields - Fields to set: { meterQuota, planVersion, month }. + * @param {boolean} preserveUsage - Whether to keep existing meterUsed and breakdown. + * @returns {Promise} The updated usage document, or null when none exists. + */ +// biome-ignore lint/correctness/useQwikValidLexicalScope: false positive β€” Node.js repository, not Qwik +const rotateWeekSnapshotForPlanChange = (orgId, weekKey, snapshotFields, preserveUsage = true) => { + const update = { + $set: { + meterQuota: snapshotFields.meterQuota, + planVersion: snapshotFields.planVersion, + month: snapshotFields.month, + }, + }; + + if (!preserveUsage) { + update.$set.meterUsed = 0; + update.$set.meterBreakdown = {}; + } + + return BillingUsage.findOneAndUpdate( + { organizationId: orgId, weekKey }, + update, + { returnDocument: 'after', runValidators: false }, + ).lean(); +}; + /** * @function markThreshold * @description Atomically set a threshold timestamp field on a usage document, @@ -239,5 +284,6 @@ export default { incrementMeter, archiveOtherWeeks, upsertWeekSnapshot, + rotateWeekSnapshotForPlanChange, markThreshold, }; diff --git a/modules/billing/services/billing.meter.outbox.service.js b/modules/billing/services/billing.meter.outbox.service.js new file mode 100644 index 000000000..d2eadf3a9 --- /dev/null +++ b/modules/billing/services/billing.meter.outbox.service.js @@ -0,0 +1,81 @@ +/** + * Module dependencies + */ +import BillingMeterOutboxRepository from '../repositories/billing.meter.outbox.repository.js'; +import BillingExtraService from './billing.extra.service.js'; +import billingEvents from '../lib/events.js'; + +/** + * @function emitExhausted + * @description Emit the alert event for an outbox row that exhausted retries. + * @param {Object} row - Original pending outbox row. + * @param {Object} updated - Updated failed outbox row. + * @returns {void} + */ +const emitExhausted = (row, updated) => { + billingEvents.emit('billing.extras_debit.exhausted', { + organizationId: String(row.organizationId), + idempotencyKey: row.idempotencyKey, + extrasUnits: row.extrasUnits, + attempts: updated.attempts, + lastError: updated.lastError, + }); +}; + +/** + * @function retryPendingExtrasDebits + * @description Retry pending extras debit outbox rows. Successful debits are + * committed; failed attempts are counted and rows move to failed + * after repository retry exhaustion. + * @param {number} [thresholdMs=300000] - Retry threshold in milliseconds. + * @param {number} [limit=100] - Maximum outbox rows to process. + * @returns {Promise<{scanned: number, committed: number, failedAttempts: number, exhausted: number}>} + */ +// biome-ignore lint/correctness/useQwikValidLexicalScope: false positive β€” Node.js service, not Qwik +const retryPendingExtrasDebits = async (thresholdMs = 5 * 60 * 1000, limit = 100) => { + const pendingRows = await BillingMeterOutboxRepository.findPendingDue(thresholdMs, limit); + let committed = 0; + let failedAttempts = 0; + let exhausted = 0; + + for (const row of pendingRows) { + try { + const debitResult = await BillingExtraService.debit( + String(row.organizationId), + row.extrasUnits, + row.idempotencyKey, + ); + + if (debitResult.applied) { + await BillingMeterOutboxRepository.markCommitted(row._id); + committed += 1; + continue; + } + + const updated = await BillingMeterOutboxRepository.markFailedAttempt(row._id, 'extras debit not applied'); + failedAttempts += 1; + if (updated?.status === 'failed') { + exhausted += 1; + emitExhausted(row, updated); + } + } catch (err) { + const updated = await BillingMeterOutboxRepository.markFailedAttempt(row._id, err); + failedAttempts += 1; + if (updated?.status === 'failed') { + exhausted += 1; + emitExhausted(row, updated); + } + } + } + + return { + scanned: pendingRows.length, + committed, + failedAttempts, + exhausted, + }; +}; + +export default { + retryPendingExtrasDebits, +}; diff --git a/modules/billing/services/billing.meter.service.js b/modules/billing/services/billing.meter.service.js index e15387147..18b8a132b 100644 --- a/modules/billing/services/billing.meter.service.js +++ b/modules/billing/services/billing.meter.service.js @@ -124,9 +124,11 @@ const capBreakdown = (breakdown, cappedUnits, originalTotal) => { /** * @function attribute * @description Attribute meter units from a History-like input to a Usage document - * for the given organization. If the plan quota is exceeded, falls back - * to BillingExtraService.debit (best-effort β€” does not throw when extras - * are exhausted; extrasConsumed=0 is returned instead). + * for the given organization. If the plan quota is exceeded, creates a + * pending BillingMeterOutbox row before attempting BillingExtraService.debit. + * The return is optimistic once usage is counted: debit failures leave the + * outbox pending for retry and still return extrasConsumed so callers do not + * retry an already-applied attribution. * * Per-step idempotency: the idempotency key is `${history._id}:${stepKey}`. * This allows multiple attributions on the same history at different processing @@ -164,9 +166,10 @@ const capBreakdown = (breakdown, cappedUnits, originalTotal) => { * @param {string|null|undefined} [options.ratioVersion=null] - Optional ratio snapshot override * paired with options.planId. Falls back to history.planVersion when omitted. * @returns {Promise<{applied: boolean, meterUsed: number, extrasConsumed: number, reason?: string}>} - * `reason` is present for zero-cost skips and exhausted extras: + * `reason` is present for zero-cost skips: * `{ applied: false, meterUsed: 0, extrasConsumed: 0, reason: 'zero_cost_skipped' }` - * or `{ applied: true, meterUsed, extrasConsumed: 0, reason: 'extras_exhausted' }`. + * Extras debit failures after usage is applied are reconciled by the outbox cron; + * consumers should not retry when `applied: true`. * @throws {Error} If stepKey is non-null/undefined and does not match the expected format. */ // biome-ignore lint/correctness/useQwikValidLexicalScope: false positive β€” Node.js service, not Qwik @@ -229,7 +232,10 @@ const attribute = async (history, organizationId, options = {}) => { const idempotencyKey = `${history._id?.toString?.() ?? String(history._id)}:${validatedStepKey}`; - const result = await BillingUsageService.incrementMeter( + // incrementMeterWithOutbox atomically creates the outbox row before returning so there + // is exactly one pending row per (idempotencyKey) when extras overflow. result.outbox is + // always present when extrasConsumed > 0. + const result = await BillingUsageService.incrementMeterWithOutbox( organizationId, cappedUnits, cappedBreakdown, @@ -243,13 +249,19 @@ const attribute = async (history, organizationId, options = {}) => { let extrasConsumed = 0; if (result.extrasConsumed > 0) { - const debitResult = await BillingExtraService.debit(organizationId, result.extrasConsumed, idempotencyKey); - if (debitResult.applied) { - extrasConsumed = result.extrasConsumed; - } else { - // Debit was not applied (balance exhausted or idempotency hit). - // Report extrasConsumed=0 so callers are not misled about charge application. - return { applied: true, meterUsed: result.meterUsed, extrasConsumed: 0, reason: 'extras_exhausted' }; + extrasConsumed = result.extrasConsumed; + const outboxDoc = result.outbox; + + try { + const debitResult = await BillingExtraService.debit(organizationId, extrasConsumed, idempotencyKey); + if (debitResult.applied && outboxDoc) { + // Lazy import to avoid pulling in repository at module load time + const { default: BillingMeterOutboxRepository } = await import('../repositories/billing.meter.outbox.repository.js'); + await BillingMeterOutboxRepository.markCommitted(outboxDoc._id); + } + } catch (err) { + // Usage is already counted and the outbox row is pending. The retry cron owns reconciliation. + console.warn('[billing.meter] extras debit deferred to outbox:', err?.message ?? err); } } diff --git a/modules/billing/services/billing.reset.service.js b/modules/billing/services/billing.reset.service.js index f6bc15472..770fd320e 100644 --- a/modules/billing/services/billing.reset.service.js +++ b/modules/billing/services/billing.reset.service.js @@ -5,6 +5,7 @@ import config from '../../../config/index.js'; import BillingUsageRepository from '../repositories/billing.usage.repository.js'; import BillingSubscriptionRepository from '../repositories/billing.subscription.repository.js'; import BillingPlanService from './billing.plan.service.js'; +import billingEvents from '../lib/events.js'; /** * Compute the ISO week key (YYYY-Www) for a given date. @@ -92,6 +93,62 @@ const resetWeek = async (orgId, periodStart) => { } }; +/** + * @function forceRotateForPlanChange + * @description Refresh the current week's quota/planVersion snapshot after a + * Stripe plan change. Unlike resetWeek, this does not archive or + * upsert weekly documents: if the current week doc does not exist, + * the next attribution lazily creates it with the active plan. + * @param {string} organizationId - The organization ObjectId (string). + * @param {Object} [options={}] - Rotation options. + * @param {boolean} [options.preserveUsage=true] - Keep meterUsed and breakdown when true; reset them when false. + * @returns {Promise} The updated current week usage document, or null when no current doc exists. + */ +// biome-ignore lint/correctness/useQwikValidLexicalScope: false positive β€” Node.js service, not Qwik +const forceRotateForPlanChange = async (organizationId, options = {}) => { + if (!config?.billing?.meterMode) return null; + + const { preserveUsage = true } = options ?? {}; + const now = new Date(); + const currentWeekKey = isoWeekKey(now); + const existingDoc = await BillingUsageRepository.findByWeek(organizationId, currentWeekKey); + if (!existingDoc) return null; + + const subscription = await BillingSubscriptionRepository.findPlan(organizationId); + const planId = subscription?.plan ?? config?.billing?.defaultPlan ?? 'free'; + const activePlan = await BillingPlanService.getActivePlan(planId); + const newQuota = activePlan?.meterQuota ?? 0; + const newVersion = activePlan?.version ?? null; + const month = `${now.getUTCFullYear()}-${String(now.getUTCMonth() + 1).padStart(2, '0')}`; + + const updatedDoc = await BillingUsageRepository.rotateWeekSnapshotForPlanChange( + organizationId, + currentWeekKey, + { + meterQuota: newQuota, + planVersion: newVersion, + month, + }, + preserveUsage, + ); + + try { + billingEvents.emit('billing.plan_change.rotated', { + organizationId, + oldQuota: existingDoc.meterQuota ?? 0, + newQuota, + oldVersion: existingDoc.planVersion ?? null, + newVersion, + preserveUsage, + }); + } catch (evtErr) { + // Listener errors must not disrupt plan-change rotation β€” log for traceability + console.error('[billing.reset] billing.plan_change.rotated listener error (non-fatal):', evtErr?.message ?? evtErr); + } + + return updatedDoc; +}; + /** * @function resetAllDue * @description Iterate active subscriptions where current_period_start has crossed @@ -135,6 +192,7 @@ const resetAllDue = async () => { export default { resetWeek, + forceRotateForPlanChange, resetAllDue, isoWeekKey, }; diff --git a/modules/billing/services/billing.usage.service.js b/modules/billing/services/billing.usage.service.js index 92a0bb4ab..8a91c14f4 100644 --- a/modules/billing/services/billing.usage.service.js +++ b/modules/billing/services/billing.usage.service.js @@ -4,6 +4,7 @@ import config from '../../../config/index.js'; import UsageRepository from '../repositories/billing.usage.repository.js'; import BillingSubscriptionRepository from '../repositories/billing.subscription.repository.js'; +import BillingMeterOutboxRepository from '../repositories/billing.meter.outbox.repository.js'; import BillingPlanService from './billing.plan.service.js'; /** @@ -214,6 +215,34 @@ const incrementMeter = async (organizationId, units, breakdown, idempotencyKey) }; }; +/** + * @function incrementMeterWithOutbox + * @description Increment meter usage and, when the increment overflows into + * extras, create the pending extras-debit outbox row before + * returning to the caller. This keeps usage idempotency and the + * reconciliation record coupled on the hot path. If Mongo + * transactions are unavailable in the deployment, this is the + * immediate-after fallback described by the billing lifecycle docs. + * @param {string} organizationId - The organization ObjectId (string). + * @param {number} units - Meter units to attribute. + * @param {Object} breakdown - Feature-keyed breakdown: { featureKey: units }. + * @param {string} idempotencyKey - Unique key for replay protection. + * @returns {Promise<{applied: boolean, meterUsed: number, meterQuota: number, extrasConsumed: number, alertCrossed: string|null, outbox?: Object}>} + */ +// biome-ignore lint/correctness/useQwikValidLexicalScope: false positive β€” Node.js service, not Qwik +const incrementMeterWithOutbox = async (organizationId, units, breakdown, idempotencyKey) => { + const result = await incrementMeter(organizationId, units, breakdown, idempotencyKey); + if (!result.applied || result.extrasConsumed <= 0) return result; + + const outbox = await BillingMeterOutboxRepository.create({ + organizationId, + idempotencyKey, + extrasUnits: result.extrasConsumed, + }); + + return { ...result, outbox }; +}; + /** * @function getMeter * @description Return the current week's meter document for an organization, @@ -234,5 +263,6 @@ export default { reset, currentWeekKey, incrementMeter, + incrementMeterWithOutbox, getMeter, }; diff --git a/modules/billing/services/billing.webhook.service.js b/modules/billing/services/billing.webhook.service.js index f6da20c3f..04bb49382 100644 --- a/modules/billing/services/billing.webhook.service.js +++ b/modules/billing/services/billing.webhook.service.js @@ -250,24 +250,15 @@ const handleSubscriptionUpdated = async (subscription, event) => { console.error('[billing.webhook] plan.changed listener error (non-fatal):', evtErr?.message ?? evtErr); } - // Memo edge case 5: plan switch mid-cycle = immediate reset of weekly meter. - // Stripe does NOT advance current_period_start on plan switch (proration only), - // so we must trigger resetWeek here independently of the period-start change block. - // Anchor = newPeriodStart only when the period ALSO changed simultaneously (e.g. annualβ†’monthly - // on renewal day). Plain mid-cycle plan switch β†’ anchor is now (current moment), because - // newPeriodStart is the billing-cycle start (potentially weeks in the past) and would resolve - // to a past ISO week bucket. - const periodChanged = - previousPeriodStart !== undefined && - subscription.current_period_start !== previousPeriodStart && - newPeriodStart; - const anchor = periodChanged ? newPeriodStart : new Date(); + // Plan switch mid-cycle = refresh the active week snapshot to the new plan. + // Unlike cron-driven resetWeek, this preserves meterUsed by default so a plan + // change does not refund or double-charge already attributed usage. try { - await BillingResetService.resetWeek(organizationId, anchor); + await BillingResetService.forceRotateForPlanChange(organizationId, { preserveUsage: true }); planChangeResetTriggered = true; } catch (err) { // Log for monitoring β€” not thrown so webhook processing continues - console.error('[billing.webhook] resetWeek on plan change failed (non-fatal):', err?.message ?? err); + console.error('[billing.webhook] forceRotateForPlanChange failed (non-fatal):', err?.message ?? err); } } } diff --git a/modules/billing/tests/billing.cron.retryPendingExtrasDebit.unit.tests.js b/modules/billing/tests/billing.cron.retryPendingExtrasDebit.unit.tests.js new file mode 100644 index 000000000..60a2e03fc --- /dev/null +++ b/modules/billing/tests/billing.cron.retryPendingExtrasDebit.unit.tests.js @@ -0,0 +1,119 @@ +/** + * Module dependencies. + */ +import { jest, describe, test, beforeEach, afterEach, expect } from '@jest/globals'; + +/** + * Unit tests for retry-pending-extras-debit cron logic. + */ +describe('billing.retryPendingExtrasDebit cron β€” BillingMeterOutboxService:', () => { + let BillingMeterOutboxService; + let mockOutboxRepository; + let mockExtraService; + let mockEvents; + + const orgId = '507f1f77bcf86cd799439011'; + + /** + * @param {Object} [overrides={}] - Fields to override on the pending outbox row. + * @returns {Object} A stub pending outbox row. + */ + const makeOutbox = (overrides = {}) => ({ + _id: '607f1f77bcf86cd799439099', + organizationId: orgId, + idempotencyKey: '507f1f77bcf86cd799439022:initial', + extrasUnits: 100, + attempts: 0, + status: 'pending', + ...overrides, + }); + + beforeEach(async () => { + jest.resetModules(); + + mockOutboxRepository = { + findPendingDue: jest.fn(), + markCommitted: jest.fn(), + markFailedAttempt: jest.fn(), + }; + + mockExtraService = { + debit: jest.fn(), + }; + + mockEvents = { + emit: jest.fn(), + }; + + jest.unstable_mockModule('../repositories/billing.meter.outbox.repository.js', () => ({ + default: mockOutboxRepository, + })); + + jest.unstable_mockModule('../services/billing.extra.service.js', () => ({ + default: mockExtraService, + })); + + jest.unstable_mockModule('../lib/events.js', () => ({ + default: mockEvents, + })); + + const mod = await import('../services/billing.meter.outbox.service.js'); + BillingMeterOutboxService = mod.default; + }); + + afterEach(() => { + jest.restoreAllMocks(); + }); + + test('retries a pending row and marks it committed on debit success', async () => { + const row = makeOutbox(); + mockOutboxRepository.findPendingDue.mockResolvedValue([row]); + mockExtraService.debit.mockResolvedValue({ applied: true }); + mockOutboxRepository.markCommitted.mockResolvedValue({ modifiedCount: 1 }); + + const result = await BillingMeterOutboxService.retryPendingExtrasDebits(300000, 100); + + expect(mockOutboxRepository.findPendingDue).toHaveBeenCalledWith(300000, 100); + expect(mockExtraService.debit).toHaveBeenCalledWith(orgId, 100, row.idempotencyKey); + expect(mockOutboxRepository.markCommitted).toHaveBeenCalledWith(row._id); + expect(result).toEqual({ scanned: 1, committed: 1, failedAttempts: 0, exhausted: 0 }); + }); + + test('records failed attempts when debit throws', async () => { + const row = makeOutbox(); + const err = new Error('write failed'); + mockOutboxRepository.findPendingDue.mockResolvedValue([row]); + mockExtraService.debit.mockRejectedValue(err); + mockOutboxRepository.markFailedAttempt.mockResolvedValue(makeOutbox({ attempts: 1 })); + + const result = await BillingMeterOutboxService.retryPendingExtrasDebits(); + + expect(mockOutboxRepository.markFailedAttempt).toHaveBeenCalledWith(row._id, err); + expect(mockEvents.emit).not.toHaveBeenCalled(); + expect(result.failedAttempts).toBe(1); + }); + + test('after 5 failures marks failed and emits billing.extras_debit.exhausted', async () => { + const row = makeOutbox({ attempts: 4 }); + const failed = makeOutbox({ + attempts: 5, + status: 'failed', + lastError: 'still failing', + }); + mockOutboxRepository.findPendingDue.mockResolvedValue([row]); + mockExtraService.debit.mockResolvedValue({ applied: false }); + mockOutboxRepository.markFailedAttempt.mockResolvedValue(failed); + + const result = await BillingMeterOutboxService.retryPendingExtrasDebits(); + + expect(mockOutboxRepository.markFailedAttempt).toHaveBeenCalledWith(row._id, 'extras debit not applied'); + expect(mockEvents.emit).toHaveBeenCalledWith('billing.extras_debit.exhausted', { + organizationId: orgId, + idempotencyKey: row.idempotencyKey, + extrasUnits: 100, + attempts: 5, + lastError: 'still failing', + }); + expect(result).toEqual({ scanned: 1, committed: 0, failedAttempts: 1, exhausted: 1 }); + }); +}); diff --git a/modules/billing/tests/billing.lifecycle.integration.tests.js b/modules/billing/tests/billing.lifecycle.integration.tests.js new file mode 100644 index 000000000..91c8f5476 --- /dev/null +++ b/modules/billing/tests/billing.lifecycle.integration.tests.js @@ -0,0 +1,211 @@ +/** + * Module dependencies. + */ +import mongoose from 'mongoose'; +import { describe, beforeAll, beforeEach, afterAll, afterEach, test, expect, jest } from '@jest/globals'; + +import config from '../../../config/index.js'; +import mongooseService from '../../../lib/services/mongoose.js'; + +/** + * Integration tests for meter lifecycle hardening. + */ +describe('Billing meter lifecycle integration tests:', () => { + let BillingUsage; + let BillingPlan; + let Subscription; + let Organization; + let BillingMeterOutbox; + let BillingExtraBalance; + let BillingResetService; + let BillingWebhookService; + let BillingMeterService; + let BillingMeterOutboxService; + let BillingPlanService; + let billingEvents; + let originalMeterMode; + + /** + * @param {string} planId - Plan identifier. + * @param {string} version - Plan version. + * @param {number} meterQuota - Meter quota. + * @returns {Promise} Created plan document. + */ + const createActivePlan = (planId, version, meterQuota) => + BillingPlan.create({ + planId, + version, + meterQuota, + ratios: { scrap: 1 }, + active: true, + effectiveFrom: new Date('2026-01-01T00:00:00.000Z'), + effectiveUntil: null, + }); + + beforeAll(async () => { + originalMeterMode = config.billing.meterMode; + config.billing.meterMode = true; + await mongooseService.loadModels(); + await mongooseService.connect(); + + BillingUsage = mongoose.model('BillingUsage'); + BillingPlan = mongoose.model('BillingPlan'); + Subscription = mongoose.model('Subscription'); + Organization = mongoose.model('Organization'); + BillingMeterOutbox = mongoose.model('BillingMeterOutbox'); + BillingExtraBalance = mongoose.model('BillingExtraBalance'); + + BillingResetService = (await import('../services/billing.reset.service.js')).default; + BillingWebhookService = (await import('../services/billing.webhook.service.js')).default; + BillingMeterService = (await import('../services/billing.meter.service.js')).default; + BillingMeterOutboxService = (await import('../services/billing.meter.outbox.service.js')).default; + BillingPlanService = (await import('../services/billing.plan.service.js')).default; + billingEvents = (await import('../lib/events.js')).default; + }); + + beforeEach(async () => { + await Promise.all([ + BillingUsage.deleteMany({}), + BillingPlan.deleteMany({}), + Subscription.deleteMany({}), + Organization.deleteMany({}), + BillingMeterOutbox.deleteMany({}), + BillingExtraBalance.deleteMany({}), + ]); + for (const planId of config.billing.plans ?? []) { + BillingPlanService.invalidateCache(planId); + } + }); + + afterEach(() => { + jest.restoreAllMocks(); + billingEvents.removeAllListeners('billing.extras_debit.exhausted'); + }); + + afterAll(async () => { + config.billing.meterMode = originalMeterMode; + await mongooseService.disconnect(); + }); + + test('plan.changed webhook updates active week quota snapshot mid-week', async () => { + const organizationId = new mongoose.Types.ObjectId(); + const weekKey = BillingResetService.isoWeekKey(new Date()); + await Organization.create({ _id: organizationId, name: 'Lifecycle Org', slug: 'lifecycle-org', plan: 'starter' }); + await createActivePlan('pro', 'pro-v2', 1000); + await Subscription.create({ + organization: organizationId, + stripeCustomerId: 'cus_lifecycle', + stripeSubscriptionId: 'sub_lifecycle', + plan: 'starter', + status: 'active', + }); + await BillingUsage.create({ + organizationId, + month: '2026-05', + weekKey, + counters: {}, + meterUsed: 25, + meterQuota: 100, + planVersion: 'starter-v1', + meterBreakdown: { scrap: 25 }, + consumedAttributionKeys: [], + }); + + await BillingWebhookService.handleSubscriptionUpdated( + { + id: 'sub_lifecycle', + status: 'active', + current_period_end: Math.floor(Date.now() / 1000) + 30 * 24 * 60 * 60, + current_period_start: Math.floor(Date.now() / 1000) - 24 * 60 * 60, + cancel_at_period_end: false, + items: { data: [{ price: { metadata: { planId: 'pro' } } }] }, + }, + { + data: { + previous_attributes: { + items: { data: [{ price: { metadata: { planId: 'starter' } } }] }, + }, + }, + }, + ); + + const usage = await BillingUsage.findOne({ organizationId, weekKey }).lean(); + expect(usage.meterQuota).toBe(1000); + expect(usage.planVersion).toBe('pro-v2'); + expect(usage.meterUsed).toBe(25); + expect(usage.meterBreakdown).toEqual({ scrap: 25 }); + }); + + test('attribute returns optimistically and leaves pending outbox when extras debit is not applied', async () => { + const organizationId = new mongoose.Types.ObjectId(); + await createActivePlan('pro', 'pro-v1', 5); + await Subscription.create({ + organization: organizationId, + plan: 'pro', + status: 'active', + }); + + const result = await BillingMeterService.attribute( + { + _id: new mongoose.Types.ObjectId(), + costs: { scrap: 0.01 }, + planId: 'pro', + planVersion: 'pro-v1', + }, + organizationId.toString(), + ); + + expect(result).toEqual({ applied: true, meterUsed: 10, extrasConsumed: 5 }); + const outbox = await BillingMeterOutbox.findOne({ organizationId }).lean(); + expect(outbox.status).toBe('pending'); + expect(outbox.extrasUnits).toBe(5); + }); + + test('retry service commits successful debit and emits alert after exhausted failures', async () => { + const organizationId = new mongoose.Types.ObjectId(); + const committedKey = '507f1f77bcf86cd799439011:initial'; + const failedKey = '507f1f77bcf86cd799439022:initial'; + const exhaustedEvents = []; + billingEvents.on('billing.extras_debit.exhausted', (payload) => exhaustedEvents.push(payload)); + + await BillingExtraBalance.create({ + organization: organizationId, + ledger: [{ kind: 'topup', amount: 100, stripeSessionId: 'cs_retry' }], + cachedBalance: 100, + }); + await BillingMeterOutbox.create({ + organizationId, + idempotencyKey: committedKey, + extrasUnits: 40, + status: 'pending', + lastAttemptedAt: null, + }); + await BillingMeterOutbox.create({ + organizationId, + idempotencyKey: failedKey, + extrasUnits: 500, + status: 'pending', + attempts: 4, + lastAttemptedAt: null, + }); + + const result = await BillingMeterOutboxService.retryPendingExtrasDebits(5 * 60 * 1000, 100); + + expect(result).toEqual({ scanned: 2, committed: 1, failedAttempts: 1, exhausted: 1 }); + const committed = await BillingMeterOutbox.findOne({ idempotencyKey: committedKey }).lean(); + const failed = await BillingMeterOutbox.findOne({ idempotencyKey: failedKey }).lean(); + const balance = await BillingExtraBalance.findOne({ organization: organizationId }).lean(); + expect(committed.status).toBe('committed'); + expect(failed.status).toBe('failed'); + expect(failed.attempts).toBe(5); + expect(balance.cachedBalance).toBe(60); + expect(exhaustedEvents).toEqual([ + expect.objectContaining({ + organizationId: organizationId.toString(), + idempotencyKey: failedKey, + extrasUnits: 500, + attempts: 5, + }), + ]); + }); +}); diff --git a/modules/billing/tests/billing.meter.outbox.unit.tests.js b/modules/billing/tests/billing.meter.outbox.unit.tests.js new file mode 100644 index 000000000..f096a080f --- /dev/null +++ b/modules/billing/tests/billing.meter.outbox.unit.tests.js @@ -0,0 +1,167 @@ +/** + * Module dependencies. + */ +import { jest, describe, test, beforeEach, afterEach, expect } from '@jest/globals'; + +/** + * Unit tests for billing.meter.outbox repository and schema. + */ +describe('BillingMeterOutbox unit tests:', () => { + describe('Schema validation', () => { + let schema; + + beforeEach(async () => { + const mod = await import('../models/billing.meter.outbox.schema.js'); + schema = mod.default; + }); + + test('accepts a pending outbox row', () => { + const result = schema.BillingMeterOutbox.safeParse({ + organizationId: '507f1f77bcf86cd799439011', + idempotencyKey: '507f1f77bcf86cd799439099:initial', + extrasUnits: 250, + }); + + expect(result.error).toBeFalsy(); + expect(result.data.status).toBe('pending'); + expect(result.data.attempts).toBe(0); + }); + + test('rejects zero extrasUnits', () => { + const result = schema.BillingMeterOutboxCreate.safeParse({ + organizationId: '507f1f77bcf86cd799439011', + idempotencyKey: 'key', + extrasUnits: 0, + }); + + expect(result.error).toBeDefined(); + }); + }); + + describe('Repository', () => { + let BillingMeterOutboxRepository; + let mockModel; + + const orgId = '507f1f77bcf86cd799439011'; + const outboxId = '607f1f77bcf86cd799439099'; + + /** + * @param {Object} [overrides={}] - Fields to override on the stub outbox row. + * @returns {Object} A stub outbox row. + */ + const makeOutbox = (overrides = {}) => ({ + _id: outboxId, + organizationId: orgId, + idempotencyKey: '507f1f77bcf86cd799439022:initial', + extrasUnits: 100, + status: 'pending', + attempts: 0, + lastError: null, + lastAttemptedAt: null, + ...overrides, + }); + + beforeEach(async () => { + jest.resetModules(); + + mockModel = { + create: jest.fn(), + find: jest.fn(), + updateOne: jest.fn(), + findOneAndUpdate: jest.fn(), + }; + + jest.unstable_mockModule('mongoose', () => ({ + default: { + model: jest.fn(() => mockModel), + }, + })); + + const mod = await import('../repositories/billing.meter.outbox.repository.js'); + BillingMeterOutboxRepository = mod.default; + }); + + afterEach(() => { + jest.restoreAllMocks(); + }); + + test('create inserts a pending row', async () => { + const row = makeOutbox(); + mockModel.create.mockResolvedValue([row]); + + const result = await BillingMeterOutboxRepository.create({ + organizationId: orgId, + idempotencyKey: row.idempotencyKey, + extrasUnits: 100, + }); + + expect(mockModel.create).toHaveBeenCalledWith( + [{ + organizationId: orgId, + idempotencyKey: row.idempotencyKey, + extrasUnits: 100, + status: 'pending', + }], + undefined, + ); + expect(result).toBe(row); + }); + + test('findPendingDue filters pending rows due for retry', async () => { + const lean = jest.fn().mockResolvedValue([makeOutbox()]); + const limit = jest.fn(() => ({ lean })); + const sort = jest.fn(() => ({ limit })); + mockModel.find.mockReturnValue({ sort }); + + await BillingMeterOutboxRepository.findPendingDue(300000, 100); + + expect(mockModel.find).toHaveBeenCalledWith({ + status: 'pending', + $or: [ + { lastAttemptedAt: null }, + { lastAttemptedAt: { $lt: expect.any(Date) } }, + ], + }); + expect(sort).toHaveBeenCalledWith({ lastAttemptedAt: 1, createdAt: 1 }); + expect(limit).toHaveBeenCalledWith(100); + expect(lean).toHaveBeenCalled(); + }); + + test('markCommitted sets committed status', async () => { + mockModel.updateOne.mockResolvedValue({ modifiedCount: 1 }); + + await BillingMeterOutboxRepository.markCommitted(outboxId); + + expect(mockModel.updateOne).toHaveBeenCalledWith( + { _id: outboxId }, + { + $set: { + status: 'committed', + lastError: null, + lastAttemptedAt: expect.any(Date), + }, + }, + ); + }); + + test('markFailedAttempt increments attempts and marks failed on fifth attempt', async () => { + const leanFirst = jest.fn().mockResolvedValue(makeOutbox({ attempts: 5 })); + const leanSecond = jest.fn().mockResolvedValue(makeOutbox({ attempts: 5, status: 'failed' })); + mockModel.findOneAndUpdate + .mockReturnValueOnce({ lean: leanFirst }) + .mockReturnValueOnce({ lean: leanSecond }); + + const result = await BillingMeterOutboxRepository.markFailedAttempt(outboxId, new Error('debit failed')); + + expect(mockModel.findOneAndUpdate).toHaveBeenCalledTimes(2); + expect(mockModel.findOneAndUpdate.mock.calls[0][1]).toEqual({ + $inc: { attempts: 1 }, + $set: { + lastError: 'debit failed', + lastAttemptedAt: expect.any(Date), + }, + }); + expect(result.status).toBe('failed'); + }); + }); +}); diff --git a/modules/billing/tests/billing.meter.service.unit.tests.js b/modules/billing/tests/billing.meter.service.unit.tests.js index 7fa9e952e..b391145e6 100644 --- a/modules/billing/tests/billing.meter.service.unit.tests.js +++ b/modules/billing/tests/billing.meter.service.unit.tests.js @@ -12,6 +12,7 @@ describe('BillingMeterService unit tests:', () => { let mockConfig; let mockBillingUsageService; let mockBillingExtraService; + let mockBillingMeterOutboxRepository; const orgId = '507f1f77bcf86cd799439011'; @@ -52,12 +53,18 @@ describe('BillingMeterService unit tests:', () => { mockBillingUsageService = { incrementMeter: jest.fn(), + incrementMeterWithOutbox: jest.fn(), }; mockBillingExtraService = { debit: jest.fn(), }; + mockBillingMeterOutboxRepository = { + create: jest.fn(), + markCommitted: jest.fn(), + }; + jest.unstable_mockModule('../../../config/index.js', () => ({ default: mockConfig, })); @@ -74,6 +81,10 @@ describe('BillingMeterService unit tests:', () => { default: mockBillingExtraService, })); + jest.unstable_mockModule('../repositories/billing.meter.outbox.repository.js', () => ({ + default: mockBillingMeterOutboxRepository, + })); + const mod = await import('../services/billing.meter.service.js'); BillingMeterService = mod.default; }); @@ -202,11 +213,80 @@ describe('BillingMeterService unit tests:', () => { }); }); + describe('attribute β€” extras outbox', () => { + test('commits outbox row after successful extras debit (outbox created by incrementMeterWithOutbox)', async () => { + // attribute() calls incrementMeterWithOutbox which creates the outbox row and returns it. + // attribute() then attempts the synchronous debit and marks the outbox committed on success. + mockBillingPlanService.getPlanByVersion.mockResolvedValue(makePlan({ ratios: { scrap: 1 } })); + mockBillingUsageService.incrementMeterWithOutbox.mockResolvedValue({ + applied: true, + meterUsed: 6000, + extrasConsumed: 1000, + outbox: { _id: 'outbox_1' }, + }); + mockBillingExtraService.debit.mockResolvedValue({ applied: true }); + mockBillingMeterOutboxRepository.markCommitted.mockResolvedValue({ modifiedCount: 1 }); + + const history = { + _id: '507f1f77bcf86cd799439051', + costs: { scrap: 6 }, + planId: 'pro', + planVersion: 'v1', + }; + + const result = await BillingMeterService.attribute(history, orgId); + + expect(mockBillingUsageService.incrementMeterWithOutbox).toHaveBeenCalledWith( + orgId, + 6000, + { scrap: 6000 }, + '507f1f77bcf86cd799439051:initial', + ); + // attribute() does NOT call outbox.create directly β€” incrementMeterWithOutbox owns that + expect(mockBillingMeterOutboxRepository.create).not.toHaveBeenCalled(); + expect(mockBillingExtraService.debit).toHaveBeenCalledWith( + orgId, + 1000, + '507f1f77bcf86cd799439051:initial', + ); + expect(mockBillingMeterOutboxRepository.markCommitted).toHaveBeenCalledWith('outbox_1'); + expect(result).toEqual({ applied: true, meterUsed: 6000, extrasConsumed: 1000 }); + }); + + test('debit failure leaves outbox pending and still returns applied=true', async () => { + const warnSpy = jest.spyOn(console, 'warn').mockImplementation(() => {}); + mockBillingPlanService.getPlanByVersion.mockResolvedValue(makePlan({ ratios: { scrap: 1 } })); + mockBillingUsageService.incrementMeterWithOutbox.mockResolvedValue({ + applied: true, + meterUsed: 6000, + extrasConsumed: 1000, + outbox: { _id: 'outbox_2' }, + }); + mockBillingExtraService.debit.mockRejectedValue(new Error('balance write failed')); + + const history = { + _id: '507f1f77bcf86cd799439052', + costs: { scrap: 6 }, + planId: 'pro', + planVersion: 'v1', + }; + + const result = await BillingMeterService.attribute(history, orgId); + + expect(result).toEqual({ applied: true, meterUsed: 6000, extrasConsumed: 1000 }); + expect(mockBillingMeterOutboxRepository.markCommitted).not.toHaveBeenCalled(); + expect(warnSpy).toHaveBeenCalledWith( + '[billing.meter] extras debit deferred to outbox:', + 'balance write failed', + ); + }); + }); + describe('attribute β€” maxUnitsPerOperation cap', () => { test('caps metered units when computed units exceed config cap', async () => { const warnSpy = jest.spyOn(console, 'warn').mockImplementation(() => {}); mockBillingPlanService.getPlanByVersion.mockResolvedValue(makePlan({ ratios: { scrap: 1 } })); - mockBillingUsageService.incrementMeter.mockResolvedValue({ + mockBillingUsageService.incrementMeterWithOutbox.mockResolvedValue({ applied: true, meterUsed: 10000, extrasConsumed: 0, @@ -221,7 +301,7 @@ describe('BillingMeterService unit tests:', () => { const result = await BillingMeterService.attribute(history, orgId); - expect(mockBillingUsageService.incrementMeter).toHaveBeenCalledWith( + expect(mockBillingUsageService.incrementMeterWithOutbox).toHaveBeenCalledWith( orgId, 10000, { scrap: 10000 }, @@ -236,7 +316,7 @@ describe('BillingMeterService unit tests:', () => { test('does not clamp when units are within the configured cap', async () => { const warnSpy = jest.spyOn(console, 'warn').mockImplementation(() => {}); mockBillingPlanService.getPlanByVersion.mockResolvedValue(makePlan({ ratios: { scrap: 1 } })); - mockBillingUsageService.incrementMeter.mockResolvedValue({ + mockBillingUsageService.incrementMeterWithOutbox.mockResolvedValue({ applied: true, meterUsed: 5000, extrasConsumed: 0, @@ -251,7 +331,7 @@ describe('BillingMeterService unit tests:', () => { await BillingMeterService.attribute(history, orgId); - expect(mockBillingUsageService.incrementMeter).toHaveBeenCalledWith( + expect(mockBillingUsageService.incrementMeterWithOutbox).toHaveBeenCalledWith( orgId, 5000, { scrap: 5000 }, @@ -264,7 +344,7 @@ describe('BillingMeterService unit tests:', () => { const warnSpy = jest.spyOn(console, 'warn').mockImplementation(() => {}); delete mockConfig.billing.meter.maxUnitsPerOperation; mockBillingPlanService.getPlanByVersion.mockResolvedValue(makePlan({ ratios: { scrap: 1 } })); - mockBillingUsageService.incrementMeter.mockResolvedValue({ + mockBillingUsageService.incrementMeterWithOutbox.mockResolvedValue({ applied: true, meterUsed: 20000, extrasConsumed: 0, @@ -279,7 +359,7 @@ describe('BillingMeterService unit tests:', () => { await BillingMeterService.attribute(history, orgId); - expect(mockBillingUsageService.incrementMeter).toHaveBeenCalledWith( + expect(mockBillingUsageService.incrementMeterWithOutbox).toHaveBeenCalledWith( orgId, 20000, { scrap: 20000 }, @@ -309,13 +389,13 @@ describe('BillingMeterService unit tests:', () => { extrasConsumed: 0, reason: 'zero_cost_skipped', }); - expect(mockBillingUsageService.incrementMeter).not.toHaveBeenCalled(); + expect(mockBillingUsageService.incrementMeterWithOutbox).not.toHaveBeenCalled(); expect(mockBillingExtraService.debit).not.toHaveBeenCalled(); }); test('costsOverride charges only the override delta', async () => { mockBillingPlanService.getPlanByVersion.mockResolvedValue(makePlan({ ratios: { scrap: 1, digest: 2 } })); - mockBillingUsageService.incrementMeter.mockResolvedValue({ + mockBillingUsageService.incrementMeterWithOutbox.mockResolvedValue({ applied: true, meterUsed: 1000, extrasConsumed: 0, @@ -333,7 +413,7 @@ describe('BillingMeterService unit tests:', () => { costsOverride: { digest: 0.5 }, }); - expect(mockBillingUsageService.incrementMeter).toHaveBeenCalledWith( + expect(mockBillingUsageService.incrementMeterWithOutbox).toHaveBeenCalledWith( orgId, 1000, { digest: 1000 }, @@ -344,7 +424,7 @@ describe('BillingMeterService unit tests:', () => { test('options.costs is accepted as an alias when costsOverride is absent', async () => { mockBillingPlanService.getPlanByVersion.mockResolvedValue(makePlan({ ratios: { digest: 2 } })); - mockBillingUsageService.incrementMeter.mockResolvedValue({ + mockBillingUsageService.incrementMeterWithOutbox.mockResolvedValue({ applied: true, meterUsed: 1000, extrasConsumed: 0, @@ -362,7 +442,7 @@ describe('BillingMeterService unit tests:', () => { costs: { digest: 0.5 }, }); - expect(mockBillingUsageService.incrementMeter).toHaveBeenCalledWith( + expect(mockBillingUsageService.incrementMeterWithOutbox).toHaveBeenCalledWith( orgId, 1000, { digest: 1000 }, @@ -374,7 +454,7 @@ describe('BillingMeterService unit tests:', () => { mockBillingPlanService.getPlanByVersion.mockResolvedValue( makePlan({ planId: 'override', version: '2026.05', ratios: { digest: 3 } }), ); - mockBillingUsageService.incrementMeter.mockResolvedValue({ + mockBillingUsageService.incrementMeterWithOutbox.mockResolvedValue({ applied: true, meterUsed: 1500, extrasConsumed: 0, @@ -394,7 +474,7 @@ describe('BillingMeterService unit tests:', () => { }); expect(mockBillingPlanService.getPlanByVersion).toHaveBeenCalledWith('override', '2026.05'); - expect(mockBillingUsageService.incrementMeter).toHaveBeenCalledWith( + expect(mockBillingUsageService.incrementMeterWithOutbox).toHaveBeenCalledWith( orgId, 1500, { digest: 1500 }, @@ -405,13 +485,13 @@ describe('BillingMeterService unit tests:', () => { test('same history attributed twice with default stepKey: 2nd is no-op (regression)', async () => { mockBillingPlanService.getPlanByVersion.mockResolvedValue(makePlan({ ratios: { scrap: 1 } })); // First call: applied - mockBillingUsageService.incrementMeter.mockResolvedValueOnce({ + mockBillingUsageService.incrementMeterWithOutbox.mockResolvedValueOnce({ applied: true, meterUsed: 1000, extrasConsumed: 0, }); // Second call: no-op (replay) - mockBillingUsageService.incrementMeter.mockResolvedValueOnce({ + mockBillingUsageService.incrementMeterWithOutbox.mockResolvedValueOnce({ applied: false, meterUsed: 1000, }); @@ -430,17 +510,17 @@ describe('BillingMeterService unit tests:', () => { expect(second.applied).toBe(false); // Both calls must use the same idempotency key (stepKey defaults to 'initial') - expect(mockBillingUsageService.incrementMeter).toHaveBeenNthCalledWith( + expect(mockBillingUsageService.incrementMeterWithOutbox).toHaveBeenNthCalledWith( 1, orgId, expect.any(Number), expect.any(Object), '507f1f77bcf86cd799439040:initial', ); - expect(mockBillingUsageService.incrementMeter).toHaveBeenNthCalledWith( + expect(mockBillingUsageService.incrementMeterWithOutbox).toHaveBeenNthCalledWith( 2, orgId, expect.any(Number), expect.any(Object), '507f1f77bcf86cd799439040:initial', ); }); test('same history attributed with {stepKey:"initial"} then {stepKey:"digest"}: both charged', async () => { mockBillingPlanService.getPlanByVersion.mockResolvedValue(makePlan({ ratios: { scrap: 1 } })); - mockBillingUsageService.incrementMeter.mockResolvedValue({ + mockBillingUsageService.incrementMeterWithOutbox.mockResolvedValue({ applied: true, meterUsed: 500, extrasConsumed: 0, @@ -459,17 +539,17 @@ describe('BillingMeterService unit tests:', () => { expect(initial.applied).toBe(true); expect(digest.applied).toBe(true); - expect(mockBillingUsageService.incrementMeter).toHaveBeenNthCalledWith( + expect(mockBillingUsageService.incrementMeterWithOutbox).toHaveBeenNthCalledWith( 1, orgId, expect.any(Number), expect.any(Object), '507f1f77bcf86cd799439041:initial', ); - expect(mockBillingUsageService.incrementMeter).toHaveBeenNthCalledWith( + expect(mockBillingUsageService.incrementMeterWithOutbox).toHaveBeenNthCalledWith( 2, orgId, expect.any(Number), expect.any(Object), '507f1f77bcf86cd799439041:digest', ); }); test('same history attributed with {stepKey:"fix:1"} then {stepKey:"fix:2"}: both charged', async () => { mockBillingPlanService.getPlanByVersion.mockResolvedValue(makePlan({ ratios: { scrap: 1 } })); - mockBillingUsageService.incrementMeter.mockResolvedValue({ + mockBillingUsageService.incrementMeterWithOutbox.mockResolvedValue({ applied: true, meterUsed: 200, extrasConsumed: 0, @@ -488,10 +568,10 @@ describe('BillingMeterService unit tests:', () => { expect(fix1.applied).toBe(true); expect(fix2.applied).toBe(true); - expect(mockBillingUsageService.incrementMeter).toHaveBeenNthCalledWith( + expect(mockBillingUsageService.incrementMeterWithOutbox).toHaveBeenNthCalledWith( 1, orgId, expect.any(Number), expect.any(Object), '507f1f77bcf86cd799439042:fix:1', ); - expect(mockBillingUsageService.incrementMeter).toHaveBeenNthCalledWith( + expect(mockBillingUsageService.incrementMeterWithOutbox).toHaveBeenNthCalledWith( 2, orgId, expect.any(Number), expect.any(Object), '507f1f77bcf86cd799439042:fix:2', ); }); @@ -512,12 +592,12 @@ describe('BillingMeterService unit tests:', () => { ).rejects.toThrow('[billing.meter] invalid stepKey'); // incrementMeter must NOT be called β€” the throw happens before any DB write - expect(mockBillingUsageService.incrementMeter).not.toHaveBeenCalled(); + expect(mockBillingUsageService.incrementMeterWithOutbox).not.toHaveBeenCalled(); }); test('null and undefined stepKey fall back to initial while invalid values throw', async () => { mockBillingPlanService.getPlanByVersion.mockResolvedValue(makePlan({ ratios: { scrap: 1 } })); - mockBillingUsageService.incrementMeter.mockResolvedValue({ + mockBillingUsageService.incrementMeterWithOutbox.mockResolvedValue({ applied: true, meterUsed: 100, extrasConsumed: 0, @@ -540,10 +620,10 @@ describe('BillingMeterService unit tests:', () => { extrasConsumed: 0, }); - expect(mockBillingUsageService.incrementMeter).toHaveBeenNthCalledWith( + expect(mockBillingUsageService.incrementMeterWithOutbox).toHaveBeenNthCalledWith( 1, orgId, expect.any(Number), expect.any(Object), '507f1f77bcf86cd799439098:initial', ); - expect(mockBillingUsageService.incrementMeter).toHaveBeenNthCalledWith( + expect(mockBillingUsageService.incrementMeterWithOutbox).toHaveBeenNthCalledWith( 2, orgId, expect.any(Number), expect.any(Object), '507f1f77bcf86cd799439098:initial', ); @@ -558,13 +638,13 @@ describe('BillingMeterService unit tests:', () => { test('replay of {stepKey:"digest"} is blocked (idempotent)', async () => { mockBillingPlanService.getPlanByVersion.mockResolvedValue(makePlan({ ratios: { scrap: 1 } })); // First digest call: applied - mockBillingUsageService.incrementMeter.mockResolvedValueOnce({ + mockBillingUsageService.incrementMeterWithOutbox.mockResolvedValueOnce({ applied: true, meterUsed: 300, extrasConsumed: 0, }); // Replay: no-op - mockBillingUsageService.incrementMeter.mockResolvedValueOnce({ + mockBillingUsageService.incrementMeterWithOutbox.mockResolvedValueOnce({ applied: false, meterUsed: 300, }); @@ -582,9 +662,9 @@ describe('BillingMeterService unit tests:', () => { expect(first.applied).toBe(true); expect(replay.applied).toBe(false); - expect(mockBillingUsageService.incrementMeter).toHaveBeenCalledTimes(2); + expect(mockBillingUsageService.incrementMeterWithOutbox).toHaveBeenCalledTimes(2); // Both calls use the same digest key - for (const call of mockBillingUsageService.incrementMeter.mock.calls) { + for (const call of mockBillingUsageService.incrementMeterWithOutbox.mock.calls) { expect(call[3]).toBe('507f1f77bcf86cd799439043:digest'); } }); diff --git a/modules/billing/tests/billing.reset.service.unit.tests.js b/modules/billing/tests/billing.reset.service.unit.tests.js index 9c07e1559..bb408f5b6 100644 --- a/modules/billing/tests/billing.reset.service.unit.tests.js +++ b/modules/billing/tests/billing.reset.service.unit.tests.js @@ -12,6 +12,7 @@ describe('BillingResetService unit tests:', () => { let mockPlanService; let mockConfig; let mockSubscriptionRepository; + let mockEvents; const orgId = '507f1f77bcf86cd799439011'; @@ -61,6 +62,7 @@ describe('BillingResetService unit tests:', () => { incrementMeter: jest.fn(), archiveOtherWeeks: jest.fn().mockResolvedValue({ modifiedCount: 0 }), upsertWeekSnapshot: jest.fn(), + rotateWeekSnapshotForPlanChange: jest.fn(), }; mockPlanService = { @@ -75,6 +77,10 @@ describe('BillingResetService unit tests:', () => { updateLastResetAt: jest.fn(), }; + mockEvents = { + emit: jest.fn(), + }; + jest.unstable_mockModule('../../../config/index.js', () => ({ default: mockConfig, })); @@ -91,6 +97,10 @@ describe('BillingResetService unit tests:', () => { default: mockPlanService, })); + jest.unstable_mockModule('../lib/events.js', () => ({ + default: mockEvents, + })); + const mod = await import('../services/billing.reset.service.js'); BillingResetService = mod.default; }); @@ -238,6 +248,83 @@ describe('BillingResetService unit tests:', () => { }); }); + describe('forceRotateForPlanChange', () => { + test('updates quota and version while preserving usage by default', async () => { + const existingDoc = makeUsageDoc({ + meterUsed: 1234, + meterQuota: 500000, + planVersion: 'v1', + }); + const updatedDoc = makeUsageDoc({ + meterUsed: 1234, + meterQuota: 1000000, + planVersion: 'v2', + }); + mockUsageRepository.findByWeek.mockResolvedValue(existingDoc); + mockSubscriptionRepository.findPlan.mockResolvedValue({ plan: 'pro' }); + mockPlanService.getActivePlan.mockResolvedValue(makePlan({ meterQuota: 1000000, version: 'v2' })); + mockUsageRepository.rotateWeekSnapshotForPlanChange.mockResolvedValue(updatedDoc); + + const result = await BillingResetService.forceRotateForPlanChange(orgId); + + expect(mockUsageRepository.rotateWeekSnapshotForPlanChange).toHaveBeenCalledWith( + orgId, + '2026-W18', + { meterQuota: 1000000, planVersion: 'v2', month: '2026-05' }, + true, + ); + expect(result).toBe(updatedDoc); + expect(mockEvents.emit).toHaveBeenCalledWith('billing.plan_change.rotated', { + organizationId: orgId, + oldQuota: 500000, + newQuota: 1000000, + oldVersion: 'v1', + newVersion: 'v2', + preserveUsage: true, + }); + }); + + test('resets usage when preserveUsage=false', async () => { + const existingDoc = makeUsageDoc({ + meterUsed: 1234, + meterBreakdown: { scrap: 1234 }, + }); + const updatedDoc = makeUsageDoc({ + meterUsed: 0, + meterBreakdown: {}, + meterQuota: 100000, + planVersion: 'v3', + }); + mockUsageRepository.findByWeek.mockResolvedValue(existingDoc); + mockSubscriptionRepository.findPlan.mockResolvedValue({ plan: 'starter' }); + mockPlanService.getActivePlan.mockResolvedValue(makePlan({ meterQuota: 100000, version: 'v3' })); + mockUsageRepository.rotateWeekSnapshotForPlanChange.mockResolvedValue(updatedDoc); + + const result = await BillingResetService.forceRotateForPlanChange(orgId, { preserveUsage: false }); + + expect(mockUsageRepository.rotateWeekSnapshotForPlanChange).toHaveBeenCalledWith( + orgId, + '2026-W18', + { meterQuota: 100000, planVersion: 'v3', month: '2026-05' }, + false, + ); + expect(result.meterUsed).toBe(0); + expect(result.meterBreakdown).toEqual({}); + }); + + test('returns null without fetching plan when no current week doc exists', async () => { + mockUsageRepository.findByWeek.mockResolvedValue(null); + + const result = await BillingResetService.forceRotateForPlanChange(orgId); + + expect(result).toBeNull(); + expect(mockSubscriptionRepository.findPlan).not.toHaveBeenCalled(); + expect(mockPlanService.getActivePlan).not.toHaveBeenCalled(); + expect(mockUsageRepository.rotateWeekSnapshotForPlanChange).not.toHaveBeenCalled(); + expect(mockEvents.emit).not.toHaveBeenCalled(); + }); + }); + describe('resetAllDue', () => { test('should return processed=0, errors=0 when meterMode is disabled', async () => { mockConfig.billing.meterMode = false; diff --git a/modules/billing/tests/billing.usage.repository.unit.tests.js b/modules/billing/tests/billing.usage.repository.unit.tests.js index 4fa99e743..5706f9378 100644 --- a/modules/billing/tests/billing.usage.repository.unit.tests.js +++ b/modules/billing/tests/billing.usage.repository.unit.tests.js @@ -144,6 +144,7 @@ describe('BillingUsageRepository β€” meter extensions unit tests:', () => { expect(capturedUpdate.$inc['meterBreakdown.scrap']).toBe(100); expect(capturedUpdate.$inc['meterBreakdown.autofix']).toBe(50); + expect(capturedUpdate.$setOnInsert.meterBreakdown).toBeUndefined(); }); test('should drop breakdown entries with non-positive or non-finite values', async () => { @@ -363,6 +364,53 @@ describe('BillingUsageRepository β€” meter extensions unit tests:', () => { }); }); + describe('rotateWeekSnapshotForPlanChange', () => { + test('updates snapshot fields while preserving usage', async () => { + const lean = jest.fn().mockResolvedValue(makeUsageDoc({ meterQuota: 1000000, planVersion: 'v2' })); + mockModel.findOneAndUpdate.mockReturnValue({ lean }); + + await BillingUsageRepository.rotateWeekSnapshotForPlanChange( + orgId, + weekKey, + { meterQuota: 1000000, planVersion: 'v2', month: '2026-05' }, + true, + ); + + expect(mockModel.findOneAndUpdate).toHaveBeenCalledWith( + { organizationId: orgId, weekKey }, + { + $set: { + meterQuota: 1000000, + planVersion: 'v2', + month: '2026-05', + }, + }, + { returnDocument: 'after', runValidators: false }, + ); + expect(lean).toHaveBeenCalled(); + }); + + test('resets meterUsed and meterBreakdown when preserveUsage=false', async () => { + const lean = jest.fn().mockResolvedValue(makeUsageDoc({ meterUsed: 0, meterBreakdown: {} })); + mockModel.findOneAndUpdate.mockReturnValue({ lean }); + + await BillingUsageRepository.rotateWeekSnapshotForPlanChange( + orgId, + weekKey, + { meterQuota: 100000, planVersion: 'v3', month: '2026-05' }, + false, + ); + + expect(mockModel.findOneAndUpdate.mock.calls[0][1].$set).toEqual({ + meterQuota: 100000, + planVersion: 'v3', + month: '2026-05', + meterUsed: 0, + meterBreakdown: {}, + }); + }); + }); + describe('markThreshold', () => { test('should call updateOne with field null filter and $set to current date', async () => { mockModel.updateOne = jest.fn().mockResolvedValue({ modifiedCount: 1 }); diff --git a/modules/billing/tests/billing.usage.service.unit.tests.js b/modules/billing/tests/billing.usage.service.unit.tests.js index 0f0f6f091..b8f047b20 100644 --- a/modules/billing/tests/billing.usage.service.unit.tests.js +++ b/modules/billing/tests/billing.usage.service.unit.tests.js @@ -11,6 +11,7 @@ describe('BillingUsageService β€” meter extensions unit tests:', () => { let mockUsageRepository; let mockPlanService; let mockSubscriptionRepository; + let mockMeterOutboxRepository; let mockConfig; const orgId = '507f1f77bcf86cd799439011'; @@ -77,6 +78,10 @@ describe('BillingUsageService β€” meter extensions unit tests:', () => { findPlan: jest.fn(), }; + mockMeterOutboxRepository = { + create: jest.fn(), + }; + jest.unstable_mockModule('../../../config/index.js', () => ({ default: mockConfig, })); @@ -89,6 +94,10 @@ describe('BillingUsageService β€” meter extensions unit tests:', () => { default: mockSubscriptionRepository, })); + jest.unstable_mockModule('../repositories/billing.meter.outbox.repository.js', () => ({ + default: mockMeterOutboxRepository, + })); + jest.unstable_mockModule('../services/billing.plan.service.js', () => ({ default: mockPlanService, })); @@ -232,6 +241,47 @@ describe('BillingUsageService β€” meter extensions unit tests:', () => { expect(result.extrasConsumed).toBe(0); }); + + test('incrementMeterWithOutbox creates outbox row when extras are consumed', async () => { + mockSubscriptionRepository.findPlan.mockResolvedValue({ plan: 'pro' }); + mockPlanService.getActivePlan.mockResolvedValue(makePlan({ meterQuota: 500000 })); + const updatedDoc = makeUsageDoc({ meterUsed: 510000, meterQuota: 500000 }); + const outbox = { _id: 'outbox_1' }; + mockUsageRepository.incrementMeter.mockResolvedValue(updatedDoc); + mockMeterOutboxRepository.create.mockResolvedValue(outbox); + + const result = await BillingUsageService.incrementMeterWithOutbox( + orgId, + 50000, + {}, + 'hist_overflow:initial', + ); + + expect(mockMeterOutboxRepository.create).toHaveBeenCalledWith({ + organizationId: orgId, + idempotencyKey: 'hist_overflow:initial', + extrasUnits: 10000, + }); + expect(result.outbox).toBe(outbox); + expect(result.extrasConsumed).toBe(10000); + }); + + test('incrementMeterWithOutbox does not create outbox row for replay', async () => { + mockSubscriptionRepository.findPlan.mockResolvedValue({ plan: 'pro' }); + mockPlanService.getActivePlan.mockResolvedValue(makePlan({ meterQuota: 500000 })); + mockUsageRepository.incrementMeter.mockResolvedValue(null); + mockUsageRepository.findByWeek.mockResolvedValue(makeUsageDoc({ meterUsed: 510000 })); + + const result = await BillingUsageService.incrementMeterWithOutbox( + orgId, + 50000, + {}, + 'hist_replay:initial', + ); + + expect(result.applied).toBe(false); + expect(mockMeterOutboxRepository.create).not.toHaveBeenCalled(); + }); }); describe('incrementMeter β€” threshold detection', () => { diff --git a/modules/billing/tests/billing.webhook.subscription.unit.tests.js b/modules/billing/tests/billing.webhook.subscription.unit.tests.js index 44c6ef23d..286e70d26 100644 --- a/modules/billing/tests/billing.webhook.subscription.unit.tests.js +++ b/modules/billing/tests/billing.webhook.subscription.unit.tests.js @@ -36,6 +36,7 @@ describe('Billing webhook subscription unit tests:', () => { mockResetService = { resetWeek: jest.fn().mockResolvedValue({}), + forceRotateForPlanChange: jest.fn().mockResolvedValue({}), }; mockEvents = { emit: jest.fn() }; @@ -187,9 +188,9 @@ describe('Billing webhook subscription unit tests:', () => { ).resolves.not.toThrow(); }); - // ── Fix #3571: plan change mid-cycle triggers resetWeek ─────────────────── + // ── Plan changes refresh the active week snapshot without weekly rollover ── - test('fix #3571: plan change with same period_start β€” resetWeek called once', async () => { + test('plan change with same period_start β€” forceRotateForPlanChange called once', async () => { const periodStart = 1700000000; const existing = { _id: subId, organization: orgId }; mockSubscriptionRepository.findByStripeSubscriptionId.mockResolvedValue(existing); @@ -216,16 +217,15 @@ describe('Billing webhook subscription unit tests:', () => { }, ); - expect(mockResetService.resetWeek).toHaveBeenCalledTimes(1); - // Plan-only change (no period change) β†’ anchor is new Date() (current moment), NOT the - // billing-cycle start. Using newPeriodStart here would resolve to a past ISO week bucket. - expect(mockResetService.resetWeek).toHaveBeenCalledWith(orgId, expect.any(Date)); - const [, anchor] = mockResetService.resetWeek.mock.calls[0]; - // Anchor must NOT be the period start (which is in the past relative to the plan switch) - expect(anchor.getTime()).not.toBe(periodStart * 1000); + expect(mockResetService.forceRotateForPlanChange).toHaveBeenCalledTimes(1); + expect(mockResetService.forceRotateForPlanChange).toHaveBeenCalledWith( + orgId, + { preserveUsage: true }, + ); + expect(mockResetService.resetWeek).not.toHaveBeenCalled(); }); - test('fix #3571: plan change AND period_start change β€” resetWeek called exactly once (not twice)', async () => { + test('plan change AND period_start change β€” forceRotateForPlanChange called exactly once', async () => { const oldPeriodStart = 1700000000; const newPeriodStart = 1700604800; const existing = { _id: subId, organization: orgId }; @@ -253,13 +253,12 @@ describe('Billing webhook subscription unit tests:', () => { }, ); - // Plan-change reset triggers first; period-start reset is skipped to avoid double reset. - expect(mockResetService.resetWeek).toHaveBeenCalledTimes(1); - // When period also changed, anchor must be newPeriodStart (not now) - expect(mockResetService.resetWeek).toHaveBeenCalledWith( + expect(mockResetService.forceRotateForPlanChange).toHaveBeenCalledTimes(1); + expect(mockResetService.forceRotateForPlanChange).toHaveBeenCalledWith( orgId, - new Date(newPeriodStart * 1000), + { preserveUsage: true }, ); + expect(mockResetService.resetWeek).not.toHaveBeenCalled(); }); test('fix #3571: no plan change β€” resetWeek NOT called on same period_start', async () => { @@ -290,7 +289,7 @@ describe('Billing webhook subscription unit tests:', () => { expect(mockResetService.resetWeek).not.toHaveBeenCalled(); }); - test('fix #3571: plan upgrade Growthβ†’Pro β€” resetWeek called with current anchor (allows full Pro quota)', async () => { + test('plan upgrade Growthβ†’Pro β€” forceRotateForPlanChange preserves usage', async () => { const periodStart = 1700000000; const existing = { _id: subId, organization: orgId }; mockSubscriptionRepository.findByStripeSubscriptionId.mockResolvedValue(existing); @@ -316,14 +315,13 @@ describe('Billing webhook subscription unit tests:', () => { }, ); - expect(mockResetService.resetWeek).toHaveBeenCalledTimes(1); - const [calledOrg, calledAnchor] = mockResetService.resetWeek.mock.calls[0]; + expect(mockResetService.forceRotateForPlanChange).toHaveBeenCalledTimes(1); + const [calledOrg, options] = mockResetService.forceRotateForPlanChange.mock.calls[0]; expect(calledOrg).toBe(orgId); - // Anchor should be the new period start (Date object from current_period_start) - expect(calledAnchor).toBeInstanceOf(Date); + expect(options).toEqual({ preserveUsage: true }); }); - test('fix #3571: plan downgrade Proβ†’Growth β€” resetWeek called (meterUsed reset to 0)', async () => { + test('plan downgrade Proβ†’Growth β€” forceRotateForPlanChange called with preserveUsage=true', async () => { const periodStart = 1700000000; const existing = { _id: subId, organization: orgId }; mockSubscriptionRepository.findByStripeSubscriptionId.mockResolvedValue(existing); @@ -349,15 +347,19 @@ describe('Billing webhook subscription unit tests:', () => { }, ); - expect(mockResetService.resetWeek).toHaveBeenCalledTimes(1); + expect(mockResetService.forceRotateForPlanChange).toHaveBeenCalledTimes(1); + expect(mockResetService.forceRotateForPlanChange).toHaveBeenCalledWith( + orgId, + { preserveUsage: true }, + ); }); - test('fix #3571: resetWeek on plan change β€” errors do not throw (non-fatal)', async () => { + test('forceRotateForPlanChange errors do not throw (non-fatal)', async () => { const periodStart = 1700000000; const existing = { _id: subId, organization: orgId }; mockSubscriptionRepository.findByStripeSubscriptionId.mockResolvedValue(existing); mockSubscriptionRepository.update.mockResolvedValue({}); - mockResetService.resetWeek.mockRejectedValue(new Error('db unavailable')); + mockResetService.forceRotateForPlanChange.mockRejectedValue(new Error('db unavailable')); await expect( BillingWebhookService.handleSubscriptionUpdated( @@ -382,7 +384,7 @@ describe('Billing webhook subscription unit tests:', () => { ).resolves.not.toThrow(); }); - test('fix #3571: plan change with no newPeriodStart falls back to now for anchor', async () => { + test('plan change with no newPeriodStart still force rotates', async () => { const existing = { _id: subId, organization: orgId }; mockSubscriptionRepository.findByStripeSubscriptionId.mockResolvedValue(existing); mockSubscriptionRepository.update.mockResolvedValue({}); @@ -407,9 +409,11 @@ describe('Billing webhook subscription unit tests:', () => { }, ); - expect(mockResetService.resetWeek).toHaveBeenCalledTimes(1); - const [, anchor] = mockResetService.resetWeek.mock.calls[0]; - expect(anchor).toBeInstanceOf(Date); + expect(mockResetService.forceRotateForPlanChange).toHaveBeenCalledTimes(1); + expect(mockResetService.forceRotateForPlanChange).toHaveBeenCalledWith( + orgId, + { preserveUsage: true }, + ); }); test('should update currentPeriodStart in subscription when period_start is present', async () => { From 13219d4ffc637fc4500179b9f2ae737a271b644f Mon Sep 17 00:00:00 2001 From: Pierre Brisorgueil Date: Sat, 2 May 2026 21:31:10 +0200 Subject: [PATCH 2/4] fix(billing): guard markFailedAttempt exhaustion against concurrent cron race Add status:'pending' filter to both findOneAndUpdate calls in markFailedAttempt so concurrent K8s CronJob instances cannot double-emit billing.extras_debit.exhausted. The second cron that loses the status-flip race receives null back and skips emit. Add regression test for the concurrent-nil path. --- .../billing.meter.outbox.repository.js | 13 +++++++++---- .../tests/billing.meter.outbox.unit.tests.js | 19 +++++++++++++++++++ 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/modules/billing/repositories/billing.meter.outbox.repository.js b/modules/billing/repositories/billing.meter.outbox.repository.js index af0876801..a1fe28469 100644 --- a/modules/billing/repositories/billing.meter.outbox.repository.js +++ b/modules/billing/repositories/billing.meter.outbox.repository.js @@ -77,7 +77,10 @@ const markCommitted = (id) => /** * @function markFailedAttempt * @description Record a failed debit attempt. The fifth failed attempt exhausts - * the row and moves it to failed status. + * the row and moves it to failed status atomically. The status + * transition uses `{ status: 'pending' }` as a filter on the + * exhaustion update so that concurrent cron runs cannot emit + * duplicate exhausted events. * @param {string} id - Outbox row id. * @param {Error|string} error - Failure to record. * @returns {Promise} Updated outbox row after failure accounting. @@ -86,7 +89,7 @@ const markCommitted = (id) => const markFailedAttempt = async (id, error) => { const message = error?.message ?? String(error); const doc = await BillingMeterOutbox().findOneAndUpdate( - { _id: id }, + { _id: id, status: 'pending' }, { $inc: { attempts: 1 }, $set: { @@ -98,9 +101,11 @@ const markFailedAttempt = async (id, error) => { ).lean(); if (!doc) return null; - if (doc.attempts >= 5 && doc.status !== 'failed') { + if (doc.attempts >= 5) { + // Atomic exhaustion transition: filter on status:'pending' ensures only + // the first concurrent caller wins the status flip and owns the event emit. return BillingMeterOutbox().findOneAndUpdate( - { _id: id }, + { _id: id, status: 'pending' }, { $set: { status: 'failed' } }, { returnDocument: 'after' }, ).lean(); diff --git a/modules/billing/tests/billing.meter.outbox.unit.tests.js b/modules/billing/tests/billing.meter.outbox.unit.tests.js index f096a080f..ba0a33a7d 100644 --- a/modules/billing/tests/billing.meter.outbox.unit.tests.js +++ b/modules/billing/tests/billing.meter.outbox.unit.tests.js @@ -154,6 +154,8 @@ describe('BillingMeterOutbox unit tests:', () => { const result = await BillingMeterOutboxRepository.markFailedAttempt(outboxId, new Error('debit failed')); expect(mockModel.findOneAndUpdate).toHaveBeenCalledTimes(2); + // First call: increment with status:'pending' filter to guard against concurrent updates + expect(mockModel.findOneAndUpdate.mock.calls[0][0]).toEqual({ _id: outboxId, status: 'pending' }); expect(mockModel.findOneAndUpdate.mock.calls[0][1]).toEqual({ $inc: { attempts: 1 }, $set: { @@ -161,7 +163,24 @@ describe('BillingMeterOutbox unit tests:', () => { lastAttemptedAt: expect.any(Date), }, }); + // Second call: exhaustion transition also guarded by status:'pending' so only one cron wins + expect(mockModel.findOneAndUpdate.mock.calls[1][0]).toEqual({ _id: outboxId, status: 'pending' }); expect(result.status).toBe('failed'); }); + + test('markFailedAttempt returns null on exhaustion transition when concurrent cron already marked failed', async () => { + // Simulates second concurrent cron: first update finds pending row, increments to attempts=5, + // second update returns null because status is already 'failed' (first cron won the race). + const leanFirst = jest.fn().mockResolvedValue(makeOutbox({ attempts: 5 })); + const leanSecond = jest.fn().mockResolvedValue(null); + mockModel.findOneAndUpdate + .mockReturnValueOnce({ lean: leanFirst }) + .mockReturnValueOnce({ lean: leanSecond }); + + const result = await BillingMeterOutboxRepository.markFailedAttempt(outboxId, new Error('concurrent')); + + expect(mockModel.findOneAndUpdate).toHaveBeenCalledTimes(2); + expect(result).toBeNull(); + }); }); }); From 1e980f6c09ed6ff6871094a3fcafd158c453e2cc Mon Sep 17 00:00:00 2001 From: Pierre Brisorgueil Date: Sat, 2 May 2026 21:36:37 +0200 Subject: [PATCH 3/4] fix(billing): harden outbox + webhook concurrent-safety - crypto.randomInt for cron jitter (resolves Codacy critical) - markCommitted: add status:pending guard so committed/failed rows are immutable - emitExhausted: wrap billingEvents.emit in try/catch so listener throws cannot cause markFailedAttempt to double-count (Copilot thread 5) - webhook: forceRotateForPlanChange no longer suppresses resetWeek when period also changed (combined plan+period change now calls both, fixes Copilot thread 8) - tests: align to new markCommitted filter + combined plan+period webhook contract --- .../crons/retry-pending-extras-debit.cron.js | 3 ++- .../billing.meter.outbox.repository.js | 4 +++- .../services/billing.meter.outbox.service.js | 22 +++++++++++++------ .../services/billing.webhook.service.js | 13 +++++++++-- .../tests/billing.meter.outbox.unit.tests.js | 4 ++-- ...billing.webhook.subscription.unit.tests.js | 9 ++++++-- 6 files changed, 40 insertions(+), 15 deletions(-) diff --git a/modules/billing/crons/retry-pending-extras-debit.cron.js b/modules/billing/crons/retry-pending-extras-debit.cron.js index 09d455438..8d3214584 100644 --- a/modules/billing/crons/retry-pending-extras-debit.cron.js +++ b/modules/billing/crons/retry-pending-extras-debit.cron.js @@ -20,7 +20,8 @@ if (!config?.billing?.meterMode) { process.exit(0); } -const jitterMs = Math.floor(Math.random() * 60_000); +const { randomInt } = await import('node:crypto'); +const jitterMs = randomInt(0, 60_000); await new Promise((resolve) => setTimeout(resolve, jitterMs)); try { diff --git a/modules/billing/repositories/billing.meter.outbox.repository.js b/modules/billing/repositories/billing.meter.outbox.repository.js index a1fe28469..4df608a20 100644 --- a/modules/billing/repositories/billing.meter.outbox.repository.js +++ b/modules/billing/repositories/billing.meter.outbox.repository.js @@ -64,13 +64,15 @@ const findPendingDue = (thresholdMs = 5 * 60 * 1000, limit = 100) => { /** * @function markCommitted * @description Mark an outbox row as committed after a successful extras debit. + * The `status:'pending'` filter makes this idempotent: committed or + * failed rows are immutable and concurrent calls are no-ops. * @param {string} id - Outbox row id. * @returns {Promise} Mongo update result. */ // biome-ignore lint/correctness/useQwikValidLexicalScope: false positive β€” Node.js repository, not Qwik const markCommitted = (id) => BillingMeterOutbox().updateOne( - { _id: id }, + { _id: id, status: 'pending' }, { $set: { status: 'committed', lastError: null, lastAttemptedAt: new Date() } }, ); diff --git a/modules/billing/services/billing.meter.outbox.service.js b/modules/billing/services/billing.meter.outbox.service.js index d2eadf3a9..5b0158541 100644 --- a/modules/billing/services/billing.meter.outbox.service.js +++ b/modules/billing/services/billing.meter.outbox.service.js @@ -8,18 +8,26 @@ import billingEvents from '../lib/events.js'; /** * @function emitExhausted * @description Emit the alert event for an outbox row that exhausted retries. + * Listener exceptions are swallowed so observer failures cannot affect + * retry accounting (markFailedAttempt must not be called a second time + * due to a misbehaving listener). * @param {Object} row - Original pending outbox row. * @param {Object} updated - Updated failed outbox row. * @returns {void} */ const emitExhausted = (row, updated) => { - billingEvents.emit('billing.extras_debit.exhausted', { - organizationId: String(row.organizationId), - idempotencyKey: row.idempotencyKey, - extrasUnits: row.extrasUnits, - attempts: updated.attempts, - lastError: updated.lastError, - }); + try { + billingEvents.emit('billing.extras_debit.exhausted', { + organizationId: String(row.organizationId), + idempotencyKey: row.idempotencyKey, + extrasUnits: row.extrasUnits, + attempts: updated.attempts, + lastError: updated.lastError, + }); + } catch (evtErr) { + // Listener errors must not disrupt outbox retry accounting β€” log for traceability + console.error('[billing.outbox] billing.extras_debit.exhausted listener error (non-fatal):', evtErr?.message ?? evtErr); + } }; /** diff --git a/modules/billing/services/billing.webhook.service.js b/modules/billing/services/billing.webhook.service.js index 04bb49382..340ee4983 100644 --- a/modules/billing/services/billing.webhook.service.js +++ b/modules/billing/services/billing.webhook.service.js @@ -253,9 +253,16 @@ const handleSubscriptionUpdated = async (subscription, event) => { // Plan switch mid-cycle = refresh the active week snapshot to the new plan. // Unlike cron-driven resetWeek, this preserves meterUsed by default so a plan // change does not refund or double-charge already attributed usage. + // Only mark planChangeResetTriggered when the period did NOT also change: + // when period AND plan change simultaneously (e.g. annualβ†’monthly on renewal), + // resetWeek(newPeriodStart) must still run to archive the old week. + const periodAlsoChanged = + previousPeriodStart !== undefined && + subscription.current_period_start !== previousPeriodStart && + newPeriodStart; try { await BillingResetService.forceRotateForPlanChange(organizationId, { preserveUsage: true }); - planChangeResetTriggered = true; + planChangeResetTriggered = !periodAlsoChanged; } catch (err) { // Log for monitoring β€” not thrown so webhook processing continues console.error('[billing.webhook] forceRotateForPlanChange failed (non-fatal):', err?.message ?? err); @@ -263,7 +270,9 @@ const handleSubscriptionUpdated = async (subscription, event) => { } } - // Detect period start change β€” trigger weekly meter reset (only when not already triggered by plan change) + // Detect period start change β€” trigger weekly meter reset (only when not already triggered by plan change). + // Also runs when plan changed AND period changed simultaneously: forceRotateForPlanChange refreshes the + // snapshot but does not archive the old week; resetWeek handles the week rollover. if ( !planChangeResetTriggered && previousPeriodStart !== undefined && diff --git a/modules/billing/tests/billing.meter.outbox.unit.tests.js b/modules/billing/tests/billing.meter.outbox.unit.tests.js index ba0a33a7d..63caba6f4 100644 --- a/modules/billing/tests/billing.meter.outbox.unit.tests.js +++ b/modules/billing/tests/billing.meter.outbox.unit.tests.js @@ -127,13 +127,13 @@ describe('BillingMeterOutbox unit tests:', () => { expect(lean).toHaveBeenCalled(); }); - test('markCommitted sets committed status', async () => { + test('markCommitted sets committed status and is idempotent via status filter', async () => { mockModel.updateOne.mockResolvedValue({ modifiedCount: 1 }); await BillingMeterOutboxRepository.markCommitted(outboxId); expect(mockModel.updateOne).toHaveBeenCalledWith( - { _id: outboxId }, + { _id: outboxId, status: 'pending' }, { $set: { status: 'committed', diff --git a/modules/billing/tests/billing.webhook.subscription.unit.tests.js b/modules/billing/tests/billing.webhook.subscription.unit.tests.js index 286e70d26..b680e77af 100644 --- a/modules/billing/tests/billing.webhook.subscription.unit.tests.js +++ b/modules/billing/tests/billing.webhook.subscription.unit.tests.js @@ -225,7 +225,10 @@ describe('Billing webhook subscription unit tests:', () => { expect(mockResetService.resetWeek).not.toHaveBeenCalled(); }); - test('plan change AND period_start change β€” forceRotateForPlanChange called exactly once', async () => { + test('plan change AND period_start change β€” forceRotateForPlanChange AND resetWeek both called', async () => { + // Combined plan+period change (e.g. annualβ†’monthly on renewal): + // forceRotateForPlanChange refreshes quota snapshot; resetWeek archives the old week. + // planChangeResetTriggered must NOT suppress resetWeek when period also changed. const oldPeriodStart = 1700000000; const newPeriodStart = 1700604800; const existing = { _id: subId, organization: orgId }; @@ -258,7 +261,9 @@ describe('Billing webhook subscription unit tests:', () => { orgId, { preserveUsage: true }, ); - expect(mockResetService.resetWeek).not.toHaveBeenCalled(); + // resetWeek must also run to archive the old week on the period rollover + expect(mockResetService.resetWeek).toHaveBeenCalledTimes(1); + expect(mockResetService.resetWeek).toHaveBeenCalledWith(orgId, new Date(newPeriodStart * 1000)); }); test('fix #3571: no plan change β€” resetWeek NOT called on same period_start', async () => { From 69be3fccb6a48fdb6ce52c7d81fcf39dba6656c2 Mon Sep 17 00:00:00 2001 From: Pierre Brisorgueil Date: Sat, 2 May 2026 21:40:33 +0200 Subject: [PATCH 4/4] fix(billing): cron exit code + alertedAt flags on clean-break rotation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - cron: exhausted rows are a handled business outcome β€” exit 0, warn instead of exit 1 so K8s CronJob does not treat normal reconciliation as an infrastructure failure - repository: rotateWeekSnapshotForPlanChange with preserveUsage=false now also clears alertedAt80/alertedAt100 so the new quota window can trigger threshold alerts again --- modules/billing/crons/retry-pending-extras-debit.cron.js | 7 ++++++- modules/billing/repositories/billing.usage.repository.js | 3 +++ .../billing/tests/billing.usage.repository.unit.tests.js | 2 ++ 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/modules/billing/crons/retry-pending-extras-debit.cron.js b/modules/billing/crons/retry-pending-extras-debit.cron.js index 8d3214584..98f40278e 100644 --- a/modules/billing/crons/retry-pending-extras-debit.cron.js +++ b/modules/billing/crons/retry-pending-extras-debit.cron.js @@ -34,7 +34,12 @@ try { console.log( `[billing.retryPendingExtrasDebit] done β€” scanned: ${result.scanned}, committed: ${result.committed}, failedAttempts: ${result.failedAttempts}, exhausted: ${result.exhausted}`, ); - process.exitCode = result.exhausted > 0 ? 1 : 0; + if (result.exhausted > 0) { + // Exhausted rows are a handled business outcome (alert event already emitted), + // not an operational cron failure β€” log for visibility without failing the job. + console.warn(`[billing.retryPendingExtrasDebit] exhausted rows (alert emitted): ${result.exhausted}`); + } + process.exitCode = 0; } catch (err) { console.error('[billing.retryPendingExtrasDebit] fatal:', err); process.exitCode = 1; diff --git a/modules/billing/repositories/billing.usage.repository.js b/modules/billing/repositories/billing.usage.repository.js index 75c0f5cae..a98a1b267 100644 --- a/modules/billing/repositories/billing.usage.repository.js +++ b/modules/billing/repositories/billing.usage.repository.js @@ -251,6 +251,9 @@ const rotateWeekSnapshotForPlanChange = (orgId, weekKey, snapshotFields, preserv if (!preserveUsage) { update.$set.meterUsed = 0; update.$set.meterBreakdown = {}; + // Clear threshold flags so the new quota window can trigger alerts again. + update.$set.alertedAt80 = null; + update.$set.alertedAt100 = null; } return BillingUsage.findOneAndUpdate( diff --git a/modules/billing/tests/billing.usage.repository.unit.tests.js b/modules/billing/tests/billing.usage.repository.unit.tests.js index 5706f9378..eabfdcf76 100644 --- a/modules/billing/tests/billing.usage.repository.unit.tests.js +++ b/modules/billing/tests/billing.usage.repository.unit.tests.js @@ -407,6 +407,8 @@ describe('BillingUsageRepository β€” meter extensions unit tests:', () => { month: '2026-05', meterUsed: 0, meterBreakdown: {}, + alertedAt80: null, + alertedAt100: null, }); }); });