From 2e3b56e37b5cc9e5a4cb895ef2edd7e47ed8fc02 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Wed, 24 Sep 2025 16:07:20 +0200 Subject: [PATCH 1/2] fix: deduplicate results to process before processing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../src/service/dataSink.service.ts | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/services/apps/data_sink_worker/src/service/dataSink.service.ts b/services/apps/data_sink_worker/src/service/dataSink.service.ts index c0cb8de778..a3db513903 100644 --- a/services/apps/data_sink_worker/src/service/dataSink.service.ts +++ b/services/apps/data_sink_worker/src/service/dataSink.service.ts @@ -247,9 +247,21 @@ export default class DataSinkService extends LoggerBase { } public async processResults( - batch: { resultId: string; data: IResultData | undefined; created: boolean }[], + resultsToProcess: { resultId: string; data: IResultData | undefined; created: boolean }[], postProcess = true, ): Promise { + const batch: { resultId: string; data: IResultData | undefined; created: boolean }[] = [] + for (const result of resultsToProcess) { + const filtered = resultsToProcess.filter((r) => r.resultId === result.resultId) + if (filtered.length > 1) { + this.log.warn( + { resultId: result.resultId }, + 'Found multiple results for the same result id!', + ) + } + batch.push(filtered[0]) + } + this.log.trace(`[RESULTS] Processing ${batch.length} results!`) const start = performance.now() From 977260259b3a8480effd560d3f44d7bdac8b55ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Thu, 25 Sep 2025 11:46:05 +0200 Subject: [PATCH 2/2] fix: deduplicate results to process before processing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../src/service/dataSink.service.ts | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/services/apps/data_sink_worker/src/service/dataSink.service.ts b/services/apps/data_sink_worker/src/service/dataSink.service.ts index a3db513903..d7db7870f0 100644 --- a/services/apps/data_sink_worker/src/service/dataSink.service.ts +++ b/services/apps/data_sink_worker/src/service/dataSink.service.ts @@ -253,13 +253,18 @@ export default class DataSinkService extends LoggerBase { const batch: { resultId: string; data: IResultData | undefined; created: boolean }[] = [] for (const result of resultsToProcess) { const filtered = resultsToProcess.filter((r) => r.resultId === result.resultId) - if (filtered.length > 1) { - this.log.warn( - { resultId: result.resultId }, - 'Found multiple results for the same result id!', - ) + + // check if we already have this result in the batch + if (!batch.some((b) => b.resultId === filtered[0].resultId)) { + if (filtered.length > 1) { + this.log.warn( + { resultId: result.resultId }, + 'Found multiple results for the same result id!', + ) + } + + batch.push(filtered[0]) } - batch.push(filtered[0]) } this.log.trace(`[RESULTS] Processing ${batch.length} results!`)