Skip to content

Commit

Permalink
fix: metadata structure correction (#3119)
Browse files Browse the repository at this point in the history
  • Loading branch information
sanpj2292 committed Feb 26, 2024
1 parent 9fa5374 commit 8351b5c
Show file tree
Hide file tree
Showing 34 changed files with 392 additions and 662 deletions.
283 changes: 29 additions & 254 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion package.json
Expand Up @@ -64,7 +64,7 @@
"@koa/router": "^12.0.0",
"@ndhoule/extend": "^2.0.0",
"@pyroscope/nodejs": "^0.2.6",
"@rudderstack/integrations-lib": "^0.2.2",
"@rudderstack/integrations-lib": "^0.2.4",
"@rudderstack/workflow-engine": "^0.7.2",
"ajv": "^8.12.0",
"ajv-draft-04": "^1.0.0",
Expand Down
15 changes: 15 additions & 0 deletions src/controllers/destination.ts
Expand Up @@ -15,6 +15,7 @@ import logger from '../logger';
import { getIntegrationVersion } from '../util/utils';
import tags from '../v0/util/tags';
import { DynamicConfigParser } from '../util/dynamicConfigParser';
import { checkInvalidRtTfEvents } from '../v0/util';

export class DestinationController {
public static async destinationTransformAtProcessor(ctx: Context) {
Expand Down Expand Up @@ -101,6 +102,20 @@ export class DestinationController {
const routerRequest = ctx.request.body as RouterTransformationRequest;
const destination = routerRequest.destType;
let events = routerRequest.input;
const errorRespEvents = checkInvalidRtTfEvents(events);
if (errorRespEvents.length > 0) {
errorRespEvents[0].metadata = [
{
destType: destination,
},
];
logger.debug(
`[${destination}] Invalid router transform payload structure: ${JSON.stringify(events)}`,
);
ctx.body = { output: errorRespEvents };
ControllerUtility.postProcess(ctx);
return ctx;
}
const metaTags = MiscService.getMetaTags(events[0].metadata);
stats.histogram('dest_transform_input_events', events.length, {
destination,
Expand Down
3 changes: 1 addition & 2 deletions src/legacy/router.js
Expand Up @@ -7,7 +7,7 @@ const Router = require('@koa/router');
const lodash = require('lodash');
const fs = require('fs');
const path = require('path');
const { PlatformError } = require('@rudderstack/integrations-lib');
const { PlatformError, getErrorRespEvents } = require('@rudderstack/integrations-lib');
const logger = require('../logger');
const stats = require('../util/stats');
const { SUPPORTED_VERSIONS, API_VERSION } = require('../routes/utils/constants');
Expand All @@ -18,7 +18,6 @@ const {
isNonFuncObject,
getMetadata,
generateErrorObject,
getErrorRespEvents,
isCdkDestination,
checkAndCorrectUserId,
} = require('../v0/util');
Expand Down
5 changes: 0 additions & 5 deletions src/v0/destinations/bqstream/transform.js
Expand Up @@ -5,7 +5,6 @@ const { EventType } = require('../../../constants');
const {
defaultBatchRequestConfig,
getSuccessRespEvents,
checkInvalidRtTfEvents,
handleRtTfSingleEventError,
groupEventsByType,
} = require('../../util');
Expand Down Expand Up @@ -130,10 +129,6 @@ const processEachTypedEventList = (
};

const processRouterDest = (inputs) => {
const errorRespEvents = checkInvalidRtTfEvents(inputs, DESTINATION);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}
const finalResp = [];

const batchedEvents = groupEventsByType(inputs);
Expand Down
6 changes: 0 additions & 6 deletions src/v0/destinations/campaign_manager/transform.js
Expand Up @@ -9,7 +9,6 @@ const {
removeUndefinedAndNullValues,
getSuccessRespEvents,
isDefinedAndNotNull,
checkInvalidRtTfEvents,
handleRtTfSingleEventError,
getAccessToken,
} = require('../../util');
Expand Down Expand Up @@ -245,11 +244,6 @@ const batchEvents = (eventChunksArray) => {
};

const processRouterDest = async (inputs, reqMetadata) => {
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}

const batchErrorRespList = [];
const eventChunksArray = [];
const { destination } = inputs[0];
Expand Down
8 changes: 0 additions & 8 deletions src/v0/destinations/clevertap/transform.js
Expand Up @@ -22,7 +22,6 @@ const {
handleRtTfSingleEventError,
batchMultiplexedEvents,
getSuccessRespEvents,
checkInvalidRtTfEvents,
} = require('../../util');
const { generateClevertapBatchedPayload } = require('./utils');

Expand Down Expand Up @@ -389,13 +388,6 @@ const processEvent = (message, destination) => {
const process = (event) => processEvent(event.message, event.destination);

const processRouterDest = (inputs, reqMetadata) => {
// const respList = await simpleProcessRouterDest(inputs, process, reqMetadata);
// return respList;
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}

const eventsChunk = [];
const errorRespList = [];
// const { destination } = inputs[0];
Expand Down
5 changes: 0 additions & 5 deletions src/v0/destinations/customerio/transform.js
Expand Up @@ -5,7 +5,6 @@ const { InstrumentationError } = require('@rudderstack/integrations-lib');
const { EventType, MappedToDestinationKey } = require('../../../constants');

const {
getErrorRespEvents,
getSuccessRespEvents,
defaultRequestConfig,
addExternalIdToTraits,
Expand Down Expand Up @@ -174,10 +173,6 @@ const batchEvents = (successRespList) => {
};

const processRouterDest = (inputs, reqMetadata) => {
if (!Array.isArray(inputs) || inputs.length <= 0) {
const respEvents = getErrorRespEvents(null, 400, 'Invalid event array');
return [respEvents];
}
let batchResponseList = [];
const batchErrorRespList = [];
const successRespList = [];
Expand Down
Expand Up @@ -9,7 +9,6 @@ const {
handleRtTfSingleEventError,
defaultBatchRequestConfig,
getSuccessRespEvents,
checkInvalidRtTfEvents,
combineBatchRequestsWithSameJobIds,
} = require('../../util');
const {
Expand Down Expand Up @@ -186,11 +185,6 @@ const batchEvents = (storeSalesEvents) => {
};

const processRouterDest = async (inputs, reqMetadata) => {
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}

const storeSalesEvents = []; // list containing store sales events in batched format
const clickCallEvents = []; // list containing click and call events in batched format
const errorRespList = [];
Expand Down
11 changes: 1 addition & 10 deletions src/v0/destinations/google_cloud_function/transform.js
@@ -1,9 +1,5 @@
const lodash = require('lodash');
const {
getSuccessRespEvents,
checkInvalidRtTfEvents,
handleRtTfSingleEventError,
} = require('../../util');
const { getSuccessRespEvents, handleRtTfSingleEventError } = require('../../util');

const { generateBatchedPayload, validateDestinationConfig } = require('./util');

Expand Down Expand Up @@ -40,11 +36,6 @@ function batchEvents(successRespList, maxBatchSize = 10) {

// Router transform with batching by default
const processRouterDest = async (inputs, reqMetadata) => {
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}

const successResponseList = [];
const errorRespList = [];
const { destination } = inputs[0];
Expand Down
5 changes: 0 additions & 5 deletions src/v0/destinations/googlesheets/transform.js
Expand Up @@ -5,7 +5,6 @@ const {
getValueFromMessage,
getSuccessRespEvents,
handleRtTfSingleEventError,
checkInvalidRtTfEvents,
} = require('../../util');

const SOURCE_KEYS = ['properties', 'traits', 'context.traits'];
Expand Down Expand Up @@ -111,10 +110,6 @@ const process = (event) => {
const processRouterDest = async (inputs, reqMetadata) => {
const successRespList = [];
const errorRespList = [];
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}
await Promise.all(
inputs.map(async (input) => {
try {
Expand Down
10 changes: 1 addition & 9 deletions src/v0/destinations/hs/transform.js
@@ -1,11 +1,7 @@
const get = require('get-value');
const { InstrumentationError } = require('@rudderstack/integrations-lib');
const { EventType } = require('../../../constants');
const {
checkInvalidRtTfEvents,
handleRtTfSingleEventError,
getDestinationExternalIDInfoForRetl,
} = require('../../util');
const { handleRtTfSingleEventError, getDestinationExternalIDInfoForRetl } = require('../../util');
const { API_VERSION } = require('./config');
const {
processLegacyIdentify,
Expand Down Expand Up @@ -71,10 +67,6 @@ const process = async (event) => {
// we are batching by default at routerTransform
const processRouterDest = async (inputs, reqMetadata) => {
let tempInputs = inputs;
const errorRespEvents = checkInvalidRtTfEvents(tempInputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}

const successRespList = [];
const errorRespList = [];
Expand Down
6 changes: 0 additions & 6 deletions src/v0/destinations/iterable/transform.js
Expand Up @@ -18,7 +18,6 @@ const {
const {
constructPayload,
defaultRequestConfig,
checkInvalidRtTfEvents,
defaultPostRequestConfig,
handleRtTfSingleEventError,
removeUndefinedAndNullValues,
Expand Down Expand Up @@ -162,11 +161,6 @@ const process = (event) => {
};

const processRouterDest = async (inputs, reqMetadata) => {
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}

const batchedEvents = batchEvents(inputs);
const response = await Promise.all(
batchedEvents.map(async (listOfEvents) => {
Expand Down
5 changes: 0 additions & 5 deletions src/v0/destinations/kafka/transform.js
Expand Up @@ -6,7 +6,6 @@ const {
getHashFromArray,
removeUndefinedAndNullValues,
getSuccessRespEvents,
getErrorRespEvents,
} = require('../../util');

const filterConfigTopics = (message, destination) => {
Expand Down Expand Up @@ -38,10 +37,6 @@ const filterConfigTopics = (message, destination) => {

const batch = (destEvents) => {
const respList = [];
if (!Array.isArray(destEvents) || destEvents.length <= 0) {
const respEvents = getErrorRespEvents(null, 400, 'Invalid event array');
return [respEvents];
}

// Grouping the events by topic
const groupedEvents = groupBy(destEvents, (event) => event.message.topic);
Expand Down
5 changes: 0 additions & 5 deletions src/v0/destinations/klaviyo/transform.js
Expand Up @@ -32,7 +32,6 @@ const {
addExternalIdToTraits,
adduserIdFromExternalId,
getSuccessRespEvents,
checkInvalidRtTfEvents,
handleRtTfSingleEventError,
flattenJson,
isNewStatusCodesAccepted,
Expand Down Expand Up @@ -320,10 +319,6 @@ const getEventChunks = (event, subscribeRespList, nonSubscribeRespList) => {
};

const processRouterDest = async (inputs, reqMetadata) => {
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}
let batchResponseList = [];
const batchErrorRespList = [];
const subscribeRespList = [];
Expand Down
3 changes: 2 additions & 1 deletion src/v0/destinations/lambda/transform.js
@@ -1,5 +1,6 @@
const _ = require('lodash');
const { getErrorRespEvents, getSuccessRespEvents } = require('../../util');
const { getErrorRespEvents } = require('@rudderstack/integrations-lib');
const { getSuccessRespEvents } = require('../../util');
const { ConfigurationError } = require('@rudderstack/integrations-lib');

const DEFAULT_INVOCATION_TYPE = 'Event'; // asynchronous invocation
Expand Down
5 changes: 0 additions & 5 deletions src/v0/destinations/mailchimp/transform.js
Expand Up @@ -3,7 +3,6 @@ const { InstrumentationError, ConfigurationError } = require('@rudderstack/integ
const {
defaultPutRequestConfig,
handleRtTfSingleEventError,
checkInvalidRtTfEvents,
constructPayload,
defaultPostRequestConfig,
isDefinedAndNotNull,
Expand Down Expand Up @@ -162,10 +161,6 @@ const getEventChunks = (event, identifyRespList, trackRespList) => {
};

const processRouterDest = async (inputs, reqMetadata) => {
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}
let batchResponseList = [];
const batchErrorRespList = [];
const identifyRespList = [];
Expand Down
5 changes: 0 additions & 5 deletions src/v0/destinations/mailjet/transform.js
@@ -1,7 +1,6 @@
const lodash = require('lodash');
const { TransformationError, InstrumentationError } = require('@rudderstack/integrations-lib');
const {
getErrorRespEvents,
getSuccessRespEvents,
defaultRequestConfig,
defaultPostRequestConfig,
Expand Down Expand Up @@ -121,10 +120,6 @@ const batchEvents = (successRespList) => {
};

const processRouterDest = (inputs, reqMetadata) => {
if (!Array.isArray(inputs) || inputs.length <= 0) {
const respEvents = getErrorRespEvents(null, 400, 'Invalid event array');
return [respEvents];
}
let batchResponseList = [];
const batchErrorRespList = [];
const successRespList = [];
Expand Down
6 changes: 0 additions & 6 deletions src/v0/destinations/mailmodo/transform.js
Expand Up @@ -10,7 +10,6 @@ const {
defaultPostRequestConfig,
defaultBatchRequestConfig,
removeUndefinedAndNullValues,
getErrorRespEvents,
getSuccessRespEvents,
handleRtTfSingleEventError,
} = require('../../util');
Expand Down Expand Up @@ -191,11 +190,6 @@ function getEventChunks(event, identifyEventChunks, eventResponseList) {
}

const processRouterDest = (inputs, reqMetadata) => {
if (!Array.isArray(inputs) || inputs.length <= 0) {
const respEvents = getErrorRespEvents(null, 400, 'Invalid event array');
return [respEvents];
}

const identifyEventChunks = []; // list containing identify events in batched format
const eventResponseList = []; // list containing other events in batched format
const errorRespList = [];
Expand Down
7 changes: 1 addition & 6 deletions src/v0/destinations/marketo/transform.js
Expand Up @@ -7,6 +7,7 @@ const {
InstrumentationError,
ConfigurationError,
UnauthorizedError,
getErrorRespEvents,
} = require('@rudderstack/integrations-lib');
const stats = require('../../../util/stats');
const { EventType, MappedToDestinationKey } = require('../../../constants');
Expand All @@ -28,10 +29,8 @@ const {
getFieldValueFromMessage,
getDestinationExternalID,
getSuccessRespEvents,
getErrorRespEvents,
isDefinedAndNotNull,
generateErrorObject,
checkInvalidRtTfEvents,
handleRtTfSingleEventError,
} = require('../../util');
const Cache = require('../../util/cache');
Expand Down Expand Up @@ -456,10 +455,6 @@ const process = async (event) => {
const processRouterDest = async (inputs, reqMetadata) => {
// Token needs to be generated for marketo which will be done on input level.
// If destination information is not present Error should be thrown
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}
let token;
try {
token = await getAuthToken(formatConfig(inputs[0].destination));
Expand Down
12 changes: 6 additions & 6 deletions src/v0/destinations/marketo_static_list/transform.js
@@ -1,6 +1,10 @@
const lodash = require('lodash');
const cloneDeep = require('lodash/cloneDeep');
const { InstrumentationError, UnauthorizedError } = require('@rudderstack/integrations-lib');
const {
InstrumentationError,
UnauthorizedError,
getErrorRespEvents,
} = require('@rudderstack/integrations-lib');
const {
defaultPostRequestConfig,
defaultDeleteRequestConfig,
Expand All @@ -9,11 +13,7 @@ const {
} = require('../../util');
const { AUTH_CACHE_TTL, JSON_MIME_TYPE } = require('../../util/constant');
const { getIds, validateMessageType } = require('./util');
const {
getDestinationExternalID,
defaultRequestConfig,
getErrorRespEvents,
} = require('../../util');
const { getDestinationExternalID, defaultRequestConfig } = require('../../util');
const { formatConfig, MAX_LEAD_IDS_SIZE } = require('./config');
const Cache = require('../../util/cache');
const { getAuthToken } = require('../marketo/transform');
Expand Down

0 comments on commit 8351b5c

Please sign in to comment.