forked from eclipse-edc/Connector
-
Notifications
You must be signed in to change notification settings - Fork 0
/
nextForState.js
84 lines (72 loc) · 2.76 KB
/
nextForState.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/*
* Copyright (c) 2020 - 2022 Microsoft Corporation
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Microsoft Corporation - initial API and implementation
*
*/
/**
* Returns the next batch of documents that are in a certain state and acquires an exclusive lock on it (a "lease").
* This SPROC will only return items that are not yet leased, or where the lease has expired. Thus, two subsequent calls with identical parameters
* will yield different results.
*
* @param state the desired state
* @param limit the batch size
* @param connectorId The name of the calling runtime
*/
function nextForState(state, limit, connectorId) {
var context = getContext();
var collection = context.getCollection();
var collectionLink = collection.getSelfLink();
var response = context.getResponse();
// first query
var filterQuery = {
'query': 'SELECT * FROM t WHERE t.wrappedInstance.state = @state AND (t.lease = null OR (t.lease.leasedAt + t.lease.leaseDuration) < @now) ORDER BY t.wrappedInstance.stateTimestamp OFFSET 0 LIMIT @limit',
'parameters': [
{
'name': '@state', 'value': parseInt(state, 10)
},
{
'name': '@limit', 'value': parseInt(limit, 10)
},
{
'name': '@leaser', 'value': connectorId
},
{
'name': '@now', 'value': Date.now()
}
]
};
var accept = collection.queryDocuments(collectionLink, filterQuery, {}, function (err, items, responseOptions) {
if (err) throw new Error("Error" + err.message);
if (!items || !items.length || items.length <= 0) {
response.setBody('no docs found')
console.log("No documents found!")
}
console.log("found " + items.length + " documents!")
// add lock to all items
for (var i = 0; i < items.length; i++) {
lease(items[i], connectorId)
}
response.setBody(items)
});
if (!accept) throw "Unable to read document details, abort ";
function lease(document, connectorId) {
document.lease = {
leasedBy: connectorId,
leasedAt: Date.now(),
leaseDuration: 60000
};
var accept = collection.replaceDocument(document._self, document, function (err, itemReplaced) {
if (err) throw "Unable to update Document, abort ";
})
if (!accept) throw "Unable to update Document, abort";
console.log("updated lease of document " + document.id)
}
}