diff --git a/docs/content.zh/docs/connectors/table/formats/debezium.md b/docs/content.zh/docs/connectors/table/formats/debezium.md
index a6ac486f0f015..7ba62dd408de9 100644
--- a/docs/content.zh/docs/connectors/table/formats/debezium.md
+++ b/docs/content.zh/docs/connectors/table/formats/debezium.md
@@ -424,6 +424,13 @@ Flink 提供了 `debezium-avro-confluent` 和 `debezium-json` 两种 format 来
Boolean |
将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.000000027 默认会表示为 2.7E-8 。当此选项设为 true 时,则会表示为 0.000000027 。 |
+
+ debezium-json.encode.ignore-null-fields |
+ 选填 |
+ false |
+ Boolean |
+ 仅序列化非 Null 的列,默认情况下,会序列化所有列无论是否为 Null。 |
+
diff --git a/docs/content.zh/docs/connectors/table/formats/json.md b/docs/content.zh/docs/connectors/table/formats/json.md
index f1acdd7a00170..005485a7a0a8e 100644
--- a/docs/content.zh/docs/connectors/table/formats/json.md
+++ b/docs/content.zh/docs/connectors/table/formats/json.md
@@ -135,6 +135,13 @@ Format 参数
Boolean |
将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.000000027 默认会表示为 2.7E-8 。当此选项设为 true 时,则会表示为 0.000000027 。 |
+
+ json.encode.ignore-null-fields |
+ 选填 |
+ false |
+ Boolean |
+ 仅序列化非 Null 的列,默认情况下,会序列化所有列无论是否为 Null。 |
+
decode.json-parser.enabled |
选填 |
diff --git a/docs/content.zh/docs/connectors/table/formats/maxwell.md b/docs/content.zh/docs/connectors/table/formats/maxwell.md
index a3ac161f23164..0bdedeac6821a 100644
--- a/docs/content.zh/docs/connectors/table/formats/maxwell.md
+++ b/docs/content.zh/docs/connectors/table/formats/maxwell.md
@@ -251,6 +251,13 @@ Format Options
Boolean |
Encode all decimals as plain numbers instead of possible scientific notations. By default, decimals may be written using scientific notation. For example, 0.000000027 is encoded as 2.7E-8 by default, and will be written as 0.000000027 if set this option to true. |
+
+ maxwell-json.encode.ignore-null-fields |
+ optional |
+ false |
+ Boolean |
+ Encode only non-null fields. By default, all fields will be included. |
+
diff --git a/docs/content.zh/docs/connectors/table/formats/ogg.md b/docs/content.zh/docs/connectors/table/formats/ogg.md
index c8e8a7a6c6d55..61ec97b60fdfb 100644
--- a/docs/content.zh/docs/connectors/table/formats/ogg.md
+++ b/docs/content.zh/docs/connectors/table/formats/ogg.md
@@ -216,7 +216,7 @@ Format Options
当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。 |
- debezium-json.timestamp-format.standard |
+ ogg-json.timestamp-format.standard |
可选 |
'SQL' |
String |
@@ -247,6 +247,13 @@ Format Options
String |
当 'ogg-json.map-null-key.mode' 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。 |
+
+ ogg-json.encode.ignore-null-fields |
+ 选填 |
+ false |
+ Boolean |
+ 仅序列化非 Null 的列,默认情况下,会序列化所有列无论是否为 Null。 |
+
diff --git a/docs/content/docs/connectors/table/formats/debezium.md b/docs/content/docs/connectors/table/formats/debezium.md
index 790196e258800..f69e3dc5d8f0a 100644
--- a/docs/content/docs/connectors/table/formats/debezium.md
+++ b/docs/content/docs/connectors/table/formats/debezium.md
@@ -445,6 +445,13 @@ Use format `debezium-avro-confluent` to interpret Debezium Avro messages and for
Boolean |
Encode all decimals as plain numbers instead of possible scientific notations. By default, decimals may be written using scientific notation. For example, 0.000000027 is encoded as 2.7E-8 by default, and will be written as 0.000000027 if set this option to true. |
+
+ debezium-json.encode.ignore-null-fields |
+ optional |
+ false |
+ Boolean |
+ Encode only non-null fields. By default, all fields will be included. |
+
diff --git a/docs/content/docs/connectors/table/formats/json.md b/docs/content/docs/connectors/table/formats/json.md
index 52345a42ea10c..df1c9949d8432 100644
--- a/docs/content/docs/connectors/table/formats/json.md
+++ b/docs/content/docs/connectors/table/formats/json.md
@@ -146,6 +146,14 @@ Format Options
Boolean |
Encode all decimals as plain numbers instead of possible scientific notations. By default, decimals may be written using scientific notation. For example, 0.000000027 is encoded as 2.7E-8 by default, and will be written as 0.000000027 if set this option to true. |
+
+ json.encode.ignore-null-fields |
+ optional |
+ no |
+ false |
+ Boolean |
+ Encode only non-null fields. By default, all fields will be included. |
+
decode.json-parser.enabled |
optional |
diff --git a/docs/content/docs/connectors/table/formats/maxwell.md b/docs/content/docs/connectors/table/formats/maxwell.md
index a7a98270f38ff..47c87442c73af 100644
--- a/docs/content/docs/connectors/table/formats/maxwell.md
+++ b/docs/content/docs/connectors/table/formats/maxwell.md
@@ -251,6 +251,13 @@ Format Options
Boolean |
Encode all decimals as plain numbers instead of possible scientific notations. By default, decimals may be written using scientific notation. For example, 0.000000027 is encoded as 2.7E-8 by default, and will be written as 0.000000027 if set this option to true. |
+
+ maxwell-json.encode.ignore-null-fields |
+ optional |
+ false |
+ Boolean |
+ Encode only non-null fields. By default, all fields will be included. |
+
diff --git a/docs/content/docs/connectors/table/formats/ogg.md b/docs/content/docs/connectors/table/formats/ogg.md
index 482273af8ce3e..3b53916e36d95 100644
--- a/docs/content/docs/connectors/table/formats/ogg.md
+++ b/docs/content/docs/connectors/table/formats/ogg.md
@@ -260,6 +260,13 @@ Format Options
String |
Specify string literal to replace null key when 'ogg-json.map-null-key.mode' is LITERAL. |
+
+ ogg-json.encode.ignore-null-fields |
+ optional |
+ false |
+ Boolean |
+ Encode only non-null fields. By default, all fields will be included. |
+
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
index 35a40ea187fdd..fbc7d89d6ad57 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
@@ -108,7 +108,7 @@ public class ThriftObjectConversions {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final RowDataToJsonConverters TO_JSON_CONVERTERS =
new RowDataToJsonConverters(
- TimestampFormat.SQL, JsonFormatOptions.MapNullKeyMode.LITERAL, "null");
+ TimestampFormat.SQL, JsonFormatOptions.MapNullKeyMode.LITERAL, "null", false);
private static final Map TABLE_TYPE_MAPPINGS = buildTableTypeMapping();
// --------------------------------------------------------------------------------------------
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
index 562b99e6bc807..55e8f86b5d278 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
@@ -46,6 +46,7 @@
import static org.apache.flink.formats.json.JsonFormatOptions.DECODE_JSON_PARSER_ENABLED;
import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
+import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_IGNORE_NULL_FIELDS;
import static org.apache.flink.formats.json.JsonFormatOptions.FAIL_ON_MISSING_FIELD;
import static org.apache.flink.formats.json.JsonFormatOptions.IGNORE_PARSE_ERRORS;
import static org.apache.flink.formats.json.JsonFormatOptions.MAP_NULL_KEY_LITERAL;
@@ -147,6 +148,7 @@ public EncodingFormat> createEncodingFormat(
final boolean encodeDecimalAsPlainNumber =
formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+ final boolean ignoreNullFields = formatOptions.get(ENCODE_IGNORE_NULL_FIELDS);
return new EncodingFormat>() {
@Override
@@ -158,7 +160,8 @@ public SerializationSchema createRuntimeEncoder(
timestampOption,
mapNullKeyMode,
mapNullKeyLiteral,
- encodeDecimalAsPlainNumber);
+ encodeDecimalAsPlainNumber,
+ ignoreNullFields);
}
@Override
@@ -187,6 +190,7 @@ public Set> optionalOptions() {
options.add(MAP_NULL_KEY_MODE);
options.add(MAP_NULL_KEY_LITERAL);
options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+ options.add(ENCODE_IGNORE_NULL_FIELDS);
return options;
}
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatOptions.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatOptions.java
index 5c9e61068ac39..cc40b325d915c 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatOptions.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatOptions.java
@@ -73,6 +73,13 @@ public class JsonFormatOptions {
.withDescription(
"Optional flag to specify whether to encode all decimals as plain numbers instead of possible scientific notations, false by default.");
+ public static final ConfigOption ENCODE_IGNORE_NULL_FIELDS =
+ ConfigOptions.key("encode.ignore-null-fields")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Optional flag to specify whether to ignore null fields when encoding, false by default.");
+
public static final ConfigOption DECODE_JSON_PARSER_ENABLED =
ConfigOptions.key("decode.json-parser.enabled")
.booleanType()
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
index 376d0d568a33d..4b68bb0c2af74 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
@@ -68,19 +68,28 @@ public class JsonRowDataSerializationSchema implements SerializationSchema> createEncodingFormat(
final boolean encodeDecimalAsPlainNumber =
formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+ final boolean ignoreNullFields = formatOptions.get(ENCODE_IGNORE_NULL_FIELDS);
+
return new EncodingFormat>() {
@Override
public ChangelogMode getChangelogMode() {
@@ -111,7 +114,8 @@ public SerializationSchema createRuntimeEncoder(
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
- encodeDecimalAsPlainNumber);
+ encodeDecimalAsPlainNumber,
+ ignoreNullFields);
}
};
}
@@ -136,6 +140,7 @@ public Set> optionalOptions() {
options.add(JSON_MAP_NULL_KEY_MODE);
options.add(JSON_MAP_NULL_KEY_LITERAL);
options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+ options.add(ENCODE_IGNORE_NULL_FIELDS);
return options;
}
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonSerializationSchema.java
index 362b9df6e6ab2..aaa292ef9df1c 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonSerializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonSerializationSchema.java
@@ -59,14 +59,16 @@ public CanalJsonSerializationSchema(
TimestampFormat timestampFormat,
JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
String mapNullKeyLiteral,
- boolean encodeDecimalAsPlainNumber) {
+ boolean encodeDecimalAsPlainNumber,
+ boolean ignoreNullFields) {
jsonSerializer =
new JsonRowDataSerializationSchema(
createJsonRowType(fromLogicalToDataType(rowType)),
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
- encodeDecimalAsPlainNumber);
+ encodeDecimalAsPlainNumber,
+ ignoreNullFields);
}
@Override
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
index d72fcd23debd9..7fec2f43c1e16 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
@@ -45,6 +45,7 @@
import java.util.Set;
import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
+import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_IGNORE_NULL_FIELDS;
import static org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.IGNORE_PARSE_ERRORS;
import static org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL;
import static org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.JSON_MAP_NULL_KEY_MODE;
@@ -92,6 +93,7 @@ public EncodingFormat> createEncodingFormat(
final boolean encodeDecimalAsPlainNumber =
formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+ final boolean ignoreNullFields = formatOptions.get(ENCODE_IGNORE_NULL_FIELDS);
return new EncodingFormat>() {
@@ -114,7 +116,8 @@ public SerializationSchema createRuntimeEncoder(
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
- encodeDecimalAsPlainNumber);
+ encodeDecimalAsPlainNumber,
+ ignoreNullFields);
}
};
}
@@ -138,6 +141,7 @@ public Set> optionalOptions() {
options.add(JSON_MAP_NULL_KEY_MODE);
options.add(JSON_MAP_NULL_KEY_LITERAL);
options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+ options.add(ENCODE_IGNORE_NULL_FIELDS);
return options;
}
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerializationSchema.java
index 0dc9a96b01297..7312b30593aee 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerializationSchema.java
@@ -56,14 +56,16 @@ public DebeziumJsonSerializationSchema(
TimestampFormat timestampFormat,
JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
String mapNullKeyLiteral,
- boolean encodeDecimalAsPlainNumber) {
+ boolean encodeDecimalAsPlainNumber,
+ boolean ignoreNullFields) {
jsonSerializer =
new JsonRowDataSerializationSchema(
createJsonRowType(fromLogicalToDataType(rowType)),
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
- encodeDecimalAsPlainNumber);
+ encodeDecimalAsPlainNumber,
+ ignoreNullFields);
}
@Override
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java
index 1bbbec8441457..e56966753a203 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java
@@ -44,6 +44,7 @@
import java.util.Set;
import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
+import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_IGNORE_NULL_FIELDS;
import static org.apache.flink.formats.json.maxwell.MaxwellJsonFormatOptions.IGNORE_PARSE_ERRORS;
import static org.apache.flink.formats.json.maxwell.MaxwellJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL;
import static org.apache.flink.formats.json.maxwell.MaxwellJsonFormatOptions.JSON_MAP_NULL_KEY_MODE;
@@ -86,6 +87,8 @@ public EncodingFormat> createEncodingFormat(
final boolean encodeDecimalAsPlainNumber =
formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+ final boolean ignoreNullFields = formatOptions.get(ENCODE_IGNORE_NULL_FIELDS);
+
return new EncodingFormat>() {
@Override
@@ -107,7 +110,8 @@ public SerializationSchema createRuntimeEncoder(
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
- encodeDecimalAsPlainNumber);
+ encodeDecimalAsPlainNumber,
+ ignoreNullFields);
}
};
}
@@ -130,6 +134,7 @@ public Set> optionalOptions() {
options.add(JSON_MAP_NULL_KEY_MODE);
options.add(JSON_MAP_NULL_KEY_LITERAL);
options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+ options.add(ENCODE_IGNORE_NULL_FIELDS);
return options;
}
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerializationSchema.java
index 1fe567b08c3cb..ad1accdddd611 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerializationSchema.java
@@ -56,14 +56,16 @@ public MaxwellJsonSerializationSchema(
TimestampFormat timestampFormat,
JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
String mapNullKeyLiteral,
- boolean encodeDecimalAsPlainNumber) {
+ boolean encodeDecimalAsPlainNumber,
+ boolean ignoreNullFields) {
this.jsonSerializer =
new JsonRowDataSerializationSchema(
createJsonRowType(fromLogicalToDataType(rowType)),
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
- encodeDecimalAsPlainNumber);
+ encodeDecimalAsPlainNumber,
+ ignoreNullFields);
this.timestampFormat = timestampFormat;
}
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactory.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactory.java
index f853983d43e61..11182e9380616 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactory.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactory.java
@@ -44,6 +44,7 @@
import java.util.Set;
import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
+import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_IGNORE_NULL_FIELDS;
import static org.apache.flink.formats.json.ogg.OggJsonFormatOptions.IGNORE_PARSE_ERRORS;
import static org.apache.flink.formats.json.ogg.OggJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL;
import static org.apache.flink.formats.json.ogg.OggJsonFormatOptions.JSON_MAP_NULL_KEY_MODE;
@@ -99,6 +100,8 @@ public EncodingFormat> createEncodingFormat(
final boolean encodeDecimalAsPlainNumber =
formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+ final boolean ignoreNullFields = formatOptions.get(ENCODE_IGNORE_NULL_FIELDS);
+
return new EncodingFormat>() {
@Override
@@ -120,7 +123,8 @@ public SerializationSchema createRuntimeEncoder(
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
- encodeDecimalAsPlainNumber);
+ encodeDecimalAsPlainNumber,
+ ignoreNullFields);
}
};
}
@@ -143,6 +147,7 @@ public Set> optionalOptions() {
options.add(JSON_MAP_NULL_KEY_MODE);
options.add(JSON_MAP_NULL_KEY_LITERAL);
options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+ options.add(ENCODE_IGNORE_NULL_FIELDS);
return options;
}
}
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonSerializationSchema.java
index 635ff3dc7e39d..f44387a586301 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonSerializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonSerializationSchema.java
@@ -57,14 +57,16 @@ public OggJsonSerializationSchema(
TimestampFormat timestampFormat,
JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
String mapNullKeyLiteral,
- boolean encodeDecimalAsPlainNumber) {
+ boolean encodeDecimalAsPlainNumber,
+ boolean ignoreNullFields) {
jsonSerializer =
new JsonRowDataSerializationSchema(
createJsonRowType(fromLogicalToDataType(rowType)),
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
- encodeDecimalAsPlainNumber);
+ encodeDecimalAsPlainNumber,
+ ignoreNullFields);
}
private static RowType createJsonRowType(DataType databaseSchema) {
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java
index 3559e2b2c8727..4430203b28e46 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java
@@ -176,6 +176,7 @@ private void testSchemaSerializationSchema(Map options) {
TimestampFormat.ISO_8601,
JsonFormatOptions.MapNullKeyMode.LITERAL,
"null",
+ true,
true);
SerializationSchema actualSer =
@@ -227,6 +228,7 @@ private Map getAllOptions() {
options.put("json.map-null-key.mode", "LITERAL");
options.put("json.map-null-key.literal", "null");
options.put("json.encode.decimal-as-plain-number", "true");
+ options.put("json.encode.ignore-null-fields", "true");
return options;
}
}
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
index ced449e09365c..916b04f50f8be 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
@@ -216,7 +216,8 @@ void testSerDe() throws Exception {
TimestampFormat.ISO_8601,
JsonFormatOptions.MapNullKeyMode.LITERAL,
"null",
- true);
+ true,
+ false);
open(serializationSchema);
byte[] actualBytes = serializationSchema.serialize(rowData);
@@ -300,7 +301,8 @@ void testSerDeMultiRows() throws Exception {
TimestampFormat.ISO_8601,
JsonFormatOptions.MapNullKeyMode.LITERAL,
"null",
- true);
+ true,
+ false);
open(serializationSchema);
// the first row
@@ -381,7 +383,8 @@ void testSerDeMultiRowsWithNullValues() throws Exception {
TimestampFormat.ISO_8601,
JsonFormatOptions.MapNullKeyMode.LITERAL,
"null",
- true);
+ true,
+ false);
open(serializationSchema);
for (int i = 0; i < jsons.length; i++) {
@@ -496,7 +499,8 @@ void testSerDeSQLTimestampFormat() throws Exception {
TimestampFormat.SQL,
JsonFormatOptions.MapNullKeyMode.LITERAL,
"null",
- true);
+ true,
+ false);
open(serializationSchema);
ObjectNode root = OBJECT_MAPPER.createObjectNode();
@@ -538,7 +542,8 @@ void testSerializationMapNullKey() {
TimestampFormat.SQL,
JsonFormatOptions.MapNullKeyMode.FAIL,
"null",
- true);
+ true,
+ false);
open(serializationSchema1);
// expect message for serializationSchema1
String errorMessage1 =
@@ -551,7 +556,8 @@ void testSerializationMapNullKey() {
TimestampFormat.SQL,
JsonFormatOptions.MapNullKeyMode.DROP,
"null",
- true);
+ true,
+ false);
open(serializationSchema2);
// expect result for serializationSchema2
String expectResult2 = "{\"nestedMap\":{\"no-null key\":{\"no-null key\":1}}}";
@@ -562,7 +568,8 @@ void testSerializationMapNullKey() {
TimestampFormat.SQL,
JsonFormatOptions.MapNullKeyMode.LITERAL,
"nullKey",
- true);
+ true,
+ false);
open(serializationSchema3);
// expect result for serializationSchema3
String expectResult3 =
@@ -601,7 +608,8 @@ void testSerializationDecimalEncode() throws Exception {
TimestampFormat.ISO_8601,
JsonFormatOptions.MapNullKeyMode.LITERAL,
"null",
- true);
+ true,
+ false);
plainDecimalSerializer.open(new DummyInitializationContext());
JsonRowDataSerializationSchema scientificDecimalSerializer =
new JsonRowDataSerializationSchema(
@@ -609,6 +617,7 @@ void testSerializationDecimalEncode() throws Exception {
TimestampFormat.ISO_8601,
JsonFormatOptions.MapNullKeyMode.LITERAL,
"null",
+ false,
false);
scientificDecimalSerializer.open(new DummyInitializationContext());
@@ -626,6 +635,62 @@ void testSerializationDecimalEncode() throws Exception {
assertThat(scientificDecimalResult).isEqualTo(scientificDecimalJson);
}
+ @TestTemplate
+ void testSerDeMultiRowsWithNullValuesIgnored() throws Exception {
+ String[] jsons =
+ new String[] {
+ "{\"ops\":null,\"ids\":null,\"metrics\":{\"k1\":10.01,\"k2\":null}}",
+ "{\"ops\":{\"id\":\"281708d0-4092-4c21-9233-931950b6eccf\", \"svt\":\"2020-02-24T12:58:09.209+0800\"}, "
+ + "\"ids\":[1, 2, 3]}",
+ "{\"ops\":{\"id\":null, \"svt\":\"2020-02-24T12:58:09.209+0800\"}, "
+ + "\"ids\":[1, 2, null]}",
+ "{\"ops\":{},\"ids\":[],\"metrics\":{}}",
+ };
+
+ String[] expected =
+ new String[] {
+ "{\"metrics\":{\"k1\":10.01,\"k2\":null}}",
+ "{\"ops\":{\"id\":\"281708d0-4092-4c21-9233-931950b6eccf\",\"svt\":\"2020-02-24T12:58:09.209+0800\"},"
+ + "\"ids\":[1,2,3]}",
+ "{\"ops\":{\"svt\":\"2020-02-24T12:58:09.209+0800\"},\"ids\":[1,2,null]}",
+ "{\"ops\":{},\"ids\":[],\"metrics\":{}}",
+ };
+
+ RowType rowType =
+ (RowType)
+ ROW(
+ FIELD(
+ "ops",
+ ROW(FIELD("id", STRING()), FIELD("svt", STRING()))),
+ FIELD("ids", ARRAY(INT())),
+ FIELD("metrics", MAP(STRING(), DOUBLE())))
+ .getLogicalType();
+
+ JsonRowDataDeserializationSchema deserializationSchema =
+ new JsonRowDataDeserializationSchema(
+ rowType,
+ InternalTypeInfo.of(rowType),
+ false,
+ true,
+ TimestampFormat.ISO_8601);
+ open(deserializationSchema);
+ JsonRowDataSerializationSchema serializationSchema =
+ new JsonRowDataSerializationSchema(
+ rowType,
+ TimestampFormat.ISO_8601,
+ JsonFormatOptions.MapNullKeyMode.LITERAL,
+ "null",
+ false,
+ true);
+ open(serializationSchema);
+ for (int i = 0; i < jsons.length; i++) {
+ String json = jsons[i];
+ RowData row = deserializationSchema.deserialize(json.getBytes());
+ String result = new String(serializationSchema.serialize(row));
+ assertThat(result).isEqualTo(expected[i]);
+ }
+ }
+
@TestTemplate
void testJsonParse() throws Exception {
for (TestSpec spec : testData) {
@@ -648,7 +713,8 @@ void testSerializationWithTypesMismatch() {
TimestampFormat.SQL,
JsonFormatOptions.MapNullKeyMode.FAIL,
"null",
- true);
+ true,
+ false);
open(serializationSchema);
String errorMessage = "Fail to serialize at field: f1.";
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java
index 00bd5a0625e33..bf2b95dac98fa 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java
@@ -74,6 +74,7 @@ void testDefaultOptions() {
TimestampFormat.SQL,
JsonFormatOptions.MapNullKeyMode.FAIL,
"null",
+ false,
false);
SerializationSchema actualSer = createSerializationSchema(options);
assertThat(actualSer).isEqualTo(expectedSer);
@@ -89,6 +90,7 @@ void testUserDefinedOptions() {
options.put("canal-json.map-null-key.mode", "LITERAL");
options.put("canal-json.map-null-key.literal", "nullKey");
options.put("canal-json.encode.decimal-as-plain-number", "true");
+ options.put("canal-json.encode.ignore-null-fields", "true");
// test Deser
CanalJsonDeserializationSchema expectedDeser =
@@ -109,6 +111,7 @@ void testUserDefinedOptions() {
TimestampFormat.ISO_8601,
JsonFormatOptions.MapNullKeyMode.LITERAL,
"nullKey",
+ true,
true);
SerializationSchema actualSer = createSerializationSchema(options);
assertThat(actualSer).isEqualTo(expectedSer);
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java
index e45bfcc5eeede..cf326f2a3394d 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java
@@ -218,7 +218,8 @@ public void runTest(List lines, CanalJsonDeserializationSchema deseriali
TimestampFormat.ISO_8601,
JsonFormatOptions.MapNullKeyMode.LITERAL,
"null",
- true);
+ true,
+ false);
serializationSchema.open(new DummyInitializationContext());
List result = new ArrayList<>();
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java
index d000877b2f993..c469e0b2f95f4 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java
@@ -81,6 +81,7 @@ void testSeDeSchema() {
TimestampFormat.ISO_8601,
JsonFormatOptions.MapNullKeyMode.LITERAL,
"null",
+ true,
true);
final DynamicTableSink actualSink = createTableSink(SCHEMA, options);
@@ -200,6 +201,7 @@ private Map getAllOptions() {
options.put("debezium-json.map-null-key.mode", "LITERAL");
options.put("debezium-json.map-null-key.literal", "null");
options.put("debezium-json.encode.decimal-as-plain-number", "true");
+ options.put("debezium-json.encode.ignore-null-fields", "true");
return options;
}
}
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java
index 3b9151f33a9f8..ffe0007f522a4 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java
@@ -249,7 +249,8 @@ private void testSerializationDeserialization(String resourceFile, boolean schem
TimestampFormat.SQL,
JsonFormatOptions.MapNullKeyMode.LITERAL,
"null",
- true);
+ true,
+ false);
open(serializationSchema);
actual = new ArrayList<>();
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactoryTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactoryTest.java
index bc47d1e68f008..54fe0804a5bd0 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactoryTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactoryTest.java
@@ -69,6 +69,7 @@ void testSeDeSchema() {
TimestampFormat.ISO_8601,
JsonFormatOptions.MapNullKeyMode.LITERAL,
"null",
+ true,
true);
final Map options = getAllOptions();
@@ -165,6 +166,7 @@ private Map getAllOptions() {
options.put("maxwell-json.map-null-key.mode", "LITERAL");
options.put("maxwell-json.map-null-key.literal", "null");
options.put("maxwell-json.encode.decimal-as-plain-number", "true");
+ options.put("maxwell-json.encode.ignore-null-fields", "true");
return options;
}
}
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java
index 12d64fd99d0f5..d17d6a83534ee 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java
@@ -187,7 +187,8 @@ void testSerializationDeserialization() throws Exception {
TimestampFormat.SQL,
JsonFormatOptions.MapNullKeyMode.LITERAL,
"null",
- true);
+ true,
+ false);
open(serializationSchema);
List result = new ArrayList<>();
for (RowData rowData : collector.list) {
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactoryTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactoryTest.java
index c04e991a2de22..f840783ca95b0 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactoryTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactoryTest.java
@@ -55,6 +55,7 @@ void testSeDeSchema() {
TimestampFormat.ISO_8601,
JsonFormatOptions.MapNullKeyMode.LITERAL,
"null",
+ true,
true);
final DynamicTableSink actualSink = createTableSink(SCHEMA, options);
@@ -137,6 +138,7 @@ private Map getAllOptions() {
options.put("ogg-json.map-null-key.mode", "LITERAL");
options.put("ogg-json.map-null-key.literal", "null");
options.put("ogg-json.encode.decimal-as-plain-number", "true");
+ options.put("ogg-json.encode.ignore-null-fields", "true");
return options;
}
}
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java
index 2fa78c8941216..76e417d4ac11b 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java
@@ -216,7 +216,8 @@ private void testSerializationDeserialization(String resourceFile) throws Except
TimestampFormat.SQL,
JsonFormatOptions.MapNullKeyMode.LITERAL,
"null",
- true);
+ true,
+ false);
open(serializationSchema);
actual = new ArrayList<>();
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoSerializer.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoSerializer.java
index b796d32ba3e48..fb43f6d62b8b5 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoSerializer.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoSerializer.java
@@ -68,7 +68,10 @@ public ResultInfoSerializer() {
private static final RowDataToJsonConverters TO_JSON_CONVERTERS =
new RowDataToJsonConverters(
- TimestampFormat.ISO_8601, JsonFormatOptions.MapNullKeyMode.LITERAL, "null");
+ TimestampFormat.ISO_8601,
+ JsonFormatOptions.MapNullKeyMode.LITERAL,
+ "null",
+ false);
@Override
public void serialize(