/
sqs-transport.integration.ts
127 lines (113 loc) · 3.57 KB
/
sqs-transport.integration.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import {
CreateTopicCommand,
PublishCommand,
SNSClient
} from '@aws-sdk/client-sns'
import {
DeleteMessageCommand,
DeleteQueueCommand,
PurgeQueueCommand,
ReceiveMessageCommand,
SQSClient
} from '@aws-sdk/client-sqs'
import { Message } from '@node-ts/bus-messages'
import { TestSystemMessage, transportTests } from '@node-ts/bus-test'
import {
SQSMessageBody,
SqsTransport,
fromMessageAttributeMap
} from './sqs-transport'
import { SqsTransportConfiguration } from './sqs-transport-configuration'
function getEnvVar(key: string): string {
const value = process.env[key]
if (!value) {
throw new Error(`Env var not set - ${key}`)
}
return value
}
// Use a randomize number otherwise aws will disallow recreate just deleted queue
// const resourcePrefix = `integration-bus-sqs-${faker.random.number()}`
const resourcePrefix = `integration-bus-sqs-1`
const AWS_REGION = getEnvVar('AWS_REGION')
const AWS_ACCOUNT_ID = getEnvVar('AWS_ACCOUNT_ID')
const sqsConfiguration: SqsTransportConfiguration = {
awsRegion: AWS_REGION,
awsAccountId: AWS_ACCOUNT_ID,
queueName: `${resourcePrefix}-test`,
deadLetterQueueName: `${resourcePrefix}-dead-letter`
}
const manualTopicName = `${resourcePrefix}-test-system-message`
const manualTopicIdentifier = `arn:aws:sns:${process.env.AWS_REGION}:${process.env.AWS_ACCOUNT_ID}:${manualTopicName}`
jest.setTimeout(15000)
describe('SqsTransport', () => {
const sqs = new SQSClient({
endpoint: 'http://localhost:4566',
region: AWS_REGION
})
const sns = new SNSClient({
endpoint: 'http://localhost:4566',
region: AWS_REGION
})
const sqsTransport = new SqsTransport(sqsConfiguration, sqs, sns)
const deadLetterQueueUrl = sqsTransport.deadLetterQueueUrl
beforeAll(async () => {
const createTopic = new CreateTopicCommand({
Name: manualTopicName
})
await sns.send(createTopic)
})
afterAll(async () => {
const appQueueUrl = sqsTransport.queueUrl
await sqs.send(new PurgeQueueCommand({ QueueUrl: appQueueUrl }))
await sqs.send(new DeleteQueueCommand({ QueueUrl: appQueueUrl }))
await sqs.send(new DeleteQueueCommand({ QueueUrl: deadLetterQueueUrl }))
})
const message = new TestSystemMessage()
const publishSystemMessage = async (systemMessageAttribute: string) => {
await sns.send(
new PublishCommand({
Message: JSON.stringify(message),
TopicArn: manualTopicIdentifier,
MessageAttributes: {
'attributes.systemMessage': {
DataType: 'String',
StringValue: systemMessageAttribute
}
}
})
)
}
const readAllFromDeadLetterQueue = async () => {
const result = await sqs.send(
new ReceiveMessageCommand({
QueueUrl: deadLetterQueueUrl,
WaitTimeSeconds: 5,
MaxNumberOfMessages: 10,
AttributeNames: ['All']
})
)
const transportMessages = result.Messages || []
await Promise.all(
transportMessages.map(message =>
sqs.send(
new DeleteMessageCommand({
QueueUrl: deadLetterQueueUrl,
ReceiptHandle: message.ReceiptHandle!
})
)
)
)
return (result.Messages || []).map(transportMessage => {
const rawMessage = JSON.parse(transportMessage.Body!) as SQSMessageBody
const message = JSON.parse(rawMessage.Message) as Message
const attributes = fromMessageAttributeMap(rawMessage.MessageAttributes)
return { message, attributes }
})
}
transportTests(
sqsTransport,
publishSystemMessage,
manualTopicIdentifier,
readAllFromDeadLetterQueue
)
})