-
Notifications
You must be signed in to change notification settings - Fork 101
/
index.js
328 lines (297 loc) · 11.7 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
'use strict';
const cumulusMessageAdapter = require('@cumulus/cumulus-message-adapter-js');
const { InvalidArgument } = require('@cumulus/common/errors');
const get = require('lodash.get');
const clonedeep = require('lodash.clonedeep');
const flatten = require('lodash.flatten');
const keyBy = require('lodash.keyby');
const path = require('path');
const {
handleDuplicateFile,
unversionFilename,
moveGranuleFile,
duplicateHandlingType
} = require('@cumulus/ingest/granule');
const {
isCMRFile,
metadataObjectFromCMRFile,
granulesToCmrFileObjects,
updateCMRMetadata
} = require('@cumulus/cmrjs');
const {
aws: {
buildS3Uri,
s3ObjectExists
},
BucketsConfig
} = require('@cumulus/common');
const { urlPathTemplate } = require('@cumulus/ingest/url-path-template');
const log = require('@cumulus/common/log');
/**
* validates the file matched only one collection.file and has a valid bucket
* config.
* @param {Array<Object>} match - list of matched collection.file
* @param {BucketsConfig} bucketsConfig - instance describing stack configuration.
* @param {Object} file - the fileObject tested.
* @throws {InvalidArgument} - If match is invalid, throws and error.
*/
function validateMatch(match, bucketsConfig, file) {
if (match.length > 1) {
throw new InvalidArgument(`File (${file}) matched more than one collection.regexp.`);
}
if (match.length === 0) {
throw new InvalidArgument(`File (${file}) did not match any collection.regexp.`);
}
if (!bucketsConfig.keyExists(match[0].bucket)) {
throw new InvalidArgument(`Collection config specifies a bucket key of ${match[0].bucket}, `
+ `but the configured bucket keys are: ${Object.keys(bucketsConfig).join(', ')}`);
}
}
/**
* Update the granule metadata where each granule has its files replaced with
* file objects that contain the desired final locations based on the
* `collection.files.regexp`. CMR metadata files have a file type added.
*
* @param {Object} granulesObject - an object of granules where the key is the granuleId
* @param {Object} collection - configuration object defining a collection
* of granules and their files
* @param {Array<Object>} cmrFiles - array of objects that include CMR xmls uris and granuleIds
* @param {BucketsConfig} bucketsConfig - instance associated with the stack
* @returns {Object} new granulesObject where each granules' files are updated with
* the correct target buckets/paths/and s3uri filenames.
*/
async function updateGranuleMetadata(granulesObject, collection, cmrFiles, bucketsConfig) {
const updatedGranules = {};
const cmrFileNames = cmrFiles.map((f) => path.basename(f.filename));
const fileSpecs = collection.files;
await Promise.all(Object.keys(granulesObject).map(async (granuleId) => {
const updatedFiles = [];
updatedGranules[granuleId] = { ...granulesObject[granuleId] };
const cmrFile = cmrFiles.find((f) => f.granuleId === granuleId);
const cmrMetadata = cmrFile ? await metadataObjectFromCMRFile(cmrFile.filename) : {};
granulesObject[granuleId].files.forEach((file) => {
const cmrFileTypeObject = {};
if (cmrFileNames.includes(file.name) && !file.type) {
cmrFileTypeObject.type = 'metadata';
}
const match = fileSpecs.filter((cf) => unversionFilename(file.name).match(cf.regex));
validateMatch(match, bucketsConfig, file);
const URLPathTemplate = file.url_path || match[0].url_path || collection.url_path || '';
const urlPath = urlPathTemplate(URLPathTemplate, {
file,
granule: granulesObject[granuleId],
cmrMetadata
});
const bucketName = bucketsConfig.nameByKey(match[0].bucket);
const filepath = path.join(urlPath, file.name);
updatedFiles.push({
...file, // keeps old info like "name" and "fileStagingDir"
...cmrFileTypeObject, // Add type if the file is a CMR file
...{
bucket: bucketName,
filepath,
filename: `s3://${path.join(bucketName, filepath)}`,
url_path: URLPathTemplate
}
});
});
updatedGranules[granuleId].files = [...updatedFiles];
}));
return updatedGranules;
}
/**
* move file from source bucket to target location, and return the file moved.
* In case of 'version' duplicateHandling, also return the renamed files.
*
* @param {Object} file - granule file to be moved
* @param {string} sourceBucket - source bucket location of files
* @param {string} duplicateHandling - how to handle duplicate files
* @param {BucketsConfig} bucketsConfig - BucketsConfig instance
* @param {boolean} markDuplicates - Override to handle cmr metadata files that
shouldn't be marked as duplicates
* @returns {Array<Object>} returns the file moved and the renamed existing duplicates if any
*/
async function moveFileRequest(
file,
sourceBucket,
duplicateHandling,
bucketsConfig,
markDuplicates = true
) {
const fileStagingDir = file.fileStagingDir || 'file-staging';
const source = {
Bucket: sourceBucket,
Key: `${fileStagingDir}/${file.name}`
};
const target = {
Bucket: file.bucket,
Key: file.filepath
};
// the file moved to destination
const fileMoved = clonedeep(file);
delete fileMoved.fileStagingDir;
const s3ObjAlreadyExists = await s3ObjectExists(target);
log.debug(`file ${target.Key} exists in ${target.Bucket}: ${s3ObjAlreadyExists}`);
const options = (bucketsConfig.type(file.bucket).match('public')) ? { ACL: 'public-read' } : null;
let versionedFiles = [];
if (s3ObjAlreadyExists) {
if (markDuplicates) fileMoved.duplicate_found = true;
// returns renamed files for 'version', otherwise empty array
versionedFiles = await handleDuplicateFile({
source,
target,
copyOptions: options,
duplicateHandling
});
} else {
await moveGranuleFile(source, target, options);
}
// return both file moved and renamed files
return [fileMoved]
.concat(versionedFiles.map((f) => ({
bucket: f.Bucket,
name: path.basename(f.Key),
filename: buildS3Uri(f.Bucket, f.Key),
filepath: f.Key,
size: f.size,
url_path: file.url_path
})));
}
/**
* Move all files in a collection of granules from staging location fo final location,
* and update granule files to include renamed files if any.
*
* @param {Object} granulesObject - an object of the granules where the key is the granuleId
* @param {string} sourceBucket - source bucket location of files
* @param {string} duplicateHandling - how to handle duplicate files
* @param {BucketsConfig} bucketsConfig - BucketsConfig instance
* @returns {Object} the object with updated granules
**/
async function moveFilesForAllGranules(
granulesObject,
sourceBucket,
duplicateHandling,
bucketsConfig
) {
const moveFileRequests = Object.keys(granulesObject).map(async (granuleKey) => {
const granule = granulesObject[granuleKey];
const filesToMove = granule.files.filter((file) => !isCMRFile(file));
const cmrFiles = granule.files.filter((file) => isCMRFile(file));
const filesMoved = await Promise.all(
filesToMove.map(
(file) => moveFileRequest(file, sourceBucket, duplicateHandling, bucketsConfig)
)
);
const markDuplicates = false;
const cmrFilesMoved = await Promise.all(
cmrFiles.map(
(file) => moveFileRequest(file, sourceBucket, 'replace', bucketsConfig, markDuplicates)
)
);
granule.files = flatten(filesMoved).concat(flatten(cmrFilesMoved));
});
await Promise.all(moveFileRequests);
return granulesObject;
}
/**
* Update each of the CMR files' OnlineAccessURL fields to represent the new
* file locations.
*
* @param {Array<Object>} cmrFiles - array of objects that include CMR xmls uris and granuleIds
* @param {Object} granulesObject - an object of the granules where the key is the granuleId
* @param {string} cmrGranuleUrlType - type of granule CMR url
* @param {string} distEndpoint - the api distribution endpoint
* @param {BucketsConfig} bucketsConfig - BucketsConfig instance
* @returns {Promise} promise resolves when all files have been updated
**/
async function updateEachCmrFileAccessURLs(
cmrFiles,
granulesObject,
cmrGranuleUrlType,
distEndpoint,
bucketsConfig
) {
return Promise.all(cmrFiles.map(async (cmrFile) => {
const granuleId = cmrFile.granuleId;
const granule = granulesObject[granuleId];
const updatedCmrFile = granule.files.find(isCMRFile);
return updateCMRMetadata({
granuleId,
cmrFile: updatedCmrFile,
files: granule.files,
distEndpoint,
published: false, // Do the publish in publish-to-cmr step
inBuckets: bucketsConfig,
cmrGranuleUrlType
});
}));
}
/**
* Move Granule files to final Location
* See the schemas directory for detailed input and output schemas
*
* @param {Object} event - Lambda function payload
* @param {Object} event.config - the config object
* @param {string} event.config.bucket - Bucket name where public/private keys are stored
* @param {Object} event.config.buckets - Buckets config
* @param {string} event.config.distribution_endpoint - distribution endpoint for the api
* @param {Object} event.config.collection - collection configuration
* https://nasa.github.io/cumulus/docs/data-cookbooks/setup#collections
* @param {boolean} [event.config.moveStagedFiles=true] - set to false to skip moving files
* from staging to final bucket. Mostly useful for testing.
* @param {Object} event.input - a granules object containing an array of granules
* @returns {Promise} returns the promise of an updated event object
*/
async function moveGranules(event) {
// we have to post the meta-xml file of all output granules
// first we check if there is an output file
const config = event.config;
const bucketsConfig = new BucketsConfig(config.buckets);
const moveStagedFiles = get(config, 'moveStagedFiles', true);
const cmrGranuleUrlType = get(config, 'cmrGranuleUrlType', 'distribution');
const duplicateHandling = duplicateHandlingType(event);
const granulesInput = event.input.granules;
const cmrFiles = granulesToCmrFileObjects(granulesInput);
const granulesByGranuleId = keyBy(granulesInput, 'granuleId');
let movedGranules;
if (cmrGranuleUrlType === 'distribution' && !config.distribution_endpoint) {
throw new Error('cmrGranuleUrlType is distribution, but no distribution endpoint is configured.');
}
// allows us to disable moving the files
if (moveStagedFiles) {
// update allGranules with aspirational metadata (where the file should end up after moving.)
const granulesToMove = await updateGranuleMetadata(
granulesByGranuleId, config.collection, cmrFiles, bucketsConfig
);
// move files from staging location to final location
movedGranules = await moveFilesForAllGranules(
granulesToMove, config.bucket, duplicateHandling, bucketsConfig
);
// update cmr metadata files with correct online access urls
await updateEachCmrFileAccessURLs(
cmrFiles,
movedGranules,
cmrGranuleUrlType,
config.distribution_endpoint,
bucketsConfig
);
} else {
movedGranules = granulesByGranuleId;
}
return {
granules: Object.keys(movedGranules).map((k) => movedGranules[k])
};
}
exports.moveGranules = moveGranules;
/**
* Lambda handler
*
* @param {Object} event - a Cumulus Message
* @param {Object} context - an AWS Lambda context
* @param {Function} callback - an AWS Lambda handler
* @returns {undefined} - does not return a value
*/
function handler(event, context, callback) {
cumulusMessageAdapter.runCumulusTask(moveGranules, event, context, callback);
}
exports.handler = handler;