Skip to content

Commit

Permalink
fix: batching with multiplexing (#1926)
Browse files Browse the repository at this point in the history
* fix: update batching logic for iterable

* fix: update batching logic for snapchat_conversion

* fix: update batching logic for facebook_custom_audience

* fix: update batching logic for tiktok_ads

* fix: update batching logic for tiktok_ads_offline_events

* address comment

* add test cases to increase codecov
  • Loading branch information
ItsSudip committed Apr 6, 2023
1 parent f133da4 commit e3fe5b5
Show file tree
Hide file tree
Showing 13 changed files with 764 additions and 776 deletions.
26 changes: 3 additions & 23 deletions src/v0/destinations/fb_custom_audience/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@ const {
defaultDeleteRequestConfig,
checkSubsetOfArray,
isDefinedAndNotNullAndNotEmpty,
getSuccessRespEvents,
getErrorRespEvents,
returnArrayOfSubarrays,
isDefinedAndNotNull,
flattenMap,
handleRtTfSingleEventError,
simpleProcessRouterDest,
getDestinationExternalIDInfoForRetl,
} = require('../../util');

Expand Down Expand Up @@ -371,26 +369,8 @@ const processEvent = (message, destination) => {

const process = (event) => processEvent(event.message, event.destination);

const processRouterDest = (inputs, reqMetadata) => {
if (!Array.isArray(inputs) || inputs.length <= 0) {
const respEvents = getErrorRespEvents(null, 400, 'Invalid event array');
return [respEvents];
}
const respList = inputs.map((input) => {
try {
if (input.message.statusCode) {
// already transformed event
return getSuccessRespEvents(input.message, [input.metadata], input.destination);
}
const transformedList = process(input);
const responseList = transformedList.map((transformedPayload) =>
getSuccessRespEvents(transformedPayload, [input.metadata], input.destination),
);
return responseList;
} catch (error) {
return handleRtTfSingleEventError(input, error, reqMetadata);
}
});
const processRouterDest = async (inputs, reqMetadata) => {
const respList = await simpleProcessRouterDest(inputs, process, reqMetadata);
return flattenMap(respList);
};

Expand Down
37 changes: 20 additions & 17 deletions src/v0/destinations/iterable/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -422,30 +422,33 @@ function getEventChunks(event, identifyEventChunks, trackEventChunks, eventRespo
event.message.operation === 'catalogs'
) {
identifyEventChunks.push(event);
} else if (event.message.endpoint.includes('api/events/track')) {
} else if (event.message?.endpoint?.includes('api/events/track')) {
// Checking if it is track type of event
trackEventChunks.push(event);
} else {
// any other type of event
const { message, metadata, destination } = event;
const endpoint = get(message, 'endpoint');
if (Array.isArray(message)) {
eventResponseList.push(getSuccessRespEvents(message, metadata, destination));
} else {
const batchedResponse = defaultBatchRequestConfig();
batchedResponse.batchedRequest.headers = message.headers;
batchedResponse.batchedRequest.endpoint = endpoint;
batchedResponse.batchedRequest.body = message.body;
batchedResponse.batchedRequest.params = message.params;
batchedResponse.batchedRequest.method = defaultPostRequestConfig.requestMethod;
batchedResponse.metadata = [metadata];
batchedResponse.destination = destination;

const batchedResponse = defaultBatchRequestConfig();
batchedResponse.batchedRequest.headers = message.headers;
batchedResponse.batchedRequest.endpoint = endpoint;
batchedResponse.batchedRequest.body = message.body;
batchedResponse.batchedRequest.params = message.params;
batchedResponse.batchedRequest.method = defaultPostRequestConfig.requestMethod;
batchedResponse.metadata = [metadata];
batchedResponse.destination = destination;

eventResponseList.push(
getSuccessRespEvents(
batchedResponse.batchedRequest,
batchedResponse.metadata,
batchedResponse.destination,
),
);
eventResponseList.push(
getSuccessRespEvents(
batchedResponse.batchedRequest,
batchedResponse.metadata,
batchedResponse.destination,
),
);
}
}
}

Expand Down
64 changes: 19 additions & 45 deletions src/v0/destinations/snapchat_conversion/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const {
getValidDynamicFormConfig,
checkInvalidRtTfEvents,
handleRtTfSingleEventError,
batchMultiplexedEvents,
} = require('../../util');
const {
ENDPOINT,
Expand Down Expand Up @@ -288,32 +289,6 @@ function process(event) {
return response;
}

function batchEvents(eventsChunk) {
const batchedResponseList = [];

// arrayChunks = [[e1,e2,e3,..batchSize],[e1,e2,e3,..batchSize]..]
const arrayChunks = _.chunk(eventsChunk, MAX_BATCH_SIZE);

arrayChunks.forEach((chunk) => {
const batchEventResponse = generateBatchedPayloadForArray(chunk);
batchedResponseList.push(
getSuccessRespEvents(
batchEventResponse.batchedRequest,
batchEventResponse.metadata,
batchEventResponse.destination,
true,
),
);
});

return batchedResponseList;
}

function getEventChunks(event, eventsChunk) {
// build eventsChunk of MAX_BATCH_SIZE
eventsChunk.push(event);
}

const processRouterDest = async (inputs, reqMetadata) => {
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
Expand All @@ -324,35 +299,34 @@ const processRouterDest = async (inputs, reqMetadata) => {
const errorRespList = [];
inputs.forEach((event) => {
try {
if (event.message.statusCode) {
let resp = event.message;
if (!event.message.statusCode) {
// already transformed event
getEventChunks(event, eventsChunk);
} else {
// if not transformed
let response = process(event);
response = Array.isArray(response) ? response : [response];
response.forEach((res) => {
getEventChunks(
{
message: res,
metadata: event.metadata,
destination: event.destination,
},
eventsChunk,
);
});
resp = process(event);
}
eventsChunk.push({
message: Array.isArray(resp) ? resp : [resp],
metadata: event.metadata,
destination: event.destination,
});
} catch (error) {
const errRespEvent = handleRtTfSingleEventError(event, error, reqMetadata);
errorRespList.push(errRespEvent);
}
});

let batchedResponseList = [];
const batchResponseList = [];
if (eventsChunk.length > 0) {
batchedResponseList = batchEvents(eventsChunk);
const batchedEvents = batchMultiplexedEvents(eventsChunk, MAX_BATCH_SIZE);
batchedEvents.forEach((batch) => {
const batchedRequest = generateBatchedPayloadForArray(batch.events, batch.destination);
batchResponseList.push(
getSuccessRespEvents(batchedRequest, batch.metadata, batch.destination, true),
);
});
}
return [...batchedResponseList, ...errorRespList];

return [...batchResponseList, ...errorRespList];
};

module.exports = { process, processRouterDest };
22 changes: 7 additions & 15 deletions src/v0/destinations/snapchat_conversion/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,39 +112,31 @@ function getPriceSum(message) {
* @param {*} events
* @returns
*/
function generateBatchedPayloadForArray(events) {
function generateBatchedPayloadForArray(events, destination) {
const batchResponseList = [];
const metadata = [];

// extracting destination
// from the first event in a batch
const { destination } = events[0];
const { apiKey } = destination.Config;

let batchEventResponse = defaultBatchRequestConfig();
const { batchedRequest } = defaultBatchRequestConfig();

// Batch event into dest batch structure
events.forEach((ev) => {
batchResponseList.push(ev.message.body.JSON);
metadata.push(ev.metadata);
batchResponseList.push(ev.body.JSON);
});

batchEventResponse.batchedRequest.body.JSON_ARRAY = {
batchedRequest.body.JSON_ARRAY = {
batch: JSON.stringify(batchResponseList),
};

batchEventResponse.batchedRequest.endpoint = ENDPOINT;
batchEventResponse.batchedRequest.headers = {
batchedRequest.endpoint = ENDPOINT;
batchedRequest.headers = {
'Content-Type': 'application/json',
Authorization: `Bearer ${apiKey}`,
};
batchEventResponse = {
...batchEventResponse,
metadata,
destination,
};

return batchEventResponse;
return batchedRequest;
}

module.exports = {
Expand Down
125 changes: 42 additions & 83 deletions src/v0/destinations/tiktok_ads/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const {
getHashFromArrayWithDuplicate,
checkInvalidRtTfEvents,
handleRtTfSingleEventError,
batchMultiplexedEvents,
} = require('../../util');
const {
trackMapping,
Expand Down Expand Up @@ -202,100 +203,52 @@ const process = async (event) => {
};

function batchEvents(eventsChunk) {
const batchedResponseList = [];
// arrayChunks = [[e1,e2,e3,..batchSize],[e1,e2,e3,..batchSize]..]
// transformed payload of (n) batch size
const arrayChunks = _.chunk(eventsChunk, MAX_BATCH_SIZE);

arrayChunks.forEach((chunk) => {
const batchResponseList = [];
const metadata = [];
const { destination, events } = eventsChunk;
const { accessToken, pixelCode } = destination.Config;
const { batchedRequest } = defaultBatchRequestConfig();

const batchResponseList = [];
events.forEach((transformedEvent) => {
// extracting destination
// from the first event in a batch
const { destination } = chunk[0];
const { accessToken, pixelCode } = destination.Config;

let batchEventResponse = defaultBatchRequestConfig();

// Batch event into dest batch structure
chunk.forEach((ev) => {
// Pixel code must be added above "batch": [..]
delete ev.message.body.JSON.pixel_code;
// Partner name must be added above "batch": [..]
delete ev.message.body.JSON.partner_name;
ev.message.body.JSON.type = 'track';
batchResponseList.push(ev.message.body.JSON);
metadata.push(ev.metadata);
});
const cloneTransformedEvent = _.clone(transformedEvent);
delete cloneTransformedEvent.body.JSON.pixel_code;
// Partner name must be added above "batch": [..]
delete cloneTransformedEvent.body.JSON.partner_name;
cloneTransformedEvent.body.JSON.type = 'track';
batchResponseList.push(cloneTransformedEvent.body.JSON);
});

batchEventResponse.batchedRequest.body.JSON = {
pixel_code: pixelCode,
partner_name: PARTNER_NAME,
batch: batchResponseList,
};
batchedRequest.body.JSON = {
pixel_code: pixelCode,
partner_name: PARTNER_NAME,
batch: batchResponseList,
};

batchEventResponse.batchedRequest.endpoint = BATCH_ENDPOINT;
batchEventResponse.batchedRequest.headers = {
'Access-Token': accessToken,
'Content-Type': 'application/json',
};
batchEventResponse = {
...batchEventResponse,
metadata,
destination,
};
batchedResponseList.push(
getSuccessRespEvents(
batchEventResponse.batchedRequest,
batchEventResponse.metadata,
batchEventResponse.destination,
true,
),
);
});
batchedRequest.endpoint = BATCH_ENDPOINT;
batchedRequest.headers = {
'Access-Token': accessToken,
'Content-Type': 'application/json',
};

return batchedResponseList;
return batchedRequest;
}

function getEventChunks(event, trackResponseList, eventsChunk) {
// only for already transformed payload
// eslint-disable-next-line no-param-reassign
event.message = Array.isArray(event.message) ? event.message : [event.message];

event.message.forEach((element) => {
// Do not apply batching if the payload contains test_event_code
// which corresponds to track endpoint
if (element.body.JSON.test_event_code) {
const message = element;
const { metadata, destination } = event;
const endpoint = get(message, 'endpoint');
delete message.body.JSON.type;

const batchedResponse = defaultBatchRequestConfig();
batchedResponse.batchedRequest.headers = message.headers;
batchedResponse.batchedRequest.endpoint = endpoint;
batchedResponse.batchedRequest.body = message.body;
batchedResponse.batchedRequest.params = message.params;
batchedResponse.batchedRequest.method = defaultPostRequestConfig.requestMethod;
batchedResponse.metadata = [metadata];
batchedResponse.destination = destination;

trackResponseList.push(
getSuccessRespEvents(
batchedResponse.batchedRequest,
batchedResponse.metadata,
batchedResponse.destination,
),
);
} else {
eventsChunk.push({
message: element,
metadata: event.metadata,
destination: event.destination,
});
}
});
if (event.message[0].body.JSON.test_event_code) {
const { metadata, destination, message } = event;
trackResponseList.push(getSuccessRespEvents(message, [metadata], destination));
} else {
eventsChunk.push({
message: event.message,
metadata: event.metadata,
destination: event.destination,
});
}
}

const processRouterDest = async (inputs, reqMetadata) => {
Expand Down Expand Up @@ -332,9 +285,15 @@ const processRouterDest = async (inputs, reqMetadata) => {
}),
);

let batchedResponseList = [];
const batchedResponseList = [];
if (eventsChunk.length > 0) {
batchedResponseList = await batchEvents(eventsChunk);
const batchedEvents = batchMultiplexedEvents(eventsChunk, MAX_BATCH_SIZE);
batchedEvents.forEach((batch) => {
const batchedRequest = batchEvents(batch);
batchedResponseList.push(
getSuccessRespEvents(batchedRequest, batch.metadata, batch.destination, true),
);
});
}
return [...batchedResponseList.concat(trackResponseList), ...errorRespList];
};
Expand Down
Loading

0 comments on commit e3fe5b5

Please sign in to comment.