Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 50 additions & 5 deletions apps/sim/lib/knowledge/connectors/sync-engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,19 +156,51 @@ export async function dispatchSync(
const connectorRows = await db
.select({
knowledgeBaseId: knowledgeConnector.knowledgeBaseId,
connectorArchivedAt: knowledgeConnector.archivedAt,
connectorDeletedAt: knowledgeConnector.deletedAt,
workspaceId: knowledgeBase.workspaceId,
userId: knowledgeBase.userId,
kbDeletedAt: knowledgeBase.deletedAt,
})
.from(knowledgeConnector)
.innerJoin(knowledgeBase, eq(knowledgeBase.id, knowledgeConnector.knowledgeBaseId))
.where(eq(knowledgeConnector.id, connectorId))
.limit(1)

const row = connectorRows[0]
if (!row) {
logger.warn(`Skipping sync dispatch: connector not found`, { connectorId, requestId })
return
}
if (row.kbDeletedAt) {
logger.warn(`Skipping sync dispatch: knowledge base is deleted`, {
connectorId,
knowledgeBaseId: row.knowledgeBaseId,
requestId,
})
await db
.update(knowledgeConnector)
.set({
status: 'error',
nextSyncAt: null,
lastSyncError: 'Knowledge base deleted',
updatedAt: new Date(),
})
.where(eq(knowledgeConnector.id, connectorId))
return
Comment thread
waleedlatif1 marked this conversation as resolved.
}
Comment thread
waleedlatif1 marked this conversation as resolved.
if (row.connectorArchivedAt || row.connectorDeletedAt) {
logger.warn(`Skipping sync dispatch: connector is archived or deleted`, {
connectorId,
requestId,
})
return
}

const tags = [`connectorId:${connectorId}`]
if (row?.knowledgeBaseId) tags.push(`knowledgeBaseId:${row.knowledgeBaseId}`)
if (row?.workspaceId) tags.push(`workspaceId:${row.workspaceId}`)
if (row?.userId) tags.push(`userId:${row.userId}`)
if (row.knowledgeBaseId) tags.push(`knowledgeBaseId:${row.knowledgeBaseId}`)
if (row.workspaceId) tags.push(`workspaceId:${row.workspaceId}`)
if (row.userId) tags.push(`userId:${row.userId}`)

await knowledgeConnectorSync.trigger(
{
Expand Down Expand Up @@ -261,7 +293,8 @@ export async function executeSync(
.limit(1)

if (connectorRows.length === 0) {
throw new Error(`Connector not found: ${connectorId}`)
logger.warn(`Skipping sync: connector ${connectorId} not found, archived, or deleted`)
return { ...result, error: 'connector_unavailable' }
}

const connector = connectorRows[0]
Expand All @@ -278,7 +311,19 @@ export async function executeSync(
.limit(1)

if (kbRows.length === 0) {
throw new Error(`Knowledge base not found: ${connector.knowledgeBaseId}`)
logger.warn(
`Skipping sync: knowledge base ${connector.knowledgeBaseId} is deleted (connector ${connectorId})`
)
await db
Comment thread
waleedlatif1 marked this conversation as resolved.
.update(knowledgeConnector)
.set({
status: 'error',
nextSyncAt: null,
lastSyncError: 'Knowledge base deleted',
updatedAt: new Date(),
})
.where(eq(knowledgeConnector.id, connectorId))
return { ...result, error: 'knowledge_base_deleted' }
Comment thread
waleedlatif1 marked this conversation as resolved.
}

const userId = kbRows[0].userId
Expand Down
15 changes: 14 additions & 1 deletion apps/sim/lib/knowledge/documents/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,20 @@ export async function processDocumentAsync(
.limit(1)

if (kb.length === 0) {
throw new Error(`Knowledge base not found: ${knowledgeBaseId}`)
logger.warn(
`[${documentId}] Skipping document processing: knowledge base ${knowledgeBaseId} is deleted`
)
await db
Comment thread
waleedlatif1 marked this conversation as resolved.
.update(document)
.set({
processingStatus: 'failed',
processingError: 'Knowledge base deleted',
processingCompletedAt: new Date(),
})
.where(
and(eq(document.id, documentId), isNull(document.archivedAt), isNull(document.deletedAt))
)
return
}
Comment thread
waleedlatif1 marked this conversation as resolved.

await db
Expand Down
Loading