Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions modules/billing/RUNBOOKS.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ Operational runbooks for the billing module. Each runbook references real endpoi

## 2 — Dead-Letter Investigation

**Context**: Stripe webhook events that fail processing 3+ times (or where the idempotency guard fires on a poisoned payload) are marked `deadLetter: true` in `processedStripeEvents`. They accumulate and must be reviewed manually — partial TTL index excludes them from auto-expiry.
**Context**: Stripe webhook events that fail processing 5+ times (or where the idempotency guard fires on a poisoned payload) are marked `deadLetter: true` in `processedStripeEvents`. They accumulate and must be reviewed manually — partial TTL index excludes them from auto-expiry.

**Steps**:

Expand Down Expand Up @@ -123,7 +123,7 @@ Operational runbooks for the billing module. Each runbook references real endpoi

```text
PATCH /api/admin/billing/plans/bump
Body: { "orgId": "...", "planId": "pro", "reason": "manual reconciliation post-mismatch" }
Body: { "orgId": "...", "planId": "pro" }
```

5. Re-run `GET /api/admin/billing/customer/:orgId` to confirm `stripeSnapshot` and `dbSnapshot` now match.
Expand Down
10 changes: 5 additions & 5 deletions modules/billing/billing.init.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export default async (app) => {
if (config.billing?.packs?.length) {
for (const pack of config.billing.packs) {
if (typeof pack.priceUsd !== 'number' || pack.priceUsd <= 0) {
console.warn(`[billing] pack '${pack.packId}' missing valid priceUsd; refundPartial fallback will be inaccurate`);
logger.warn(`[billing] pack '${pack.packId}' missing valid priceUsd; refundPartial fallback will be inaccurate`);
}
}
}
Expand All @@ -33,7 +33,7 @@ export default async (app) => {
const SUPPORTED_THRESHOLD_PERCENTS = new Set([80, 100]);
for (const threshold of getAlertThresholdPercents()) {
if (!SUPPORTED_THRESHOLD_PERCENTS.has(threshold)) {
console.warn(
logger.warn(
`[billing] Configured alert threshold ${threshold}% is not in schema-supported set [80, 100] — alert will be silently skipped`,
);
}
Expand All @@ -45,7 +45,7 @@ export default async (app) => {
try {
AnalyticsService.groupIdentify('company', String(organizationId), { plan: newPlan });
} catch (err) {
console.warn('[billing] analytics groupIdentify failed (non-fatal):', err?.message ?? err);
logger.warn('[billing] analytics groupIdentify failed (non-fatal)', { error: err?.message ?? String(err) });
}
});

Expand Down Expand Up @@ -131,11 +131,11 @@ export default async (app) => {
const distinctPlans = await Subscription.distinct('plan');
for (const plan of distinctPlans) {
if (!knownPlans.has(plan)) {
console.warn(`[billing] Subscription.plan value "${plan}" not in planDefinitions — orphaned plan, may resolve quota=0`);
logger.warn(`[billing] Subscription.plan value "${plan}" not in planDefinitions — orphaned plan, may resolve quota=0`);
}
}
} catch (err) {
console.warn('[billing] Subscription.plan boot validator failed (non-fatal):', err?.message ?? err);
logger.warn('[billing] Subscription.plan boot validator failed (non-fatal)', { error: err?.message ?? String(err) });
}
}
};
33 changes: 27 additions & 6 deletions modules/billing/controllers/billing.admin.controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import responses from '../../../lib/helpers/responses.js';
import getStripe from '../lib/stripe.js';
import BillingAdminService from '../services/billing.admin.service.js';
import { AdminDeadLettersQuery } from '../models/billing.subscription.schema.js';
import { AdminDeadLettersQuery, AdminOrgIdParam, AdminEventIdParam } from '../models/billing.subscription.schema.js';
import logger from '../../../lib/services/logger.js';

/**
Expand Down Expand Up @@ -106,7 +106,11 @@ const adminBumpPlan = async (req, res) => {
// biome-ignore lint/correctness/useQwikValidLexicalScope: false positive — Node.js controller, not Qwik
const adminGetCustomerStatus = async (req, res) => {
try {
const { orgId } = req.params;
const parsed = AdminOrgIdParam.safeParse(req.params);
if (!parsed.success) {
return responses.error(res, 422, 'Unprocessable Entity', 'Invalid path parameters')(parsed.error);
}
const { orgId } = parsed.data;
const result = await BillingAdminService.getCustomerStatus(orgId);
return responses.success(res, 'customer status')(result);
} catch (err) {
Expand All @@ -126,7 +130,11 @@ const adminGetCustomerStatus = async (req, res) => {
// biome-ignore lint/correctness/useQwikValidLexicalScope: false positive — Node.js controller, not Qwik
const adminSyncFromStripe = async (req, res) => {
try {
const { orgId } = req.params;
const parsed = AdminOrgIdParam.safeParse(req.params);
if (!parsed.success) {
return responses.error(res, 422, 'Unprocessable Entity', 'Invalid path parameters')(parsed.error);
}
const { orgId } = parsed.data;
const result = await BillingAdminService.syncOrgFromStripe(orgId);
return responses.success(res, 'subscription synced from Stripe')(result);
} catch (err) {
Expand Down Expand Up @@ -190,7 +198,11 @@ const adminListDeadLetters = async (req, res) => {
// biome-ignore lint/correctness/useQwikValidLexicalScope: false positive — Node.js controller, not Qwik
const adminPurgeDeadLetter = async (req, res) => {
try {
const { eventId } = req.params;
const parsed = AdminEventIdParam.safeParse(req.params);
if (!parsed.success) {
return responses.error(res, 422, 'Unprocessable Entity', 'Invalid path parameters')(parsed.error);
}
const { eventId } = parsed.data;
const result = await BillingAdminService.purgeDeadLetter(eventId);
return responses.success(res, 'dead letter purged')(result);
} catch (err) {
Expand All @@ -211,7 +223,11 @@ const adminPurgeDeadLetter = async (req, res) => {
// biome-ignore lint/correctness/useQwikValidLexicalScope: false positive — Node.js controller, not Qwik
const adminCancelSubscription = async (req, res) => {
try {
const { orgId } = req.params;
const parsed = AdminOrgIdParam.safeParse(req.params);
if (!parsed.success) {
return responses.error(res, 422, 'Unprocessable Entity', 'Invalid path parameters')(parsed.error);
}
const { orgId } = parsed.data;
const result = await BillingAdminService.cancelSubscription(orgId);
return responses.success(res, 'subscription canceled')(result);
} catch (err) {
Expand All @@ -232,8 +248,13 @@ const adminCancelSubscription = async (req, res) => {
// biome-ignore lint/correctness/useQwikValidLexicalScope: false positive — Node.js controller, not Qwik
const adminDisputeCredit = async (req, res) => {
try {
const parsedParams = AdminOrgIdParam.safeParse(req.params);
if (!parsedParams.success) {
return responses.error(res, 422, 'Unprocessable Entity', 'Invalid path parameters')(parsedParams.error);
}
const { orgId } = parsedParams.data;

const { chargeId, amountCents, reason, refundRequestId } = req.body;
const { orgId } = req.params;

const rawAdminId = req.user?._id;
if (!rawAdminId) {
Expand Down
17 changes: 17 additions & 0 deletions modules/billing/models/billing.subscription.schema.js
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,27 @@ const AdminDisputeCreditRequest = z
})
.strict();

/**
* Path parameter schemas for admin routes.
* orgId: MongoDB ObjectId (24 hex chars, case-insensitive).
* eventId: Stripe event ID (evt_ prefix).
*/
const AdminOrgIdParam = z.object({
orgId: z.string().regex(/^[a-f0-9]{24}$/i, 'orgId must be a valid ObjectId'),
});

const AdminEventIdParam = z.object({
eventId: z.string().regex(/^evt_/, 'eventId must be a Stripe event ID (evt_...)'),
});

export {
AdminRefundRequest,
AdminBumpPlanRequest,
AdminWebhookReplayRequest,
AdminDeadLettersQuery,
AdminDisputeCreditRequest,
AdminOrgIdParam,
AdminEventIdParam,
};

export default {
Expand All @@ -165,4 +180,6 @@ export default {
AdminWebhookReplayRequest,
AdminDeadLettersQuery,
AdminDisputeCreditRequest,
AdminOrgIdParam,
AdminEventIdParam,
};
22 changes: 13 additions & 9 deletions modules/billing/repositories/billing.subscription.repository.js
Original file line number Diff line number Diff line change
Expand Up @@ -312,26 +312,30 @@ const adminUpdatePlanOnly = (id, planId, adminUserId) => {

/**
* Fetch one page of subscriptions matching given statuses for reconciliation.
* Used by the billing reconcile service (replaces direct mongoose.model() access there).
* Uses cursor-based pagination (_id > lastSeenId) instead of skip+limit for stability:
* skip offsets shift when new documents are inserted mid-run, causing silent skips or
* double-processing. The _id cursor is monotonically increasing and unaffected by inserts.
* @param {string[]} statuses - Subscription statuses to include.
* @param {number} page - 0-based page index.
* @param {string|null} lastSeenId - ObjectId string of the last document from the previous page, or null for first page.
* @param {number} limit - Page size.
* @returns {Promise<Object[]>} Lean subscription documents.
*/
// biome-ignore lint/correctness/useQwikValidLexicalScope: false positive — Node.js repository, not Qwik
const findPageForReconciliation = (statuses, page, limit) =>
Subscription.find(
{
status: { $in: statuses },
stripeSubscriptionId: { $exists: true, $ne: null },
},
const findPageForReconciliation = (statuses, lastSeenId, limit) => {
const filter = {
status: { $in: statuses },
stripeSubscriptionId: { $exists: true, $ne: null },
...(lastSeenId ? { _id: { $gt: new mongoose.Types.ObjectId(lastSeenId) } } : {}),
};
return Subscription.find(
filter,
{ _id: 1, organization: 1, stripeSubscriptionId: 1, stripeCustomerId: 1, plan: 1, status: 1, currentPeriodStart: 1 },
)
.sort({ _id: 1 })
.skip(page * limit)
.limit(limit)
.lean()
.exec();
};

export default {
list,
Expand Down
21 changes: 12 additions & 9 deletions modules/billing/services/billing.reconcile.service.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ const runReconciliation = async () => {
let checked = 0;
let divergences = 0;
let errors = 0;
let page = 0;
let lastSeenId = null;

let hasMore = true;
while (hasMore) {
// Paginate via skip+limit — for large collections consider cursor-based pagination,
// but skip is acceptable for ops crons running at low frequency.
const subs = await _fetchPage(SubscriptionRepository, page, RECONCILE_PAGE_SIZE);
// Cursor-based pagination via _id > lastSeen — stable across new subscriptions inserted
// mid-run (skip+limit would shift offsets and silently skip or double-process docs).
const subs = await _fetchPage(SubscriptionRepository, lastSeenId, RECONCILE_PAGE_SIZE);
if (!subs || subs.length === 0) break;

for (const sub of subs) {
Expand All @@ -93,7 +93,7 @@ const runReconciliation = async () => {
if (subs.length < RECONCILE_PAGE_SIZE) {
hasMore = false;
} else {
page += 1;
lastSeenId = String(subs[subs.length - 1]._id);
}
}

Expand All @@ -102,17 +102,20 @@ const runReconciliation = async () => {
};

/**
* Fetch one page of active|past_due subscriptions.
* Fetch one page of active|past_due subscriptions using cursor-based pagination.
* Passing lastSeenId advances the cursor to the next page; null fetches the first page.
* Cursor approach is stable across new subscription inserts during a reconcile run —
* skip+limit would shift offsets and silently skip or double-process documents.
* @param {Object} SubscriptionRepository - Subscription repository.
* @param {number} page - 0-based page index.
* @param {string|null} lastSeenId - ObjectId string of the last document from the previous page, or null for first page.
* @param {number} limit - Page size.
* @returns {Promise<Array>}
*/
// biome-ignore lint/correctness/useQwikValidLexicalScope: false positive — Node.js service, not Qwik
const _fetchPage = async (SubscriptionRepository, page, limit) => {
const _fetchPage = async (SubscriptionRepository, lastSeenId, limit) => {
return SubscriptionRepository.findPageForReconciliation(
RECONCILE_STATUSES,
page,
lastSeenId,
limit,
);
};
Expand Down
18 changes: 14 additions & 4 deletions modules/billing/services/billing.webhook.service.js
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,17 @@ const handleCheckoutPaymentCompleted = async (session) => {
if (!organizationId || !mongoose.Types.ObjectId.isValid(organizationId)) return;
if (!packId) return;

await BillingExtraService.creditPack(organizationId, packId, stripeSessionId);
const result = await BillingExtraService.creditPack(organizationId, packId, stripeSessionId);

// Observability: log when creditPack returns applied=false (duplicate session detected).
// Debug level — not an error, but useful to detect phantom Stripe redeliveries in prod.
if (result && !result.applied) {
logger.debug('[billing.webhook] creditPack duplicate session detected — idempotency guard fired', {
organizationId,
packId,
stripeSessionId,
});
}

// Backfill PaymentIntent metadata with the real session ID so that charge.refunded
// events can correlate the charge back to this ledger entry.
Expand Down Expand Up @@ -1116,9 +1126,9 @@ const handleChargeDisputeFundsReinstated = async (dispute, event) => {
return;
}

// funds_reinstated is good news (dispute was won back) — log as warn, not error.
// The alert is for ops to apply a manual ledger credit via the admin endpoint.
logger.warn('[billing.webhook] dispute.funds_reinstated received — use POST /api/admin/billing/dispute/credit/:orgId to restore the extras balance', {
// funds_reinstated requires immediate manual action (apply ledger credit via admin endpoint)
// — log as error so this surfaces in error-level alerting dashboards, not buried in warn noise.
logger.error('[billing.webhook] dispute.funds_reinstated received — ACTION REQUIRED: use POST /api/admin/billing/dispute/credit/:orgId to restore the extras balance', {
disputeId,
chargeId,
amount,
Expand Down
Loading
Loading