Skip to content
This repository has been archived by the owner on Dec 14, 2022. It is now read-only.

Commit

Permalink
fix the pulsar schema cannot exclude the key fields. (#401)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jianyun Zhao committed Sep 8, 2021
1 parent 0280311 commit 02c505d
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 65 deletions.
Expand Up @@ -14,8 +14,11 @@

package org.apache.flink.streaming.connectors.pulsar.internal;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.atomic.AtomicRowDataFormatFactory;
import org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogTable;
Expand All @@ -25,6 +28,7 @@
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;

import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
Expand Down Expand Up @@ -72,6 +76,13 @@ public PulsarCatalogSupport(String adminUrl, ClientConfigurationData clientConfi
this.schemaTranslator = schemaTranslator;
}

@VisibleForTesting
protected PulsarCatalogSupport(PulsarMetadataReader metadataReader,
SchemaTranslator schemaTranslator) {
this.pulsarMetadataReader = metadataReader;
this.schemaTranslator = schemaTranslator;
}

public List<String> listNamespaces() throws PulsarAdminException {
return pulsarMetadataReader.listNamespaces();
}
Expand All @@ -89,11 +100,11 @@ public List<String> getTopics(String databaseName) throws PulsarAdminException {
}

public CatalogTable getTableSchema(ObjectPath tablePath,
Map<String, String> properties)
Configuration configuration)
throws PulsarAdminException, IncompatibleSchemaException {
String topicName = objectPath2TopicName(tablePath);
final SchemaInfo pulsarSchema = pulsarMetadataReader.getPulsarSchema(topicName);
return schemaToCatalogTable(pulsarSchema, tablePath, properties);
return schemaToCatalogTable(pulsarSchema, tablePath, configuration);
}

