-
-
Notifications
You must be signed in to change notification settings - Fork 10
fix(billing): plan-change forced reset + atomic usage+extras outbox #3582
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
3102ee5
fix(billing): forceRotateForPlanChange + atomic usage+extras outbox
PierreBrisorgueil 13219d4
fix(billing): guard markFailedAttempt exhaustion against concurrent cβ¦
PierreBrisorgueil 1e980f6
fix(billing): harden outbox + webhook concurrent-safety
PierreBrisorgueil 69be3fc
fix(billing): cron exit code + alertedAt flags on clean-break rotation
PierreBrisorgueil File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| /** | ||
| * Cron script β retry pending extras debits from the meter outbox. | ||
| * | ||
| * No-op when config.billing.meterMode === false (default). | ||
| * Intended to run as a Kubernetes CronJob every 5 minutes. | ||
| * | ||
| * Usage: | ||
| * NODE_ENV=production node modules/billing/crons/retry-pending-extras-debit.cron.js | ||
| */ | ||
|
|
||
| process.env.NODE_ENV = process.env.NODE_ENV || 'development'; | ||
|
|
||
| const [{ default: config }, { default: mongooseService }] = await Promise.all([ | ||
| import('../../../config/index.js'), | ||
| import('../../../lib/services/mongoose.js'), | ||
| ]); | ||
|
|
||
| if (!config?.billing?.meterMode) { | ||
| console.log('[billing.retryPendingExtrasDebit] meterMode disabled β skipping.'); | ||
| process.exit(0); | ||
| } | ||
|
|
||
| const { randomInt } = await import('node:crypto'); | ||
| const jitterMs = randomInt(0, 60_000); | ||
| await new Promise((resolve) => setTimeout(resolve, jitterMs)); | ||
|
|
||
| try { | ||
| await mongooseService.loadModels(); | ||
| await mongooseService.connect(); | ||
|
|
||
| const { default: BillingMeterOutboxService } = await import('../services/billing.meter.outbox.service.js'); | ||
| const result = await BillingMeterOutboxService.retryPendingExtrasDebits(5 * 60 * 1000, 100); | ||
|
|
||
| console.log( | ||
| `[billing.retryPendingExtrasDebit] done β scanned: ${result.scanned}, committed: ${result.committed}, failedAttempts: ${result.failedAttempts}, exhausted: ${result.exhausted}`, | ||
| ); | ||
| if (result.exhausted > 0) { | ||
| // Exhausted rows are a handled business outcome (alert event already emitted), | ||
| // not an operational cron failure β log for visibility without failing the job. | ||
| console.warn(`[billing.retryPendingExtrasDebit] exhausted rows (alert emitted): ${result.exhausted}`); | ||
| } | ||
| process.exitCode = 0; | ||
| } catch (err) { | ||
| console.error('[billing.retryPendingExtrasDebit] fatal:', err); | ||
| process.exitCode = 1; | ||
| } finally { | ||
| await mongooseService.disconnect?.(); | ||
| } | ||
| process.exit(process.exitCode ?? 0); | ||
|
PierreBrisorgueil marked this conversation as resolved.
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
40 changes: 40 additions & 0 deletions
40
modules/billing/models/billing.meter.outbox.model.mongoose.js
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| /** | ||
| * Module dependencies | ||
| */ | ||
| import mongoose from 'mongoose'; | ||
|
|
||
| const Schema = mongoose.Schema; | ||
|
|
||
| /** | ||
| * Meter outbox model. | ||
| * | ||
| * Stores deferred extras debits created after meter usage crosses plan quota. | ||
| * Pending rows are retried by the billing retry-pending-extras-debit cron. | ||
| */ | ||
| const BillingMeterOutboxMongoose = new Schema({ | ||
| organizationId: { type: Schema.ObjectId, required: true, index: true }, | ||
| idempotencyKey: { type: String, required: true, unique: true }, | ||
| extrasUnits: { type: Number, required: true }, | ||
| status: { type: String, enum: ['pending', 'committed', 'failed'], default: 'pending', index: true }, | ||
| attempts: { type: Number, default: 0 }, | ||
| lastError: { type: String, default: null }, | ||
| lastAttemptedAt: { type: Date, default: null }, | ||
| createdAt: { type: Date, default: () => new Date() }, | ||
| }); | ||
|
|
||
| BillingMeterOutboxMongoose.index({ status: 1, lastAttemptedAt: 1 }); | ||
|
|
||
| /** | ||
| * Returns the hex string representation of the document ObjectId. | ||
| * @returns {string} Hex string of the ObjectId. | ||
| */ | ||
| function addID() { | ||
| return this._id.toHexString(); | ||
| } | ||
|
|
||
| BillingMeterOutboxMongoose.virtual('id').get(addID); | ||
| BillingMeterOutboxMongoose.set('toJSON', { | ||
| virtuals: true, | ||
| }); | ||
|
|
||
| mongoose.model('BillingMeterOutbox', BillingMeterOutboxMongoose); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| /** | ||
| * Module dependencies | ||
| */ | ||
| import { z } from 'zod'; | ||
|
|
||
| /** | ||
| * BillingMeterOutbox Zod schema β mirrors billing.meter.outbox.model.mongoose.js | ||
| */ | ||
|
|
||
| const objectIdRegex = /^[a-f\d]{24}$/i; | ||
|
|
||
| const BillingMeterOutboxStatus = z.enum(['pending', 'committed', 'failed']); | ||
|
|
||
| const BillingMeterOutbox = z.object({ | ||
| organizationId: z.string().trim().regex(objectIdRegex, 'organizationId must be a valid ObjectId'), | ||
| idempotencyKey: z.string().trim().min(1, 'idempotencyKey is required'), | ||
| extrasUnits: z.number().int().min(1, 'extrasUnits must be >= 1'), | ||
| status: BillingMeterOutboxStatus.default('pending'), | ||
| attempts: z.number().int().min(0).default(0), | ||
| lastError: z.string().nullable().default(null), | ||
| lastAttemptedAt: z.coerce.date().nullable().default(null), | ||
| createdAt: z.coerce.date().optional(), | ||
| }); | ||
|
|
||
| const BillingMeterOutboxCreate = z.object({ | ||
| organizationId: z.string().trim().regex(objectIdRegex, 'organizationId must be a valid ObjectId'), | ||
| idempotencyKey: z.string().trim().min(1, 'idempotencyKey is required'), | ||
| extrasUnits: z.number().int().min(1, 'extrasUnits must be >= 1'), | ||
| }); | ||
|
|
||
| export default { | ||
| BillingMeterOutboxStatus, | ||
| BillingMeterOutbox, | ||
| BillingMeterOutboxCreate, | ||
| }; |
123 changes: 123 additions & 0 deletions
123
modules/billing/repositories/billing.meter.outbox.repository.js
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,123 @@ | ||
| /** | ||
| * Module dependencies | ||
| */ | ||
| import mongoose from 'mongoose'; | ||
|
|
||
| /** | ||
| * @function BillingMeterOutbox | ||
| * @description Lazily resolves the BillingMeterOutbox Mongoose model. | ||
| * Deferred to keep unit tests importable before model registration. | ||
| * @returns {import('mongoose').Model} The registered BillingMeterOutbox model. | ||
| */ | ||
| // biome-ignore lint/correctness/useQwikValidLexicalScope: false positive β Node.js repository, not Qwik | ||
| const BillingMeterOutbox = () => mongoose.model('BillingMeterOutbox'); | ||
|
|
||
| /** | ||
| * @function create | ||
| * @description Insert a pending outbox row for a deferred extras debit. | ||
| * @param {Object} payload - Outbox row fields. | ||
| * @param {string} payload.organizationId - Organization ObjectId. | ||
| * @param {string} payload.idempotencyKey - Usage attribution idempotency key. | ||
| * @param {number} payload.extrasUnits - Extras units to debit. | ||
| * @param {Object} [options={}] - Optional write options. | ||
| * @param {import('mongoose').ClientSession} [options.session] - Optional Mongo session. | ||
| * @returns {Promise<Object>} Inserted outbox document. | ||
| */ | ||
| // biome-ignore lint/correctness/useQwikValidLexicalScope: false positive β Node.js repository, not Qwik | ||
| const create = async ({ organizationId, idempotencyKey, extrasUnits }, options = {}) => { | ||
| const docs = await BillingMeterOutbox().create( | ||
| [{ | ||
| organizationId, | ||
| idempotencyKey, | ||
| extrasUnits, | ||
| status: 'pending', | ||
| }], | ||
| options.session ? { session: options.session } : undefined, | ||
| ); | ||
| return docs[0]; | ||
| }; | ||
|
|
||
| /** | ||
| * @function findPendingDue | ||
| * @description Return pending outbox rows whose last attempt is due for retry. | ||
| * Rows with lastAttemptedAt=null are due immediately. | ||
| * @param {number} [thresholdMs=300000] - Retry backoff threshold in milliseconds. | ||
| * @param {number} [limit=100] - Maximum rows to return. | ||
| * @returns {Promise<Object[]>} Pending due outbox rows. | ||
| */ | ||
| // biome-ignore lint/correctness/useQwikValidLexicalScope: false positive β Node.js repository, not Qwik | ||
| const findPendingDue = (thresholdMs = 5 * 60 * 1000, limit = 100) => { | ||
| const dueBefore = new Date(Date.now() - thresholdMs); | ||
| return BillingMeterOutbox() | ||
| .find({ | ||
| status: 'pending', | ||
| $or: [ | ||
| { lastAttemptedAt: null }, | ||
| { lastAttemptedAt: { $lt: dueBefore } }, | ||
| ], | ||
| }) | ||
| .sort({ lastAttemptedAt: 1, createdAt: 1 }) | ||
| .limit(limit) | ||
| .lean(); | ||
| }; | ||
|
|
||
| /** | ||
| * @function markCommitted | ||
| * @description Mark an outbox row as committed after a successful extras debit. | ||
| * The `status:'pending'` filter makes this idempotent: committed or | ||
| * failed rows are immutable and concurrent calls are no-ops. | ||
| * @param {string} id - Outbox row id. | ||
| * @returns {Promise<Object>} Mongo update result. | ||
| */ | ||
| // biome-ignore lint/correctness/useQwikValidLexicalScope: false positive β Node.js repository, not Qwik | ||
| const markCommitted = (id) => | ||
| BillingMeterOutbox().updateOne( | ||
| { _id: id, status: 'pending' }, | ||
| { $set: { status: 'committed', lastError: null, lastAttemptedAt: new Date() } }, | ||
| ); | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
|
|
||
| /** | ||
| * @function markFailedAttempt | ||
| * @description Record a failed debit attempt. The fifth failed attempt exhausts | ||
| * the row and moves it to failed status atomically. The status | ||
| * transition uses `{ status: 'pending' }` as a filter on the | ||
| * exhaustion update so that concurrent cron runs cannot emit | ||
| * duplicate exhausted events. | ||
| * @param {string} id - Outbox row id. | ||
| * @param {Error|string} error - Failure to record. | ||
| * @returns {Promise<Object|null>} Updated outbox row after failure accounting. | ||
| */ | ||
| // biome-ignore lint/correctness/useQwikValidLexicalScope: false positive β Node.js repository, not Qwik | ||
| const markFailedAttempt = async (id, error) => { | ||
|
PierreBrisorgueil marked this conversation as resolved.
|
||
| const message = error?.message ?? String(error); | ||
| const doc = await BillingMeterOutbox().findOneAndUpdate( | ||
| { _id: id, status: 'pending' }, | ||
| { | ||
| $inc: { attempts: 1 }, | ||
| $set: { | ||
| lastError: message, | ||
| lastAttemptedAt: new Date(), | ||
| }, | ||
| }, | ||
| { returnDocument: 'after' }, | ||
| ).lean(); | ||
|
PierreBrisorgueil marked this conversation as resolved.
|
||
|
|
||
| if (!doc) return null; | ||
| if (doc.attempts >= 5) { | ||
| // Atomic exhaustion transition: filter on status:'pending' ensures only | ||
| // the first concurrent caller wins the status flip and owns the event emit. | ||
| return BillingMeterOutbox().findOneAndUpdate( | ||
| { _id: id, status: 'pending' }, | ||
| { $set: { status: 'failed' } }, | ||
| { returnDocument: 'after' }, | ||
| ).lean(); | ||
| } | ||
| return doc; | ||
| }; | ||
|
|
||
| export default { | ||
| create, | ||
| findPendingDue, | ||
| markCommitted, | ||
| markFailedAttempt, | ||
| }; | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.