-
Notifications
You must be signed in to change notification settings - Fork 103
/
transform.js
152 lines (139 loc) · 5.21 KB
/
transform.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
const get = require('get-value');
const { InstrumentationError } = require('@rudderstack/integrations-lib');
const { EventType } = require('../../../constants');
const { handleRtTfSingleEventError, getDestinationExternalIDInfoForRetl } = require('../../util');
const { API_VERSION } = require('./config');
const {
processLegacyIdentify,
processLegacyTrack,
legacyBatchEvents,
} = require('./HSTransform-v1');
const { MappedToDestinationKey, GENERIC_TRUE_VALUES } = require('../../../constants');
const { processIdentify, processTrack, batchEvents } = require('./HSTransform-v2');
const {
splitEventsForCreateUpdate,
fetchFinalSetOfTraits,
getProperties,
validateDestinationConfig,
} = require('./util');
const processSingleMessage = async (message, destination, propertyMap) => {
if (!message.type) {
throw new InstrumentationError('Message type is not present. Aborting message.');
}
// Config Validation
validateDestinationConfig(destination);
let response;
switch (message.type) {
case EventType.IDENTIFY: {
response = [];
if (destination.Config.apiVersion === API_VERSION.v3) {
response.push(await processIdentify(message, destination, propertyMap));
} else {
// Legacy API
response.push(await processLegacyIdentify(message, destination, propertyMap));
}
break;
}
case EventType.TRACK:
if (destination.Config.apiVersion === API_VERSION.v3) {
response = await processTrack(message, destination, propertyMap);
} else {
response = await processLegacyTrack(message, destination, propertyMap);
}
break;
default:
throw new InstrumentationError(`Message type ${message.type} is not supported`);
}
return response;
};
// has been deprecated - using routerTransform for both the versions
const process = async (event) => {
const { destination, message } = event;
const mappedToDestination = get(message, MappedToDestinationKey);
let events = [];
events = [event];
if (mappedToDestination && GENERIC_TRUE_VALUES.includes(mappedToDestination?.toString())) {
// get info about existing objects and splitting accordingly.
events = await splitEventsForCreateUpdate([event], destination);
}
return processSingleMessage(events[0].message, events[0].destination);
};
// we are batching by default at routerTransform
const processRouterDest = async (inputs, reqMetadata) => {
let tempInputs = inputs;
const successRespList = [];
const errorRespList = [];
// using the first destination config for transforming the batch
const { destination } = tempInputs[0];
let propertyMap;
const mappedToDestination = get(tempInputs[0].message, MappedToDestinationKey);
const { objectType } = getDestinationExternalIDInfoForRetl(tempInputs[0].message, 'HS');
try {
if (mappedToDestination && GENERIC_TRUE_VALUES.includes(mappedToDestination?.toString())) {
// skip splitting the batches to inserts and updates if object it is an association
if (objectType.toLowerCase() !== 'association') {
propertyMap = await getProperties(destination);
// get info about existing objects and splitting accordingly.
tempInputs = await splitEventsForCreateUpdate(tempInputs, destination);
}
} else {
// reduce the no. of calls for properties endpoint
const traitsFound = tempInputs.some(
(input) => fetchFinalSetOfTraits(input.message) !== undefined,
);
if (traitsFound) {
propertyMap = await getProperties(destination);
}
}
} catch (error) {
// Any error thrown from the above try block applies to all the events
return tempInputs.map((input) => handleRtTfSingleEventError(input, error, reqMetadata));
}
await Promise.all(
tempInputs.map(async (input) => {
try {
if (input.message.statusCode) {
// already transformed event
successRespList.push({
message: input.message,
metadata: input.metadata,
destination,
});
} else {
// event is not transformed
let receivedResponse = await processSingleMessage(
input.message,
destination,
propertyMap,
);
receivedResponse = Array.isArray(receivedResponse)
? receivedResponse
: [receivedResponse];
// received response can be in array format [{}, {}, {}, ..., {}]
// if multiple response is being returned
receivedResponse.forEach((element) => {
successRespList.push({
message: element,
metadata: input.metadata,
destination,
});
});
}
} catch (error) {
const errRespEvent = handleRtTfSingleEventError(input, error, reqMetadata);
errorRespList.push(errRespEvent);
}
}),
);
// batch implementation
let batchedResponseList = [];
if (successRespList.length > 0) {
if (destination.Config.apiVersion === API_VERSION.v3) {
batchedResponseList = batchEvents(successRespList);
} else {
batchedResponseList = legacyBatchEvents(successRespList);
}
}
return [...batchedResponseList, ...errorRespList];
};
module.exports = { process, processRouterDest };