public boolean topicExists(ObjectPath tablePath) throws PulsarAdminException {
Expand All @@ -108,11 +119,12 @@ public void createTopic(ObjectPath tablePath, int defaultNumPartitions, CatalogB
pulsarMetadataReader.createTopic(topicName, defaultNumPartitions);
}

public void putSchema(ObjectPath tablePath, CatalogBaseTable table, String format)
public void putSchema(ObjectPath tablePath, CatalogBaseTable table, Configuration configuration)
throws PulsarAdminException, IncompatibleSchemaException {
String topicName = objectPath2TopicName(tablePath);
final TableSchema schema = table.getSchema();
final SchemaInfo schemaInfo = tableSchemaToPulsarSchema(format, schema, table.getOptions());
String format = configuration.get(PulsarTableOptions.VALUE_FORMAT);
final SchemaInfo schemaInfo = tableSchemaToPulsarSchema(format, schema, configuration);

// Writing schemaInfo#properties causes the client to fail to consume it when it is a Pulsar native type.
if (!StringUtils.equals(format, AtomicRowDataFormatFactory.IDENTIFIER)) {
Expand Down Expand Up @@ -158,15 +170,18 @@ private static Map<String, String> retrieveFlinkProperties(Map<String, String> p
}

private SchemaInfo tableSchemaToPulsarSchema(String format, TableSchema schema,
Map<String, String> options) throws IncompatibleSchemaException {
Configuration configuration) throws IncompatibleSchemaException {
// The exclusion logic for the key is not handled correctly here when the user sets the key-related fields using pulsar
final DataType physicalRowDataType = schema.toPhysicalRowDataType();
return SchemaUtils.tableSchemaToSchemaInfo(format, physicalRowDataType, options);
final int[] valueProjection =
PulsarTableOptions.createValueFormatProjection(configuration, physicalRowDataType);
DataType physicalValueDataType = DataTypeUtils.projectRow(physicalRowDataType, valueProjection);
return SchemaUtils.tableSchemaToSchemaInfo(format, physicalValueDataType, configuration);
}

private CatalogTable schemaToCatalogTable(SchemaInfo pulsarSchema,
ObjectPath tablePath,
Map<String, String> flinkProperties)
ObjectPath tablePath,
Configuration configuration)
throws IncompatibleSchemaException {
boolean isCatalogTopic = Boolean.parseBoolean(pulsarSchema.getProperties().get(IS_CATALOG_TOPIC));
if (isCatalogTopic) {
Expand All @@ -180,7 +195,7 @@ private CatalogTable schemaToCatalogTable(SchemaInfo pulsarSchema,
List<String> partitionKeys = tableSchemaProps.getPartitionKeys();
// remove the schema from properties
properties = CatalogTableImpl.removeRedundant(properties, tableSchema, partitionKeys);
properties.putAll(flinkProperties);
properties.putAll(configuration.toMap());
properties.remove(IS_CATALOG_TOPIC);
String comment = properties.remove(PulsarCatalogSupport.COMMENT);
return CatalogTable.of(
Expand All @@ -195,7 +210,7 @@ private CatalogTable schemaToCatalogTable(SchemaInfo pulsarSchema,
tableSchema.toSchema(),
"",
Collections.emptyList(),
flinkProperties
configuration.toMap()
);
}
}
Expand Down
Expand Up @@ -90,7 +90,6 @@ public class PulsarOptions {
PUBLISH_TIME_NAME,
EVENT_TIME_NAME);

public static final String DEFAULT_PARTITIONS = "table-default-partitions";
public static final String AUTH_PARAMS_KEY = "auth-params";
public static final String AUTH_PLUGIN_CLASSNAME_KEY = "auth-plugin-classname";
}
Expand Up @@ -14,6 +14,7 @@

package org.apache.flink.streaming.connectors.pulsar.internal;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.formats.protobuf.PbFormatOptions;
import org.apache.flink.streaming.connectors.pulsar.config.RecordSchemaType;
Expand All @@ -40,7 +41,6 @@
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;

import static org.apache.avro.Schema.Type.RECORD;
import static org.apache.pulsar.shade.com.google.common.base.Preconditions.checkNotNull;
Expand Down Expand Up @@ -217,15 +217,15 @@ private static <T extends GeneratedMessageV3> Class<T> convertProtobuf(Class rec
}

public static SchemaInfo tableSchemaToSchemaInfo(String format, DataType dataType,
Map<String, String> options)
Configuration configuration)
throws IncompatibleSchemaException {
switch (StringUtils.lowerCase(format)) {
case "json":
return getSchemaInfo(SchemaType.JSON, dataType);
case "avro":
return getSchemaInfo(SchemaType.AVRO, dataType);
case "protobuf":
final String messageClassName = options.get(PbFormatOptions.MESSAGE_CLASS_NAME.key());
final String messageClassName = configuration.get(PbFormatOptions.MESSAGE_CLASS_NAME);
return getProtobufSchemaInfo(messageClassName, SchemaUtils.class.getClassLoader());
case "atomic":
org.apache.pulsar.client.api.Schema pulsarSchema =
Expand Down
Expand Up @@ -16,6 +16,7 @@

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.protobuf.PbFormatOptions;
import org.apache.flink.formats.protobuf.serialize.PbRowDataSerializationSchema;
import org.apache.flink.streaming.connectors.pulsar.internal.SchemaUtils;
Expand All @@ -38,7 +39,6 @@
import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -222,19 +222,19 @@ private FlinkSchema<RowData> buildSchema() {
if (StringUtils.isBlank(valueFormatType)) {
return new FlinkSchema<>(Schema.BYTES.getSchemaInfo(), valueSerialization, null);
}
Map<String, String> options = new HashMap<>();
hackPbSerializationSchema(options);
SchemaInfo schemaInfo = SchemaUtils.tableSchemaToSchemaInfo(valueFormatType, valueDataType, options);
Configuration configuration = new Configuration();
hackPbSerializationSchema(configuration);
SchemaInfo schemaInfo = SchemaUtils.tableSchemaToSchemaInfo(valueFormatType, valueDataType, configuration);
return new FlinkSchema<>(schemaInfo, valueSerialization, null);
}

private void hackPbSerializationSchema(Map<String, String> options) {
private void hackPbSerializationSchema(Configuration configuration) {
// reflect read PbRowSerializationSchema#messageClassName
if (valueSerialization instanceof PbRowDataSerializationSchema) {
try {
final String messageClassName =
(String) FieldUtils.readDeclaredField(valueSerialization, "messageClassName", true);
options.put(PbFormatOptions.MESSAGE_CLASS_NAME.key(), messageClassName);
configuration.set(PbFormatOptions.MESSAGE_CLASS_NAME, messageClassName);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
Expand Down
Expand Up @@ -209,7 +209,7 @@ private PulsarSerializationSchema<RowData> createPulsarSerializer(SerializationS
hasMetadata,
metadataPositions,
upsertMode,
physicalDataType,
DataTypeUtils.projectRow(physicalDataType, valueProjection),
formatType,
delayMilliseconds);
}
Expand Down
Expand Up @@ -54,7 +54,7 @@
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarSinkSemantic.AT_LEAST_ONCE;
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarSinkSemantic.EXACTLY_ONCE;
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarSinkSemantic.NONE;
import static org.apache.flink.table.factories.FactoryUtil.FORMAT_SUFFIX;
import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;

/**
Expand All @@ -68,16 +68,17 @@ public class PulsarTableOptions {
// --------------------------------------------------------------------------------------------

public static final ConfigOption<String> KEY_FORMAT = ConfigOptions
.key("key" + FORMAT_SUFFIX)
.key("key." + FORMAT.key())
.stringType()
.noDefaultValue()
.withDescription("Defines the format identifier for encoding key data. "
+ "The identifier is used to discover a suitable format factory.");

public static final ConfigOption<String> VALUE_FORMAT = ConfigOptions
.key("value" + FORMAT_SUFFIX)
.key("value." + FORMAT.key())
.stringType()
.noDefaultValue()
.withFallbackKeys(FORMAT.key())
.withDescription("Defines the format identifier for encoding value data. "
+ "The identifier is used to discover a suitable format factory.");

Expand Down
Expand Up @@ -14,12 +14,12 @@

package org.apache.flink.table.catalog.pulsar;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.pulsar.internal.IncompatibleSchemaException;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarCatalogSupport;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions;
import org.apache.flink.streaming.connectors.pulsar.internal.SimpleSchemaTranslator;
import org.apache.flink.streaming.connectors.pulsar.table.PulsarDynamicTableFactory;
import org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
Expand All @@ -39,7 +39,6 @@
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.descriptors.FormatDescriptorValidator;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.Factory;

Expand All @@ -54,7 +53,7 @@
import java.util.Optional;

import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.PROPERTIES_PREFIX;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR;
import static org.apache.flink.table.catalog.pulsar.factories.PulsarCatalogFactoryOptions.DEFAULT_PARTITIONS;

/**
* Expose a Pulsar instance as a database catalog.
Expand All @@ -64,16 +63,16 @@ public class PulsarCatalog extends GenericInMemoryCatalog {

private String adminUrl;

private Map<String, String> properties;
private Configuration configuration;

private PulsarCatalogSupport catalogSupport;

public static final String DEFAULT_DB = "public/default";

public PulsarCatalog(String adminUrl, String catalogName, Map<String, String> props, String defaultDatabase) {
public PulsarCatalog(String adminUrl, String catalogName, Configuration configuration, String defaultDatabase) {
super(catalogName, defaultDatabase);
this.adminUrl = adminUrl;
this.properties = new HashMap<>(props);
this.configuration = configuration;
log.info("Created Pulsar Catalog {}", catalogName);
}

Expand All @@ -87,9 +86,11 @@ public void open() throws CatalogException {
if (catalogSupport == null) {
try {
final ClientConfigurationData clientConf = new ClientConfigurationData();
clientConf.setAuthParams(properties.get(PROPERTIES_PREFIX + PulsarOptions.AUTH_PARAMS_KEY));
clientConf.setAuthParams(
configuration.getString(PROPERTIES_PREFIX + PulsarOptions.AUTH_PARAMS_KEY, null));
clientConf.setAuthPluginClassName(
properties.get(PROPERTIES_PREFIX + PulsarOptions.AUTH_PLUGIN_CLASSNAME_KEY));
configuration.getString(PROPERTIES_PREFIX + PulsarOptions.AUTH_PLUGIN_CLASSNAME_KEY, null));

catalogSupport = new PulsarCatalogSupport(adminUrl, clientConf, "",
new HashMap<>(), -1, -1, new SimpleSchemaTranslator(false));
} catch (PulsarClientException e) {
Expand Down Expand Up @@ -165,7 +166,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep
return super.getTable(tablePath);
}
try {
return catalogSupport.getTableSchema(tablePath, properties);
return catalogSupport.getTableSchema(tablePath, configuration);
} catch (PulsarAdminException.NotFoundException e) {
throw new TableNotExistException(getName(), tablePath, e);
} catch (PulsarAdminException | IncompatibleSchemaException e) {
Expand Down Expand Up @@ -193,8 +194,8 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
if (tablePath.getObjectName().startsWith("_tmp_table_")) {
super.createTable(tablePath, table, ignoreIfExists);
}
final String key = CONNECTOR + "." + PulsarOptions.DEFAULT_PARTITIONS;
int defaultNumPartitions = Integer.parseInt(properties.getOrDefault(key, "5"));

int defaultNumPartitions = configuration.getInteger(DEFAULT_PARTITIONS);
String databaseName = tablePath.getDatabaseName();
Boolean databaseExists;
try {
Expand All @@ -209,7 +210,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig

try {
catalogSupport.createTopic(tablePath, defaultNumPartitions, table);
catalogSupport.putSchema(tablePath, table, getFormat());
catalogSupport.putSchema(tablePath, table, configuration);
} catch (PulsarAdminException e) {
if (e.getStatusCode() == 409) {
throw new TableAlreadyExistException(getName(), tablePath, e);
Expand All @@ -221,11 +222,6 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
}
}

private String getFormat() {
return Optional.ofNullable(properties.get(FormatDescriptorValidator.FORMAT))
.orElseGet(() -> properties.get(PulsarTableOptions.VALUE_FORMAT.key()));
}

// ------------------------------------------------------------------------
// Unsupported catalog operations for Pulsar
// There should not be such permission in the connector, it is very dangerous
Expand Down
Expand Up @@ -15,6 +15,7 @@
package org.apache.flink.table.catalog.pulsar.factories;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.pulsar.PulsarCatalog;
import org.apache.flink.table.factories.CatalogFactory;
Expand All @@ -28,6 +29,7 @@
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.KEY_FIELDS_PREFIX;
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.KEY_FORMAT;
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.PROPERTIES;
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.PROPERTIES_PREFIX;
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SCAN_STARTUP_MODE;
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SERVICE_URL;
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SINK_SEMANTIC;
Expand All @@ -53,11 +55,11 @@ public String factoryIdentifier() {
public Catalog createCatalog(Context context) {
final FactoryUtil.CatalogFactoryHelper helper =
FactoryUtil.createCatalogFactoryHelper(this, context);
helper.validate();
helper.validateExcept(PROPERTIES_PREFIX);
return new PulsarCatalog(
helper.getOptions().get(ADMIN_URL),
context.getName(),
context.getOptions(),
(Configuration) helper.getOptions(),
helper.getOptions().get(DEFAULT_DATABASE));
}

Expand Down

0 comments on commit 02c505d

Please sign in to comment.