-
Notifications
You must be signed in to change notification settings - Fork 0
/
remoteProcessing.js
175 lines (161 loc) · 4.78 KB
/
remoteProcessing.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
const redis = require("./redisConnect");
const { curry, Task } = require("./essentials");
console.log(process.env);
redis.setInstanceId(process.env.INSTANCE_ID || "instance 1");
//const instanceId = redis.getInstanceId();
const doProcessingForResource = curry((
channel, //The redis pub sub channel for communication
redisConnectorForResource, //Resource connector
rid, //resource id
action, //id of message action
responseAction, //id of message response action
obj, // input object for doing the processing
localProcessingfunction, //the local processing function
resp, // The http response object is needed in the remote response callback handler
isReserved //Is the resource rid reserved by this instance?
) => {
console.log(
"rid is",
rid,
"action",
action,
"responseAction",
responseAction,
"doProcessingForResource called with isReserved value",
isReserved,
"and instanceId is",
redis.getInstanceId(),
"channel is",
channel
);
var obj_ = Object.assign(
{ rid: rid },
{ action: action, response: resp },
obj
);
if (!isReserved) {
console.log("Not reserved");
const remoteResponseCallback = respObj => {
console.log("Great news - remoteResponseCallback");
if (
respObj.toInstance !== redis.getInstanceId() ||
respObj.action !== responseAction
) {
//This handler not meant for this instance or this action
return;
}
remoteProcessingResponseHandler(resp, respObj);
};
return redisConnectorForResource.getReserver(rid).chain(toInstanceId => {
console.log("getReserver returned", toInstanceId, "for rid", rid);
//This isnstance could not reserve the resource, so send message to the owning
//instance, toInstanceId, to process it
redis.sendMessage(toInstanceId, channel, obj_);
//Register a callback for processing the response from the owner instance
if (!remoteResponseCallbacks[responseAction]) {
redis.onMessage(remoteResponseCallback, channel);
remoteResponseCallbacks[responseAction] = true;
}
return Task.of(false);
});
} else {
console.log("Reserved by local server");
return localProcessingfunction(obj_);
}
});
const remoteProcessingHandler = curry(
(channel, action, responseAction, localProcessingfunction, obj) => {
console.log(
"remoteProcessingHandler called for instanceId",
redis.getInstanceId(),
"action is",
action,
"responseAction is",
responseAction
);
if (obj.toInstance !== redis.getInstanceId() || obj.action !== action) {
//This handler not meant for this instance or this action
return;
}
var respObj;
localProcessingfunction(obj)
.chain(result => {
//Do whatever is required with obj, and form a response to be sent back
// and set respObj
respObj = Object.assign(
{
status: 200,
action: responseAction
},
result
);
return Task.of(respObj);
})
.fork(
err => {
console.log("Sending ERROR message to", obj.fromInstance);
redis.sendMessage(obj.fromInstance, channel, {
error: { code: 400, msg: "Save error" }
});
},
result => {
console.log("Sending message to", obj.fromInstance);
//send the respObj as a message for the original instance
redis.sendMessage(obj.fromInstance, channel, respObj);
}
);
}
);
const remoteProcessingResponseHandler = curry((resp, obj) => {
resp.status(obj.status);
resp.send(obj);
});
//Communication Channels
const TEST_INFRA_CHANNEL = "testInfraChannel";
const SAVE_RESOURCE_CHANNEL = "saveResourceChannel";
//Message Action ids
const SAVE_RESOURCE = "saveResource";
const SAVE_RESOURCE_RESPONSE = "saveResourceResponse";
const TEST_INFRA = "testInfra";
const TEST_INFRA_RESPONSE = "testInfraResponse";
//...
//...
//Register Callback handlers
const registerCallbackHandler = (
channel,
action,
responseAction,
localProcessingfunction
) => {
console.log(
"registerCallbackHandler for action",
action,
"responseAction",
responseAction,
"channel",
channel
);
redis.subscribeChannel(channel);
redis.onMessage(
remoteProcessingHandler(
channel,
action,
responseAction,
localProcessingfunction
),
channel
);
};
//Already registered Response Callback handlers
const remoteResponseCallbacks = {};
module.exports = {
doProcessingForResource: doProcessingForResource,
actionIds: {
SAVE_RESOURCE,
SAVE_RESOURCE_RESPONSE,
TEST_INFRA,
TEST_INFRA_RESPONSE
},
remoteCommunicationChannels: { SAVE_RESOURCE_CHANNEL, TEST_INFRA_CHANNEL },
registerCallbackHandler: registerCallbackHandler
};