-
Notifications
You must be signed in to change notification settings - Fork 102
/
index.js
136 lines (121 loc) · 4.02 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
'use strict';
const path = require('path');
const pMap = require('p-map');
const cumulusMessageAdapter = require('@cumulus/cumulus-message-adapter-js');
const errors = require('@cumulus/common/errors');
const lock = require('@cumulus/ingest/lock');
const {
selector: granuleSelector,
duplicateHandlingType
} = require('@cumulus/ingest/granule');
const log = require('@cumulus/common/log');
/**
* Ingest a list of granules
*
* @param {Object} ingest - an ingest object
* @param {string} bucket - the name of an S3 bucket, used for locking
* @param {string} provider - the name of a provider, used for locking
* @param {Object[]} granules - the granules to be ingested
* @returns {Promise.<Array>} - the list of successfully ingested granules
*/
async function download(ingest, bucket, provider, granules) {
log.debug(`awaiting lock.proceed in download() bucket: ${bucket}, `
+ `provider: ${JSON.stringify(provider)}, granuleID: ${granules[0].granuleId}`);
const proceed = await lock.proceed(bucket, provider, granules[0].granuleId);
if (!proceed) {
const err = new errors.ResourcesLockedError('Download lock remained in place after multiple tries');
log.error(err);
throw err;
}
const ingestGranule = async (granule) => {
try {
const startTime = Date.now();
const r = await ingest.ingest(granule, bucket);
const endTime = Date.now();
return {
...r,
sync_granule_duration: endTime - startTime
};
} catch (e) {
log.error(e);
throw e;
}
};
try {
return await pMap(granules, ingestGranule, { concurrency: 1 });
} finally {
await lock.removeLock(bucket, provider.id, granules[0].granuleId);
}
}
/**
* Ingest a list of granules
*
* @param {Object} event - contains input and config parameters
* @returns {Promise.<Object>} - a description of the ingested granules
*/
exports.syncGranule = function syncGranule(event) {
const config = event.config;
const input = event.input;
const stack = config.stack;
const buckets = config.buckets;
const provider = config.provider;
const collection = config.collection;
const forceDownload = config.forceDownload || false;
const downloadBucket = config.downloadBucket;
const duplicateHandling = duplicateHandlingType(event);
// use stack and collection names to suffix fileStagingDir
const fileStagingDir = path.join(
(config.fileStagingDir || 'file-staging'),
stack
);
if (!provider) {
const err = new errors.ProviderNotFound('Provider info not provided');
log.error(err);
return Promise.reject(err);
}
const IngestClass = granuleSelector('ingest', provider.protocol);
const ingest = new IngestClass(
buckets,
collection,
provider,
fileStagingDir,
forceDownload,
duplicateHandling
);
return download(ingest, downloadBucket, provider, input.granules)
.then((granules) => {
if (ingest.end) ingest.end();
const output = { granules };
if (collection && collection.process) output.process = collection.process;
if (config.pdr) output.pdr = config.pdr;
log.debug(`SyncGranule Complete. Returning output: ${JSON.stringify(output)}`);
return output;
}).catch((e) => {
log.debug('SyncGranule errored.');
if (ingest.end) ingest.end();
let errorToThrow = e;
if (e.toString().includes('ECONNREFUSED')) {
errorToThrow = new errors.RemoteResourceError('Connection Refused');
} else if (e.details && e.details.status === 'timeout') {
errorToThrow = new errors.ConnectionTimeout('connection Timed out');
}
log.error(errorToThrow);
throw errorToThrow;
});
};
/**
* Lambda handler
*
* @param {Object} event - a Cumulus Message
* @param {Object} context - an AWS Lambda context
* @param {Function} callback - an AWS Lambda handler
* @returns {undefined} - does not return a value
*/
exports.handler = function handler(event, context, callback) {
cumulusMessageAdapter.runCumulusTask(
exports.syncGranule,
event,
context,
callback
);
};