Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion apps/sim/app/api/billing/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
members: rawBillingData.members.map((m) => ({
...m,
joinedAt: m.joinedAt.toISOString(),
lastActive: m.lastActive?.toISOString() || null,
Comment thread
icecrasher321 marked this conversation as resolved.
})),
}

Expand Down
7 changes: 6 additions & 1 deletion apps/sim/app/api/knowledge/utils.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,12 @@ describe('Knowledge Utils', () => {
{}
)

expect(dbOps.order).toEqual(['insert', 'updateDoc'])
// Embeddings are inserted first, then the document counter update. A
// usage_log billing insert (recordUsage) may trail after updateDoc and is
// irrelevant to this ordering invariant, so assert position rather than
// exact array equality.
expect(dbOps.order[0]).toBe('insert')
expect(dbOps.order.indexOf('updateDoc')).toBeGreaterThan(0)

expect(dbOps.updatePayloads[0]).toMatchObject({
processingStatus: 'completed',
Expand Down
15 changes: 12 additions & 3 deletions apps/sim/app/api/logs/execution/[executionId]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { executionIdParamsSchema } from '@/lib/api/contracts/logs'
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
import { generateRequestId } from '@/lib/core/utils/request'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
import { materializeExecutionData } from '@/lib/logs/execution/trace-store'
import type { TraceSpan, WorkflowExecutionLog } from '@/lib/logs/types'

const logger = createLogger('LogsByExecutionIdAPI')
Expand All @@ -39,13 +40,14 @@ export const GET = withRouteHandler(
.select({
id: workflowExecutionLogs.id,
workflowId: workflowExecutionLogs.workflowId,
workspaceId: workflowExecutionLogs.workspaceId,
executionId: workflowExecutionLogs.executionId,
stateSnapshotId: workflowExecutionLogs.stateSnapshotId,
trigger: workflowExecutionLogs.trigger,
startedAt: workflowExecutionLogs.startedAt,
endedAt: workflowExecutionLogs.endedAt,
totalDurationMs: workflowExecutionLogs.totalDurationMs,
cost: workflowExecutionLogs.cost,
costTotal: workflowExecutionLogs.costTotal,
executionData: workflowExecutionLogs.executionData,
})
.from(workflowExecutionLogs)
Expand Down Expand Up @@ -119,7 +121,14 @@ export const GET = withRouteHandler(
return NextResponse.json({ error: 'Workflow state snapshot not found' }, { status: 404 })
}

const executionData = workflowLog.executionData as WorkflowExecutionLog['executionData']
const executionData = (await materializeExecutionData(
workflowLog.executionData as Record<string, unknown> | null,
{
workspaceId: workflowLog.workspaceId,
workflowId: workflowLog.workflowId,
executionId: workflowLog.executionId,
}
)) as WorkflowExecutionLog['executionData']
const traceSpans = (executionData?.traceSpans as TraceSpan[]) || []
const childSnapshotIds = new Set<string>()
const collectSnapshotIds = (spans: TraceSpan[]) => {
Expand Down Expand Up @@ -163,7 +172,7 @@ export const GET = withRouteHandler(
startedAt: workflowLog.startedAt.toISOString(),
endedAt: workflowLog.endedAt?.toISOString(),
totalDurationMs: workflowLog.totalDurationMs,
cost: workflowLog.cost || null,
cost: workflowLog.costTotal != null ? { total: Number(workflowLog.costTotal) } : null,
},
}

Expand Down
41 changes: 33 additions & 8 deletions apps/sim/app/api/logs/export/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import { createLogger } from '@sim/logger'
import { and, desc, eq, sql } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { getSession } from '@/lib/auth'
import { MATERIALIZE_CONCURRENCY, mapWithConcurrency } from '@/lib/core/utils/concurrency'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
import { materializeExecutionData } from '@/lib/logs/execution/trace-store'
import { buildFilterConditions, LogFilterParamsSchema } from '@/lib/logs/filters'
import { expandFolderIdsWithDescendants } from '@/lib/logs/folder-expansion'

Expand Down Expand Up @@ -41,7 +43,7 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
startedAt: workflowExecutionLogs.startedAt,
endedAt: workflowExecutionLogs.endedAt,
totalDurationMs: workflowExecutionLogs.totalDurationMs,
cost: workflowExecutionLogs.cost,
costTotal: workflowExecutionLogs.costTotal,
executionData: workflowExecutionLogs.executionData,
workflowName: sql<string>`COALESCE(${workflow.name}, 'Deleted Workflow')`,
}
Expand Down Expand Up @@ -96,32 +98,55 @@ export const GET = withRouteHandler(async (request: NextRequest) => {

if (!rows.length) break

for (const r of rows as any[]) {
// Heavy execution data may live in object storage; materialize per
// row with bounded concurrency so a 1000-row page doesn't fan out
// into 1000 simultaneous reads.
const materialized = await mapWithConcurrency(
rows as any[],
MATERIALIZE_CONCURRENCY,
(r) =>
materializeExecutionData(r.executionData as Record<string, unknown> | null, {
workspaceId: params.workspaceId,
workflowId: r.workflowId,
executionId: r.executionId,
})
)

for (let j = 0; j < rows.length; j++) {
const r = rows[j] as any
const ed = materialized[j] as Record<string, any>
// A single malformed/unserializable row must not abort the whole CSV
// stream — derive the message/trace columns defensively and fall back
// to empty on error so the row's metadata still exports.
let message = ''
let traces: any = null
let tracesJson = ''
try {
const ed = (r as any).executionData
if (ed) {
if (ed.finalOutput)
message =
typeof ed.finalOutput === 'string'
? ed.finalOutput
: JSON.stringify(ed.finalOutput)
if (ed.message) message = ed.message
if (ed.traceSpans) traces = ed.traceSpans
if (ed.traceSpans) tracesJson = JSON.stringify(ed.traceSpans)
}
} catch {}
} catch (rowError) {
logger.warn('Skipping unserializable execution data for export row', {
executionId: r.executionId,
error: rowError instanceof Error ? rowError.message : String(rowError),
})
}
Comment thread
icecrasher321 marked this conversation as resolved.
const line = [
escapeCsv(r.startedAt?.toISOString?.() || r.startedAt),
escapeCsv(r.level),
escapeCsv(r.workflowName),
escapeCsv(r.trigger),
escapeCsv(r.totalDurationMs ?? ''),
escapeCsv(r.cost?.total ?? r.cost?.value?.total ?? ''),
escapeCsv(r.costTotal ?? ''),
escapeCsv(r.workflowId ?? ''),
escapeCsv(r.executionId ?? ''),
escapeCsv(message),
escapeCsv(traces ? JSON.stringify(traces) : ''),
escapeCsv(tracesJson),
].join(',')
controller.enqueue(encoder.encode(`${line}\n`))
}
Expand Down
12 changes: 8 additions & 4 deletions apps/sim/app/api/logs/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import { listLogsContract, type WorkflowLogSummary } from '@/lib/api/contracts/l
import { parseRequest } from '@/lib/api/server'
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
import { jobCostTotal } from '@/lib/logs/fetch-log-detail'
import { buildFilterConditions } from '@/lib/logs/filters'
import { expandFolderIdsWithDescendants } from '@/lib/logs/folder-expansion'

Expand Down Expand Up @@ -81,7 +82,8 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
case 'duration':
return sql`${workflowExecutionLogs.totalDurationMs}`
case 'cost':
return sql`(${workflowExecutionLogs.cost}->>'total')::numeric`
// Indexed projection of the usage_log ledger (dollars); no live aggregation.
return sql`${workflowExecutionLogs.costTotal}`
case 'status':
return sql`${workflowExecutionLogs.status}`
default:
Expand Down Expand Up @@ -201,7 +203,7 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
startedAt: workflowExecutionLogs.startedAt,
endedAt: workflowExecutionLogs.endedAt,
totalDurationMs: workflowExecutionLogs.totalDurationMs,
cost: workflowExecutionLogs.cost,
costTotal: workflowExecutionLogs.costTotal,
createdAt: workflowExecutionLogs.createdAt,
workflowName: workflow.name,
workflowDescription: workflow.description,
Expand Down Expand Up @@ -379,7 +381,9 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
}
: null,
jobTitle: null,
cost: (log.cost as WorkflowLogSummary['cost']) ?? null,
// List cost is the cost_total projection (faithful ledger sum). Null until
// completion (running) or until the one-time legacy backfill populates it.
cost: log.costTotal != null ? { total: Number(log.costTotal) } : null,
Comment thread
icecrasher321 marked this conversation as resolved.
pauseSummary: {
status: log.pausedStatus ?? null,
total: totalPauseCount,
Expand All @@ -405,7 +409,7 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
createdAt: log.startedAt.toISOString(),
workflow: null,
jobTitle: log.jobTitle ?? null,
cost: (log.cost as WorkflowLogSummary['cost']) ?? null,
cost: jobCostTotal(log.cost),
pauseSummary: { status: null, total: 0, resumed: 0 },
hasPendingPause: false,
}
Expand Down
16 changes: 16 additions & 0 deletions apps/sim/app/api/organizations/[id]/members/[memberId]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
import { parseRequest } from '@/lib/api/server'
import { getSession } from '@/lib/auth'
import { setActiveOrganizationForCurrentSession } from '@/lib/auth/active-organization'
import { getOrgMemberLedgerByUser } from '@/lib/billing/core/organization'
import { getUserUsageData } from '@/lib/billing/core/usage'
import {
removeExternalUserFromOrganizationWorkspaces,
Expand Down Expand Up @@ -101,10 +102,25 @@ export const GET = withRouteHandler(
const computed = await getUserUsageData(memberId)

if (usageData.length > 0) {
// currentPeriodCost is only a baseline; add this member's attributed
// usage_log for the period. (getUserUsageData returns the org POOL for
// org-scoped members, so it can't supply the per-member figure.)
const memberLedger =
(
await getOrgMemberLedgerByUser(
organizationId,
computed.billingPeriodStart && computed.billingPeriodEnd
? { start: computed.billingPeriodStart, end: computed.billingPeriodEnd }
: null
)
).get(memberId) ?? 0
memberData = {
...memberData,
usage: {
...usageData[0],
currentPeriodCost: (
Number(usageData[0].currentPeriodCost ?? 0) + memberLedger
).toString(),
billingPeriodStart: computed.billingPeriodStart,
billingPeriodEnd: computed.billingPeriodEnd,
},
Expand Down
14 changes: 14 additions & 0 deletions apps/sim/app/api/organizations/[id]/members/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
} from '@/lib/api/contracts/organization'
import { getValidationErrorMessage, parseRequest } from '@/lib/api/server'
import { getSession } from '@/lib/auth'
import { getOrgMemberLedgerByUser } from '@/lib/billing/core/organization'
import { ENTITLED_SUBSCRIPTION_STATUSES } from '@/lib/billing/subscriptions/utils'
import { validateSeatAvailability } from '@/lib/billing/validation/seat-management'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
Expand Down Expand Up @@ -139,8 +140,21 @@ export const GET = withRouteHandler(
const billingPeriodStart = orgSub?.periodStart ?? null
const billingPeriodEnd = orgSub?.periodEnd ?? null

// currentPeriodCost is only a baseline; add each member's attributed
// usage_log for the period (batched, one query) so the roster shows real
// usage rather than the frozen baseline.
const usageByUser = await getOrgMemberLedgerByUser(
organizationId,
billingPeriodStart && billingPeriodEnd
? { start: billingPeriodStart, end: billingPeriodEnd }
: null
)

const membersWithUsage = base.map((row) => ({
...row,
currentPeriodCost: (
Number(row.currentPeriodCost ?? 0) + (usageByUser.get(row.userId) ?? 0)
).toString(),
billingPeriodStart,
billingPeriodEnd,
}))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import {
adminV1UpdateOrganizationMemberContract,
} from '@/lib/api/contracts/v1/admin'
import { parseRequest } from '@/lib/api/server'
import { getOrgMemberLedgerByUser } from '@/lib/billing/core/organization'
import { removeUserFromOrganization } from '@/lib/billing/organizations/membership'
import { isBillingEnabled } from '@/lib/core/config/feature-flags'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
Expand Down Expand Up @@ -86,7 +87,6 @@ export const GET = withRouteHandler(
userEmail: user.email,
currentPeriodCost: userStats.currentPeriodCost,
currentUsageLimit: userStats.currentUsageLimit,
lastActive: userStats.lastActive,
billingBlocked: userStats.billingBlocked,
})
.from(member)
Expand All @@ -99,6 +99,10 @@ export const GET = withRouteHandler(
return notFoundResponse('Member')
}

// currentPeriodCost is only a baseline; add this member's attributed
// usage_log for the org's period so admin shows real current usage.
const ledgerByUser = await getOrgMemberLedgerByUser(organizationId)

const data: AdminMemberDetail = {
id: memberData.id,
userId: memberData.userId,
Expand All @@ -107,9 +111,10 @@ export const GET = withRouteHandler(
createdAt: memberData.createdAt.toISOString(),
userName: memberData.userName,
userEmail: memberData.userEmail,
currentPeriodCost: memberData.currentPeriodCost ?? '0',
currentPeriodCost: (
Number(memberData.currentPeriodCost ?? 0) + (ledgerByUser.get(memberData.userId) ?? 0)
).toString(),
currentUsageLimit: memberData.currentUsageLimit,
lastActive: memberData.lastActive?.toISOString() ?? null,
billingBlocked: memberData.billingBlocked ?? false,
}

Expand Down
12 changes: 9 additions & 3 deletions apps/sim/app/api/v1/admin/organizations/[id]/members/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import {
adminV1ListOrganizationMembersContract,
} from '@/lib/api/contracts/v1/admin'
import { parseRequest } from '@/lib/api/server'
import { getOrgMemberLedgerByUser } from '@/lib/billing/core/organization'
import { addUserToOrganization } from '@/lib/billing/organizations/membership'
import { isBillingEnabled } from '@/lib/core/config/feature-flags'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
Expand Down Expand Up @@ -96,7 +97,6 @@ export const GET = withRouteHandler(
userEmail: user.email,
currentPeriodCost: userStats.currentPeriodCost,
currentUsageLimit: userStats.currentUsageLimit,
lastActive: userStats.lastActive,
billingBlocked: userStats.billingBlocked,
})
.from(member)
Expand All @@ -109,6 +109,11 @@ export const GET = withRouteHandler(
])

const total = countResult[0].count

// currentPeriodCost is only a baseline; add each member's attributed
// usage_log for the org's period so admin shows real current usage.
const usageByUser = await getOrgMemberLedgerByUser(organizationId)

const data: AdminMemberDetail[] = membersData.map((m) => ({
id: m.id,
userId: m.userId,
Expand All @@ -117,9 +122,10 @@ export const GET = withRouteHandler(
createdAt: m.createdAt.toISOString(),
userName: m.userName,
userEmail: m.userEmail,
currentPeriodCost: m.currentPeriodCost ?? '0',
currentPeriodCost: (
Number(m.currentPeriodCost ?? 0) + (usageByUser.get(m.userId) ?? 0)
).toString(),
currentUsageLimit: m.currentUsageLimit,
lastActive: m.lastActive?.toISOString() ?? null,
billingBlocked: m.billingBlocked ?? false,
}))

Expand Down
10 changes: 0 additions & 10 deletions apps/sim/app/api/v1/admin/organizations/[id]/seats/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,6 @@ export const GET = withRouteHandler(
subscriptionPlan: analytics.subscriptionPlan,
canAddSeats: analytics.canAddSeats,
utilizationRate: analytics.utilizationRate,
activeMembers: analytics.activeMembers,
inactiveMembers: analytics.inactiveMembers,
memberActivity: analytics.memberActivity.map((m) => ({
userId: m.userId,
userName: m.userName,
userEmail: m.userEmail,
role: m.role,
joinedAt: m.joinedAt.toISOString(),
lastActive: m.lastActive?.toISOString() ?? null,
})),
}

logger.info(`Admin API: Retrieved seat analytics for organization ${organizationId}`)
Expand Down
Loading
Loading