diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarCatalogSupport.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarCatalogSupport.java index 9d7c9dc0..bfd2a770 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarCatalogSupport.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarCatalogSupport.java @@ -14,8 +14,11 @@ package org.apache.flink.streaming.connectors.pulsar.internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; import org.apache.flink.formats.atomic.AtomicRowDataFormatFactory; +import org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogTable; @@ -25,6 +28,7 @@ import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.descriptors.Schema; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.utils.DataTypeUtils; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.PulsarClientException; @@ -72,6 +76,13 @@ public PulsarCatalogSupport(String adminUrl, ClientConfigurationData clientConfi this.schemaTranslator = schemaTranslator; } + @VisibleForTesting + protected PulsarCatalogSupport(PulsarMetadataReader metadataReader, + SchemaTranslator schemaTranslator) { + this.pulsarMetadataReader = metadataReader; + this.schemaTranslator = schemaTranslator; + } + public List listNamespaces() throws PulsarAdminException { return pulsarMetadataReader.listNamespaces(); } @@ -89,11 +100,11 @@ public List getTopics(String databaseName) throws PulsarAdminException { } public CatalogTable getTableSchema(ObjectPath tablePath, - Map properties) + Configuration configuration) throws PulsarAdminException, IncompatibleSchemaException { String topicName = objectPath2TopicName(tablePath); final SchemaInfo pulsarSchema = pulsarMetadataReader.getPulsarSchema(topicName); - return schemaToCatalogTable(pulsarSchema, tablePath, properties); + return schemaToCatalogTable(pulsarSchema, tablePath, configuration); } public boolean topicExists(ObjectPath tablePath) throws PulsarAdminException { @@ -108,11 +119,12 @@ public void createTopic(ObjectPath tablePath, int defaultNumPartitions, CatalogB pulsarMetadataReader.createTopic(topicName, defaultNumPartitions); } - public void putSchema(ObjectPath tablePath, CatalogBaseTable table, String format) + public void putSchema(ObjectPath tablePath, CatalogBaseTable table, Configuration configuration) throws PulsarAdminException, IncompatibleSchemaException { String topicName = objectPath2TopicName(tablePath); final TableSchema schema = table.getSchema(); - final SchemaInfo schemaInfo = tableSchemaToPulsarSchema(format, schema, table.getOptions()); + String format = configuration.get(PulsarTableOptions.VALUE_FORMAT); + final SchemaInfo schemaInfo = tableSchemaToPulsarSchema(format, schema, configuration); // Writing schemaInfo#properties causes the client to fail to consume it when it is a Pulsar native type. if (!StringUtils.equals(format, AtomicRowDataFormatFactory.IDENTIFIER)) { @@ -158,15 +170,18 @@ private static Map retrieveFlinkProperties(Map p } private SchemaInfo tableSchemaToPulsarSchema(String format, TableSchema schema, - Map options) throws IncompatibleSchemaException { + Configuration configuration) throws IncompatibleSchemaException { // The exclusion logic for the key is not handled correctly here when the user sets the key-related fields using pulsar final DataType physicalRowDataType = schema.toPhysicalRowDataType(); - return SchemaUtils.tableSchemaToSchemaInfo(format, physicalRowDataType, options); + final int[] valueProjection = + PulsarTableOptions.createValueFormatProjection(configuration, physicalRowDataType); + DataType physicalValueDataType = DataTypeUtils.projectRow(physicalRowDataType, valueProjection); + return SchemaUtils.tableSchemaToSchemaInfo(format, physicalValueDataType, configuration); } private CatalogTable schemaToCatalogTable(SchemaInfo pulsarSchema, - ObjectPath tablePath, - Map flinkProperties) + ObjectPath tablePath, + Configuration configuration) throws IncompatibleSchemaException { boolean isCatalogTopic = Boolean.parseBoolean(pulsarSchema.getProperties().get(IS_CATALOG_TOPIC)); if (isCatalogTopic) { @@ -180,7 +195,7 @@ private CatalogTable schemaToCatalogTable(SchemaInfo pulsarSchema, List partitionKeys = tableSchemaProps.getPartitionKeys(); // remove the schema from properties properties = CatalogTableImpl.removeRedundant(properties, tableSchema, partitionKeys); - properties.putAll(flinkProperties); + properties.putAll(configuration.toMap()); properties.remove(IS_CATALOG_TOPIC); String comment = properties.remove(PulsarCatalogSupport.COMMENT); return CatalogTable.of( @@ -195,7 +210,7 @@ private CatalogTable schemaToCatalogTable(SchemaInfo pulsarSchema, tableSchema.toSchema(), "", Collections.emptyList(), - flinkProperties + configuration.toMap() ); } } diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarOptions.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarOptions.java index 1116ff60..e578707c 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarOptions.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarOptions.java @@ -90,7 +90,6 @@ public class PulsarOptions { PUBLISH_TIME_NAME, EVENT_TIME_NAME); - public static final String DEFAULT_PARTITIONS = "table-default-partitions"; public static final String AUTH_PARAMS_KEY = "auth-params"; public static final String AUTH_PLUGIN_CLASSNAME_KEY = "auth-plugin-classname"; } diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/SchemaUtils.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/SchemaUtils.java index d331114e..f4083410 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/SchemaUtils.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/SchemaUtils.java @@ -14,6 +14,7 @@ package org.apache.flink.streaming.connectors.pulsar.internal; +import org.apache.flink.configuration.Configuration; import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; import org.apache.flink.formats.protobuf.PbFormatOptions; import org.apache.flink.streaming.connectors.pulsar.config.RecordSchemaType; @@ -40,7 +41,6 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; -import java.util.Map; import static org.apache.avro.Schema.Type.RECORD; import static org.apache.pulsar.shade.com.google.common.base.Preconditions.checkNotNull; @@ -217,7 +217,7 @@ private static Class convertProtobuf(Class rec } public static SchemaInfo tableSchemaToSchemaInfo(String format, DataType dataType, - Map options) + Configuration configuration) throws IncompatibleSchemaException { switch (StringUtils.lowerCase(format)) { case "json": @@ -225,7 +225,7 @@ public static SchemaInfo tableSchemaToSchemaInfo(String format, DataType dataTyp case "avro": return getSchemaInfo(SchemaType.AVRO, dataType); case "protobuf": - final String messageClassName = options.get(PbFormatOptions.MESSAGE_CLASS_NAME.key()); + final String messageClassName = configuration.get(PbFormatOptions.MESSAGE_CLASS_NAME); return getProtobufSchemaInfo(messageClassName, SchemaUtils.class.getClassLoader()); case "atomic": org.apache.pulsar.client.api.Schema pulsarSchema = diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/table/DynamicPulsarSerializationSchema.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/table/DynamicPulsarSerializationSchema.java index 32d75012..41ccef66 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/table/DynamicPulsarSerializationSchema.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/table/DynamicPulsarSerializationSchema.java @@ -16,6 +16,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; import org.apache.flink.formats.protobuf.PbFormatOptions; import org.apache.flink.formats.protobuf.serialize.PbRowDataSerializationSchema; import org.apache.flink.streaming.connectors.pulsar.internal.SchemaUtils; @@ -38,7 +39,6 @@ import javax.annotation.Nullable; import java.io.Serializable; -import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -222,19 +222,19 @@ private FlinkSchema buildSchema() { if (StringUtils.isBlank(valueFormatType)) { return new FlinkSchema<>(Schema.BYTES.getSchemaInfo(), valueSerialization, null); } - Map options = new HashMap<>(); - hackPbSerializationSchema(options); - SchemaInfo schemaInfo = SchemaUtils.tableSchemaToSchemaInfo(valueFormatType, valueDataType, options); + Configuration configuration = new Configuration(); + hackPbSerializationSchema(configuration); + SchemaInfo schemaInfo = SchemaUtils.tableSchemaToSchemaInfo(valueFormatType, valueDataType, configuration); return new FlinkSchema<>(schemaInfo, valueSerialization, null); } - private void hackPbSerializationSchema(Map options) { + private void hackPbSerializationSchema(Configuration configuration) { // reflect read PbRowSerializationSchema#messageClassName if (valueSerialization instanceof PbRowDataSerializationSchema) { try { final String messageClassName = (String) FieldUtils.readDeclaredField(valueSerialization, "messageClassName", true); - options.put(PbFormatOptions.MESSAGE_CLASS_NAME.key(), messageClassName); + configuration.set(PbFormatOptions.MESSAGE_CLASS_NAME, messageClassName); } catch (IllegalAccessException e) { e.printStackTrace(); } diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarDynamicTableSink.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarDynamicTableSink.java index f196ac01..f3a0fbe7 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarDynamicTableSink.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarDynamicTableSink.java @@ -209,7 +209,7 @@ private PulsarSerializationSchema createPulsarSerializer(SerializationS hasMetadata, metadataPositions, upsertMode, - physicalDataType, + DataTypeUtils.projectRow(physicalDataType, valueProjection), formatType, delayMilliseconds); } diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarTableOptions.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarTableOptions.java index 40b0e7c9..54420a40 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarTableOptions.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarTableOptions.java @@ -54,7 +54,7 @@ import static org.apache.flink.streaming.connectors.pulsar.table.PulsarSinkSemantic.AT_LEAST_ONCE; import static org.apache.flink.streaming.connectors.pulsar.table.PulsarSinkSemantic.EXACTLY_ONCE; import static org.apache.flink.streaming.connectors.pulsar.table.PulsarSinkSemantic.NONE; -import static org.apache.flink.table.factories.FactoryUtil.FORMAT_SUFFIX; +import static org.apache.flink.table.factories.FactoryUtil.FORMAT; import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot; /** @@ -68,16 +68,17 @@ public class PulsarTableOptions { // -------------------------------------------------------------------------------------------- public static final ConfigOption KEY_FORMAT = ConfigOptions - .key("key" + FORMAT_SUFFIX) + .key("key." + FORMAT.key()) .stringType() .noDefaultValue() .withDescription("Defines the format identifier for encoding key data. " + "The identifier is used to discover a suitable format factory."); public static final ConfigOption VALUE_FORMAT = ConfigOptions - .key("value" + FORMAT_SUFFIX) + .key("value." + FORMAT.key()) .stringType() .noDefaultValue() + .withFallbackKeys(FORMAT.key()) .withDescription("Defines the format identifier for encoding value data. " + "The identifier is used to discover a suitable format factory."); diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/PulsarCatalog.java b/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/PulsarCatalog.java index 20ad3472..48f1fcd0 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/PulsarCatalog.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/PulsarCatalog.java @@ -14,12 +14,12 @@ package org.apache.flink.table.catalog.pulsar; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.connectors.pulsar.internal.IncompatibleSchemaException; import org.apache.flink.streaming.connectors.pulsar.internal.PulsarCatalogSupport; import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions; import org.apache.flink.streaming.connectors.pulsar.internal.SimpleSchemaTranslator; import org.apache.flink.streaming.connectors.pulsar.table.PulsarDynamicTableFactory; -import org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogDatabase; import org.apache.flink.table.catalog.CatalogDatabaseImpl; @@ -39,7 +39,6 @@ import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; import org.apache.flink.table.catalog.stats.CatalogTableStatistics; -import org.apache.flink.table.descriptors.FormatDescriptorValidator; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.factories.Factory; @@ -54,7 +53,7 @@ import java.util.Optional; import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.PROPERTIES_PREFIX; -import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR; +import static org.apache.flink.table.catalog.pulsar.factories.PulsarCatalogFactoryOptions.DEFAULT_PARTITIONS; /** * Expose a Pulsar instance as a database catalog. @@ -64,16 +63,16 @@ public class PulsarCatalog extends GenericInMemoryCatalog { private String adminUrl; - private Map properties; + private Configuration configuration; private PulsarCatalogSupport catalogSupport; public static final String DEFAULT_DB = "public/default"; - public PulsarCatalog(String adminUrl, String catalogName, Map props, String defaultDatabase) { + public PulsarCatalog(String adminUrl, String catalogName, Configuration configuration, String defaultDatabase) { super(catalogName, defaultDatabase); this.adminUrl = adminUrl; - this.properties = new HashMap<>(props); + this.configuration = configuration; log.info("Created Pulsar Catalog {}", catalogName); } @@ -87,9 +86,11 @@ public void open() throws CatalogException { if (catalogSupport == null) { try { final ClientConfigurationData clientConf = new ClientConfigurationData(); - clientConf.setAuthParams(properties.get(PROPERTIES_PREFIX + PulsarOptions.AUTH_PARAMS_KEY)); + clientConf.setAuthParams( + configuration.getString(PROPERTIES_PREFIX + PulsarOptions.AUTH_PARAMS_KEY, null)); clientConf.setAuthPluginClassName( - properties.get(PROPERTIES_PREFIX + PulsarOptions.AUTH_PLUGIN_CLASSNAME_KEY)); + configuration.getString(PROPERTIES_PREFIX + PulsarOptions.AUTH_PLUGIN_CLASSNAME_KEY, null)); + catalogSupport = new PulsarCatalogSupport(adminUrl, clientConf, "", new HashMap<>(), -1, -1, new SimpleSchemaTranslator(false)); } catch (PulsarClientException e) { @@ -165,7 +166,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep return super.getTable(tablePath); } try { - return catalogSupport.getTableSchema(tablePath, properties); + return catalogSupport.getTableSchema(tablePath, configuration); } catch (PulsarAdminException.NotFoundException e) { throw new TableNotExistException(getName(), tablePath, e); } catch (PulsarAdminException | IncompatibleSchemaException e) { @@ -193,8 +194,8 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig if (tablePath.getObjectName().startsWith("_tmp_table_")) { super.createTable(tablePath, table, ignoreIfExists); } - final String key = CONNECTOR + "." + PulsarOptions.DEFAULT_PARTITIONS; - int defaultNumPartitions = Integer.parseInt(properties.getOrDefault(key, "5")); + + int defaultNumPartitions = configuration.getInteger(DEFAULT_PARTITIONS); String databaseName = tablePath.getDatabaseName(); Boolean databaseExists; try { @@ -209,7 +210,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig try { catalogSupport.createTopic(tablePath, defaultNumPartitions, table); - catalogSupport.putSchema(tablePath, table, getFormat()); + catalogSupport.putSchema(tablePath, table, configuration); } catch (PulsarAdminException e) { if (e.getStatusCode() == 409) { throw new TableAlreadyExistException(getName(), tablePath, e); @@ -221,11 +222,6 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig } } - private String getFormat() { - return Optional.ofNullable(properties.get(FormatDescriptorValidator.FORMAT)) - .orElseGet(() -> properties.get(PulsarTableOptions.VALUE_FORMAT.key())); - } - // ------------------------------------------------------------------------ // Unsupported catalog operations for Pulsar // There should not be such permission in the connector, it is very dangerous diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/factories/PulsarCatalogFactory.java b/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/factories/PulsarCatalogFactory.java index 83e4500a..5b35dfcb 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/factories/PulsarCatalogFactory.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/factories/PulsarCatalogFactory.java @@ -15,6 +15,7 @@ package org.apache.flink.table.catalog.pulsar.factories; import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.pulsar.PulsarCatalog; import org.apache.flink.table.factories.CatalogFactory; @@ -28,6 +29,7 @@ import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.KEY_FIELDS_PREFIX; import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.KEY_FORMAT; import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.PROPERTIES; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.PROPERTIES_PREFIX; import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SCAN_STARTUP_MODE; import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SERVICE_URL; import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SINK_SEMANTIC; @@ -53,11 +55,11 @@ public String factoryIdentifier() { public Catalog createCatalog(Context context) { final FactoryUtil.CatalogFactoryHelper helper = FactoryUtil.createCatalogFactoryHelper(this, context); - helper.validate(); + helper.validateExcept(PROPERTIES_PREFIX); return new PulsarCatalog( helper.getOptions().get(ADMIN_URL), context.getName(), - context.getOptions(), + (Configuration) helper.getOptions(), helper.getOptions().get(DEFAULT_DATABASE)); } diff --git a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/CatalogITest.java b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/CatalogITest.java index a8f269f0..46f323ae 100644 --- a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/CatalogITest.java +++ b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/CatalogITest.java @@ -22,6 +22,7 @@ import org.apache.flink.streaming.connectors.pulsar.testutils.EnvironmentFileUtil; import org.apache.flink.streaming.connectors.pulsar.testutils.FailingIdentityMapper; import org.apache.flink.streaming.connectors.pulsar.testutils.SingletonStreamSink; +import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableSchema; @@ -159,8 +160,8 @@ public void testDatabases() throws Exception { assertTrue( Sets.symmetricDifference( - Sets.newHashSet(tableEnv.listTables()), - Sets.newHashSet(Iterables.concat(topics, partitionedTopics))) + Sets.newHashSet(tableEnv.listTables()), + Sets.newHashSet(Iterables.concat(topics, partitionedTopics))) .isEmpty()); } finally { @@ -347,31 +348,45 @@ public void testTableSchema() throws Exception { String tableSinkTopic = newTopic("tableSink"); String tableSinkName = TopicName.get(tableSinkTopic).getLocalName(); - String pulsarCatalog1 = "pulsarcatalog3"; - - Map conf = getStreamingConfs(); - conf.put("$VAR_STARTING", "earliest"); - conf.put("$VAR_FORMAT", "avro"); - - ExecutionContext context = createExecutionContext(CATALOGS_ENVIRONMENT_FILE_START, conf); - TableEnvironment tableEnv = context.getTableEnvironment(); - - tableEnv.useCatalog(pulsarCatalog1); - - String sinkDDL = "create table " + tableSinkName + "(\n" + - " id int,\n" + - " compute as id + 1,\n" + - " log_ts timestamp(3),\n" + - " ts as log_ts + INTERVAL '1' SECOND,\n" + - " watermark for ts as log_ts" + + String useCatalog = "pulsarcatalogtest1"; + + TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.fromConfiguration(new Configuration())); + + String createCatalog = "CREATE CATALOG " + useCatalog + "\n" + + " WITH (\n" + + " 'type' = 'pulsar',\n" + + " 'value.format' = 'avro',\n" + + " 'value.fields-include' = 'EXCEPT_KEY',\n" + + " 'key.format' = 'string',\n" + + " 'key.fields' = 'key',\n" + + " 'default-database' = 'public/default',\n" + + " 'scan.startup.mode' = 'earliest',\n" + + " 'service-url'='" + getServiceUrl() + "',\n" + + " 'admin-url'= '" + getAdminUrl() + "',\n" + + " 'properties.auth-plugin-classname'= '',\n" + + " 'properties.auth-params'= 'auth-params'\n" + + " )"; + tableEnv.executeSql(createCatalog); + tableEnv.useCatalog(useCatalog); + + String sinkDDL = "CREATE TABLE " + tableSinkName + " (\n" + + " `physical_1` STRING,\n" + + " `physical_2` INT,\n" + + " `eventTime` TIMESTAMP(3) METADATA, \n" + + " `properties` MAP METADATA,\n" + + " `topic` STRING METADATA VIRTUAL,\n" + + " `sequenceId` BIGINT METADATA VIRTUAL,\n" + + " `key` STRING ,\n" + + " `physical_3` BOOLEAN\n" + ")"; tableEnv.executeSql(sinkDDL).print(); final TableSchema schema = tableEnv.executeSql("DESCRIBE " + tableSinkName).getTableSchema(); - ExecutionContext context2 = createExecutionContext(CATALOGS_ENVIRONMENT_FILE_START, conf); - TableEnvironment tableEnv2 = context2.getTableEnvironment(); - tableEnv2.useCatalog(pulsarCatalog1); + TableEnvironment tableEnv2 = + TableEnvironment.create(EnvironmentSettings.fromConfiguration(new Configuration())); + tableEnv2.executeSql(createCatalog); + tableEnv2.useCatalog(useCatalog); final TableSchema schema2 = tableEnv2.executeSql("DESCRIBE " + tableSinkName).getTableSchema(); assertEquals(schema, schema2); diff --git a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarCatalogSupportTest.java b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarCatalogSupportTest.java new file mode 100644 index 00000000..5f36766a --- /dev/null +++ b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarCatalogSupportTest.java @@ -0,0 +1,85 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pulsar.internal; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedSchema; + +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; + +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * unit test for PulsarCatalogSupport. + */ +public class PulsarCatalogSupportTest { + + @Test + public void putSchema() throws PulsarAdminException { + final CatalogBaseTable catalogBaseTable = mock(CatalogBaseTable.class); + final ResolvedSchema schema2 = ResolvedSchema.of( + Column.physical("physical_1", DataTypes.STRING()), + Column.physical("physical_2", DataTypes.INT()), + Column.metadata("eventTime", DataTypes.TIMESTAMP(3), "eventTime", false), + Column.metadata("properties", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()), "properties", + false), + Column.metadata("topic", DataTypes.STRING(), "topic", true), + Column.metadata("sequenceId", DataTypes.BIGINT(), "sequenceId", true), + Column.physical("key", DataTypes.STRING()), + Column.physical("physical_3", DataTypes.BOOLEAN()) + ); + when(catalogBaseTable.getSchema()).thenReturn(TableSchema.fromResolvedSchema(schema2)); + when(catalogBaseTable.getUnresolvedSchema()).thenReturn( + Schema.newBuilder().fromResolvedSchema(schema2).build()); + final Configuration configuration = getCatalogConfig(); + + PulsarMetadataReader metadataReader = verify(mock(PulsarMetadataReader.class), data -> { + final SchemaInfo schemaInfo = data.getTarget().getInvocation().getArgument(1, SchemaInfo.class); + final org.apache.avro.Schema schema = + new org.apache.avro.Schema.Parser().parse(new String(schemaInfo.getSchema())); + + Assert.assertEquals(schema.getFields().size(), 3); + Assert.assertEquals(schema.getFields().get(0).name(), "physical_1"); + Assert.assertEquals(schema.getFields().get(1).name(), "physical_2"); + Assert.assertEquals(schema.getFields().get(2).name(), "physical_3"); + }); + final PulsarCatalogSupport catalogSupport = + new PulsarCatalogSupport(metadataReader, new SimpleSchemaTranslator()); + catalogSupport.putSchema(ObjectPath.fromString("public/default.test"), catalogBaseTable, configuration); + } + + private Configuration getCatalogConfig() { + final Configuration configuration = new Configuration(); + configuration.set(PulsarTableOptions.VALUE_FORMAT, "avro"); + configuration.set(PulsarTableOptions.KEY_FORMAT, "string"); + configuration.set(PulsarTableOptions.KEY_FIELDS, Collections.singletonList("key")); + configuration.set(PulsarTableOptions.VALUE_FIELDS_INCLUDE, PulsarTableOptions.ValueFieldsStrategy.EXCEPT_KEY); + return configuration; + } +}