Skip to content

Commit

Permalink
[FLINK-16627][json] Support ignore null fields when serializing into …
Browse files Browse the repository at this point in the history
…JSON

Close apache#24430
  • Loading branch information
RubyChou authored and libenchao committed Mar 5, 2024
1 parent e0b6c12 commit d422da4
Show file tree
Hide file tree
Showing 32 changed files with 235 additions and 38 deletions.
7 changes: 7 additions & 0 deletions docs/content.zh/docs/connectors/table/formats/debezium.md
Expand Up @@ -424,6 +424,13 @@ Flink 提供了 `debezium-avro-confluent` 和 `debezium-json` 两种 format 来
<td>Boolean</td>
<td>将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:<code>0.000000027</code> 默认会表示为 <code>2.7E-8</code>。当此选项设为 true 时,则会表示为 <code>0.000000027</code>。</td>
</tr>
<tr>
<td><h5>debezium-json.encode.ignore-null-fields</h5></td>
<td>选填</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>仅序列化非 Null 的列,默认情况下,会序列化所有列无论是否为 Null。</td>
</tr>
</tbody>
</table>

Expand Down
7 changes: 7 additions & 0 deletions docs/content.zh/docs/connectors/table/formats/json.md
Expand Up @@ -135,6 +135,13 @@ Format 参数
<td>Boolean</td>
<td>将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:<code>0.000000027</code> 默认会表示为 <code>2.7E-8</code>。当此选项设为 true 时,则会表示为 <code>0.000000027</code>。</td>
</tr>
<tr>
<td><h5>json.encode.ignore-null-fields</h5></td>
<td>选填</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>仅序列化非 Null 的列,默认情况下,会序列化所有列无论是否为 Null。</td>
</tr>
<tr>
<td><h5>decode.json-parser.enabled</h5></td>
<td>选填</td>
Expand Down
7 changes: 7 additions & 0 deletions docs/content.zh/docs/connectors/table/formats/maxwell.md
Expand Up @@ -251,6 +251,13 @@ Format Options
<td>Boolean</td>
<td>Encode all decimals as plain numbers instead of possible scientific notations. By default, decimals may be written using scientific notation. For example, <code>0.000000027</code> is encoded as <code>2.7E-8</code> by default, and will be written as <code>0.000000027</code> if set this option to true.</td>
</tr>
<tr>
<td><h5>maxwell-json.encode.ignore-null-fields</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Encode only non-null fields. By default, all fields will be included.</td>
</tr>
</tbody>
</table>

Expand Down
9 changes: 8 additions & 1 deletion docs/content.zh/docs/connectors/table/formats/ogg.md
Expand Up @@ -216,7 +216,7 @@ Format Options
<td>当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。</td>
</tr>
<tr>
<td><h5>debezium-json.timestamp-format.standard</h5></td>
<td><h5>ogg-json.timestamp-format.standard</h5></td>
<td>可选</td>
<td style="word-wrap: break-word;"><code>'SQL'</code></td>
<td>String</td>
Expand Down Expand Up @@ -247,6 +247,13 @@ Format Options
<td>String</td>
<td>当 <code>'ogg-json.map-null-key.mode'</code> 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。</td>
</tr>
<tr>
<td><h5>ogg-json.encode.ignore-null-fields</h5></td>
<td>选填</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>仅序列化非 Null 的列,默认情况下,会序列化所有列无论是否为 Null。</td>
</tr>
</tbody>
</table>

Expand Down
7 changes: 7 additions & 0 deletions docs/content/docs/connectors/table/formats/debezium.md
Expand Up @@ -445,6 +445,13 @@ Use format `debezium-avro-confluent` to interpret Debezium Avro messages and for
<td>Boolean</td>
<td>Encode all decimals as plain numbers instead of possible scientific notations. By default, decimals may be written using scientific notation. For example, <code>0.000000027</code> is encoded as <code>2.7E-8</code> by default, and will be written as <code>0.000000027</code> if set this option to true.</td>
</tr>
<tr>
<td><h5>debezium-json.encode.ignore-null-fields</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Encode only non-null fields. By default, all fields will be included.</td>
</tr>
</tbody>
</table>

