From 921aeeebdf3c3ac8611acd55f305cfadcc50983b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Sergio=20Pe=C3=B1a?=
Date: Wed, 13 Apr 2022 18:59:18 -0500
Subject: [PATCH] fix: Create stream fails when multiple Protobuf schema
definitions exist (#8933)
---
.../ksql/inference/DefaultSchemaInjector.java | 38 +++
.../schema/registry/SchemaRegistryUtil.java | 11 +-
.../registry/SchemaRegistryUtilTest.java | 35 ++-
.../ValueSpecProtobufSerdeSupplier.java | 19 +-
.../confluent/ksql/test/utils/SerdeUtil.java | 2 +-
.../7.3.0_1649453206637/plan.json | 227 ++++++++++++++++++
.../7.3.0_1649453206637/spec.json | 146 +++++++++++
.../7.3.0_1649453206637/topology | 13 +
.../7.3.0_1649453337688/plan.json | 227 ++++++++++++++++++
.../7.3.0_1649453337688/spec.json | 148 ++++++++++++
.../7.3.0_1649453337688/topology | 13 +
.../query-validation-tests/protobuf.json | 75 ++++++
.../with/CreateSourcePropertiesTest.java | 2 +
.../execution/InsertValuesExecutor.java | 65 ++++-
.../execution/InsertValuesExecutorTest.java | 102 ++++++--
.../java/io/confluent/ksql/serde/Format.java | 11 +
.../ksql/serde/connect/ConnectFormat.java | 11 +
.../ksql/serde/protobuf/ProtobufFormat.java | 20 ++
.../protobuf/ProtobufSchemaTranslator.java | 10 +-
.../protobuf/KsqlProtobufSerializerTest.java | 2 +-
.../serde/protobuf/ProtobufFormatTest.java | 55 +++++
.../ProtobufSchemaTranslatorTest.java | 104 ++++++++
22 files changed, 1282 insertions(+), 54 deletions(-)
create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/protobuf_-_specify_a_protobuf_schema_from_multiple_protobuf_schema_definitions/7.3.0_1649453206637/plan.json
create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/protobuf_-_specify_a_protobuf_schema_from_multiple_protobuf_schema_definitions/7.3.0_1649453206637/spec.json
create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/protobuf_-_specify_a_protobuf_schema_from_multiple_protobuf_schema_definitions/7.3.0_1649453206637/topology
create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/protobuf_-_specify_a_protobuf_schema_name_with_SCHEMA_FULL_NAME/7.3.0_1649453337688/plan.json
create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/protobuf_-_specify_a_protobuf_schema_name_with_SCHEMA_FULL_NAME/7.3.0_1649453337688/spec.json
create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/protobuf_-_specify_a_protobuf_schema_name_with_SCHEMA_FULL_NAME/7.3.0_1649453337688/topology
create mode 100644 ksqldb-serde/src/test/java/io/confluent/ksql/serde/protobuf/ProtobufFormatTest.java
diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjector.java b/ksqldb-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjector.java
index 35914ea4cec6..659bb74c1cde 100644
--- a/ksqldb-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjector.java
+++ b/ksqldb-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjector.java
@@ -19,6 +19,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.collect.Sets.SetView;
+import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.execution.ddl.commands.CreateSourceCommand;
import io.confluent.ksql.execution.expression.tree.Type;
@@ -491,6 +492,29 @@ private static CreateAsSelect addSchemaFieldsCas(
return statement.copyWith(newProperties);
}
+ private static void throwOnMultiSchemaDefinitions(
+ final ParsedSchema schema,
+ final Format schemaFormat,
+ final boolean isKey
+ ) {
+ final List schemaFullNames = schemaFormat.schemaFullNames(schema);
+ if (schemaFullNames.size() > 1) {
+ final String schemaFullNameConfig = isKey
+ ? CommonCreateConfigs.KEY_SCHEMA_FULL_NAME
+ : CommonCreateConfigs.VALUE_SCHEMA_FULL_NAME;
+
+ throw new KsqlException(
+ (isKey ? "Key" : "Value") + " schema has multiple schema definitions. "
+ + System.lineSeparator()
+ + System.lineSeparator()
+ + schemaFullNames.stream().map(n -> "- " + n).collect(Collectors.joining("\n"))
+ + System.lineSeparator()
+ + System.lineSeparator()
+ + "Please specify a schema full name in the WITH clause using "
+ + schemaFullNameConfig);
+ }
+ }
+
private static CreateSource addSchemaFields(
final ConfiguredStatement preparedStatement,
final Optional keySchema,
@@ -504,6 +528,20 @@ private static CreateSource addSchemaFields(
final Optional keySchemaName;
final Optional valueSchemaName;
+ if (keySchema.isPresent() && !properties.getKeySchemaFullName().isPresent()) {
+ final Format keyFormat = FormatFactory.of(
+ SourcePropertiesUtil.getKeyFormat(properties, statement.getName()));
+
+ throwOnMultiSchemaDefinitions(keySchema.get().rawSchema, keyFormat, true);
+ }
+
+ if (valueSchema.isPresent() && !properties.getValueSchemaFullName().isPresent()) {
+ final Format valueFormat = FormatFactory.of(
+ SourcePropertiesUtil.getValueFormat(properties));
+
+ throwOnMultiSchemaDefinitions(valueSchema.get().rawSchema, valueFormat, false);
+ }
+
// Only populate key and value schema names when schema ids are explicitly provided
if (properties.getKeySchemaId().isPresent() && keySchema.isPresent()) {
keySchemaName = Optional.ofNullable(keySchema.get().rawSchema.name());
diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/schema/registry/SchemaRegistryUtil.java b/ksqldb-engine/src/main/java/io/confluent/ksql/schema/registry/SchemaRegistryUtil.java
index 4c1f69a07f1d..a9af042235c4 100644
--- a/ksqldb-engine/src/main/java/io/confluent/ksql/schema/registry/SchemaRegistryUtil.java
+++ b/ksqldb-engine/src/main/java/io/confluent/ksql/schema/registry/SchemaRegistryUtil.java
@@ -106,7 +106,16 @@ public static boolean subjectExists(
return getLatestSchema(srClient, subject).isPresent();
}
- public static Optional getParsedSchema(
+ public static Optional getLatestSchemaId(
+ final SchemaRegistryClient srClient,
+ final String topic,
+ final boolean isKey
+ ) {
+ final String subject = KsqlConstants.getSRSubject(topic, isKey);
+ return getLatestSchema(srClient, subject).map(SchemaMetadata::getId);
+ }
+
+ public static Optional getLatestParsedSchema(
final SchemaRegistryClient srClient,
final String topic,
final boolean isKey
diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/schema/registry/SchemaRegistryUtilTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/schema/registry/SchemaRegistryUtilTest.java
index e45f548024e0..9e5c1f20ec9d 100644
--- a/ksqldb-engine/src/test/java/io/confluent/ksql/schema/registry/SchemaRegistryUtilTest.java
+++ b/ksqldb-engine/src/test/java/io/confluent/ksql/schema/registry/SchemaRegistryUtilTest.java
@@ -77,12 +77,43 @@ public void shouldReturnParsedSchemaFromSubjectValue() throws Exception {
// When:
final Optional parsedSchema =
- SchemaRegistryUtil.getParsedSchema(schemaRegistryClient, "bar", false);
+ SchemaRegistryUtil.getLatestParsedSchema(schemaRegistryClient, "bar", false);
// Then:
assertThat(parsedSchema.get(), equalTo(AVRO_SCHEMA));
}
+ @Test
+ public void shouldReturnSchemaIdFromSubjectKey() throws Exception {
+ // Given:
+ when(schemaMetadata.getId()).thenReturn(123);
+ when(schemaRegistryClient.getLatestSchemaMetadata("bar-key"))
+ .thenReturn(schemaMetadata);
+
+ // When:
+ final Optional schemaId =
+ SchemaRegistryUtil.getLatestSchemaId(schemaRegistryClient, "bar", true);
+
+ // Then:
+ assertThat(schemaId.get(), equalTo(123));
+ }
+
+ @Test
+ public void shouldReturnSchemaIdFromSubjectValue() throws Exception {
+ // Given:
+ when(schemaMetadata.getId()).thenReturn(123);
+ when(schemaRegistryClient.getLatestSchemaMetadata("bar-value"))
+ .thenReturn(schemaMetadata);
+
+ // When:
+ final Optional schemaId =
+ SchemaRegistryUtil.getLatestSchemaId(schemaRegistryClient, "bar", false);
+
+ // Then:
+ assertThat(schemaId.get(), equalTo(123));
+ }
+
+
@Test
public void shouldReturnParsedSchemaFromSubjectKey() throws Exception {
// Given:
@@ -94,7 +125,7 @@ public void shouldReturnParsedSchemaFromSubjectKey() throws Exception {
// When:
final Optional parsedSchema =
- SchemaRegistryUtil.getParsedSchema(schemaRegistryClient, "bar", true);
+ SchemaRegistryUtil.getLatestParsedSchema(schemaRegistryClient, "bar", true);
// Then:
assertThat(parsedSchema.get(), equalTo(AVRO_SCHEMA));
diff --git a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/serde/protobuf/ValueSpecProtobufSerdeSupplier.java b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/serde/protobuf/ValueSpecProtobufSerdeSupplier.java
index 781405f60996..bb07783b18fe 100644
--- a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/serde/protobuf/ValueSpecProtobufSerdeSupplier.java
+++ b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/serde/protobuf/ValueSpecProtobufSerdeSupplier.java
@@ -15,31 +15,24 @@
package io.confluent.ksql.test.serde.protobuf;
-import static io.confluent.connect.protobuf.ProtobufDataConfig.SCHEMAS_CACHE_SIZE_CONFIG;
-import static io.confluent.connect.protobuf.ProtobufDataConfig.WRAPPER_FOR_RAW_PRIMITIVES_CONFIG;
-
-import com.google.common.collect.ImmutableMap;
import io.confluent.connect.protobuf.ProtobufConverter;
-import io.confluent.connect.protobuf.ProtobufData;
-import io.confluent.connect.protobuf.ProtobufDataConfig;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
+import io.confluent.ksql.serde.protobuf.ProtobufProperties;
+import io.confluent.ksql.serde.protobuf.ProtobufSchemaTranslator;
import io.confluent.ksql.test.serde.ConnectSerdeSupplier;
import org.apache.kafka.connect.data.Schema;
public class ValueSpecProtobufSerdeSupplier extends ConnectSerdeSupplier {
- private final boolean unwrapPrimitives;
+ private final ProtobufSchemaTranslator schemaTranslator;
- public ValueSpecProtobufSerdeSupplier(final boolean unwrapPrimitives) {
+ public ValueSpecProtobufSerdeSupplier(final ProtobufProperties protobufProperties) {
super(ProtobufConverter::new);
- this.unwrapPrimitives = unwrapPrimitives;
+ this.schemaTranslator = new ProtobufSchemaTranslator(protobufProperties);
}
@Override
protected Schema fromParsedSchema(final ProtobufSchema schema) {
- return new ProtobufData(new ProtobufDataConfig(ImmutableMap.of(
- SCHEMAS_CACHE_SIZE_CONFIG, 1,
- WRAPPER_FOR_RAW_PRIMITIVES_CONFIG, unwrapPrimitives
- ))).toConnectSchema(schema);
+ return schemaTranslator.toConnectSchema(schema);
}
}
diff --git a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/utils/SerdeUtil.java b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/utils/SerdeUtil.java
index 5330e0d1edd4..e676ce1bf485 100644
--- a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/utils/SerdeUtil.java
+++ b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/utils/SerdeUtil.java
@@ -76,7 +76,7 @@ public static SerdeSupplier> getSerdeSupplier(
case AvroFormat.NAME: return new ValueSpecAvroSerdeSupplier();
case ProtobufFormat.NAME:
return new ValueSpecProtobufSerdeSupplier(
- new ProtobufProperties(formatInfo.getProperties()).getUnwrapPrimitives());
+ new ProtobufProperties(formatInfo.getProperties()));
case JsonFormat.NAME: return new ValueSpecJsonSerdeSupplier(false, properties);
case JsonSchemaFormat.NAME: return new ValueSpecJsonSerdeSupplier(true, properties);
case DelimitedFormat.NAME: return new StringSerdeSupplier();
diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/protobuf_-_specify_a_protobuf_schema_from_multiple_protobuf_schema_definitions/7.3.0_1649453206637/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/protobuf_-_specify_a_protobuf_schema_from_multiple_protobuf_schema_definitions/7.3.0_1649453206637/plan.json
new file mode 100644
index 000000000000..61b4fad2f2b7
--- /dev/null
+++ b/ksqldb-functional-tests/src/test/resources/historical_plans/protobuf_-_specify_a_protobuf_schema_from_multiple_protobuf_schema_definitions/7.3.0_1649453206637/plan.json
@@ -0,0 +1,227 @@
+{
+ "plan" : [ {
+ "@type" : "ksqlPlanV1",
+ "statementText" : "CREATE STREAM INPUT (K1 STRING KEY, C1 BIGINT) WITH (FORMAT='PROTOBUF', KAFKA_TOPIC='input', KEY_SCHEMA_FULL_NAME='ProtobufKey2', VALUE_SCHEMA_FULL_NAME='ProtobufValue2');",
+ "ddlCommand" : {
+ "@type" : "createStreamV1",
+ "sourceName" : "INPUT",
+ "schema" : "`K1` STRING KEY, `C1` BIGINT",
+ "timestampColumn" : null,
+ "topicName" : "input",
+ "formats" : {
+ "keyFormat" : {
+ "format" : "PROTOBUF",
+ "properties" : {
+ "fullSchemaName" : "ProtobufKey2",
+ "unwrapPrimitives" : "true"
+ }
+ },
+ "valueFormat" : {
+ "format" : "PROTOBUF",
+ "properties" : {
+ "fullSchemaName" : "ProtobufValue2",
+ "unwrapPrimitives" : "true"
+ }
+ }
+ },
+ "windowInfo" : null,
+ "orReplace" : false,
+ "isSource" : false
+ },
+ "queryPlan" : null
+ }, {
+ "@type" : "ksqlPlanV1",
+ "statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES",
+ "ddlCommand" : {
+ "@type" : "createStreamV1",
+ "sourceName" : "OUTPUT",
+ "schema" : "`K1` STRING KEY, `C1` BIGINT",
+ "timestampColumn" : null,
+ "topicName" : "OUTPUT",
+ "formats" : {
+ "keyFormat" : {
+ "format" : "PROTOBUF",
+ "properties" : {
+ "fullSchemaName" : "ProtobufKey2",
+ "unwrapPrimitives" : "true"
+ }
+ },
+ "valueFormat" : {
+ "format" : "PROTOBUF",
+ "properties" : {
+ "fullSchemaName" : "ProtobufValue2",
+ "unwrapPrimitives" : "true"
+ }
+ }
+ },
+ "windowInfo" : null,
+ "orReplace" : false,
+ "isSource" : false
+ },
+ "queryPlan" : {
+ "sources" : [ "INPUT" ],
+ "sink" : "OUTPUT",
+ "physicalPlan" : {
+ "@type" : "streamSinkV1",
+ "properties" : {
+ "queryContext" : "OUTPUT"
+ },
+ "source" : {
+ "@type" : "streamSelectV1",
+ "properties" : {
+ "queryContext" : "Project"
+ },
+ "source" : {
+ "@type" : "streamSourceV1",
+ "properties" : {
+ "queryContext" : "KsqlTopic/Source"
+ },
+ "topicName" : "input",
+ "formats" : {
+ "keyFormat" : {
+ "format" : "PROTOBUF",
+ "properties" : {
+ "fullSchemaName" : "ProtobufKey2",
+ "unwrapPrimitives" : "true"
+ }
+ },
+ "valueFormat" : {
+ "format" : "PROTOBUF",
+ "properties" : {
+ "fullSchemaName" : "ProtobufValue2",
+ "unwrapPrimitives" : "true"
+ }
+ }
+ },
+ "timestampColumn" : null,
+ "sourceSchema" : "`K1` STRING KEY, `C1` BIGINT",
+ "pseudoColumnVersion" : 1
+ },
+ "keyColumnNames" : [ "K1" ],
+ "selectedKeys" : null,
+ "selectExpressions" : [ "C1 AS C1" ]
+ },
+ "formats" : {
+ "keyFormat" : {
+ "format" : "PROTOBUF",
+ "properties" : {
+ "fullSchemaName" : "ProtobufKey2",
+ "unwrapPrimitives" : "true"
+ }
+ },
+ "valueFormat" : {
+ "format" : "PROTOBUF",
+ "properties" : {
+ "fullSchemaName" : "ProtobufValue2",
+ "unwrapPrimitives" : "true"
+ }
+ }
+ },
+ "topicName" : "OUTPUT",
+ "timestampColumn" : null
+ },
+ "queryId" : "CSAS_OUTPUT_0",
+ "runtimeId" : null
+ }
+ } ],
+ "configs" : {
+ "ksql.extension.dir" : "ext",
+ "ksql.streams.cache.max.bytes.buffering" : "0",
+ "metric.reporters" : "",
+ "ksql.query.status.running.threshold.seconds" : "300",
+ "ksql.connect.basic.auth.credentials.reload" : "false",
+ "ksql.output.topic.name.prefix" : "",
+ "ksql.query.pull.stream.enabled" : "true",
+ "ksql.query.push.v2.interpreter.enabled" : "true",
+ "ksql.queryanonymizer.logs_enabled" : "true",
+ "ksql.variable.substitution.enable" : "true",
+ "ksql.streams.shutdown.timeout.ms" : "300000",
+ "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
+ "ksql.query.pull.max.qps" : "2147483647",
+ "ksql.access.validator.enable" : "auto",
+ "ksql.streams.bootstrap.servers" : "localhost:0",
+ "ksql.query.pull.metrics.enabled" : "true",
+ "ksql.metrics.extension" : null,
+ "ksql.query.push.v2.alos.enabled" : "true",
+ "ksql.query.push.v2.max.hourly.bandwidth.megabytes" : "2147483647",
+ "ksql.query.pull.range.scan.enabled" : "true",
+ "ksql.transient.query.cleanup.service.initial.delay.seconds" : "600",
+ "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
+ "ksql.lambdas.enabled" : "true",
+ "ksql.source.table.materialization.enabled" : "true",
+ "ksql.query.pull.max.hourly.bandwidth.megabytes" : "2147483647",
+ "ksql.sink.window.change.log.additional.retention" : "1000000",
+ "ksql.query.persistent.active.limit" : "2147483647",
+ "ksql.persistence.wrap.single.values" : null,
+ "ksql.query.transient.max.bytes.buffering.total" : "-1",
+ "ksql.connect.basic.auth.credentials.source" : "NONE",
+ "ksql.schema.registry.url" : "schema_registry.url:0",
+ "ksql.properties.overrides.denylist" : "",
+ "ksql.service.id" : "some.ksql.service.id",
+ "ksql.query.push.v2.max.catchup.consumers" : "5",
+ "ksql.query.push.v2.enabled" : "false",
+ "ksql.transient.query.cleanup.service.enable" : "true",
+ "ksql.query.push.v2.metrics.enabled" : "true",
+ "ksql.rowpartition.rowoffset.enabled" : "true",
+ "ksql.streams.commit.interval.ms" : "2000",
+ "ksql.query.pull.table.scan.enabled" : "true",
+ "ksql.streams.auto.commit.interval.ms" : "0",
+ "ksql.streams.topology.optimization" : "all",
+ "ksql.endpoint.migrate.query" : "true",
+ "ksql.query.push.v2.registry.installed" : "false",
+ "ksql.streams.num.stream.threads" : "4",
+ "ksql.metrics.tags.custom" : "",
+ "ksql.query.push.v2.catchup.consumer.msg.window" : "50",
+ "ksql.runtime.feature.shared.enabled" : "false",
+ "ksql.udf.collect.metrics" : "false",
+ "ksql.new.query.planner.enabled" : "false",
+ "ksql.connect.request.headers.plugin" : null,
+ "ksql.security.extension.class" : null,
+ "ksql.transient.prefix" : "transient_",
+ "ksql.headers.columns.enabled" : "true",
+ "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
+ "ksql.connect.request.timeout.ms" : "5000",
+ "ksql.query.pull.enable.standby.reads" : "false",
+ "ksql.persistence.default.format.key" : "KAFKA",
+ "ksql.query.persistent.max.bytes.buffering.total" : "-1",
+ "ksql.query.error.max.queue.size" : "10",
+ "ksql.query.cleanup.shutdown.timeout.ms" : "30000",
+ "ksql.internal.topic.min.insync.replicas" : "1",
+ "ksql.internal.topic.replicas" : "1",
+ "ksql.insert.into.values.enabled" : "true",
+ "ksql.queryanonymizer.cluster_namespace" : null,
+ "ksql.create.or.replace.enabled" : "true",
+ "ksql.shared.runtimes.count" : "2",
+ "ksql.cast.strings.preserve.nulls" : "true",
+ "ksql.authorization.cache.max.entries" : "10000",
+ "ksql.pull.queries.enable" : "true",
+ "ksql.transient.query.cleanup.service.period.seconds" : "600",
+ "ksql.suppress.enabled" : "false",
+ "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
+ "ksql.connect.basic.auth.credentials.file" : "",
+ "ksql.authorization.cache.expiry.time.secs" : "30",
+ "ksql.query.retry.backoff.initial.ms" : "15000",
+ "ksql.query.pull.max.concurrent.requests" : "2147483647",
+ "ksql.streams.auto.offset.reset" : "earliest",
+ "ksql.connect.url" : "http://localhost:8083",
+ "ksql.query.push.v2.new.latest.delay.ms" : "5000",
+ "ksql.query.push.v2.latest.reset.age.ms" : "30000",
+ "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
+ "ksql.query.pull.interpreter.enabled" : "true",
+ "ksql.query.pull.limit.clause.enabled" : "true",
+ "ksql.query.pull.router.thread.pool.size" : "50",
+ "ksql.query.push.v2.continuation.tokens.enabled" : "false",
+ "ksql.query.retry.backoff.max.ms" : "900000",
+ "ksql.timestamp.throw.on.invalid" : "false",
+ "ksql.persistence.default.format.value" : null,
+ "ksql.udfs.enabled" : "true",
+ "ksql.udf.enable.security.manager" : "true",
+ "ksql.connect.worker.config" : "",
+ "ksql.nested.error.set.null" : "true",
+ "ksql.query.pull.thread.pool.size" : "50",
+ "ksql.persistent.prefix" : "query_",
+ "ksql.metastore.backup.location" : "",
+ "ksql.error.classifier.regex" : "",
+ "ksql.suppress.buffer.size.bytes" : "-1"
+ }
+}
\ No newline at end of file
diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/protobuf_-_specify_a_protobuf_schema_from_multiple_protobuf_schema_definitions/7.3.0_1649453206637/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/protobuf_-_specify_a_protobuf_schema_from_multiple_protobuf_schema_definitions/7.3.0_1649453206637/spec.json
new file mode 100644
index 000000000000..113d35da58be
--- /dev/null
+++ b/ksqldb-functional-tests/src/test/resources/historical_plans/protobuf_-_specify_a_protobuf_schema_from_multiple_protobuf_schema_definitions/7.3.0_1649453206637/spec.json
@@ -0,0 +1,146 @@
+{
+ "version" : "7.3.0",
+ "timestamp" : 1649453206637,
+ "path" : "query-validation-tests/protobuf.json",
+ "schemas" : {
+ "CSAS_OUTPUT_0.KsqlTopic.Source" : {
+ "schema" : "`K1` STRING KEY, `C1` BIGINT",
+ "keyFormat" : {
+ "format" : "PROTOBUF",
+ "properties" : {
+ "fullSchemaName" : "ProtobufKey2",
+ "unwrapPrimitives" : "true"
+ }
+ },
+ "valueFormat" : {
+ "format" : "PROTOBUF",
+ "properties" : {
+ "fullSchemaName" : "ProtobufValue2",
+ "unwrapPrimitives" : "true"
+ }
+ }
+ },
+ "CSAS_OUTPUT_0.OUTPUT" : {
+ "schema" : "`K1` STRING KEY, `C1` BIGINT",
+ "keyFormat" : {
+ "format" : "PROTOBUF",
+ "properties" : {
+ "fullSchemaName" : "ProtobufKey2",
+ "unwrapPrimitives" : "true"
+ }
+ },
+ "valueFormat" : {
+ "format" : "PROTOBUF",
+ "properties" : {
+ "fullSchemaName" : "ProtobufValue2",
+ "unwrapPrimitives" : "true"
+ }
+ }
+ }
+ },
+ "testCase" : {
+ "name" : "specify a protobuf schema from multiple protobuf schema definitions",
+ "inputs" : [ {
+ "topic" : "input",
+ "key" : {
+ "k1" : "A"
+ },
+ "value" : {
+ "c1" : 100
+ }
+ } ],
+ "outputs" : [ {
+ "topic" : "OUTPUT",
+ "key" : {
+ "K1" : "A"
+ },
+ "value" : {
+ "C1" : 100
+ }
+ } ],
+ "topics" : [ {
+ "name" : "input",
+ "keySchema" : "syntax = \"proto3\";\n\nmessage ProtobufKey1 {\n uint32 k1 = 1;\n}\nmessage ProtobufKey2 {\n string k1 = 1;\n}\n",
+ "valueSchema" : "syntax = \"proto3\";\n\nmessage ProtobufValue1 {\n float c1 = 1;\n uint32 c2 = 2;\n}\nmessage ProtobufValue2 {\n uint32 c1 = 1;\n}\n",
+ "keyFormat" : "PROTOBUF",
+ "valueFormat" : "PROTOBUF",
+ "replicas" : 1,
+ "numPartitions" : 1
+ }, {
+ "name" : "OUTPUT",
+ "keySchema" : "syntax = \"proto3\";\n\nmessage ProtobufKey2 {\n string k1 = 1;\n}\n",
+ "valueSchema" : "syntax = \"proto3\";\n\nmessage ProtobufValue2 {\n uint32 c1 = 1;\n}\n",
+ "keyFormat" : "PROTOBUF",
+ "valueFormat" : "PROTOBUF",
+ "replicas" : 1,
+ "numPartitions" : 1
+ } ],
+ "statements" : [ "CREATE STREAM INPUT WITH (kafka_topic='input', format='PROTOBUF', KEY_SCHEMA_FULL_NAME='ProtobufKey2', VALUE_SCHEMA_FULL_NAME='ProtobufValue2');", "CREATE STREAM OUTPUT AS SELECT * FROM input;" ],
+ "post" : {
+ "sources" : [ {
+ "name" : "INPUT",
+ "type" : "STREAM",
+ "schema" : "`K1` STRING KEY, `C1` BIGINT",
+ "keyFormat" : {
+ "format" : "PROTOBUF"
+ },
+ "valueFormat" : "PROTOBUF",
+ "keyFeatures" : [ ],
+ "valueFeatures" : [ ],
+ "isSource" : false
+ }, {
+ "name" : "OUTPUT",
+ "type" : "STREAM",
+ "schema" : "`K1` STRING KEY, `C1` BIGINT",
+ "keyFormat" : {
+ "format" : "PROTOBUF"
+ },
+ "valueFormat" : "PROTOBUF",
+ "keyFeatures" : [ ],
+ "valueFeatures" : [ ],
+ "isSource" : false
+ } ],
+ "topics" : {
+ "topics" : [ {
+ "name" : "input",
+ "keyFormat" : {
+ "format" : "PROTOBUF",
+ "properties" : {
+ "fullSchemaName" : "ProtobufKey2",
+ "unwrapPrimitives" : "true"
+ }
+ },
+ "valueFormat" : {
+ "format" : "PROTOBUF",
+ "properties" : {
+ "fullSchemaName" : "ProtobufValue2",
+ "unwrapPrimitives" : "true"
+ }
+ },
+ "partitions" : 1,
+ "keySchema" : "syntax = \"proto3\";\n\nmessage ProtobufKey2 {\n string k1 = 1;\n}\n",
+ "valueSchema" : "syntax = \"proto3\";\n\nmessage ProtobufValue2 {\n uint32 c1 = 1;\n}\n"
+ }, {
+ "name" : "OUTPUT",
+ "keyFormat" : {
+ "format" : "PROTOBUF",
+ "properties" : {
+ "fullSchemaName" : "ProtobufKey2",
+ "unwrapPrimitives" : "true"
+ }
+ },
+ "valueFormat" : {
+ "format" : "PROTOBUF",
+ "properties" : {
+ "fullSchemaName" : "ProtobufValue2",
+ "unwrapPrimitives" : "true"
+ }
+ },
+ "partitions" : 4,
+ "keySchema" : "syntax = \"proto3\";\n\nmessage ProtobufKey2 {\n string K1 = 1;\n}\n",
+ "valueSchema" : "syntax = \"proto3\";\n\nmessage ProtobufValue2 {\n int64 C1 = 1;\n}\n"
+ } ]
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/protobuf_-_specify_a_protobuf_schema_from_multiple_protobuf_schema_definitions/7.3.0_1649453206637/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/protobuf_-_specify_a_protobuf_schema_from_multiple_protobuf_schema_definitions/7.3.0_1649453206637/topology
new file mode 100644
index 000000000000..a70a4e91e301
--- /dev/null
+++ b/ksqldb-functional-tests/src/test/resources/historical_plans/protobuf_-_specify_a_protobuf_schema_from_multiple_protobuf_schema_definitions/7.3.0_1649453206637/topology
@@ -0,0 +1,13 @@
+Topologies:
+ Sub-topology: 0
+ Source: KSTREAM-SOURCE-0000000000 (topics: [input])
+ --> KSTREAM-TRANSFORMVALUES-0000000001
+ Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
+ --> Project
+ <-- KSTREAM-SOURCE-0000000000
+ Processor: Project (stores: [])
+ --> KSTREAM-SINK-0000000003
+ <-- KSTREAM-TRANSFORMVALUES-0000000001
+ Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT)
+ <-- Project
+
diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/protobuf_-_specify_a_protobuf_schema_name_with_SCHEMA_FULL_NAME/7.3.0_1649453337688/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/protobuf_-_specify_a_protobuf_schema_name_with_SCHEMA_FULL_NAME/7.3.0_1649453337688/plan.json
new file mode 100644
index 000000000000..b2bfc77df153
--- /dev/null
+++ b/ksqldb-functional-tests/src/test/resources/historical_plans/protobuf_-_specify_a_protobuf_schema_name_with_SCHEMA_FULL_NAME/7.3.0_1649453337688/plan.json
@@ -0,0 +1,227 @@
+{
+ "plan" : [ {
+ "@type" : "ksqlPlanV1",
+ "statementText" : "CREATE STREAM INPUT (K1 BIGINT KEY, C1 DOUBLE, C2 BIGINT) WITH (FORMAT='PROTOBUF', KAFKA_TOPIC='input', KEY_SCHEMA_FULL_NAME='ProtobufKey1', VALUE_SCHEMA_FULL_NAME='ProtobufValue1');",
+ "ddlCommand" : {
+ "@type" : "createStreamV1",
+ "sourceName" : "INPUT",
+ "schema" : "`K1` BIGINT KEY, `C1` DOUBLE, `C2` BIGINT",
+ "timestampColumn" : null,
+ "topicName" : "input",
+ "formats" : {
+ "keyFormat" : {
+ "format" : "PROTOBUF",
+ "properties" : {
+ "fullSchemaName" : "ProtobufKey1",
+ "unwrapPrimitives" : "true"
+ }
+ },
+ "valueFormat" : {
+ "format" : "PROTOBUF",
+ "properties" : {
+ "fullSchemaName" : "ProtobufValue1",
+ "unwrapPrimitives" : "true"
+ }
+ }
+ },
+ "windowInfo" : null,
+ "orReplace" : false,
+ "isSource" : false
+ },
+ "queryPlan" : null
+ }, {
+ "@type" : "ksqlPlanV1",
+ "statementText" : "CREATE STREAM OUTPUT WITH (FORMAT='PROTOBUF', KEY_SCHEMA_FULL_NAME='ProtobufKey2', VALUE_SCHEMA_FULL_NAME='ProtobufValue2') AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES",
+ "ddlCommand" : {
+ "@type" : "createStreamV1",
+ "sourceName" : "OUTPUT",
+ "schema" : "`K1` BIGINT KEY, `C1` DOUBLE, `C2` BIGINT",
+ "timestampColumn" : null,
+ "topicName" : "OUTPUT",
+ "formats" : {
+ "keyFormat" : {
+ "format" : "PROTOBUF",
+ "properties" : {
+ "fullSchemaName" : "ProtobufKey2",
+ "unwrapPrimitives" : "true"
+ }
+ },
+ "valueFormat" : {
+ "format" : "PROTOBUF",
+ "properties" : {
+ "fullSchemaName" : "ProtobufValue2",
+ "unwrapPrimitives" : "true"
+ }
+ }
+ },
+ "windowInfo" : null,
+ "orReplace" : false,
+ "isSource" : false
+ },
+ "queryPlan" : {
+ "sources" : [ "INPUT" ],
+ "sink" : "OUTPUT",
+ "physicalPlan" : {
+ "@type" : "streamSinkV1",
+ "properties" : {
+ "queryContext" : "OUTPUT"
+ },
+ "source" : {
+ "@type" : "streamSelectV1",
+ "properties" : {
+ "queryContext" : "Project"
+ },
+ "source" : {
+ "@type" : "streamSourceV1",
+ "properties" : {
+ "queryContext" : "KsqlTopic/Source"
+ },
+ "topicName" : "input",
+ "formats" : {
+ "keyFormat" : {
+ "format" : "PROTOBUF",
+ "properties" : {
+ "fullSchemaName" : "ProtobufKey1",
+ "unwrapPrimitives" : "true"
+ }
+ },
+ "valueFormat" : {
+ "format" : "PROTOBUF",
+ "properties" : {
+ "fullSchemaName" : "ProtobufValue1",
+ "unwrapPrimitives" : "true"
+ }
+ }
+ },
+ "timestampColumn" : null,
+ "sourceSchema" : "`K1` BIGINT KEY, `C1` DOUBLE, `C2` BIGINT",
+ "pseudoColumnVersion" : 1
+ },
+ "keyColumnNames" : [ "K1" ],
+ "selectedKeys" : null,
+ "selectExpressions" : [ "C1 AS C1", "C2 AS C2" ]
+ },
+ "formats" : {
+ "keyFormat" : {
+ "format" : "PROTOBUF",
+ "properties" : {
+ "fullSchemaName" : "ProtobufKey2",
+ "unwrapPrimitives" : "true"
+ }
+ },
+ "valueFormat" : {
+ "format" : "PROTOBUF",
+ "properties" : {
+ "fullSchemaName" : "ProtobufValue2",
+ "unwrapPrimitives" : "true"
+ }
+ }
+ },
+ "topicName" : "OUTPUT",
+ "timestampColumn" : null
+ },
+ "queryId" : "CSAS_OUTPUT_0",
+ "runtimeId" : null
+ }
+ } ],
+ "configs" : {
+ "ksql.extension.dir" : "ext",
+ "ksql.streams.cache.max.bytes.buffering" : "0",
+ "metric.reporters" : "",
+ "ksql.query.status.running.threshold.seconds" : "300",
+ "ksql.connect.basic.auth.credentials.reload" : "false",
+ "ksql.output.topic.name.prefix" : "",
+ "ksql.query.pull.stream.enabled" : "true",
+ "ksql.query.push.v2.interpreter.enabled" : "true",
+ "ksql.queryanonymizer.logs_enabled" : "true",
+ "ksql.variable.substitution.enable" : "true",
+ "ksql.streams.shutdown.timeout.ms" : "300000",
+ "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
+ "ksql.query.pull.max.qps" : "2147483647",
+ "ksql.access.validator.enable" : "auto",
+ "ksql.streams.bootstrap.servers" : "localhost:0",
+ "ksql.query.pull.metrics.enabled" : "true",
+ "ksql.metrics.extension" : null,
+ "ksql.query.push.v2.alos.enabled" : "true",
+ "ksql.query.push.v2.max.hourly.bandwidth.megabytes" : "2147483647",
+ "ksql.query.pull.range.scan.enabled" : "true",
+ "ksql.transient.query.cleanup.service.initial.delay.seconds" : "600",
+ "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
+ "ksql.lambdas.enabled" : "true",
+ "ksql.source.table.materialization.enabled" : "true",
+ "ksql.query.pull.max.hourly.bandwidth.megabytes" : "2147483647",
+ "ksql.sink.window.change.log.additional.retention" : "1000000",
+ "ksql.query.persistent.active.limit" : "2147483647",
+ "ksql.persistence.wrap.single.values" : null,
+ "ksql.query.transient.max.bytes.buffering.total" : "-1",
+ "ksql.connect.basic.auth.credentials.source" : "NONE",
+ "ksql.schema.registry.url" : "schema_registry.url:0",
+ "ksql.properties.overrides.denylist" : "",
+ "ksql.service.id" : "some.ksql.service.id",
+ "ksql.query.push.v2.max.catchup.consumers" : "5",
+ "ksql.query.push.v2.enabled" : "false",
+ "ksql.transient.query.cleanup.service.enable" : "true",
+ "ksql.query.push.v2.metrics.enabled" : "true",
+ "ksql.rowpartition.rowoffset.enabled" : "true",
+ "ksql.streams.commit.interval.ms" : "2000",
+ "ksql.query.pull.table.scan.enabled" : "true",
+ "ksql.streams.auto.commit.interval.ms" : "0",
+ "ksql.streams.topology.optimization" : "all",
+ "ksql.endpoint.migrate.query" : "true",
+ "ksql.query.push.v2.registry.installed" : "false",
+ "ksql.streams.num.stream.threads" : "4",
+ "ksql.metrics.tags.custom" : "",
+ "ksql.query.push.v2.catchup.consumer.msg.window" : "50",
+ "ksql.runtime.feature.shared.enabled" : "false",
+ "ksql.udf.collect.metrics" : "false",
+ "ksql.new.query.planner.enabled" : "false",
+ "ksql.connect.request.headers.plugin" : null,
+ "ksql.security.extension.class" : null,
+ "ksql.transient.prefix" : "transient_",
+ "ksql.headers.columns.enabled" : "true",
+ "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
+ "ksql.connect.request.timeout.ms" : "5000",
+ "ksql.query.pull.enable.standby.reads" : "false",
+ "ksql.persistence.default.format.key" : "KAFKA",
+ "ksql.query.persistent.max.bytes.buffering.total" : "-1",
+ "ksql.query.error.max.queue.size" : "10",
+ "ksql.query.cleanup.shutdown.timeout.ms" : "30000",
+ "ksql.internal.topic.min.insync.replicas" : "1",
+ "ksql.internal.topic.replicas" : "1",
+ "ksql.insert.into.values.enabled" : "true",
+ "ksql.queryanonymizer.cluster_namespace" : null,
+ "ksql.create.or.replace.enabled" : "true",
+ "ksql.shared.runtimes.count" : "2",
+ "ksql.cast.strings.preserve.nulls" : "true",
+ "ksql.authorization.cache.max.entries" : "10000",
+ "ksql.pull.queries.enable" : "true",
+ "ksql.transient.query.cleanup.service.period.seconds" : "600",
+ "ksql.suppress.enabled" : "false",
+ "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
+ "ksql.connect.basic.auth.credentials.file" : "",
+ "ksql.authorization.cache.expiry.time.secs" : "30",
+ "ksql.query.retry.backoff.initial.ms" : "15000",
+ "ksql.query.pull.max.concurrent.requests" : "2147483647",
+ "ksql.streams.auto.offset.reset" : "earliest",
+ "ksql.connect.url" : "http://localhost:8083",
+ "ksql.query.push.v2.new.latest.delay.ms" : "5000",
+ "ksql.query.push.v2.latest.reset.age.ms" : "30000",
+ "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
+ "ksql.query.pull.interpreter.enabled" : "true",
+ "ksql.query.pull.limit.clause.enabled" : "true",
+ "ksql.query.pull.router.thread.pool.size" : "50",
+ "ksql.query.push.v2.continuation.tokens.enabled" : "false",
+ "ksql.query.retry.backoff.max.ms" : "900000",
+ "ksql.timestamp.throw.on.invalid" : "false",
+ "ksql.persistence.default.format.value" : null,
+ "ksql.udfs.enabled" : "true",
+ "ksql.udf.enable.security.manager" : "true",
+ "ksql.connect.worker.config" : "",
+ "ksql.nested.error.set.null" : "true",
+ "ksql.query.pull.thread.pool.size" : "50",
+ "ksql.persistent.prefix" : "query_",
+ "ksql.metastore.backup.location" : "",
+ "ksql.error.classifier.regex" : "",
+ "ksql.suppress.buffer.size.bytes" : "-1"
+ }
+}
\ No newline at end of file
diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/protobuf_-_specify_a_protobuf_schema_name_with_SCHEMA_FULL_NAME/7.3.0_1649453337688/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/protobuf_-_specify_a_protobuf_schema_name_with_SCHEMA_FULL_NAME/7.3.0_1649453337688/spec.json
new file mode 100644
index 000000000000..789829a40f54
--- /dev/null
+++ b/ksqldb-functional-tests/src/test/resources/historical_plans/protobuf_-_specify_a_protobuf_schema_name_with_SCHEMA_FULL_NAME/7.3.0_1649453337688/spec.json
@@ -0,0 +1,148 @@
+{
+ "version" : "7.3.0",
+ "timestamp" : 1649453337688,
+ "path" : "query-validation-tests/protobuf.json",
+ "schemas" : {
+ "CSAS_OUTPUT_0.KsqlTopic.Source" : {
+ "schema" : "`K1` BIGINT KEY, `C1` DOUBLE, `C2` BIGINT",
+ "keyFormat" : {
+ "format" : "PROTOBUF",
+ "properties" : {
+ "fullSchemaName" : "ProtobufKey1",
+ "unwrapPrimitives" : "true"
+ }
+ },
+ "valueFormat" : {
+ "format" : "PROTOBUF",
+ "properties" : {
+ "fullSchemaName" : "ProtobufValue1",
+ "unwrapPrimitives" : "true"
+ }
+ }
+ },
+ "CSAS_OUTPUT_0.OUTPUT" : {
+ "schema" : "`K1` BIGINT KEY, `C1` DOUBLE, `C2` BIGINT",
+ "keyFormat" : {
+ "format" : "PROTOBUF",
+ "properties" : {
+ "fullSchemaName" : "ProtobufKey2",
+ "unwrapPrimitives" : "true"
+ }
+ },
+ "valueFormat" : {
+ "format" : "PROTOBUF",
+ "properties" : {
+ "fullSchemaName" : "ProtobufValue2",
+ "unwrapPrimitives" : "true"
+ }
+ }
+ }
+ },
+ "testCase" : {
+ "name" : "specify a protobuf schema name with SCHEMA_FULL_NAME",
+ "inputs" : [ {
+ "topic" : "input",
+ "key" : {
+ "k1" : 10
+ },
+ "value" : {
+ "c1" : 1.1234,
+ "c2" : 100
+ }
+ } ],
+ "outputs" : [ {
+ "topic" : "OUTPUT",
+ "key" : {
+ "K1" : 10
+ },
+ "value" : {
+ "C1" : 1.1233999729156494,
+ "C2" : 100
+ }
+ } ],
+ "topics" : [ {
+ "name" : "input",
+ "keySchema" : "syntax = \"proto3\";\n\nmessage ProtobufKey1 {\n uint32 k1 = 1;\n}\n",
+ "valueSchema" : "syntax = \"proto3\";\n\nmessage ProtobufValue1 {\n float c1 = 1;\n uint32 c2 = 2;\n}\n",
+ "keyFormat" : "PROTOBUF",
+ "valueFormat" : "PROTOBUF",
+ "replicas" : 1,
+ "numPartitions" : 1
+ }, {
+ "name" : "OUTPUT",
+ "keySchema" : "syntax = \"proto3\";\n\nmessage ProtobufKey2 {\n uint32 k1 = 1;\n}\n",
+ "valueSchema" : "syntax = \"proto3\";\n\nmessage ProtobufValue2 {\n float c1 = 1;\n uint32 c2 = 2;\n}\n",
+ "keyFormat" : "PROTOBUF",
+ "valueFormat" : "PROTOBUF",
+ "replicas" : 1,
+ "numPartitions" : 1
+ } ],
+ "statements" : [ "CREATE STREAM INPUT WITH (kafka_topic='input', format='PROTOBUF', KEY_SCHEMA_FULL_NAME='ProtobufKey1', VALUE_SCHEMA_FULL_NAME='ProtobufValue1');", "CREATE STREAM OUTPUT WITH (format='PROTOBUF', KEY_SCHEMA_FULL_NAME='ProtobufKey2', VALUE_SCHEMA_FULL_NAME='ProtobufValue2') AS SELECT * FROM input;" ],
+ "post" : {
+ "sources" : [ {
+ "name" : "INPUT",
+ "type" : "STREAM",
+ "schema" : "`K1` BIGINT KEY, `C1` DOUBLE, `C2` BIGINT",
+ "keyFormat" : {
+ "format" : "PROTOBUF"
+ },
+ "valueFormat" : "PROTOBUF",
+ "keyFeatures" : [ ],
+ "valueFeatures" : [ ],
+ "isSource" : false
+ }, {
+ "name" : "OUTPUT",
+ "type" : "STREAM",
+ "schema" : "`K1` BIGINT KEY, `C1` DOUBLE, `C2` BIGINT",
+ "keyFormat" : {
+ "format" : "PROTOBUF"
+ },
+ "valueFormat" : "PROTOBUF",
+ "keyFeatures" : [ ],
+ "valueFeatures" : [ ],
+ "isSource" : false
+ } ],
+ "topics" : {
+ "topics" : [ {
+ "name" : "input",
+ "keyFormat" : {
+ "format" : "PROTOBUF",
+ "properties" : {
+ "fullSchemaName" : "ProtobufKey1",
+ "unwrapPrimitives" : "true"
+ }
+ },
+ "valueFormat" : {
+ "format" : "PROTOBUF",
+ "properties" : {
+ "fullSchemaName" : "ProtobufValue1",
+ "unwrapPrimitives" : "true"
+ }
+ },
+ "partitions" : 1,
+ "keySchema" : "syntax = \"proto3\";\n\nmessage ProtobufKey1 {\n uint32 k1 = 1;\n}\n",
+ "valueSchema" : "syntax = \"proto3\";\n\nmessage ProtobufValue1 {\n float c1 = 1;\n uint32 c2 = 2;\n}\n"
+ }, {
+ "name" : "OUTPUT",
+ "keyFormat" : {
+ "format" : "PROTOBUF",
+ "properties" : {
+ "fullSchemaName" : "ProtobufKey2",
+ "unwrapPrimitives" : "true"
+ }
+ },
+ "valueFormat" : {
+ "format" : "PROTOBUF",
+ "properties" : {
+ "fullSchemaName" : "ProtobufValue2",
+ "unwrapPrimitives" : "true"
+ }
+ },
+ "partitions" : 4,
+ "keySchema" : "syntax = \"proto3\";\n\nmessage ProtobufKey2 {\n int64 K1 = 1;\n}\n",
+ "valueSchema" : "syntax = \"proto3\";\n\nmessage ProtobufValue2 {\n double C1 = 1;\n int64 C2 = 2;\n}\n"
+ } ]
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/protobuf_-_specify_a_protobuf_schema_name_with_SCHEMA_FULL_NAME/7.3.0_1649453337688/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/protobuf_-_specify_a_protobuf_schema_name_with_SCHEMA_FULL_NAME/7.3.0_1649453337688/topology
new file mode 100644
index 000000000000..a70a4e91e301
--- /dev/null
+++ b/ksqldb-functional-tests/src/test/resources/historical_plans/protobuf_-_specify_a_protobuf_schema_name_with_SCHEMA_FULL_NAME/7.3.0_1649453337688/topology
@@ -0,0 +1,13 @@
+Topologies:
+ Sub-topology: 0
+ Source: KSTREAM-SOURCE-0000000000 (topics: [input])
+ --> KSTREAM-TRANSFORMVALUES-0000000001
+ Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
+ --> Project
+ <-- KSTREAM-SOURCE-0000000000
+ Processor: Project (stores: [])
+ --> KSTREAM-SINK-0000000003
+ <-- KSTREAM-TRANSFORMVALUES-0000000001
+ Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT)
+ <-- Project
+
diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/protobuf.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/protobuf.json
index e5d9e48811db..c6c35e5961c7 100644
--- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/protobuf.json
+++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/protobuf.json
@@ -605,6 +605,81 @@
]
}
},
+ {
+ "name": "specify a protobuf schema name with SCHEMA_FULL_NAME",
+ "statements": [
+ "CREATE STREAM INPUT WITH (kafka_topic='input', format='PROTOBUF', KEY_SCHEMA_FULL_NAME='ProtobufKey1', VALUE_SCHEMA_FULL_NAME='ProtobufValue1');",
+ "CREATE STREAM OUTPUT WITH (format='PROTOBUF', KEY_SCHEMA_FULL_NAME='ProtobufKey2', VALUE_SCHEMA_FULL_NAME='ProtobufValue2') AS SELECT * FROM input;"
+ ],
+ "topics": [
+ {
+ "name": "input",
+ "keyFormat": "PROTOBUF",
+ "keySchema": "syntax = \"proto3\"; message ProtobufKey1 {uint32 k1 = 1;}",
+ "valueFormat": "PROTOBUF",
+ "valueSchema": "syntax = \"proto3\"; message ProtobufValue1 {float c1 = 1; uint32 c2 = 2;}"
+ },
+ {
+ "name": "OUTPUT",
+ "keyFormat": "PROTOBUF",
+ "keySchema": "syntax = \"proto3\"; message ProtobufKey2 {uint32 k1 = 1;}",
+ "valueFormat": "PROTOBUF",
+ "valueSchema": "syntax = \"proto3\"; message ProtobufValue2 {float c1 = 1; uint32 c2 = 2;}"
+ }
+ ],
+ "inputs": [
+ {"topic": "input", "key": {"k1": 10}, "value": {"c1": 1.1234, "c2": 100}}
+ ],
+ "outputs": [
+ {"topic": "OUTPUT", "key": {"K1": 10}, "value": {"C1": 1.1233999729156494, "C2": 100}}
+ ]
+ },
+ {
+ "name": "specify a protobuf schema from multiple protobuf schema definitions",
+ "statements": [
+ "CREATE STREAM INPUT WITH (kafka_topic='input', format='PROTOBUF', KEY_SCHEMA_FULL_NAME='ProtobufKey2', VALUE_SCHEMA_FULL_NAME='ProtobufValue2');",
+ "CREATE STREAM OUTPUT AS SELECT * FROM input;"
+ ],
+ "topics": [
+ {
+ "name": "input",
+ "keyFormat": "PROTOBUF",
+ "keySchema": "syntax = \"proto3\"; message ProtobufKey1 {uint32 k1 = 1;} message ProtobufKey2 {string k1 = 1;}",
+ "valueFormat": "PROTOBUF",
+ "valueSchema": "syntax = \"proto3\"; message ProtobufValue1 {float c1 = 1; uint32 c2 = 2;} message ProtobufValue2 {uint32 c1 = 1;}"
+ },
+ {
+ "name": "OUTPUT",
+ "keyFormat": "PROTOBUF",
+ "keySchema": "syntax = \"proto3\"; message ProtobufKey2 {string k1 = 1;}",
+ "valueFormat": "PROTOBUF",
+ "valueSchema": "syntax = \"proto3\"; message ProtobufValue2 {uint32 c1 = 1;}"
+ }
+ ],
+ "inputs": [
+ {"topic": "input", "key": {"k1": "A"}, "value": {"c1": 100}}
+ ],
+ "outputs": [
+ {"topic": "OUTPUT", "key": {"K1": "A"}, "value": {"C1": 100}}
+ ]
+ },
+ {
+ "name": "fail if protobuf schema name is not found",
+ "statements": [
+ "CREATE STREAM INPUT WITH (kafka_topic='input_topic', value_format='PROTOBUF', VALUE_SCHEMA_FULL_NAME='ProtobufValue1000');"
+ ],
+ "topics": [
+ {
+ "name": "input",
+ "valueFormat": "PROTOBUF",
+ "valueSchema": "syntax = \"proto3\"; message ProtobufValue1 {float c1 = 1; uint32 c2 = 2;} message ProtobufValue2 {uint32 c1 = 1;}"
+ }
+ ],
+ "expectedException": {
+ "type": "io.confluent.ksql.util.KsqlStatementException",
+ "message": "Schema for message values on topic 'input_topic' does not exist in the Schema Registry."
+ }
+ },
{
"name": "map with non-string keys - C*",
"comment": ["This is current limitation. PB supports INT/BIGINT keys. See https://github.com/confluentinc/ksql/issues/6177"],
diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/properties/with/CreateSourcePropertiesTest.java b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/properties/with/CreateSourcePropertiesTest.java
index 29ec0c08ac22..681ee8ebade7 100644
--- a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/properties/with/CreateSourcePropertiesTest.java
+++ b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/properties/with/CreateSourcePropertiesTest.java
@@ -101,6 +101,8 @@ public void shouldReturnOptionalEmptyForMissingProps() {
assertThat(properties.getReplicas(), is(Optional.empty()));
assertThat(properties.getPartitions(), is(Optional.empty()));
assertThat(properties.getValueSerdeFeatures(), is(SerdeFeatures.of()));
+ assertThat(properties.getKeySchemaFullName(), is(Optional.empty()));
+ assertThat(properties.getValueSchemaFullName(), is(Optional.empty()));
}
@Test
diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/InsertValuesExecutor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/InsertValuesExecutor.java
index b4e0819223e4..818db8b4bf57 100644
--- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/InsertValuesExecutor.java
+++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/InsertValuesExecutor.java
@@ -16,10 +16,10 @@
package io.confluent.ksql.rest.server.execution;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.confluent.connect.avro.AvroDataConfig;
import io.confluent.kafka.schemaregistry.ParsedSchema;
-import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.ksql.GenericKey;
@@ -50,6 +50,9 @@
import io.confluent.ksql.serde.ValueSerdeFactory;
import io.confluent.ksql.serde.avro.AvroFormat;
import io.confluent.ksql.serde.connect.ConnectProperties;
+import io.confluent.ksql.serde.protobuf.ProtobufFormat;
+import io.confluent.ksql.serde.protobuf.ProtobufProperties;
+import io.confluent.ksql.serde.protobuf.ProtobufSchemaTranslator;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
@@ -380,9 +383,9 @@ private static void ensureKeySchemasMatch(
.getSchemaTranslator(formatInfo.getProperties())
.toParsedSchema(keySchema);
- final Optional latest;
+ final Optional latest;
try {
- latest = SchemaRegistryUtil.getLatestSchema(
+ latest = SchemaRegistryUtil.getLatestParsedSchema(
schemaRegistryClient,
dataSource.getKafkaTopicName(),
true);
@@ -394,7 +397,9 @@ private static void ensureKeySchemasMatch(
+ "operation potentially overrides existing key schema in schema registry.", e);
}
- if (latest.isPresent() && !latest.get().getSchema().equals(schema.canonicalString())) {
+ if (latest.isPresent() && !latest.get().canonicalString().equals(schema.canonicalString())) {
+ final Map formatProps = keyFormat.getFormatInfo().getProperties();
+
// Hack: skip comparing connect name. See https://github.com/confluentinc/ksql/issues/7211
// Avro schema are registered in source creation time as well data insertion time.
// CONNECT_META_DATA_CONFIG is configured to false in Avro Serializer, but it's true in
@@ -402,17 +407,36 @@ private static void ensureKeySchemasMatch(
// enabling it breaks lots of history QTT test which implies backward compatibility issues.
// So we just bypass the connect name check here.
if (format instanceof AvroFormat) {
- final SchemaTranslator translator = format
- .getSchemaTranslator(keyFormat.getFormatInfo().getProperties());
+ final SchemaTranslator translator = format.getSchemaTranslator(formatProps);
translator.configure(ImmutableMap.of(AvroDataConfig.CONNECT_META_DATA_CONFIG, false));
final ParsedSchema parsedSchema = translator.toParsedSchema(keySchema);
- if (latest.get().getSchema().equals(parsedSchema.canonicalString())) {
+ if (latest.get().canonicalString().equals(parsedSchema.canonicalString())) {
+ return;
+ }
+ } else if (format instanceof ProtobufFormat
+ && formatProps.containsKey(ConnectProperties.FULL_SCHEMA_NAME)) {
+
+ // The SR key schema may have multiple schema definitions. The FULL_SCHEMA_NAME is used
+ // to specify one definition only. To verify the source key schema matches SR, then we
+ // extract the single schema based on the FULL_SCHEMA_NAME and then compare with the
+ // source schema.
+
+ final ProtobufSchemaTranslator protoTranslator = new ProtobufSchemaTranslator(
+ new ProtobufProperties(formatProps)
+ );
+
+ final ParsedSchema extractedSingleSchema = protoTranslator.fromConnectSchema(
+ protoTranslator.toConnectSchema(latest.get())
+ );
+
+ if (extractedSingleSchema.canonicalString().equals(schema.canonicalString())) {
return;
}
}
+
throw new KsqlException("Cannot INSERT VALUES into data source " + dataSource.getName()
+ ". ksqlDB generated schema would overwrite existing key schema."
- + "\n\tExisting Schema: " + latest.get().getSchema()
+ + "\n\tExisting Schema: " + latest.get().canonicalString()
+ "\n\tksqlDB Generated: " + schema.canonicalString());
}
}
@@ -495,12 +519,35 @@ private static FormatInfo addSerializerMissingFormatFields(
// The FULL_SCHEMA_NAME allows the serializer to choose the schema definition
if (supportedProperties.contains(ConnectProperties.FULL_SCHEMA_NAME)) {
if (!formatInfo.getProperties().containsKey(ConnectProperties.FULL_SCHEMA_NAME)) {
- SchemaRegistryUtil.getParsedSchema(srClient, topicName, isKey).map(ParsedSchema::name)
+ SchemaRegistryUtil.getLatestParsedSchema(srClient, topicName, isKey).map(ParsedSchema::name)
.ifPresent(schemaName ->
propertiesBuilder.put(ConnectProperties.FULL_SCHEMA_NAME, schemaName));
}
}
+ // The SCHEMA_ID is used to by the Serde factory (i.e. ProtobufSerdeFactory) to
+ // get the SR schema and extract the full schema name in case multiple schema definitions
+ // are available.
+ if (supportedProperties.contains(ConnectProperties.SCHEMA_ID)) {
+ if (!formatInfo.getProperties().containsKey(ConnectProperties.SCHEMA_ID)) {
+ // Inject the SCHEMA_ID only if the format to serialize is Protobuf and if the SR schema
+ // has multiple schema definitions. This ensures that the serializer does not attempt to
+ // register a new schema during the object serialization.
+ if (format instanceof ProtobufFormat) {
+ final List schemaNames =
+ SchemaRegistryUtil.getLatestParsedSchema(srClient, topicName, isKey)
+ .map(format::schemaFullNames)
+ .orElse(ImmutableList.of());
+
+ if (schemaNames.size() > 1) {
+ SchemaRegistryUtil.getLatestSchemaId(srClient, topicName, isKey)
+ .ifPresent(schemaId ->
+ propertiesBuilder.put(ConnectProperties.SCHEMA_ID, String.valueOf(schemaId)));
+ }
+ }
+ }
+ }
+
return FormatInfo.of(formatInfo.getFormat(), propertiesBuilder.build());
}
diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/InsertValuesExecutorTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/InsertValuesExecutorTest.java
index 126414dfa83f..c245d5a27863 100644
--- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/InsertValuesExecutorTest.java
+++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/InsertValuesExecutorTest.java
@@ -36,6 +36,7 @@
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.ksql.GenericKey;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.config.SessionConfig;
@@ -98,7 +99,6 @@
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.junit.Before;
import org.junit.Test;
@@ -137,6 +137,13 @@ public class InsertValuesExecutorTest {
+ "{\"name\":\"k0\",\"type\":[\"null\",\"string\"],\"default\":null},"
+ "{\"name\":\"k1\",\"type\":[\"null\",\"string\"],\"default\":null}]}";
+ private static final String AVRO_RAW_ONE_KEY_SCHEMA =
+ "{\"type\":\"record\","
+ + "\"name\":\"KsqlDataSourceSchema\","
+ + "\"namespace\":\"io.confluent.ksql.avro_schemas\","
+ + "\"fields\":["
+ + "{\"name\":\"k0\",\"type\":[\"null\",\"string\"],\"default\":null}]}";
+
private static final LogicalSchema SCHEMA_WITH_MUTI_KEYS = LogicalSchema.builder()
.keyColumn(K0, SqlTypes.STRING)
.keyColumn(K1, SqlTypes.STRING)
@@ -971,6 +978,55 @@ public void shouldThrowOnTablesWithKeyFieldAndNullKeyFieldValueProvided() {
"Failed to insert values into 'TOPIC'. Value for primary key column(s) k0 is required for tables"));
}
+ @Test
+ public void shouldAllowInsertOnMultipleKeySchemaDefinitions() throws Exception {
+ final String protoMultiSchema = "syntax = \"proto3\";\n"
+ + "package io.proto;\n"
+ + "\n"
+ + "message SingleKey {\n"
+ + " string k0 = 1;\n"
+ + "}\n"
+ + "message MultiKeys {\n"
+ + " string k0 = 1;\n"
+ + " string k1 = 2;\n"
+ + "}\n";
+
+ // Given:
+ when(srClient.getLatestSchemaMetadata(Mockito.any()))
+ .thenReturn(new SchemaMetadata(1, 1, protoMultiSchema));
+ when(srClient.getSchemaById(1))
+ .thenReturn(new ProtobufSchema(protoMultiSchema));
+ givenDataSourceWithSchema(
+ TOPIC_NAME,
+ SCHEMA_WITH_MUTI_KEYS,
+ SerdeFeatures.of(SerdeFeature.SCHEMA_INFERENCE),
+ SerdeFeatures.of(),
+ FormatInfo.of(FormatFactory.PROTOBUF.name(), ImmutableMap.of(
+ AvroProperties.FULL_SCHEMA_NAME,"io.proto.MultiKeys",
+ AvroProperties.SCHEMA_ID, "1"
+ )),
+ FormatInfo.of(FormatFactory.JSON.name()),
+ false,
+ false);
+
+ final ConfiguredStatement statement = givenInsertValues(
+ ImmutableList.of(K0, K1, COL0, COL1),
+ ImmutableList.of(
+ new StringLiteral("K0"),
+ new StringLiteral("K1"),
+ new StringLiteral("V0"),
+ new LongLiteral(21))
+ );
+
+ // When:
+ executor.execute(statement, mock(SessionProperties.class), engine, serviceContext);
+
+ // Then:
+ verify(keySerializer).serialize(TOPIC_NAME, genericKey("K0", "K1"));
+ verify(valueSerializer).serialize(TOPIC_NAME, genericRow("V0", 21L));
+ verify(producer).send(new ProducerRecord<>(TOPIC_NAME, null, 1L, KEY, VALUE));
+ }
+
@Test
public void shouldIgnoreConnectNameComparingKeySchema() throws Exception {
// Given:
@@ -1010,13 +1066,13 @@ public void shouldIgnoreConnectNameComparingKeySchema() throws Exception {
public void shouldSupportInsertIntoWithSchemaInferenceMatch() throws Exception {
// Given:
when(srClient.getLatestSchemaMetadata(Mockito.any()))
- .thenReturn(new SchemaMetadata(1, 1, "\"string\""));
+ .thenReturn(new SchemaMetadata(1, 1, ""));
when(srClient.getSchemaById(1))
- .thenReturn(new AvroSchema(RAW_SCHEMA));
+ .thenReturn(new AvroSchema(AVRO_RAW_ONE_KEY_SCHEMA));
givenDataSourceWithSchema(
TOPIC_NAME,
SCHEMA,
- SerdeFeatures.of(SerdeFeature.SCHEMA_INFERENCE, SerdeFeature.UNWRAP_SINGLES),
+ SerdeFeatures.of(SerdeFeature.SCHEMA_INFERENCE, SerdeFeature.WRAP_SINGLES),
SerdeFeatures.of(),
FormatInfo.of(FormatFactory.AVRO.name()),
FormatInfo.of(FormatFactory.AVRO.name()),
@@ -1043,7 +1099,7 @@ public void shouldSupportInsertIntoWithSchemaInferenceMatch() throws Exception {
public void shouldThrowOnSchemaInferenceMismatchForKey() throws Exception {
// Given:
when(srClient.getLatestSchemaMetadata(Mockito.any()))
- .thenReturn(new SchemaMetadata(1, 1, "schema"));
+ .thenReturn(new SchemaMetadata(1, 1, ""));
when(srClient.getSchemaById(1))
.thenReturn(new AvroSchema(RAW_SCHEMA));
givenDataSourceWithSchema(
@@ -1071,28 +1127,28 @@ public void shouldThrowOnSchemaInferenceMismatchForKey() throws Exception {
// Then:
assertThat(e.getMessage(), containsString("ksqlDB generated schema would overwrite existing key schema"));
- assertThat(e.getMessage(), containsString("Existing Schema: schema"));
+ assertThat(e.getMessage(), containsString("Existing Schema: " + RAW_SCHEMA));
assertThat(e.getMessage(), containsString("ksqlDB Generated: {\"type\":"));
}
@Test
public void shouldBuildSerdeWithSchemaFullName() throws Exception {
- final String AVRO_SCHEMA = "{\"type\":\"record\","
+ final String AVRO_SCHEMA_WITH_CUSTOM_NAME = "{\"type\":\"record\","
+ "\"name\":\"TestSchema\","
+ "\"namespace\":\"io.avro\","
+ "\"fields\":["
- + "{\"name\":\"k0\",\"type\":[\"null\",\"string\"],\"default\":null},"
- + "{\"name\":\"k1\",\"type\":[\"null\",\"string\"],\"default\":null}]}";
+ + "{\"name\":\"k0\",\"type\":[\"null\",\"string\"],\"default\":null}],"
+ + "\"connect.name\":\"io.avro.TestSchema\"}";
// Given:
when(srClient.getLatestSchemaMetadata(Mockito.any()))
- .thenReturn(new SchemaMetadata(1, 1, "\"string\""));
+ .thenReturn(new SchemaMetadata(1, 1, ""));
when(srClient.getSchemaById(1))
- .thenReturn(new AvroSchema(AVRO_SCHEMA));
+ .thenReturn(new AvroSchema(AVRO_SCHEMA_WITH_CUSTOM_NAME));
givenDataSourceWithSchema(
TOPIC_NAME,
SCHEMA,
- SerdeFeatures.of(SerdeFeature.UNWRAP_SINGLES),
+ SerdeFeatures.of(SerdeFeature.WRAP_SINGLES),
SerdeFeatures.of(),
FormatInfo.of(FormatFactory.AVRO.name()),
FormatInfo.of(FormatFactory.AVRO.name()),
@@ -1113,9 +1169,8 @@ public void shouldBuildSerdeWithSchemaFullName() throws Exception {
// Then:
verify(keySerdeFactory).create(
FormatInfo.of(FormatFactory.AVRO.name(), ImmutableMap.of(
- AvroProperties.FULL_SCHEMA_NAME,"io.avro.TestSchema"
- )),
- PersistenceSchema.from(SCHEMA.key(), SerdeFeatures.of(SerdeFeature.UNWRAP_SINGLES)),
+ AvroProperties.FULL_SCHEMA_NAME,"io.avro.TestSchema")),
+ PersistenceSchema.from(SCHEMA.key(), SerdeFeatures.of(SerdeFeature.WRAP_SINGLES)),
new KsqlConfig(ImmutableMap.of()),
srClientFactory,
"",
@@ -1125,8 +1180,7 @@ public void shouldBuildSerdeWithSchemaFullName() throws Exception {
verify(valueSerdeFactory).create(
FormatInfo.of(FormatFactory.AVRO.name(), ImmutableMap.of(
- AvroProperties.FULL_SCHEMA_NAME,"io.avro.TestSchema"
- )),
+ AvroProperties.FULL_SCHEMA_NAME,"io.avro.TestSchema")),
PersistenceSchema.from(SCHEMA.value(), SerdeFeatures.of()),
new KsqlConfig(ImmutableMap.of()),
srClientFactory,
@@ -1212,13 +1266,13 @@ public void shouldThrowWhenNotAuthorizedToReadKeySchemaFromSR() throws Exception
public void shouldThrowWhenNotAuthorizedToWriteKeySchemaToSR() throws Exception {
// Given:
when(srClient.getLatestSchemaMetadata(Mockito.any()))
- .thenReturn(new SchemaMetadata(1, 1, "\"string\""));
+ .thenReturn(new SchemaMetadata(1, 1, AVRO_RAW_ONE_KEY_SCHEMA));
when(srClient.getSchemaById(1))
- .thenReturn(new AvroSchema(RAW_SCHEMA));
+ .thenReturn(new AvroSchema(AVRO_RAW_ONE_KEY_SCHEMA));
givenDataSourceWithSchema(
TOPIC_NAME,
SCHEMA,
- SerdeFeatures.of(SerdeFeature.UNWRAP_SINGLES),
+ SerdeFeatures.of(SerdeFeature.WRAP_SINGLES),
SerdeFeatures.of(),
FormatInfo.of(FormatFactory.AVRO.name()),
FormatInfo.of(FormatFactory.AVRO.name()),
@@ -1288,13 +1342,13 @@ public void shouldThrowWhenNotAuthorizedToReadValSchemaFromSR() throws Exception
public void shouldThrowWhenNotAuthorizedToWriteValSchemaToSR() throws Exception {
// Given:
when(srClient.getLatestSchemaMetadata(Mockito.any()))
- .thenReturn(new SchemaMetadata(1, 1, "\"string\""));
+ .thenReturn(new SchemaMetadata(1, 1, AVRO_RAW_ONE_KEY_SCHEMA));
when(srClient.getSchemaById(1))
- .thenReturn(new AvroSchema(RAW_SCHEMA));
+ .thenReturn(new AvroSchema(AVRO_RAW_ONE_KEY_SCHEMA));
givenDataSourceWithSchema(
TOPIC_NAME,
SCHEMA,
- SerdeFeatures.of(SerdeFeature.UNWRAP_SINGLES),
+ SerdeFeatures.of(SerdeFeature.WRAP_SINGLES),
SerdeFeatures.of(),
FormatInfo.of(FormatFactory.AVRO.name()),
FormatInfo.of(FormatFactory.AVRO.name()),
@@ -1474,7 +1528,7 @@ private void givenDataSourceWithSchema(
) {
final KsqlTopic topic = new KsqlTopic(
topicName,
- KeyFormat.nonWindowed(keyFormat, keyFeatures),
+ KeyFormat.of(keyFormat, keyFeatures, Optional.empty()),
ValueFormat.of(valueFormat, valFeatures)
);
diff --git a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/Format.java b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/Format.java
index 15d008373d06..70335bc9146c 100644
--- a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/Format.java
+++ b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/Format.java
@@ -15,6 +15,7 @@
package io.confluent.ksql.serde;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
@@ -166,4 +167,14 @@ Serde> getSerde(
* @return true if the given sql type is supported
*/
boolean supportsKeyType(SqlType type);
+
+ /**
+ * Returns a list of schema names found in the {@code ParsedSchema}.
+ *
+ * It may return an empty list if not names are found. Names are not found ont formats
+ * such as Delimited and Json (no SR) formats.
+ */
+ default List schemaFullNames(final ParsedSchema schema) {
+ return ImmutableList.of();
+ }
}
diff --git a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/connect/ConnectFormat.java b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/connect/ConnectFormat.java
index bd94e9c55460..2b4a566e8e7d 100644
--- a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/connect/ConnectFormat.java
+++ b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/connect/ConnectFormat.java
@@ -15,6 +15,8 @@
package io.confluent.ksql.serde.connect;
+import com.google.common.collect.ImmutableList;
+import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.schema.ksql.PersistenceSchema;
import io.confluent.ksql.schema.ksql.SchemaConverters;
@@ -259,4 +261,13 @@ public void close() {
public boolean supportsKeyType(final SqlType type) {
return true;
}
+
+ @Override
+ public List schemaFullNames(final ParsedSchema schema) {
+ if (schema.name() == null) {
+ return ImmutableList.of();
+ }
+
+ return ImmutableList.of(schema.name());
+ }
}
diff --git a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufFormat.java b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufFormat.java
index 76e6ecf31ff9..5a4f15f2919f 100644
--- a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufFormat.java
+++ b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufFormat.java
@@ -15,17 +15,23 @@
package io.confluent.ksql.serde.protobuf;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
+import com.squareup.wire.schema.internal.parser.ProtoFileElement;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.serde.FormatProperties;
import io.confluent.ksql.serde.SerdeFeature;
import io.confluent.ksql.serde.connect.ConnectFormat;
import io.confluent.ksql.serde.connect.ConnectSchemaTranslator;
import io.confluent.ksql.util.KsqlConfig;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.connect.data.ConnectSchema;
@@ -80,4 +86,18 @@ protected Serde getConnectSerde(
return new ProtobufSerdeFactory(new ProtobufProperties(formatProps))
.createSerde(connectSchema, config, srFactory, targetType, isKey);
}
+
+ @Override
+ public List schemaFullNames(final ParsedSchema schema) {
+ if (schema.rawSchema() instanceof ProtoFileElement) {
+ final ProtoFileElement protoFileElement = (ProtoFileElement) schema.rawSchema();
+ final String packageName = protoFileElement.getPackageName();
+
+ return protoFileElement.getTypes().stream()
+ .map(typeElement -> Joiner.on(".").skipNulls().join(packageName, typeElement.getName()))
+ .collect(Collectors.toList());
+ }
+
+ return ImmutableList.of();
+ }
}
diff --git a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufSchemaTranslator.java b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufSchemaTranslator.java
index 62fdea6b8080..931421c0a318 100644
--- a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufSchemaTranslator.java
+++ b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufSchemaTranslator.java
@@ -33,7 +33,7 @@
/**
* Translates between Connect and Protobuf Schema Registry schema types.
*/
-class ProtobufSchemaTranslator implements ConnectSchemaTranslator {
+public class ProtobufSchemaTranslator implements ConnectSchemaTranslator {
private final ProtobufProperties properties;
private final Map baseConfigs;
@@ -42,7 +42,7 @@ class ProtobufSchemaTranslator implements ConnectSchemaTranslator {
private Map updatedConfigs;
private ProtobufData protobufData;
- ProtobufSchemaTranslator(final ProtobufProperties properties) {
+ public ProtobufSchemaTranslator(final ProtobufProperties properties) {
this.properties = Objects.requireNonNull(properties, "properties");
this.baseConfigs = ImmutableMap.of(
WRAPPER_FOR_RAW_PRIMITIVES_CONFIG, properties.getUnwrapPrimitives());
@@ -69,7 +69,7 @@ public String name() {
@Override
public Schema toConnectSchema(final ParsedSchema schema) {
- return protobufData.toConnectSchema((ProtobufSchema) schema);
+ return protobufData.toConnectSchema(withSchemaFullName((ProtobufSchema) schema));
}
@Override
@@ -80,6 +80,10 @@ public ParsedSchema fromConnectSchema(final Schema schema) {
.fromConnectSchema(injectSchemaFullName(schema));
}
+ private ProtobufSchema withSchemaFullName(final ProtobufSchema origSchema) {
+ return fullNameSchema.map(origSchema::copy).orElse(origSchema);
+ }
+
private Schema injectSchemaFullName(final Schema origSchema) {
return fullNameSchema
.map(fullName -> ProtobufSchemas.schemaWithName(origSchema, fullName))
diff --git a/ksqldb-serde/src/test/java/io/confluent/ksql/serde/protobuf/KsqlProtobufSerializerTest.java b/ksqldb-serde/src/test/java/io/confluent/ksql/serde/protobuf/KsqlProtobufSerializerTest.java
index d68bcc55d560..a51c6d4adf4e 100644
--- a/ksqldb-serde/src/test/java/io/confluent/ksql/serde/protobuf/KsqlProtobufSerializerTest.java
+++ b/ksqldb-serde/src/test/java/io/confluent/ksql/serde/protobuf/KsqlProtobufSerializerTest.java
@@ -307,7 +307,7 @@ public void shouldThrowIfSchemaMissesFieldNotCompatible() throws Exception {
int schemaId = givenPhysicalSchema(getSRSubject(SOME_TOPIC, false),
PROTOBUF_STRUCT_SCHEMA_WITH_MISSING_FIELD);
final Serializer serializer = givenSerializerForSchema(RANDOM_NAME_STRUCT_SCHEMA,
- Struct.class, Optional.of(schemaId), Optional.of("RandomName"));
+ Struct.class, Optional.of(schemaId), Optional.empty());
// When:
final Exception e = assertThrows(
diff --git a/ksqldb-serde/src/test/java/io/confluent/ksql/serde/protobuf/ProtobufFormatTest.java b/ksqldb-serde/src/test/java/io/confluent/ksql/serde/protobuf/ProtobufFormatTest.java
new file mode 100644
index 000000000000..ab0cab83e4c3
--- /dev/null
+++ b/ksqldb-serde/src/test/java/io/confluent/ksql/serde/protobuf/ProtobufFormatTest.java
@@ -0,0 +1,55 @@
+package io.confluent.ksql.serde.protobuf;
+
+import com.google.common.collect.ImmutableList;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
+import java.util.List;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
+public class ProtobufFormatTest {
+ private ProtobufFormat format;
+
+ @Before
+ public void setUp() {
+ format = new ProtobufFormat();
+ }
+
+ @Test
+ public void shouldReturnSchemaNamesFromMultipleSchemaDefinitionsWithPackageName() {
+ // Given
+ final ProtobufSchema protoSchema = new ProtobufSchema(""
+ + "syntax = \"proto3\"; "
+ + "package examples.proto; "
+ + "message ProtobufKey1 {uint32 k1 = 1;} "
+ + "message ProtobufKey2 {string k1 = 1;}"
+ );
+
+ // When
+ final List schemaNames = format.schemaFullNames(protoSchema);
+
+ // Then
+ assertThat(schemaNames, equalTo(ImmutableList.of(
+ "examples.proto.ProtobufKey1",
+ "examples.proto.ProtobufKey2"
+ )));
+ }
+
+ @Test
+ public void shouldReturnSchemaNamesFromMultipleSchemaDefinitionsWithoutPackageName() {
+ // Given
+ final ProtobufSchema protoSchema = new ProtobufSchema(""
+ + "syntax = \"proto3\"; "
+ + "message ProtobufKey1 {uint32 k1 = 1;} "
+ + "message ProtobufKey2 {string k1 = 1;}"
+ );
+
+ // When
+ final List schemaNames = format.schemaFullNames(protoSchema);
+
+ // Then
+ assertThat(schemaNames, equalTo(ImmutableList.of("ProtobufKey1", "ProtobufKey2")));
+ }
+}
diff --git a/ksqldb-serde/src/test/java/io/confluent/ksql/serde/protobuf/ProtobufSchemaTranslatorTest.java b/ksqldb-serde/src/test/java/io/confluent/ksql/serde/protobuf/ProtobufSchemaTranslatorTest.java
index e8b9ab6d8ba0..661c598d60bd 100644
--- a/ksqldb-serde/src/test/java/io/confluent/ksql/serde/protobuf/ProtobufSchemaTranslatorTest.java
+++ b/ksqldb-serde/src/test/java/io/confluent/ksql/serde/protobuf/ProtobufSchemaTranslatorTest.java
@@ -20,9 +20,11 @@
import static org.hamcrest.Matchers.is;
import com.google.common.collect.ImmutableMap;
+import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
import org.junit.Test;
public class ProtobufSchemaTranslatorTest {
@@ -108,6 +110,103 @@ public void shouldKeepWrappingPrimitivesOnConfigure() {
assertThat(schema.field("c5").schema().field("value").schema().type(), is(Type.STRING));
}
+ @Test
+ public void shouldReturnParsedSchemaWithDefaultFullSchemaName() {
+ // Given:
+ givenWrapPrimitives();
+ final Schema connectSchema = SchemaBuilder.struct()
+ .field("id", Schema.OPTIONAL_INT64_SCHEMA)
+ .field("array", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA))
+ .field("map", SchemaBuilder.map(Schema.BYTES_SCHEMA, Schema.FLOAT64_SCHEMA))
+ .field("struct", SchemaBuilder.struct()
+ .field("c1", Schema.STRING_SCHEMA)
+ .build())
+ .build();
+
+ // When:
+ schemaTranslator.configure(ImmutableMap.of());
+ final ParsedSchema parsedSchema = schemaTranslator.fromConnectSchema(connectSchema);
+
+ // Then:
+ assertThat(parsedSchema.name(), is("ConnectDefault1"));
+ assertThat(((ProtobufSchema)parsedSchema).rawSchema().toSchema(), is(
+ "// Proto schema formatted by Wire, do not edit.\n" +
+ "// Source: \n" +
+ "\n" +
+ "syntax = \"proto3\";\n" +
+ "\n" +
+ "message ConnectDefault1 {\n" +
+ " int64 id = 1;\n" +
+ "\n" +
+ " repeated string array = 2;\n" +
+ "\n" +
+ " repeated ConnectDefault2Entry map = 3;\n" +
+ "\n" +
+ " ConnectDefault3 struct = 4;\n" +
+ "\n" +
+ " message ConnectDefault2Entry {\n" +
+ " bytes key = 1;\n" +
+ " \n" +
+ " double value = 2;\n" +
+ " }\n" +
+ "\n" +
+ " message ConnectDefault3 {\n" +
+ " string c1 = 1;\n" +
+ " }\n" +
+ "}\n"
+ ));
+ }
+
+ @Test
+ public void shouldReturnParsedSchemaWithFullSchemaName() {
+ // Given:
+ givenSchemaFullName("io.examples.Customer");
+ final Schema connectSchema = SchemaBuilder.struct()
+ .field("id", Schema.OPTIONAL_INT64_SCHEMA)
+ .field("array", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA))
+ .field("map", SchemaBuilder.map(Schema.BYTES_SCHEMA, Schema.FLOAT64_SCHEMA).name("InternalMap"))
+ .field("struct", SchemaBuilder.struct()
+ .field("c1", Schema.STRING_SCHEMA)
+ .name("InternalStruct")
+ .build())
+ .build();
+
+ // When:
+ schemaTranslator.configure(ImmutableMap.of());
+ final ParsedSchema parsedSchema = schemaTranslator.fromConnectSchema(connectSchema);
+
+ // Then:
+ assertThat(parsedSchema.name(), is("io.examples.Customer"));
+ assertThat(((ProtobufSchema)parsedSchema).rawSchema().toSchema(), is(
+ "// Proto schema formatted by Wire, do not edit.\n" +
+ "// Source: \n" +
+ "\n" +
+ "syntax = \"proto3\";\n" +
+ "\n" +
+ "package io.examples;\n" +
+ "\n" +
+ "message Customer {\n" +
+ " int64 id = 1;\n" +
+ "\n" +
+ " repeated string array = 2;\n" +
+ "\n" +
+ " repeated InternalMapEntry map = 3;\n" +
+ "\n" +
+ " InternalStruct struct = 4;\n" +
+ "\n" +
+ " message InternalMapEntry {\n" +
+ " bytes key = 1;\n" +
+ " \n" +
+ " double value = 2;\n" +
+ " }\n" +
+ "\n" +
+ " message InternalStruct {\n" +
+ " string c1 = 1;\n" +
+ " }\n" +
+ "}\n"
+ ));
+ }
+
private void givenUnwrapPrimitives() {
schemaTranslator = new ProtobufSchemaTranslator(new ProtobufProperties(ImmutableMap.of(
ProtobufProperties.UNWRAP_PRIMITIVES, ProtobufProperties.UNWRAP
@@ -118,4 +217,9 @@ private void givenWrapPrimitives() {
schemaTranslator = new ProtobufSchemaTranslator(new ProtobufProperties(ImmutableMap.of()));
}
+ private void givenSchemaFullName(final String fullSchemaName) {
+ schemaTranslator = new ProtobufSchemaTranslator(new ProtobufProperties(ImmutableMap.of(
+ ProtobufProperties.FULL_SCHEMA_NAME, fullSchemaName
+ )));
+ }
}
\ No newline at end of file