Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 130 additions & 72 deletions apps/sim/app/api/files/multipart/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,61 @@ import {
isUsingCloudStorage,
type StorageContext,
} from '@/lib/uploads'
import {
signUploadToken,
type UploadTokenPayload,
verifyUploadToken,
} from '@/lib/uploads/core/upload-token'
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'

const logger = createLogger('MultipartUploadAPI')

const ALLOWED_UPLOAD_CONTEXTS = new Set<StorageContext>([
'knowledge-base',
'chat',
'copilot',
'mothership',
'execution',
'workspace',
'profile-pictures',
'og-images',
'logs',
'workspace-logos',
])

interface InitiateMultipartRequest {
fileName: string
contentType: string
fileSize: number
workspaceId: string
context?: StorageContext
}

interface GetPartUrlsRequest {
uploadId: string
key: string
interface TokenBoundRequest {
uploadToken: string
}

interface GetPartUrlsRequest extends TokenBoundRequest {
partNumbers: number[]
context?: StorageContext
}

interface CompleteSingleRequest extends TokenBoundRequest {
parts: unknown
}

interface CompleteBatchRequest {
uploads: Array<TokenBoundRequest & { parts: unknown }>
}

const verifyTokenForUser = (token: string | undefined, userId: string) => {
if (!token || typeof token !== 'string') {
return null
}
const result = verifyUploadToken(token)
if (!result.valid || result.payload.userId !== userId) {
return null
}
return result.payload
}

export const POST = withRouteHandler(async (request: NextRequest) => {
Expand All @@ -31,6 +71,7 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
if (!session?.user?.id) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const userId = session.user.id

const action = request.nextUrl.searchParams.get('action')

Expand All @@ -45,32 +86,34 @@ export const POST = withRouteHandler(async (request: NextRequest) => {

switch (action) {
case 'initiate': {
const data: InitiateMultipartRequest = await request.json()
const { fileName, contentType, fileSize, context = 'knowledge-base' } = data
const data = (await request.json()) as InitiateMultipartRequest
const { fileName, contentType, fileSize, workspaceId, context = 'knowledge-base' } = data

const config = getStorageConfig(context)
if (!workspaceId || typeof workspaceId !== 'string') {
return NextResponse.json({ error: 'workspaceId is required' }, { status: 400 })
}

if (storageProvider === 's3') {
const { initiateS3MultipartUpload } = await import('@/lib/uploads/providers/s3/client')
if (!ALLOWED_UPLOAD_CONTEXTS.has(context)) {
return NextResponse.json({ error: 'Invalid storage context' }, { status: 400 })
}

const result = await initiateS3MultipartUpload({
fileName,
contentType,
fileSize,
})
const permission = await getUserEntityPermissions(userId, 'workspace', workspaceId)
if (permission !== 'write' && permission !== 'admin') {
return NextResponse.json({ error: 'Forbidden' }, { status: 403 })
}

logger.info(
`Initiated S3 multipart upload for ${fileName} (context: ${context}): ${result.uploadId}`
)
const config = getStorageConfig(context)

return NextResponse.json({
uploadId: result.uploadId,
key: result.key,
})
}
if (storageProvider === 'blob') {
const { initiateMultipartUpload } = await import('@/lib/uploads/providers/blob/client')
let uploadId: string
let key: string

if (storageProvider === 's3') {
const { initiateS3MultipartUpload } = await import('@/lib/uploads/providers/s3/client')
const result = await initiateS3MultipartUpload({ fileName, contentType, fileSize })
uploadId = result.uploadId
key = result.key
} else if (storageProvider === 'blob') {
const { initiateMultipartUpload } = await import('@/lib/uploads/providers/blob/client')
const result = await initiateMultipartUpload({
fileName,
contentType,
Expand All @@ -82,46 +125,55 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
connectionString: config.connectionString,
},
})

logger.info(
`Initiated Azure multipart upload for ${fileName} (context: ${context}): ${result.uploadId}`
uploadId = result.uploadId
key = result.key
} else {
return NextResponse.json(
{ error: `Unsupported storage provider: ${storageProvider}` },
{ status: 400 }
)

return NextResponse.json({
uploadId: result.uploadId,
key: result.key,
})
}

return NextResponse.json(
{ error: `Unsupported storage provider: ${storageProvider}` },
{ status: 400 }
const uploadToken = signUploadToken({
uploadId,
key,
userId,
workspaceId,
context,
})

logger.info(
`Initiated ${storageProvider} multipart upload for ${fileName} (context: ${context}, workspace: ${workspaceId}): ${uploadId}`
)

return NextResponse.json({ uploadId, key, uploadToken })
}

case 'get-part-urls': {
const data: GetPartUrlsRequest = await request.json()
const { uploadId, key, partNumbers, context = 'knowledge-base' } = data
const data = (await request.json()) as GetPartUrlsRequest
const { partNumbers } = data

const tokenPayload = verifyTokenForUser(data.uploadToken, userId)
if (!tokenPayload) {
return NextResponse.json({ error: 'Invalid or expired upload token' }, { status: 403 })
}

const { uploadId, key, context } = tokenPayload
const config = getStorageConfig(context)

if (storageProvider === 's3') {
const { getS3MultipartPartUrls } = await import('@/lib/uploads/providers/s3/client')

const presignedUrls = await getS3MultipartPartUrls(key, uploadId, partNumbers)

return NextResponse.json({ presignedUrls })
}
if (storageProvider === 'blob') {
const { getMultipartPartUrls } = await import('@/lib/uploads/providers/blob/client')

const presignedUrls = await getMultipartPartUrls(key, partNumbers, {
containerName: config.containerName!,
accountName: config.accountName!,
accountKey: config.accountKey,
connectionString: config.connectionString,
})

return NextResponse.json({ presignedUrls })
}

Expand All @@ -132,24 +184,32 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
}

case 'complete': {
const data = await request.json()
const context: StorageContext = data.context || 'knowledge-base'
const data = (await request.json()) as CompleteSingleRequest | CompleteBatchRequest

const config = getStorageConfig(context)
if ('uploads' in data && Array.isArray(data.uploads)) {
const verified = data.uploads.map((upload) => {
const payload = verifyTokenForUser(upload.uploadToken, userId)
return payload ? { payload, parts: upload.parts } : null
})

if (verified.some((entry) => entry === null)) {
return NextResponse.json({ error: 'Invalid or expired upload token' }, { status: 403 })
}

const verifiedEntries = verified.filter(
(entry): entry is { payload: UploadTokenPayload; parts: unknown } => entry !== null
)

if ('uploads' in data) {
const results = await Promise.all(
data.uploads.map(async (upload: any) => {
const { uploadId, key } = upload
verifiedEntries.map(async ({ payload, parts }) => {
const { uploadId, key, context } = payload
const config = getStorageConfig(context)

if (storageProvider === 's3') {
const { completeS3MultipartUpload } = await import(
'@/lib/uploads/providers/s3/client'
)
const parts = upload.parts // S3 format: { ETag, PartNumber }

const result = await completeS3MultipartUpload(key, uploadId, parts)

const result = await completeS3MultipartUpload(key, uploadId, parts as any)
return {
success: true,
location: result.location,
Expand All @@ -161,15 +221,12 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
const { completeMultipartUpload } = await import(
'@/lib/uploads/providers/blob/client'
)
const parts = upload.parts // Azure format: { blockId, partNumber }

const result = await completeMultipartUpload(key, parts, {
const result = await completeMultipartUpload(key, parts as any, {
containerName: config.containerName!,
accountName: config.accountName!,
accountKey: config.accountKey,
connectionString: config.connectionString,
})

return {
success: true,
location: result.location,
Expand All @@ -182,19 +239,23 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
})
)

logger.info(`Completed ${data.uploads.length} multipart uploads (context: ${context})`)
logger.info(`Completed ${verifiedEntries.length} multipart uploads`)
return NextResponse.json({ results })
}

const { uploadId, key, parts } = data
const single = data as CompleteSingleRequest
const tokenPayload = verifyTokenForUser(single.uploadToken, userId)
if (!tokenPayload) {
return NextResponse.json({ error: 'Invalid or expired upload token' }, { status: 403 })
}

const { uploadId, key, context } = tokenPayload
const config = getStorageConfig(context)

if (storageProvider === 's3') {
const { completeS3MultipartUpload } = await import('@/lib/uploads/providers/s3/client')

const result = await completeS3MultipartUpload(key, uploadId, parts)

const result = await completeS3MultipartUpload(key, uploadId, single.parts as any)
logger.info(`Completed S3 multipart upload for key ${key} (context: ${context})`)

return NextResponse.json({
success: true,
location: result.location,
Expand All @@ -204,16 +265,13 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
}
if (storageProvider === 'blob') {
const { completeMultipartUpload } = await import('@/lib/uploads/providers/blob/client')

const result = await completeMultipartUpload(key, parts, {
const result = await completeMultipartUpload(key, single.parts as any, {
containerName: config.containerName!,
accountName: config.accountName!,
accountKey: config.accountKey,
connectionString: config.connectionString,
})

logger.info(`Completed Azure multipart upload for key ${key} (context: ${context})`)

return NextResponse.json({
success: true,
location: result.location,
Expand All @@ -229,27 +287,27 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
}

case 'abort': {
const data = await request.json()
const { uploadId, key, context = 'knowledge-base' } = data
const data = (await request.json()) as TokenBoundRequest
const tokenPayload = verifyTokenForUser(data.uploadToken, userId)
if (!tokenPayload) {
return NextResponse.json({ error: 'Invalid or expired upload token' }, { status: 403 })
}

const config = getStorageConfig(context as StorageContext)
const { uploadId, key, context } = tokenPayload
const config = getStorageConfig(context)

if (storageProvider === 's3') {
const { abortS3MultipartUpload } = await import('@/lib/uploads/providers/s3/client')

await abortS3MultipartUpload(key, uploadId)

logger.info(`Aborted S3 multipart upload for key ${key} (context: ${context})`)
} else if (storageProvider === 'blob') {
const { abortMultipartUpload } = await import('@/lib/uploads/providers/blob/client')

await abortMultipartUpload(key, {
containerName: config.containerName!,
accountName: config.accountName!,
accountKey: config.accountKey,
connectionString: config.connectionString,
})

logger.info(`Aborted Azure multipart upload for key ${key} (context: ${context})`)
} else {
return NextResponse.json(
Expand Down
Loading
Loading