Skip to content

Commit

Permalink
fix: INSERT VALUES fail when SR schema has a non-default name (#8984)
Browse files Browse the repository at this point in the history
  • Loading branch information
spena committed Apr 14, 2022
1 parent 5b4b811 commit 150bc44
Show file tree
Hide file tree
Showing 14 changed files with 801 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,43 @@ public static boolean subjectExists(
return getLatestSchema(srClient, subject).isPresent();
}

public static Optional<ParsedSchema> getParsedSchema(
final SchemaRegistryClient srClient,
final String topic,
final boolean isKey
) {
final String subject = KsqlConstants.getSRSubject(topic, isKey);

final Optional<SchemaMetadata> metadata = getLatestSchema(srClient, subject);
if (!metadata.isPresent()) {
return Optional.empty();
}

final int schemaId = metadata.get().getId();

try {
return Optional.of(srClient.getSchemaById(schemaId));
} catch (final Exception e) {
throwOnAuthError(e, subject);

throw new KsqlException(
"Could not get schema for subject " + subject + " and id " + schemaId, e);
}
}

private static void throwOnAuthError(final Exception e, final String subject) {
if (isAuthErrorCode(e)) {
final AclOperation deniedOperation = SchemaRegistryUtil.getDeniedOperation(e.getMessage());

if (deniedOperation != AclOperation.UNKNOWN) {
throw new KsqlSchemaAuthorizationException(
deniedOperation,
subject
);
}
}
}

public static Optional<SchemaMetadata> getLatestSchema(
final SchemaRegistryClient srClient,
final String topic,
Expand All @@ -127,16 +164,7 @@ public static Optional<SchemaMetadata> getLatestSchema(
return Optional.empty();
}

if (isAuthErrorCode(e)) {
final AclOperation deniedOperation = SchemaRegistryUtil.getDeniedOperation(e.getMessage());

if (deniedOperation != AclOperation.UNKNOWN) {
throw new KsqlSchemaAuthorizationException(
deniedOperation,
subject
);
}
}
throwOnAuthError(e, subject);

throw new KsqlException("Could not get latest schema for subject " + subject, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import io.confluent.ksql.exception.KsqlSchemaAuthorizationException;
import io.confluent.ksql.util.KsqlException;
import java.io.IOException;
import java.util.Optional;

import org.apache.kafka.common.acl.AclOperation;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -64,6 +66,40 @@ public class SchemaRegistryUtilTest {
@Mock
private SchemaMetadata schemaMetadata;

@Test
public void shouldReturnParsedSchemaFromSubjectValue() throws Exception {
// Given:
when(schemaMetadata.getId()).thenReturn(123);
when(schemaRegistryClient.getLatestSchemaMetadata("bar-value"))
.thenReturn(schemaMetadata);
when(schemaRegistryClient.getSchemaById(123))
.thenReturn(AVRO_SCHEMA);

// When:
final Optional<ParsedSchema> parsedSchema =
SchemaRegistryUtil.getParsedSchema(schemaRegistryClient, "bar", false);

// Then:
assertThat(parsedSchema.get(), equalTo(AVRO_SCHEMA));
}

@Test
public void shouldReturnParsedSchemaFromSubjectKey() throws Exception {
// Given:
when(schemaMetadata.getId()).thenReturn(123);
when(schemaRegistryClient.getLatestSchemaMetadata("bar-key"))
.thenReturn(schemaMetadata);
when(schemaRegistryClient.getSchemaById(123))
.thenReturn(AVRO_SCHEMA);

// When:
final Optional<ParsedSchema> parsedSchema =
SchemaRegistryUtil.getParsedSchema(schemaRegistryClient, "bar", true);

// Then:
assertThat(parsedSchema.get(), equalTo(AVRO_SCHEMA));
}

@Test
public void shouldDeleteChangeLogTopicSchema() throws Exception {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.parser.properties.with;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -139,6 +140,24 @@ public Optional<Integer> getValueSchemaId() {
return Optional.ofNullable(props.getInt(CommonCreateConfigs.VALUE_SCHEMA_ID));
}

public Optional<String> getKeySchemaFullName() {
final String schemaFullName = props.getString(CommonCreateConfigs.KEY_SCHEMA_FULL_NAME);
if (schemaFullName == null) {
return Optional.empty();
}

return Optional.ofNullable(Strings.emptyToNull(schemaFullName.trim()));
}

public Optional<String> getValueSchemaFullName() {
final String schemaFullName = props.getString(CommonCreateConfigs.VALUE_SCHEMA_FULL_NAME);
if (schemaFullName == null) {
return Optional.empty();
}

return Optional.ofNullable(Strings.emptyToNull(schemaFullName.trim()));
}

public Optional<FormatInfo> getKeyFormat(final SourceName name) {
final String keyFormat = getFormatName()
.orElse(props.getString(CommonCreateConfigs.KEY_FORMAT_PROPERTY));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,36 @@ public void shouldReturnOptionalEmptyForMissingProps() {
assertThat(properties.getValueSerdeFeatures(), is(SerdeFeatures.of()));
}

@Test
public void shouldReturnOptionalEmptyForPropsWithSpaces() {
// When:
final CreateSourceProperties properties = CreateSourceProperties.from(
ImmutableMap.<String, Literal>builder()
.putAll(MINIMUM_VALID_PROPS)
.put(KEY_SCHEMA_FULL_NAME, new StringLiteral(" "))
.put(VALUE_SCHEMA_FULL_NAME, new StringLiteral(" "))
.build());

// Then:
assertThat(properties.getKeySchemaFullName(), is(Optional.empty()));
assertThat(properties.getValueSchemaFullName(), is(Optional.empty()));
}

@Test
public void shouldSetValidKeyAndValueFullSchemaName() {
// When:
final CreateSourceProperties properties = CreateSourceProperties.from(
ImmutableMap.<String, Literal>builder()
.putAll(MINIMUM_VALID_PROPS)
.put(CommonCreateConfigs.KEY_SCHEMA_FULL_NAME, new StringLiteral("io.com.key"))
.put(CommonCreateConfigs.VALUE_SCHEMA_FULL_NAME, new StringLiteral("io.com.val"))
.build());

// Then:
assertThat(properties.getKeySchemaFullName(), is(Optional.of("io.com.key")));
assertThat(properties.getValueSchemaFullName(), is(Optional.of("io.com.val")));
}

@Test
public void shouldSetValidTimestampName() {
// When:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.confluent.connect.avro.AvroDataConfig;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.ksql.GenericKey;
import io.confluent.ksql.GenericRow;
Expand All @@ -39,6 +40,7 @@
import io.confluent.ksql.schema.registry.SchemaRegistryUtil;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.FormatInfo;
import io.confluent.ksql.serde.GenericKeySerDe;
import io.confluent.ksql.serde.GenericRowSerDe;
import io.confluent.ksql.serde.KeyFormat;
Expand All @@ -47,6 +49,7 @@
import io.confluent.ksql.serde.SerdeFeature;
import io.confluent.ksql.serde.ValueSerdeFactory;
import io.confluent.ksql.serde.avro.AvroFormat;
import io.confluent.ksql.serde.connect.ConnectProperties;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
Expand All @@ -60,6 +63,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -311,8 +315,15 @@ private byte[] serializeKey(

ensureKeySchemasMatch(physicalSchema.keySchema(), dataSource, serviceContext);

final Serde<GenericKey> keySerde = keySerdeFactory.create(
final FormatInfo formatInfo = addSerializerMissingFormatFields(
serviceContext.getSchemaRegistryClient(),
dataSource.getKsqlTopic().getKeyFormat().getFormatInfo(),
dataSource.getKafkaTopicName(),
true
);

final Serde<GenericKey> keySerde = keySerdeFactory.create(
formatInfo,
physicalSchema.keySchema(),
config,
serviceContext.getSchemaRegistryClientFactory(),
Expand Down Expand Up @@ -356,14 +367,23 @@ private static void ensureKeySchemasMatch(
return;
}

final SchemaRegistryClient schemaRegistryClient = serviceContext.getSchemaRegistryClient();

final FormatInfo formatInfo = addSerializerMissingFormatFields(
schemaRegistryClient,
dataSource.getKsqlTopic().getKeyFormat().getFormatInfo(),
dataSource.getKafkaTopicName(),
true
);

final ParsedSchema schema = format
.getSchemaTranslator(keyFormat.getFormatInfo().getProperties())
.getSchemaTranslator(formatInfo.getProperties())
.toParsedSchema(keySchema);

final Optional<SchemaMetadata> latest;
try {
latest = SchemaRegistryUtil.getLatestSchema(
serviceContext.getSchemaRegistryClient(),
schemaRegistryClient,
dataSource.getKafkaTopicName(),
true);

Expand Down Expand Up @@ -409,8 +429,15 @@ private byte[] serializeValue(
dataSource.getKsqlTopic().getValueFormat().getFeatures()
);

final Serde<GenericRow> valueSerde = valueSerdeFactory.create(
final FormatInfo formatInfo = addSerializerMissingFormatFields(
serviceContext.getSchemaRegistryClient(),
dataSource.getKsqlTopic().getValueFormat().getFormatInfo(),
dataSource.getKafkaTopicName(),
false
);

final Serde<GenericRow> valueSerde = valueSerdeFactory.create(
formatInfo,
physicalSchema.valueSchema(),
config,
serviceContext.getSchemaRegistryClientFactory(),
Expand All @@ -435,6 +462,48 @@ private byte[] serializeValue(
}
}

/**
* Add missing required fields to the passed {@code FormatInfo} and return a
* new {@code FormatInfo} with the new fields. These fields are required to serialize the
* key and value schema correctly. For instance, if running INSERT on a stream with a SR schema
* name different from the default name used in Connect (i.e. ConnectDefault1 for Protobuf).
* </p>
* Note: I initially thought of injecting the SCHEMA_FULL_NAME property in the WITH clause
* when creating the stream, keep that property in the metastore and use it here instead
* of looking at the SR directly. But this is not compatible with existing streams because
* they were created previous to this fix. Those previous streams would fail with INSERT.
* The best option was to dynamically look at the SR schema during an INSERT statement.
*/
private static FormatInfo addSerializerMissingFormatFields(
final SchemaRegistryClient srClient,
final FormatInfo formatInfo,
final String topicName,
final boolean isKey
) {
// Only SR information, such as schema ids and full names are required, which are available
// for SR formats.
final Format format = FormatFactory.fromName(formatInfo.getFormat());
if (!format.supportsFeature(SerdeFeature.SCHEMA_INFERENCE)) {
return formatInfo;
}

final Set<String> supportedProperties = format.getSupportedProperties();

final ImmutableMap.Builder propertiesBuilder = ImmutableMap.builder();
propertiesBuilder.putAll(formatInfo.getProperties());

// The FULL_SCHEMA_NAME allows the serializer to choose the schema definition
if (supportedProperties.contains(ConnectProperties.FULL_SCHEMA_NAME)) {
if (!formatInfo.getProperties().containsKey(ConnectProperties.FULL_SCHEMA_NAME)) {
SchemaRegistryUtil.getParsedSchema(srClient, topicName, isKey).map(ParsedSchema::name)
.ifPresent(schemaName ->
propertiesBuilder.put(ConnectProperties.FULL_SCHEMA_NAME, schemaName));
}
}

return FormatInfo.of(formatInfo.getFormat(), propertiesBuilder.build());
}

private static void maybeThrowSchemaRegistryAuthError(
final Format format,
final String topicName,
Expand Down
Loading

0 comments on commit 150bc44

Please sign in to comment.