21
21
import com .squareup .javapoet .CodeBlock ;
22
22
import com .squareup .javapoet .ParameterizedTypeName ;
23
23
import com .squareup .javapoet .TypeName ;
24
+ import com .squareup .javapoet .WildcardTypeName ;
24
25
import java .util .Map ;
25
26
import java .util .Optional ;
26
27
import java .util .concurrent .CompletableFuture ;
28
+ import software .amazon .awssdk .awscore .eventstream .EventStreamAsyncResponseTransformer ;
29
+ import software .amazon .awssdk .awscore .eventstream .EventStreamTaggedUnionPojoSupplier ;
30
+ import software .amazon .awssdk .awscore .eventstream .RestEventStreamAsyncResponseTransformer ;
31
+ import software .amazon .awssdk .awscore .exception .AwsServiceException ;
27
32
import software .amazon .awssdk .codegen .model .config .customization .S3ArnableFieldConfig ;
28
33
import software .amazon .awssdk .codegen .model .intermediate .IntermediateModel ;
29
34
import software .amazon .awssdk .codegen .model .intermediate .OperationModel ;
35
+ import software .amazon .awssdk .codegen .model .intermediate .ShapeModel ;
30
36
import software .amazon .awssdk .codegen .poet .PoetExtensions ;
31
37
import software .amazon .awssdk .codegen .poet .client .traits .HttpChecksumRequiredTrait ;
38
+ import software .amazon .awssdk .codegen .poet .eventstream .EventStreamUtils ;
39
+ import software .amazon .awssdk .codegen .poet .model .EventStreamSpecHelper ;
40
+ import software .amazon .awssdk .core .SdkPojoBuilder ;
32
41
import software .amazon .awssdk .core .client .handler .ClientExecutionParams ;
33
42
import software .amazon .awssdk .core .http .HttpResponseHandler ;
34
43
import software .amazon .awssdk .protocols .xml .AwsXmlProtocolFactory ;
@@ -60,21 +69,26 @@ protected Class<?> protocolFactoryClass() {
60
69
@ Override
61
70
public CodeBlock responseHandler (IntermediateModel model ,
62
71
OperationModel opModel ) {
63
-
64
72
if (opModel .hasStreamingOutput ()) {
65
73
return streamingResponseHandler (opModel );
66
74
}
67
75
68
76
ClassName responseType = poetExtensions .getModelClass (opModel .getReturnType ().getReturnType ());
69
77
78
+ if (opModel .hasEventStreamOutput ()) {
79
+ return CodeBlock .builder ()
80
+ .add (eventStreamResponseHandlers (opModel , responseType ))
81
+ .build ();
82
+ }
83
+
70
84
TypeName handlerType = ParameterizedTypeName .get (
71
85
ClassName .get (HttpResponseHandler .class ),
72
86
ParameterizedTypeName .get (ClassName .get (software .amazon .awssdk .core .Response .class ), responseType ));
73
87
74
88
return CodeBlock .builder ()
75
89
.addStatement ("\n \n $T responseHandler = protocolFactory.createCombinedResponseHandler($T::builder, "
76
90
+ "new $T().withHasStreamingSuccessResponse($L))" ,
77
- handlerType , responseType , XmlOperationMetadata .class , opModel .hasStreamingOutput ())
91
+ handlerType , responseType , XmlOperationMetadata .class , opModel .hasStreamingOutput ())
78
92
.build ();
79
93
}
80
94
@@ -159,35 +173,68 @@ public CodeBlock asyncExecutionHandler(IntermediateModel intermediateModel, Oper
159
173
ClassName pojoResponseType = poetExtensions .getModelClass (opModel .getReturnType ().getReturnType ());
160
174
ClassName requestType = poetExtensions .getModelClass (opModel .getInput ().getVariableType ());
161
175
ClassName marshaller = poetExtensions .getRequestTransformClass (opModel .getInputShape ().getShapeName () + "Marshaller" );
176
+ String eventStreamTransformFutureName = "eventStreamTransformFuture" ;
177
+
178
+ CodeBlock .Builder builder = CodeBlock .builder ();
179
+
180
+ if (opModel .hasEventStreamOutput ()) {
181
+ builder .add (eventStreamResponseTransformers (opModel , eventStreamTransformFutureName ));
182
+ }
162
183
163
184
TypeName executeFutureValueType = executeFutureValueType (opModel , poetExtensions );
164
- CodeBlock .Builder builder =
165
- CodeBlock .builder ()
166
- .add ("\n \n $T<$T> executeFuture = clientHandler.execute(new $T<$T, $T>()\n " ,
167
- CompletableFuture .class , executeFutureValueType ,
168
- ClientExecutionParams .class , requestType , pojoResponseType )
169
- .add (".withOperationName(\" $N\" )\n " , opModel .getOperationName ())
170
- .add (".withMarshaller($L)\n " , asyncMarshaller (intermediateModel , opModel , marshaller , "protocolFactory" ))
171
- .add (".withCombinedResponseHandler(responseHandler)\n " )
172
- .add (hostPrefixExpression (opModel ))
173
- .add (".withMetricCollector(apiCallMetricCollector)\n " )
174
- .add (asyncRequestBody (opModel ))
175
- .add (HttpChecksumRequiredTrait .putHttpChecksumAttribute (opModel ));
185
+ String executionResponseTransformerName = "asyncResponseTransformer" ;
186
+
187
+ if (opModel .hasEventStreamOutput ()) {
188
+ executionResponseTransformerName = "restAsyncResponseTransformer" ;
189
+ }
190
+
191
+ builder .add ("\n \n $T<$T> executeFuture = clientHandler.execute(new $T<$T, $T>()\n " ,
192
+ CompletableFuture .class , executeFutureValueType ,
193
+ ClientExecutionParams .class , requestType , pojoResponseType )
194
+ .add (".withOperationName(\" $N\" )\n " , opModel .getOperationName ())
195
+ .add (".withMarshaller($L)\n " , asyncMarshaller (intermediateModel , opModel , marshaller , "protocolFactory" ));
196
+
197
+ if (opModel .hasEventStreamOutput ()) {
198
+ builder .add (".withResponseHandler(responseHandler)" );
199
+ } else {
200
+ builder .add (".withCombinedResponseHandler(responseHandler)" );
201
+ }
202
+
203
+ builder .add (hostPrefixExpression (opModel ))
204
+ .add (".withMetricCollector(apiCallMetricCollector)\n " )
205
+ .add (asyncRequestBody (opModel ))
206
+ .add (HttpChecksumRequiredTrait .putHttpChecksumAttribute (opModel ));
176
207
177
208
s3ArnableFields (opModel , model ).ifPresent (builder ::add );
178
- builder .add (".withInput($L) $L);" , opModel .getInput ().getVariableName (), opModel .hasStreamingOutput () ?
179
- ", asyncResponseTransformer" : "" );
209
+
210
+ builder .add (".withInput($L)" , opModel .getInput ().getVariableName ());
211
+ if (opModel .hasStreamingOutput () || opModel .hasEventStreamOutput ()) {
212
+ builder .add (", $N" , executionResponseTransformerName );
213
+ }
214
+ builder .addStatement (")" );
215
+
180
216
String whenCompleteFutureName = "whenCompleteFuture" ;
181
217
builder .addStatement ("$T $N = null" , ParameterizedTypeName .get (ClassName .get (CompletableFuture .class ),
182
218
executeFutureValueType ), whenCompleteFutureName );
183
- if (opModel .hasStreamingOutput ()) {
219
+
220
+ if (opModel .hasStreamingOutput () || opModel .hasEventStreamOutput ()) {
184
221
builder .addStatement ("$N = executeFuture$L" , whenCompleteFutureName ,
185
- streamingOutputWhenComplete ("asyncResponseTransformer" ));
222
+ whenCompleteBlock (opModel , "asyncResponseHandler" ,
223
+ eventStreamTransformFutureName ));
186
224
} else {
187
225
builder .addStatement ("$N = executeFuture$L" , whenCompleteFutureName , publishMetricsWhenComplete ());
188
226
}
189
- builder .addStatement ("return $T.forwardExceptionTo($N, executeFuture)" , CompletableFutureUtils .class ,
227
+
228
+ builder .addStatement ("$T.forwardExceptionTo($N, executeFuture)" , CompletableFutureUtils .class ,
190
229
whenCompleteFutureName );
230
+
231
+ if (opModel .hasEventStreamOutput ()) {
232
+ builder .addStatement ("return $T.forwardExceptionTo($N, executeFuture)" , CompletableFutureUtils .class ,
233
+ eventStreamTransformFutureName );
234
+ } else {
235
+ builder .addStatement ("return $N" , whenCompleteFutureName );
236
+ }
237
+
191
238
return builder .build ();
192
239
}
193
240
@@ -198,4 +245,122 @@ private String asyncRequestBody(OperationModel opModel) {
198
245
private CodeBlock asyncStreamingExecutionHandler (IntermediateModel intermediateModel , OperationModel opModel ) {
199
246
return super .asyncExecutionHandler (intermediateModel , opModel );
200
247
}
248
+
249
+ private CodeBlock eventStreamResponseHandlers (OperationModel opModel , TypeName pojoResponseType ) {
250
+ CodeBlock streamResponseOpMd = CodeBlock .builder ()
251
+ .add ("$T.builder()" , XmlOperationMetadata .class )
252
+ .add (".hasStreamingSuccessResponse(true)" )
253
+ .add (".build()" )
254
+ .build ();
255
+
256
+
257
+ CodeBlock .Builder builder = CodeBlock .builder ();
258
+
259
+ // Response handler for handling the initial response from the operation. Note, this does not handle the event stream
260
+ // messages, that is the job of "eventResponseHandler" below
261
+ builder .addStatement ("$T<$T> responseHandler = protocolFactory.createResponseHandler($T::builder, $L)" ,
262
+ HttpResponseHandler .class ,
263
+ pojoResponseType ,
264
+ pojoResponseType ,
265
+ streamResponseOpMd );
266
+
267
+ // Response handler responsible for errors for the API call itself, as well as errors sent over the event stream
268
+ builder .addStatement ("$T errorResponseHandler = protocolFactory"
269
+ + ".createErrorResponseHandler()" , ParameterizedTypeName .get (HttpResponseHandler .class ,
270
+ AwsServiceException .class ));
271
+
272
+
273
+ ShapeModel eventStreamShape = EventStreamUtils .getEventStreamInResponse (opModel .getOutputShape ());
274
+ ClassName eventStream = poetExtensions .getModelClassFromShape (eventStreamShape );
275
+ EventStreamSpecHelper eventStreamSpecHelper = new EventStreamSpecHelper (eventStreamShape , intermediateModel );
276
+
277
+ CodeBlock .Builder supplierBuilder = CodeBlock .builder ()
278
+ .add ("$T.builder()" , EventStreamTaggedUnionPojoSupplier .class );
279
+ EventStreamUtils .getEvents (eventStreamShape ).forEach (m -> {
280
+ String builderName = eventStreamSpecHelper .eventBuilderMethodName (m );
281
+ supplierBuilder .add (".putSdkPojoSupplier($S, $T::$N)" , m .getName (), eventStream , builderName );
282
+ });
283
+ supplierBuilder .add (".defaultSdkPojoSupplier(() -> new $T($T.UNKNOWN))" , SdkPojoBuilder .class , eventStream );
284
+ CodeBlock supplierCodeBlock = supplierBuilder .add (".build()" ).build ();
285
+
286
+ CodeBlock nonStreamingOpMd = CodeBlock .builder ()
287
+ .add ("$T.builder()" , XmlOperationMetadata .class )
288
+ .add (".hasStreamingSuccessResponse(false)" )
289
+ .add (".build()" )
290
+ .build ();
291
+
292
+ // The response handler responsible for unmarshalling each event
293
+ builder .addStatement ("$T eventResponseHandler = protocolFactory.createResponseHandler($L, $L)" ,
294
+ ParameterizedTypeName .get (ClassName .get (HttpResponseHandler .class ),
295
+ WildcardTypeName .subtypeOf (eventStream )),
296
+ supplierCodeBlock ,
297
+ nonStreamingOpMd );
298
+
299
+
300
+ return builder .build ();
301
+ }
302
+
303
+ private CodeBlock eventStreamResponseTransformers (OperationModel opModel , String eventTransformerFutureName ) {
304
+ ShapeModel shapeModel = EventStreamUtils .getEventStreamInResponse (opModel .getOutputShape ());
305
+ ClassName pojoResponseType = poetExtensions .getModelClass (opModel .getReturnType ().getReturnType ());
306
+ ClassName eventStreamBaseClass = poetExtensions .getModelClassFromShape (shapeModel );
307
+
308
+ CodeBlock .Builder builder = CodeBlock .builder ();
309
+
310
+ ParameterizedTypeName transformerType = ParameterizedTypeName .get (
311
+ ClassName .get (EventStreamAsyncResponseTransformer .class ),
312
+ pojoResponseType ,
313
+ eventStreamBaseClass );
314
+
315
+ builder .addStatement ("$1T<$2T> $3N = new $1T<>()" , ClassName .get (CompletableFuture .class ),
316
+ ClassName .get (Void .class ), eventTransformerFutureName )
317
+ .add ("$T asyncResponseTransformer = $T.<$T, $T>builder()\n " ,
318
+ transformerType , ClassName .get (EventStreamAsyncResponseTransformer .class ), pojoResponseType ,
319
+ eventStreamBaseClass )
320
+ .add (".eventStreamResponseHandler(asyncResponseHandler)\n " )
321
+ .add (".eventResponseHandler(eventResponseHandler)\n " )
322
+ .add (".initialResponseHandler(responseHandler)\n " )
323
+ .add (".exceptionResponseHandler(errorResponseHandler)\n " )
324
+ .add (".future($N)\n " , eventTransformerFutureName )
325
+ .add (".executor(executor)\n " )
326
+ .add (".serviceName(serviceName())\n " )
327
+ .addStatement (".build()" );
328
+
329
+ ParameterizedTypeName restTransformType =
330
+ ParameterizedTypeName .get (ClassName .get (RestEventStreamAsyncResponseTransformer .class ), pojoResponseType ,
331
+ eventStreamBaseClass );
332
+
333
+ // Wrap the event transformer with this so that the caller's response handler's onResponse() method is invoked. See
334
+ // docs for RestEventStreamAsyncResponseTransformer for more info on why it's needed
335
+ builder .addStatement ("$T restAsyncResponseTransformer = $T.<$T, $T>builder()\n "
336
+ + ".eventStreamAsyncResponseTransformer(asyncResponseTransformer)\n "
337
+ + ".eventStreamResponseHandler(asyncResponseHandler)\n "
338
+ + ".build()" , restTransformType , RestEventStreamAsyncResponseTransformer .class ,
339
+ pojoResponseType , eventStreamBaseClass );
340
+
341
+ return builder .build ();
342
+ }
343
+
344
+ private CodeBlock whenCompleteBlock (OperationModel operationModel , String responseHandlerName ,
345
+ String eventTransformerFutureName ) {
346
+ CodeBlock .Builder whenComplete = CodeBlock .builder ()
347
+ .add (".whenComplete((r, e) -> " )
348
+ .beginControlFlow ("" )
349
+ .beginControlFlow ("if (e != null)" )
350
+ .add ("runAndLogError(log, $S, () -> $N.exceptionOccurred(e));" ,
351
+ "Exception thrown in exceptionOccurred callback, ignoring" ,
352
+ responseHandlerName );
353
+
354
+ if (operationModel .hasEventStreamOutput ()) {
355
+ whenComplete .add ("$N.completeExceptionally(e);" , eventTransformerFutureName );
356
+ }
357
+
358
+ whenComplete .endControlFlow ()
359
+ .add (publishMetrics ())
360
+ .endControlFlow ()
361
+ .add (")" )
362
+ .build ();
363
+
364
+ return whenComplete .build ();
365
+ }
201
366
}
0 commit comments