Expand Down
8 changes: 8 additions & 0 deletions docs/content/docs/connectors/table/formats/json.md
Expand Up @@ -146,6 +146,14 @@ Format Options
<td>Boolean</td>
<td>Encode all decimals as plain numbers instead of possible scientific notations. By default, decimals may be written using scientific notation. For example, <code>0.000000027</code> is encoded as <code>2.7E-8</code> by default, and will be written as <code>0.000000027</code> if set this option to true.</td>
</tr>
<tr>
<td><h5>json.encode.ignore-null-fields</h5></td>
<td>optional</td>
<td>no</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Encode only non-null fields. By default, all fields will be included.</td>
</tr>
<tr>
<td><h5>decode.json-parser.enabled</h5></td>
<td>optional</td>
Expand Down
7 changes: 7 additions & 0 deletions docs/content/docs/connectors/table/formats/maxwell.md
Expand Up @@ -251,6 +251,13 @@ Format Options
<td>Boolean</td>
<td>Encode all decimals as plain numbers instead of possible scientific notations. By default, decimals may be written using scientific notation. For example, <code>0.000000027</code> is encoded as <code>2.7E-8</code> by default, and will be written as <code>0.000000027</code> if set this option to true.</td>
</tr>
<tr>
<td><h5>maxwell-json.encode.ignore-null-fields</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Encode only non-null fields. By default, all fields will be included.</td>
</tr>
</tbody>
</table>

Expand Down
7 changes: 7 additions & 0 deletions docs/content/docs/connectors/table/formats/ogg.md
Expand Up @@ -260,6 +260,13 @@ Format Options
<td>String</td>
<td>Specify string literal to replace null key when <code>'ogg-json.map-null-key.mode'</code> is LITERAL.</td>
</tr>
<tr>
<td><h5>ogg-json.encode.ignore-null-fields</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Encode only non-null fields. By default, all fields will be included.</td>
</tr>
</tbody>
</table>

Expand Down
Expand Up @@ -108,7 +108,7 @@ public class ThriftObjectConversions {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final RowDataToJsonConverters TO_JSON_CONVERTERS =
new RowDataToJsonConverters(
TimestampFormat.SQL, JsonFormatOptions.MapNullKeyMode.LITERAL, "null");
TimestampFormat.SQL, JsonFormatOptions.MapNullKeyMode.LITERAL, "null", false);
private static final Map<String, TableKind> TABLE_TYPE_MAPPINGS = buildTableTypeMapping();

// --------------------------------------------------------------------------------------------
Expand Down
Expand Up @@ -46,6 +46,7 @@

import static org.apache.flink.formats.json.JsonFormatOptions.DECODE_JSON_PARSER_ENABLED;
import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_IGNORE_NULL_FIELDS;
import static org.apache.flink.formats.json.JsonFormatOptions.FAIL_ON_MISSING_FIELD;
import static org.apache.flink.formats.json.JsonFormatOptions.IGNORE_PARSE_ERRORS;
import static org.apache.flink.formats.json.JsonFormatOptions.MAP_NULL_KEY_LITERAL;
Expand Down Expand Up @@ -147,6 +148,7 @@ public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(

final boolean encodeDecimalAsPlainNumber =
formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
final boolean ignoreNullFields = formatOptions.get(ENCODE_IGNORE_NULL_FIELDS);

return new EncodingFormat<SerializationSchema<RowData>>() {
@Override
Expand All @@ -158,7 +160,8 @@ public SerializationSchema<RowData> createRuntimeEncoder(
timestampOption,
mapNullKeyMode,
mapNullKeyLiteral,
encodeDecimalAsPlainNumber);
encodeDecimalAsPlainNumber,
ignoreNullFields);
}

