Skip to content

Commit

Permalink
[ISSUE apache#88] Introduce format parameter configuration to support…
Browse files Browse the repository at this point in the history
… different serialization and deserialization formats provided by Flink
  • Loading branch information
leosanqing committed Jun 25, 2023
1 parent 45590d1 commit 2be3561
Show file tree
Hide file tree
Showing 13 changed files with 687 additions and 23 deletions.
27 changes: 27 additions & 0 deletions pom.xml
Expand Up @@ -54,6 +54,21 @@
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
Expand Down Expand Up @@ -119,11 +134,23 @@
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-broker</artifactId>
<version>${rocketmq.version}</version>
<exclusions>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-test</artifactId>
<version>${rocketmq.version}</version>
<exclusions>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
Expand Down
Expand Up @@ -20,7 +20,11 @@

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.description.Description;

import java.util.List;

import static org.apache.flink.table.factories.FactoryUtil.FORMAT_SUFFIX;
import static org.apache.rocketmq.flink.legacy.RocketMQConfig.DEFAULT_START_MESSAGE_OFFSET;

/** Includes config options of RocketMQ connector type. */
Expand Down Expand Up @@ -117,4 +121,83 @@ public class RocketMQOptions {

public static final ConfigOption<Long> OPTIONAL_OFFSET_FROM_TIMESTAMP =
ConfigOptions.key("offsetFromTimestamp").longType().noDefaultValue();

// --------------------------------------------------------------------------------------------
// Format options
// --------------------------------------------------------------------------------------------

public static final ConfigOption<String> VALUE_FORMAT =
ConfigOptions.key("value" + FORMAT_SUFFIX)
.stringType()
.noDefaultValue()
.withDescription(
"Defines the format identifier for encoding value data. "
+ "The identifier is used to discover a suitable format factory.");

public static final ConfigOption<String> KEY_FORMAT =
ConfigOptions.key("key" + FORMAT_SUFFIX)
.stringType()
// .defaultValue("rocketmq-default")
.noDefaultValue()
.withDescription(
"Defines the format identifier for encoding key data. "
+ "The identifier is used to discover a suitable format factory.");

public static final ConfigOption<List<String>> KEY_FIELDS =
ConfigOptions.key("key.fields")
.stringType()
.asList()
.defaultValues()
.withDescription(
"Defines an explicit list of physical columns from the table schema "
+ "that configure the data type for the key format. By default, this list is "
+ "empty and thus a key is undefined.");

public static final ConfigOption<ValueFieldsStrategy> VALUE_FIELDS_INCLUDE =
ConfigOptions.key("value.fields-include")
.enumType(ValueFieldsStrategy.class)
.defaultValue(ValueFieldsStrategy.ALL)
.withDescription(
String.format(
"Defines a strategy how to deal with key columns in the data type "
+ "of the value format. By default, '%s' physical columns of the table schema "
+ "will be included in the value format which means that the key columns "
+ "appear in the data type for both the key and value format.",
ValueFieldsStrategy.ALL));

public static final ConfigOption<String> KEY_FIELDS_PREFIX =
ConfigOptions.key("key.fields-prefix")
.stringType()
.noDefaultValue()
.withDescription(
Description.builder()
.text(
"Defines a custom prefix for all fields of the key format to avoid "
+ "name clashes with fields of the value format. "
+ "By default, the prefix is empty.")
.linebreak()
.text(
String.format(
"If a custom prefix is defined, both the table schema and '%s' will work with prefixed names.",
KEY_FIELDS.key()))
.linebreak()
.text(
"When constructing the data type of the key format, the prefix "
+ "will be removed and the non-prefixed names will be used within the key format.")
.linebreak()
.text(
String.format(
"Please note that this option requires that '%s' must be '%s'.",
VALUE_FIELDS_INCLUDE.key(),
ValueFieldsStrategy.EXCEPT_KEY))
.build());

// --------------------------------------------------------------------------------------------
// Enums
// --------------------------------------------------------------------------------------------

public enum ValueFieldsStrategy {
ALL,
EXCEPT_KEY
}
}
Expand Up @@ -17,13 +17,14 @@
package org.apache.rocketmq.flink.legacy.common.serialization;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.MapTypeInfo;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.java.typeutils.MapTypeInfo;

public class SimpleKeyValueDeserializationSchema implements KeyValueDeserializationSchema<Map<String, String>> {
public class SimpleKeyValueDeserializationSchema
implements KeyValueDeserializationSchema<Map<String, String>> {
public static final String DEFAULT_KEY_FIELD = "key";
public static final String DEFAULT_VALUE_FIELD = "value";

Expand Down
Expand Up @@ -29,7 +29,6 @@
import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit;
import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplitSerializer;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
Expand Down Expand Up @@ -123,12 +122,13 @@ public Boundedness getBoundedness() {
}

@Override
public SourceReader<OUT, RocketMQPartitionSplit> createReader(
SourceReaderContext readerContext) {
public SourceReader<OUT, RocketMQPartitionSplit> createReader(SourceReaderContext readerContext)
throws Exception {
FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple3<OUT, Long, Long>>> elementsQueue =
new FutureCompletingBlockingQueue<>();
deserializationSchema.open(
new DeserializationSchema.InitializationContext() {
new org.apache.flink.api.common.serialization.DeserializationSchema
.InitializationContext() {
@Override
public MetricGroup getMetricGroup() {
return readerContext.metricGroup();
Expand Down
Expand Up @@ -42,7 +42,7 @@ public interface RocketMQDeserializationSchema<T>
*/
@Override
@PublicEvolving
default void open(InitializationContext context) {}
default void open(InitializationContext context) throws Exception {}

/**
* Deserializes the byte message.
Expand Down
Expand Up @@ -28,12 +28,13 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Collector;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* A row data wrapper class that wraps a {@link RocketMQDeserializationSchema} to deserialize {@link
* A row data wrapper class that wraps a {@link DeserializationSchema} to deserialize {@link
* MessageExt}.
*/
public class RocketMQRowDeserializationSchema implements RocketMQDeserializationSchema<RowData> {
Expand All @@ -46,6 +47,10 @@ public class RocketMQRowDeserializationSchema implements RocketMQDeserialization

public RocketMQRowDeserializationSchema(
TableSchema tableSchema,
org.apache.flink.api.common.serialization.DeserializationSchema<RowData>
keyDeserialization,
org.apache.flink.api.common.serialization.DeserializationSchema<RowData>
valueDeserialization,
Map<String, String> properties,
boolean hasMetadata,
MetadataConverter[] metadataConverters) {
Expand All @@ -55,17 +60,20 @@ public RocketMQRowDeserializationSchema(
.setTableSchema(tableSchema)
.setHasMetadata(hasMetadata)
.setMetadataConverters(metadataConverters)
.setKeyDeserialization(keyDeserialization)
.setValueDeserialization(valueDeserialization)
.build();
}

@Override
public void open(InitializationContext context) {
public void open(InitializationContext context) throws Exception {
deserializationSchema.open(context);
bytesMessages = new ArrayList<>();
}

@Override
public void deserialize(List<MessageExt> input, Collector<RowData> collector) {
public void deserialize(List<MessageExt> input, Collector<RowData> collector)
throws IOException {
extractMessages(input);
deserializationSchema.deserialize(bytesMessages, collector);
}
Expand Down

0 comments on commit 2be3561

Please sign in to comment.