Skip to content

Commit

Permalink
Dla surety (#3621)
Browse files Browse the repository at this point in the history
* error back out to top level in parsing

* correctly test for [] with bad granules

* making sure messageBody is logged on a parsing failure

* looking at how this passes through in integration tests

* looking at how this passes through in integration tests

* did adding a logger change the structure of the sqs message??

* is body:Body an auto-nesting based on some conformity?

* Revert "is body:Body an auto-nesting based on some conformity?"

This reverts commit b862c33.

* de-nesting in sf-event

* if sqs message bodies can get nested, we need to recursively unpack

* fix typing in sf-event

* tracing the structure of the message as it gets de-nested

* fixing DLQrecord relationship with SQS record

* obvious mistake in recursive de-nesting to proper object

* switch nothing granules return to null as per review request

* integration test with nulls and executionArn for fileName

* more complete testing for valid values and why isn't error showing up

* does it need to be just inside the firs tnesting?

* use identical invocation to master to identify difference

* and some logging to check the state in transit

* further investigate raw incoming message

* working with the de-nesting and original send side

* back to error inclusino in sf-event

* add variable nesting to unit tests

* typing for unwrapping messageBody

* linter errors

* having given null as no granule entry found, have to check for it

* further type securing in write-db-dlq

* further fixed typing in message parsing

* added smal update to changelog

* cleanus in write-db

* success and failure cases for parsing in Dlq test

* lint fixes

* linting

* move SQSRecord typing to SQS

* make log argument actually optional in SQS.sendSQSMessage

* DLQRecord and DLA Record definitions in DeadLetterMessage

* suggested jsdoc update

* removed unnecessary spacer line

* kick out test output on failure

* added flexibility in error catchment

* correct-er params

* typedef to remove multiline doc type declaration

* refactor to put deadletter message definitions in the types package

* docstring typign removal

* inclusion of provider id and collectionId

* fix dosctring description of hoist

* tests and test error fixes

* get setup error output

* expand checkign surrounding the provider ID extraction

* adding tests for type check behaviors

* get buidl cache workign right agin

* expanding and fixing tests

* clearer typing in write-db

* linter errors

* update integration test to be correct

* add logger to debug integration tests

* lint fix

* weird error comes from strange provider config

* rearrange for better optimization

* WIP

* remove jsdocstring from ts file

* remove bad .only test

* expand dlq tests to include sqs tests

* Update packages/api/lambdas/write-db-dlq-records-to-s3.js

Co-authored-by: Jonathan Kovarik <kovarik@nsidc.org>

* furhter wip typing in hoist

* simplify typecheck in Providers.ts

* more concrete naming in sqs message nestedness

* typo fix

* fixing sqs vs dlq overlap in testing

* WIP super broken

* works with heavy babying

* cleanup from testing

* celanup from testing

* first commit

* CHANGELOG

* shift s3List objects to have better type output

* works right now

* back to forEach syntax

* rename to be less inane

* fixing and changing tests to new format

* fixing tests

* fixing tests

* removing test code

* fixing test (final)

* cleaning

* fixing int test

* output to dist and gitignore appropriately

* random typing cleanup in S3.ts

* updates found in the course of writing tests

* test all functionalities

* needs aws-sdk/client-s3

* changing to new path

* typing cleanup in S3

* linting and docstrings

* cleanup imports

* further testing

* move non-user scripts to 'ci_scripts'

* move update script to scripts

* console log cleanups

* update changelog

* mistaken replace fixed

* correctly point the coverage python script

* correctly move testing of hoist over to test-DeadLttermessage

* package.json cleanup

* ad newline to changelog

* removing eventdate

* changing unknown time

* changing unknown time

* link with lerna to get dependencies right

* turn on the rest of the tests

* lint fixes

* fixing test

* add prepare to package.json

* typing and concurrency fixes

* curent coverage values

* PR feedback

* small test fix

* stupid solution but works?

* trying to track down difference

* PR feedback

* seeing that we skip 'prepare' trying to figure out how to correctly link inc i

* seeing that we skip 'prepare' trying to figure out how to correctly link inc i

* build and package both just in case

* small jsdoc fixes

* use hoist as declared in DeadLetterMessage.ts

* trying to figure out how to build in ci

* use batch listObjects Iterable

* work with timestamp locations

* move to lambdas directory

* tf linkages

* move into single file: index.ts

* include linkage to updateDLA in api

* lint fixes

* lowering coverage thresholdds temporarily while I get ci to work

* shift ci_scripts back to scripts

* cleanup debug from unit_tests.sh

* CUMULUS-3617: Add lambdas to migrate DLA messages

* more updates

* add unit test and fix recursivelyDeleteS3Bucket

* move recursivelyDeleteS3Bucket to batched v2

* skip maxKeys question

* Update changelog

* update test coverage

* add tf config fix test coverage

* 3609 incorporated first pass

* excising references to old dla format

* WIP conversion

* temp breaking ci to get a clear output

* fix errors in lambda

* Revert "temp breaking ci to get a clear output"

This reverts commit cc9c651.

* more merging in tests and functionality

* get me some kind of ts build cache output

* add cumulus/logger dependency

* does having a precompile kill the second compile??

* index is able to compile

* fix up some debris from translation

* linter fixes

* proper import to fix structure

* add coverage script

* updates to bring tests up to proper function

* remove manipulateTrailingSlash

* changelog

* remove leftover test of generate-build-cache- printout

* move to skipping subdirectories

* docs and cleanup according to PR review

* cumulus imports as the last packages imported

* remove stopOnError: false for internal error catchment

* warning note for future developers

* bolder warning

* remove unused line in tsconfig.eslint

* added tests for graceful handling of bad files

* need to be serial

* some fixes from pr review

* move to use of aws's types without override

* additional notes on dla migration in changelog

* @Cumulus import order

* decrease log chattiness in test

* cleanup test pathing

* coverage fix

* maybe tell me what's wrong?

* fixed missing dependency

* linter errors

* added in missed import of aws-lambda

* de-nestsqsMessages in hoistDLA

* docs update to reflect non-nested body attribute

* CHANGELOG with dummy ticket number while I can't access jira

* fix body in test-index.js

* investigating states in the process lambda

* super dumb error logs

* investigating states in the process lambda

* move to non-nested dates structure

* little bit more debugging

* WIP looks successful but doesn't transfer files as expected

* still trying to debug vanished message

* mistaken function name fixed

* cleanup from debugging

* fix ticket numberin changelog

* fix error log messages from mangled merge

* linter cleanup

* fix unit test Body vs body

* ensure : character doesn't come into  filename by extracting name

* handle / or :

* update changelog

* better target paht parsing

* integration test broken in two for clearer test results

* unit tests upates and associated code changes

* thinking a little more about naming

* docs format

* dlaSpec working

* trying to have a better name

* able to pass through sensible executionArn and name into fake dla

* check on what cumulusMessage is actualy ariving

* trying to parse where the message is going wrong

* fixed dlaprocessing spec

* remove old debug message

* default args to new function parameters to ensure backwards compat

* use dead-letter-archive/sqs replace as per pr review request

* get process.env.stackName as default stackName

* flip over key getter with comment

* cleanup comments

* getDLAFailureKey to simplify out replace calls

* fix to make sure dla-migration has access to dlaroot

* clearer documentation

* change it

* slightly cleareer documentation

* update supertest version, appears to all work

* does this fix my error?

---------

Co-authored-by: ecarton <ethan.j.carton@nasa.gov>
Co-authored-by: Jonathan Kovarik <kovarik@nsidc.org>
Co-authored-by: Naga Nages <nnagavenkata@yahoo.com>
Co-authored-by: Naga Nages <66387215+Nnaga1@users.noreply.github.com>
Co-authored-by: jennyhliu <jenny.h.liu@nasa.gov>
Co-authored-by: jennyhliu <34660846+jennyhliu@users.noreply.github.com>
  • Loading branch information
7 people committed Apr 24, 2024
1 parent ae1da0c commit 43944c1
Show file tree
Hide file tree
Showing 13 changed files with 468 additions and 369 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ the CloudWatch logs for your async operations (e.g. `PREFIX-AsyncOperationEcsLog

### Changed

- **CUMULUS-3629**
- dla guarantees de-nested SQS message bodies, preferring outermost metadata as found.
- dla uses execution Name as filename and ensures no ':' or '/' characters in name
- **CUMULUS-3570**
- Updated Kinesis docs to support latest AWS UI and recommend server-side encryption.
- **CUMULUS-3519**
Expand Down
4 changes: 1 addition & 3 deletions docs/features/dead_letter_archive.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ The body attribute should be a JSON string containing an event bridge event

Note that

- The body attribute *can* come nested, such that you will have to de-nest a series of body attributes to get to the heart of your message
- The word body can be interchanged with Body (capitalized)
- Because this message body arrived in the Dead Letter Archive because of issues in processing it, there is no strict guarantee that it is a valid json object, or conforms to expected structure. the *expected* structure follows.
- Automated processing of these messages *must* be prepared for attributes to be missing.

Expand Down Expand Up @@ -144,4 +142,4 @@ to `date` folder under Dead Letter Archive S3 storage location.
)
```

See `[SQL reference for Athena](https://docs.aws.amazon.com/athena/latest/ug/ddl-sql-reference.html)` for the complete SQL guide.
See [SQL reference for Athena](https://docs.aws.amazon.com/athena/latest/ug/ddl-sql-reference.html) for the complete SQL guide.
225 changes: 225 additions & 0 deletions example/spec/parallel/dbRecords/DLQSpec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
'use strict';

const { InvokeCommand } = require('@aws-sdk/client-lambda');
const moment = require('moment');
const { lambda, s3 } = require('@cumulus/aws-client/services');
const { randomString } = require('@cumulus/common/test-utils');
const {
deleteS3Object,
s3ObjectExists,
listS3ObjectsV2,
getObject,
getObjectStreamContents,
} = require('@cumulus/aws-client/S3');
const {
deleteAsyncOperation,
getAsyncOperation,
} = require('@cumulus/api-client/asyncOperations');

const { waitForListObjectsV2ResultCount } = require('@cumulus/integration-tests');
const { postRecoverCumulusMessages } = require('@cumulus/api-client/deadLetterArchive');
const { loadConfig } = require('../../helpers/testUtils');
const {
waitForApiStatus,
} = require('../../helpers/apiUtils');
describe('when a bad record is ingested', () => {
let stackName;
let systemBucket;
let executionArn;
let leftoverS3Key;

beforeAll(async () => {
const config = await loadConfig();
stackName = config.stackName;
systemBucket = config.bucket;
});
afterAll(async () => {
await deleteS3Object(
systemBucket,
leftoverS3Key
);
});
describe('with full metadata present', () => {
let failedMessageS3Key;
let beforeAllFailed;
beforeAll(async () => {
executionArn = `execution-${randomString(16)}`;
const { $metadata } = await lambda().send(new InvokeCommand({
FunctionName: `${stackName}-sfEventSqsToDbRecords`,
InvocationType: 'RequestResponse',
Payload: JSON.stringify({
env: {},
Records: [{
Body: JSON.stringify({
time: '2024-03-11T18:58:27Z',
detail: {
executionArn: executionArn,
stateMachineArn: '1234',
status: 'RUNNING',
input: JSON.stringify({
meta: {
collection: {
name: 'A_COLLECTION',
version: '12',
},
provider: {
id: 'abcd',
protocol: 'a',
host: 'b',
},
},
payload: {
granules: [{ granuleId: 'a' }],
},
}),
},
}),
}],
}),
}));
if ($metadata.httpStatusCode >= 400) {
console.log(`lambda invocation to set up failed, code ${$metadata.httpStatusCode}`);
beforeAllFailed = true;
return;
}
console.log(`Waiting for the creation of failed message for execution ${executionArn}`);
const prefix = `${stackName}/dead-letter-archive/sqs/2024-03-11/${executionArn}`;
try {
await expectAsync(waitForListObjectsV2ResultCount({
bucket: systemBucket,
prefix,
desiredCount: 1,
interval: 5 * 1000,
timeout: 30 * 1000,
})).toBeResolved();
// fetch key for cleanup
const listResults = await listS3ObjectsV2({
Bucket: systemBucket,
Prefix: prefix,
});
failedMessageS3Key = listResults[0].Key;
} catch (error) {
console.log(`Did not find expected S3 Object: ${error}`);
beforeAllFailed = true;
}
});
it('is sent to the DLA and processed to have expected metadata fields', async () => {
if (beforeAllFailed) fail('beforeAllFailed');
const s3Object = await getObject(
s3(),
{
Bucket: systemBucket,
Key: failedMessageS3Key,
}
);
const fileBody = await getObjectStreamContents(s3Object.Body);
const parsed = JSON.parse(fileBody);
expect(parsed.status).toEqual('RUNNING');
expect(parsed.time).toEqual('2024-03-11T18:58:27Z');
expect(parsed.stateMachineArn).toEqual('1234');
expect(parsed.collectionId).toEqual('A_COLLECTION___12');
expect(parsed.executionArn).toEqual(executionArn);
expect(parsed.granules).toEqual(['a']);
expect(parsed.providerId).toEqual('abcd');
expect(parsed.error).toEqual('UnmetRequirementsError: Could not satisfy requirements for writing records to PostgreSQL. No records written to the database.');
});
it('yields a message to the DLA which can be handled correctly by process-s3-dead-letter-archive', async () => {
if (beforeAllFailed) fail('beforeAllFailed');
const postRecoverResponse = await postRecoverCumulusMessages(
{
prefix: stackName,
payload: {
bucket: systemBucket,
path: failedMessageS3Key,
},
}
);
const deadLetterRecoveryAsyncOpId = JSON.parse(postRecoverResponse.body).id;
await waitForApiStatus(
getAsyncOperation,
{
prefix: stackName,
asyncOperationId: deadLetterRecoveryAsyncOpId,
},
'SUCCEEDED'
);
const postRecoveryFailedKey = failedMessageS3Key.replace('sqs/', 'failed-sqs/');
expect(await s3ObjectExists({
Bucket: systemBucket,
Key: failedMessageS3Key,
})).toEqual(false);
expect(await s3ObjectExists({
Bucket: systemBucket,
Key: postRecoveryFailedKey,
})).toEqual(true);
leftoverS3Key = postRecoveryFailedKey;
await deleteAsyncOperation({
prefix: stackName,
asyncOperationId: deadLetterRecoveryAsyncOpId,
});
});
});
it('is sent to the DLA and processed to have expected metadata fields even when data is not found', async () => {
executionArn = `execution-${randomString(16)}`;
const { $metadata } = await lambda().send(new InvokeCommand({
FunctionName: `${stackName}-sfEventSqsToDbRecords`,
InvocationType: 'RequestResponse',
Payload: JSON.stringify({
env: {},
Records: [{
Body: JSON.stringify({
detail: {
executionArn: executionArn,
input: JSON.stringify({
a: 'sldkj',
}),
},
}),
}],
}),
}));
if ($metadata.httpStatusCode >= 400) {
fail(`lambda invocation to set up failed, code ${$metadata.httpStatusCode}`);
}
console.log(`Waiting for the creation of failed message for execution ${executionArn}`);
const prefix = `${stackName}/dead-letter-archive/sqs/${moment.utc().format('YYYY-MM-DD')}/${executionArn}`;
let failedMessageS3Key;
try {
await expectAsync(waitForListObjectsV2ResultCount({
bucket: systemBucket,
prefix,
desiredCount: 1,
interval: 5 * 1000,
timeout: 30 * 1000,
})).toBeResolved();
// fetch key for cleanup
const listResults = await listS3ObjectsV2({
Bucket: systemBucket,
Prefix: prefix,
});
failedMessageS3Key = listResults[0].Key;
} catch (error) {
fail(`Did not find expected S3 Object: ${error}`);
}
const s3Object = await getObject(
s3(),
{
Bucket: systemBucket,
Key: failedMessageS3Key,
}
);
const fileBody = await getObjectStreamContents(s3Object.Body);

const parsed = JSON.parse(fileBody);

expect(parsed.status).toEqual(null);
expect(parsed.time).toEqual(null);
expect(parsed.stateMachineArn).toEqual(null);
expect(parsed.collectionId).toEqual(null);
expect(parsed.executionArn).toEqual(executionArn);
expect(parsed.granules).toEqual(null);
expect(parsed.providerId).toEqual(null);
expect(parsed.error).toEqual('CumulusMessageError: getMessageWorkflowStartTime on a message without a workflow_start_time');
leftoverS3Key = failedMessageS3Key;
});
});
Loading

0 comments on commit 43944c1

Please sign in to comment.