diff --git a/services/apps/data_sink_worker/src/service/dataSink.service.ts b/services/apps/data_sink_worker/src/service/dataSink.service.ts index dbb362dbd5..29cfb23b2a 100644 --- a/services/apps/data_sink_worker/src/service/dataSink.service.ts +++ b/services/apps/data_sink_worker/src/service/dataSink.service.ts @@ -488,31 +488,48 @@ export default class DataSinkService extends LoggerBase { const end = performance.now() const totalTime = end - start - for (const type of types) { - const items = groupedByType.get(type) - const msPerItem = Math.floor(totalTime / items.length) - - const args = { type } - - if (type === IntegrationResultType.ACTIVITY) { - items.forEach((item) => { - const activityArgs = { - ...args, - platform: item.platform, - integrationId: item.integrationId, - onboarding: - item.onboarding === null || item.onboarding === undefined - ? '' - : item.onboarding.toString(), - channel: (item.data.data as IActivityData).channel, - } - telemetry.distribution('data_sink_worker.process_result', msPerItem, activityArgs) - }) - } else { - items.forEach(() => { - telemetry.distribution('data_sink_worker.process_result', msPerItem, args) - }) + try { + for (const type of types) { + const items = groupedByType.get(type) + const msPerItem = Math.floor(totalTime / items.length) + + const args = { type } + + if (type === IntegrationResultType.ACTIVITY) { + items.forEach((item) => { + const activityData = item.data?.data as IActivityData | undefined + if (!activityData) { + this.log.warn( + { + resultId: item.id, + integrationId: item.integrationId, + platform: item.platform, + streamId: item.streamId, + dataType: item.data?.type, + }, + 'Activity result has missing data payload (data.data is undefined)!', + ) + } + const activityArgs = { + ...args, + platform: item.platform, + integrationId: item.integrationId, + onboarding: + item.onboarding === null || item.onboarding === undefined + ? '' + : item.onboarding.toString(), + channel: activityData?.channel, + } + telemetry.distribution('data_sink_worker.process_result', msPerItem, activityArgs) + }) + } else { + items.forEach(() => { + telemetry.distribution('data_sink_worker.process_result', msPerItem, args) + }) + } } + } catch (telemetryErr) { + this.log.error(telemetryErr, 'Error while reporting telemetry for processed results!') } } }