-
Notifications
You must be signed in to change notification settings - Fork 103
/
transform.js
110 lines (100 loc) · 3.19 KB
/
transform.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
/* eslint-disable no-restricted-syntax */
const groupBy = require('lodash/groupBy');
const cloneDeep = require('lodash/cloneDeep');
const {
getIntegrationsObj,
getHashFromArray,
removeUndefinedAndNullValues,
getSuccessRespEvents,
} = require('../../util');
const filterConfigTopics = (message, destination) => {
const { Config } = destination;
if (Config?.enableMultiTopic) {
const eventTypeTopicMap = getHashFromArray(Config?.eventTypeToTopicMap);
const eventNameTopicMap = getHashFromArray(Config?.eventToTopicMap, 'from', 'to', false);
switch (message.type) {
case 'identify':
case 'screen':
case 'page':
case 'group':
case 'alias':
return eventTypeTopicMap[message.type];
case 'track':
{
const { event: eventName } = message;
if (eventName) {
return eventNameTopicMap[eventName];
}
}
break;
default:
return null;
}
}
return null;
};
const batch = (destEvents) => {
const respList = [];
// Grouping the events by topic
const groupedEvents = groupBy(destEvents, (event) => event.message.topic);
// Creating a batched request for each topic
// we are grouping the events based on topics
// and creating a batched request for each topic
// example: input events = [{event1,topic1},{event2,topic1},{event3,topic2}]
// out from transformer: {batchedRequest:[{event1},{event2}]}, {batchedRequest:[{event3}]} (2 multilexed responses)
for (const events of Object.values(groupedEvents)) {
const response = {
batchedRequest: events.map((event) => event.message),
metadata: events.map((event) => event.metadata),
destination: events[0].destination,
};
respList.push(
getSuccessRespEvents(response.batchedRequest, response.metadata, response.destination, true),
);
}
return respList;
};
const process = (event) => {
const { message, destination } = event;
const integrationsObj = getIntegrationsObj(message, 'kafka');
const { schemaId } = integrationsObj || {};
const topic =
integrationsObj?.topic || filterConfigTopics(message, destination) || destination.Config?.topic;
// TODO: uncomment this when v.1.3.0 of server is avialble in all envs
// if (!topic) {
// throw new InstrumentationError("Topic is required for Kafka destination");
// }
const userId = message.userId || message.anonymousId;
let outputEvent;
if (schemaId) {
outputEvent = {
message,
userId,
schemaId,
topic,
};
} else {
outputEvent = {
message,
userId,
topic,
};
}
return removeUndefinedAndNullValues(outputEvent);
};
/**
* This functions takes event matadata and updates it based on the transformed and raw paylaod
* the outputEvent is the transformed event which is guranateed to contain the topic
* @param {*} input
* @returns {*} metadata
*/
const processMetadata = (input) => {
const { metadata, outputEvent } = input;
const clonedMetadata = cloneDeep(metadata);
const { topic } = outputEvent;
if (topic) {
clonedMetadata.rudderId = `${clonedMetadata.rudderId}<<>>${topic}`;
}
return clonedMetadata;
};
module.exports = { process, batch, processMetadata };