-
Notifications
You must be signed in to change notification settings - Fork 2
/
SqsWithUniqueIdGeneration.cs
170 lines (155 loc) · 6.75 KB
/
SqsWithUniqueIdGeneration.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
using System.Collections.Generic;
using Amazon.CDK;
using Amazon.CDK.AWS.APIGateway;
using Amazon.CDK.AWS.IAM;
using Amazon.CDK.AWS.Logs;
using Amazon.CDK.AWS.SQS;
using Amazon.CDK.AWS.StepFunctions;
using Amazon.CDK.AWS.StepFunctions.Tasks;
using Constructs;
namespace ArchitecturePatterns.NET.CDK.Patterns.StorageFirstApi;
internal class SqsWithUniqueIdGeneration : Construct
{
public AwsIntegration WorkflowQueueIntegration { get; private set; }
public StateMachine Workflow { get; }
public IQueue Queue { get; }
public IQueue ErrorQueue { get; }
public SqsWithUniqueIdGeneration(
Construct scope,
string id,
IRole integrationRole,
string integrationName) : base(
scope,
id)
{
// Create role to be assumed by the workflow.
var workflowRole = new Role(this, $"{integrationName}WorkflowRole", new RoleProps
{
AssumedBy = new ServicePrincipal("states.amazonaws.com")
});
// Generate the Queue for storing the messages, as well as a dead letter queue to handle failures.
this.ErrorQueue = new Queue(this, $"{integrationName}StorageDLQ", new QueueProps());
Queue = new Queue(this, $"{integrationName}StorageQueue", new QueueProps
{
DeadLetterQueue = new DeadLetterQueue
{
MaxReceiveCount = 3,
Queue = this.ErrorQueue
}
});
Queue.GrantSendMessages(workflowRole);
// Log group for enabling StepFunctions express workflow logs
var logGroup = new LogGroup(this, $"{integrationName}WorkflowLogGroup", new LogGroupProps
{
Retention = RetentionDays.ONE_DAY,
RemovalPolicy = RemovalPolicy.DESTROY
});
logGroup.GrantWrite(workflowRole);
// Create the workflow definition. The workflow:
// 1. Generates a unique identifier using the States.UUID() Intrinsic function.
// 2. Sends the message to SQS.
// 3. Formats the response to return.
var workflowDefinition = new Pass(scope, "GenerateCaseId", new PassProps
{
Parameters = new Dictionary<string, object>(4)
{
{ "payload", JsonPath.EntirePayload },
{ "identifier.$", "States.UUID()" }
}
})
.Next(new Parallel(this, "SendAcknowledgementAndProcess", new ParallelProps
{
ResultPath = JsonPath.DISCARD
}).Branch(
new SqsSendMessage(this, $"{integrationName}WorkflowSendMessage", new SqsSendMessageProps
{
MessageBody = TaskInput.FromJsonPathAt("$"),
Queue = Queue,
ResultPath = JsonPath.DISCARD
}), new Choice(this, "DoesHaveResponseChannel").When(Condition.IsPresent("$.payload.responseChannel"),
new CallAwsService(this, $"{integrationName}SendMessageToResponseChannel", new CallAwsServiceProps()
{
Service = "sqs",
Action = "sendMessage",
Parameters = new Dictionary<string, object>(2)
{
{"MessageBody", JsonPath.StringAt("$.payload.correlationId")},
{"QueueUrl", JsonPath.StringAt("$.payload.responseChannel")}
},
IamResources = new []{"*"},
ResultPath = JsonPath.DISCARD
}))
.Otherwise(new Pass(this, "NoResponseRequired")))
.Next(new Pass(scope, $"{integrationName}FormatResponse", new PassProps
{
Parameters = new Dictionary<string, object>(4)
{
{ "identifier", JsonPath.StringAt("$.identifier") }
}
})));
// Create the workflow.
Workflow = new StateMachine(
scope,
$"{integrationName}StorageWorkflow",
new StateMachineProps
{
Definition = workflowDefinition,
Role = workflowRole,
StateMachineType = StateMachineType.EXPRESS,
Timeout = Duration.Seconds(30),
TracingEnabled = true,
Logs = new LogOptions
{
Level = LogLevel.ALL,
IncludeExecutionData = true,
Destination = logGroup
}
});
// Allow the API Gateway integration role to start a synchronous execution of the workflow.
Workflow.GrantStartSyncExecution(integrationRole);
// Create the AWS integration.
WorkflowQueueIntegration = new AwsIntegration(
new AwsIntegrationProps
{
Service = "states",
Action = "StartSyncExecution",
IntegrationHttpMethod = "POST",
Options = new IntegrationOptions
{
CredentialsRole = integrationRole,
RequestTemplates = new Dictionary<string, string>
{
// The request template requires both the state machine ARN, as well as the input being passed in. Use the entire request body.
{
"application/json",
"{ \"stateMachineArn\": \"" + Workflow.StateMachineArn +
"\", \"input\": \"$util.escapeJavaScript($input.json('$'))\" }"
}
},
IntegrationResponses = new List<IIntegrationResponse>(3)
{
new IntegrationResponse
{
StatusCode = "200",
ResponseTemplates = new Dictionary<string, string>(1)
{
// For the response, include the output.
{
"application/json",
"{ \"result\": $input.json('$.output'), \"status\": \"$input.json('$.status')\" }"
}
}
},
new IntegrationResponse
{
StatusCode = "400"
},
new IntegrationResponse
{
StatusCode = "500"
}
}.ToArray()
}
});
}
}