Skip to content

Commit

Permalink
Pass ParsedSchema to SchemaParser
Browse files Browse the repository at this point in the history
  • Loading branch information
Praveen2112 committed Jan 4, 2021
1 parent 2aa8c20 commit 23a4334
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 4 deletions.
Expand Up @@ -14,6 +14,8 @@
package io.trino.plugin.kafka.schema.confluent;

import com.google.common.collect.ImmutableList;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.trino.decoder.avro.AvroRowDecoderFactory;
import io.trino.plugin.kafka.KafkaTopicFieldDescription;
import io.trino.plugin.kafka.KafkaTopicFieldGroup;
Expand All @@ -27,6 +29,7 @@
import java.util.List;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.plugin.kafka.schema.confluent.ConfluentSessionProperties.getEmptyFieldStrategy;
Expand All @@ -44,9 +47,10 @@ public AvroSchemaParser(TypeManager typeManager)
}

@Override
public KafkaTopicFieldGroup parse(ConnectorSession session, String subject, String rawSchema)
public KafkaTopicFieldGroup parse(ConnectorSession session, String subject, ParsedSchema parsedSchema)
{
Schema schema = new Schema.Parser().parse(rawSchema);
checkArgument(parsedSchema instanceof AvroSchema, "parsedSchema should be an instance of AvroSchema");
Schema schema = ((AvroSchema) parsedSchema).rawSchema();
AvroSchemaConverter schemaConverter = new AvroSchemaConverter(typeManager, getEmptyFieldStrategy(session));
List<Type> types = schemaConverter.convertAvroSchema(schema);
ImmutableList.Builder<KafkaTopicFieldDescription> fieldsBuilder = ImmutableList.builder();
Expand Down
Expand Up @@ -192,7 +192,14 @@ private KafkaTopicFieldGroup getFieldGroup(ConnectorSession session, String subj
if (schemaParser == null) {
throw new TrinoException(NOT_SUPPORTED, "Not supported schema: " + schemaMetadata.getSchemaType());
}
return schemaParser.parse(session, subject, schemaMetadata.getSchema());
return schemaParser.parse(
session,
subject,
schemaRegistryClient.parseSchema(
schemaMetadata.getSchemaType(),
schemaMetadata.getSchema(),
schemaMetadata.getReferences())
.orElseThrow());
}

private SchemaMetadata getLatestSchemaMetadata(String subject)
Expand Down
Expand Up @@ -13,10 +13,11 @@
*/
package io.trino.plugin.kafka.schema.confluent;

import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.trino.plugin.kafka.KafkaTopicFieldGroup;
import io.trino.spi.connector.ConnectorSession;

public interface SchemaParser
{
KafkaTopicFieldGroup parse(ConnectorSession session, String subject, String rawSchema);
KafkaTopicFieldGroup parse(ConnectorSession session, String subject, ParsedSchema parsedSchema);
}

0 comments on commit 23a4334

Please sign in to comment.