-
Notifications
You must be signed in to change notification settings - Fork 2
/
main.js
65 lines (58 loc) · 1.49 KB
/
main.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
const avro = require("avsc");
const {
SimpleTransform,
PolicyError,
PolicyInjection,
calculateRecordBatchSize
} = require("@vectorizedio/wasm-api");
const transform = new SimpleTransform();
/**
* Topics that fire the transform function
* - Earliest
* - Stored
* - Latest
*/
transform.subscribe([["market_activity", PolicyInjection.Latest]]);
/**
* The strategy the transform engine will use when handling errors
* - SkipOnFailure
* - Deregister
*/
transform.errorHandler(PolicyError.SkipOnFailure);
/* TODO: Fetch Avro schema from repository */
const avroType = avro.Type.forSchema({
name: "market_activity",
type: "record",
fields: [
{name: "Date", type: "string"},
{name: "CloseLast", type: "string"},
{name: "Volume", type: "string"},
{name: "Open", type: "string"},
{name: "High", type: "string"},
{name: "Low", type: "string"}
]
});
/* Auxiliar transform function for records */
const toAvro = (record) => {
const obj = JSON.parse(record.value);
const newRecord = {
...record,
value: avroType.toBuffer(obj),
};
return newRecord;
}
/* Transform function */
transform.processRecord((batch) => {
const result = new Map();
const transformedBatch = batch.map(({ header, records }) => {
return {
header,
records: records.map(toAvro),
};
});
result.set("avro", transformedBatch);
// processRecord function returns a Promise
return Promise.resolve(result);
});
exports["default"] = transform;
exports["schema"] = avroType;