@Override
Expand Down Expand Up @@ -187,6 +190,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(MAP_NULL_KEY_MODE);
options.add(MAP_NULL_KEY_LITERAL);
options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
options.add(ENCODE_IGNORE_NULL_FIELDS);
return options;
}

Expand Down
Expand Up @@ -73,6 +73,13 @@ public class JsonFormatOptions {
.withDescription(
"Optional flag to specify whether to encode all decimals as plain numbers instead of possible scientific notations, false by default.");

public static final ConfigOption<Boolean> ENCODE_IGNORE_NULL_FIELDS =
ConfigOptions.key("encode.ignore-null-fields")
.booleanType()
.defaultValue(false)
.withDescription(
"Optional flag to specify whether to ignore null fields when encoding, false by default.");

public static final ConfigOption<Boolean> DECODE_JSON_PARSER_ENABLED =
ConfigOptions.key("decode.json-parser.enabled")
.booleanType()
Expand Down
Expand Up @@ -68,19 +68,28 @@ public class JsonRowDataSerializationSchema implements SerializationSchema<RowDa
/** Flag indicating whether to serialize all decimals as plain numbers. */
private final boolean encodeDecimalAsPlainNumber;

/** Flag indicating whether to ignore null fields. */
private final boolean ignoreNullFields;

public JsonRowDataSerializationSchema(
RowType rowType,
TimestampFormat timestampFormat,
JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
String mapNullKeyLiteral,
boolean encodeDecimalAsPlainNumber) {
boolean encodeDecimalAsPlainNumber,
boolean ignoreNullFields) {
this.rowType = rowType;
this.timestampFormat = timestampFormat;
this.mapNullKeyMode = mapNullKeyMode;
this.mapNullKeyLiteral = mapNullKeyLiteral;
this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber;
this.ignoreNullFields = ignoreNullFields;
this.runtimeConverter =
new RowDataToJsonConverters(timestampFormat, mapNullKeyMode, mapNullKeyLiteral)
new RowDataToJsonConverters(
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
ignoreNullFields)
.createConverter(rowType);
}

Expand All @@ -95,7 +104,7 @@ public void open(InitializationContext context) throws Exception {

@Override
public byte[] serialize(RowData row) {
if (node == null) {
if (node == null || ignoreNullFields) {
node = mapper.createObjectNode();
}

Expand All @@ -120,7 +129,8 @@ public boolean equals(Object o) {
&& timestampFormat.equals(that.timestampFormat)
&& mapNullKeyMode.equals(that.mapNullKeyMode)
&& mapNullKeyLiteral.equals(that.mapNullKeyLiteral)
&& encodeDecimalAsPlainNumber == that.encodeDecimalAsPlainNumber;
&& encodeDecimalAsPlainNumber == that.encodeDecimalAsPlainNumber
&& ignoreNullFields == that.ignoreNullFields;
}

@Override
Expand All @@ -130,6 +140,7 @@ public int hashCode() {
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
encodeDecimalAsPlainNumber);
encodeDecimalAsPlainNumber,
ignoreNullFields);
}
}
Expand Up @@ -68,13 +68,18 @@ public class RowDataToJsonConverters implements Serializable {
/** The string literal when handling mode for map null key LITERAL. is */
private final String mapNullKeyLiteral;

/** Flag indicating whether to ignore null fields. */
private final boolean ignoreNullFields;

public RowDataToJsonConverters(
TimestampFormat timestampFormat,
JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
String mapNullKeyLiteral) {
String mapNullKeyLiteral,
boolean ignoreNullFields) {
this.timestampFormat = timestampFormat;
this.mapNullKeyMode = mapNullKeyMode;
this.mapNullKeyLiteral = mapNullKeyLiteral;
this.ignoreNullFields = ignoreNullFields;
}

/**
Expand Down Expand Up @@ -331,9 +336,11 @@ private RowDataToJsonConverter createRowConverter(RowType type) {
String fieldName = fieldNames[i];
try {
Object field = fieldGetters[i].getFieldOrNull(row);
node.set(
fieldName,
fieldConverters[i].convert(mapper, node.get(fieldName), field));
if (field != null || !ignoreNullFields) {
node.set(
fieldName,
fieldConverters[i].convert(mapper, node.get(fieldName), field));
}
} catch (Throwable t) {
throw new RuntimeException(
String.format("Fail to serialize at field: %s.", fieldName), t);
Expand Down
Expand Up @@ -44,6 +44,7 @@
import java.util.Set;

import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_IGNORE_NULL_FIELDS;
import static org.apache.flink.formats.json.canal.CanalJsonFormatOptions.DATABASE_INCLUDE;
import static org.apache.flink.formats.json.canal.CanalJsonFormatOptions.IGNORE_PARSE_ERRORS;
import static org.apache.flink.formats.json.canal.CanalJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL;
Expand Down Expand Up @@ -91,6 +92,8 @@ public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
final boolean encodeDecimalAsPlainNumber =
formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);

final boolean ignoreNullFields = formatOptions.get(ENCODE_IGNORE_NULL_FIELDS);

return new EncodingFormat<SerializationSchema<RowData>>() {
@Override
public ChangelogMode getChangelogMode() {
Expand All @@ -111,7 +114,8 @@ public SerializationSchema<RowData> createRuntimeEncoder(
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
encodeDecimalAsPlainNumber);
encodeDecimalAsPlainNumber,
ignoreNullFields);
}
};
}
Expand All @@ -136,6 +140,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(JSON_MAP_NULL_KEY_MODE);
options.add(JSON_MAP_NULL_KEY_LITERAL);
options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
options.add(ENCODE_IGNORE_NULL_FIELDS);
return options;
}

Expand Down
Expand Up @@ -59,14 +59,16 @@ public CanalJsonSerializationSchema(
TimestampFormat timestampFormat,
JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
String mapNullKeyLiteral,
boolean encodeDecimalAsPlainNumber) {
boolean encodeDecimalAsPlainNumber,
boolean ignoreNullFields) {
jsonSerializer =
new JsonRowDataSerializationSchema(
createJsonRowType(fromLogicalToDataType(rowType)),
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
encodeDecimalAsPlainNumber);
encodeDecimalAsPlainNumber,
ignoreNullFields);
}

@Override
Expand Down
Expand Up @@ -45,6 +45,7 @@
import java.util.Set;

import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_IGNORE_NULL_FIELDS;
import static org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.IGNORE_PARSE_ERRORS;
import static org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL;
import static org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.JSON_MAP_NULL_KEY_MODE;
Expand Down Expand Up @@ -92,6 +93,7 @@ public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(

final boolean encodeDecimalAsPlainNumber =
formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
final boolean ignoreNullFields = formatOptions.get(ENCODE_IGNORE_NULL_FIELDS);

return new EncodingFormat<SerializationSchema<RowData>>() {

Expand All @@ -114,7 +116,8 @@ public SerializationSchema<RowData> createRuntimeEncoder(
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
encodeDecimalAsPlainNumber);
encodeDecimalAsPlainNumber,
ignoreNullFields);
}
};
}
Expand All @@ -138,6 +141,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(JSON_MAP_NULL_KEY_MODE);
options.add(JSON_MAP_NULL_KEY_LITERAL);
options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
options.add(ENCODE_IGNORE_NULL_FIELDS);
return options;
}

Expand Down
Expand Up @@ -56,14 +56,16 @@ public DebeziumJsonSerializationSchema(
TimestampFormat timestampFormat,
JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
String mapNullKeyLiteral,
boolean encodeDecimalAsPlainNumber) {
boolean encodeDecimalAsPlainNumber,
boolean ignoreNullFields) {
jsonSerializer =
new JsonRowDataSerializationSchema(
createJsonRowType(fromLogicalToDataType(rowType)),
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
encodeDecimalAsPlainNumber);
encodeDecimalAsPlainNumber,
ignoreNullFields);
}

@Override
Expand Down

0 comments on commit d422da4

Please sign in to comment.