From 683d27887735c14a5bb26b90c2e0ac5ec9579d9a Mon Sep 17 00:00:00 2001 From: Aleksey Krichevskiy Date: Thu, 25 Sep 2025 00:19:29 +0300 Subject: [PATCH] fix: KplMessageHandler.setGlueSchema() Signed-off-by: Aleksey Krichevskiy --- .../integration/aws/outbound/KplMessageHandler.java | 2 +- .../aws/outbound/KplMessageHandlerTests.java | 11 +++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java index 5b1d2e7..fdffb87 100644 --- a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java +++ b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java @@ -229,7 +229,7 @@ public void setHeaderMapper(HeaderMapper headerMapper) { * @see UserRecord#setSchema(Schema) */ public void setGlueSchema(Schema glueSchema) { - setPartitionKeyExpression(new ValueExpression<>(glueSchema)); + setGlueSchemaExpression(new ValueExpression<>(glueSchema)); } /** diff --git a/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java b/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java index 1081bb9..b1475ff 100644 --- a/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java +++ b/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java @@ -18,6 +18,7 @@ import com.amazonaws.services.kinesis.producer.KinesisProducer; import com.amazonaws.services.kinesis.producer.UserRecord; +import com.amazonaws.services.schemaregistry.common.Schema; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -58,6 +59,9 @@ @DirtiesContext public class KplMessageHandlerTests { + @Autowired + protected Schema schema; + @Autowired protected KinesisProducer kinesisProducer; @@ -89,6 +93,7 @@ void kplMessageHandlerWithRawPayloadBackpressureDisabledSuccess() { assertThat(userRecord.getStreamName()).isEqualTo("someStream"); assertThat(userRecord.getPartitionKey()).isEqualTo("somePartitionKey"); assertThat(userRecord.getExplicitHashKey()).isNull(); + assertThat(userRecord.getSchema()).isSameAs(this.schema); } @Test @@ -116,6 +121,7 @@ void kplMessageHandlerWithRawPayloadBackpressureEnabledCapacityAvailable() { assertThat(userRecord.getStreamName()).isEqualTo("someStream"); assertThat(userRecord.getPartitionKey()).isEqualTo("somePartitionKey"); assertThat(userRecord.getExplicitHashKey()).isNull(); + assertThat(userRecord.getSchema()).isSameAs(this.schema); } @Test @@ -174,9 +180,14 @@ public MessageHandler kplMessageHandler(KinesisProducer kinesisProducer) { KplMessageHandler kplMessageHandler = new KplMessageHandler(kinesisProducer); kplMessageHandler.setAsync(true); kplMessageHandler.setStream("someStream"); + kplMessageHandler.setGlueSchema(schema()); return kplMessageHandler; } + @Bean + public Schema schema() { + return new Schema("syntax=\"proto2\";", "PROTOBUF", "testschema"); + } } }