From 90d678c0f74b4c2e19085d7332325dd2dd0c7517 Mon Sep 17 00:00:00 2001 From: Yann Byron Date: Fri, 23 Sep 2022 01:33:19 +0800 Subject: [PATCH] [HUDI-3478][HUDI-4887] Use Avro as the format of persisted cdc data (#6734) --- .../org/apache/hudi/io/HoodieCDCLogger.java | 36 +- .../org/apache/hudi/io/HoodieMergeHandle.java | 13 +- .../org/apache/hudi/avro/AvroSchemaUtils.java | 8 +- .../hudi/common/table/cdc/HoodieCDCUtils.java | 102 +++--- .../functional/TestHoodieLogFormat.java | 59 +++- .../cdc/TestCDCDataFrameSuite.scala | 324 ++++++++++++++++++ 6 files changed, 448 insertions(+), 94 deletions(-) create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java index c93489d89096..e4f1e14252af 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java @@ -102,16 +102,11 @@ public HoodieCDCLogger( this.cdcSupplementalLoggingMode = HoodieCDCSupplementalLoggingMode.parse( config.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE)); - if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER)) { - this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA; - this.cdcSchemaString = HoodieCDCUtils.CDC_SCHEMA_STRING; - } else if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE)) { - this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA_OP_RECORDKEY_BEFORE; - this.cdcSchemaString = HoodieCDCUtils.CDC_SCHEMA_OP_RECORDKEY_BEFORE_STRING; - } else { - this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA_OP_AND_RECORDKEY; - this.cdcSchemaString = HoodieCDCUtils.CDC_SCHEMA_OP_AND_RECORDKEY_STRING; - } + this.cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode( + cdcSupplementalLoggingMode, + dataSchema + ); + this.cdcSchemaString = this.cdcSchema.toString(); this.cdcData = new ExternalSpillableMap<>( maxInMemorySizeInBytes, @@ -158,18 +153,21 @@ private GenericData.Record createCDCRecord(HoodieCDCOperation operation, GenericRecord newRecord) { GenericData.Record record; if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER)) { - record = HoodieCDCUtils.cdcRecord(operation.getValue(), commitTime, + record = HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), commitTime, removeCommitMetadata(oldRecord), newRecord); } else if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE)) { - record = HoodieCDCUtils.cdcRecord(operation.getValue(), recordKey, + record = HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), recordKey, removeCommitMetadata(oldRecord)); } else { - record = HoodieCDCUtils.cdcRecord(operation.getValue(), recordKey); + record = HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), recordKey); } return record; } private GenericRecord removeCommitMetadata(GenericRecord record) { + if (record == null) { + return null; + } return HoodieAvroUtils.rewriteRecordWithNewSchema(record, dataSchema, new HashMap<>()); } @@ -221,18 +219,6 @@ public void close() { } } - public static Option writeCDCDataIfNeeded(HoodieCDCLogger cdcLogger, - long recordsWritten, - long insertRecordsWritten) { - if (cdcLogger == null || recordsWritten == 0L || (recordsWritten == insertRecordsWritten)) { - // the following cases where we do not need to write out the cdc file: - // case 1: all the data from the previous file slice are deleted. and no new data is inserted; - // case 2: all the data are new-coming, - return Option.empty(); - } - return cdcLogger.writeCDCData(); - } - public static void setCDCStatIfNeeded(HoodieWriteStat stat, Option cdcResult, String partitionPath, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index da6b1c6071b8..442256ade348 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -425,6 +425,16 @@ protected void writeIncomingRecords() throws IOException { } } + private Option writeCDCDataIfNeeded() { + if (cdcLogger == null || recordsWritten == 0L || (recordsWritten == insertRecordsWritten)) { + // the following cases where we do not need to write out the cdc file: + // case 1: all the data from the previous file slice are deleted. and no new data is inserted; + // case 2: all the data are new-coming, + return Option.empty(); + } + return cdcLogger.writeCDCData(); + } + @Override public List close() { try { @@ -445,8 +455,7 @@ public List close() { } // if there are cdc data written, set the CDC-related information. - Option cdcResult = - HoodieCDCLogger.writeCDCDataIfNeeded(cdcLogger, recordsWritten, insertRecordsWritten); + Option cdcResult = writeCDCDataIfNeeded(); HoodieCDCLogger.setCDCStatIfNeeded(stat, cdcResult, partitionPath, fs); long fileSizeInBytes = FSUtils.getFileSize(fs, newFilePath); diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java index d45cfe0351ed..6a87cc3c2990 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java @@ -105,8 +105,12 @@ public static Schema resolveNullableSchema(Schema schema) { * wrapping around provided target non-null type */ public static Schema createNullableSchema(Schema.Type avroType) { - checkState(avroType != Schema.Type.NULL); - return Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(avroType)); + return createNullableSchema(Schema.create(avroType)); + } + + public static Schema createNullableSchema(Schema schema) { + checkState(schema.getType() != Schema.Type.NULL); + return Schema.createUnion(Schema.create(Schema.Type.NULL), schema); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java index 3cf8315a5434..a741181d4d51 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java @@ -18,12 +18,17 @@ package org.apache.hudi.common.table.cdc; +import org.apache.avro.JsonProperties; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.avro.AvroSchemaUtils; import org.apache.hudi.exception.HoodieException; +import java.util.Arrays; +import java.util.List; + public class HoodieCDCUtils { public static final String CDC_LOGFILE_SUFFIX = "-cdc"; @@ -50,33 +55,6 @@ public class HoodieCDCUtils { CDC_AFTER_IMAGE }; - /** - * This is the standard CDC output format. - * Also, this is the schema of cdc log file in the case `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before_after'. - */ - public static final String CDC_SCHEMA_STRING = "{\"type\":\"record\",\"name\":\"Record\"," - + "\"fields\":[" - + "{\"name\":\"op\",\"type\":[\"string\",\"null\"]}," - + "{\"name\":\"ts_ms\",\"type\":[\"string\",\"null\"]}," - + "{\"name\":\"before\",\"type\":[\"string\",\"null\"]}," - + "{\"name\":\"after\",\"type\":[\"string\",\"null\"]}" - + "]}"; - - public static final Schema CDC_SCHEMA = new Schema.Parser().parse(CDC_SCHEMA_STRING); - - /** - * The schema of cdc log file in the case `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before'. - */ - public static final String CDC_SCHEMA_OP_RECORDKEY_BEFORE_STRING = "{\"type\":\"record\",\"name\":\"Record\"," - + "\"fields\":[" - + "{\"name\":\"op\",\"type\":[\"string\",\"null\"]}," - + "{\"name\":\"record_key\",\"type\":[\"string\",\"null\"]}," - + "{\"name\":\"before\",\"type\":[\"string\",\"null\"]}" - + "]}"; - - public static final Schema CDC_SCHEMA_OP_RECORDKEY_BEFORE = - new Schema.Parser().parse(CDC_SCHEMA_OP_RECORDKEY_BEFORE_STRING); - /** * The schema of cdc log file in the case `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_op_key'. */ @@ -89,32 +67,50 @@ public class HoodieCDCUtils { public static final Schema CDC_SCHEMA_OP_AND_RECORDKEY = new Schema.Parser().parse(CDC_SCHEMA_OP_AND_RECORDKEY_STRING); - public static final Schema schemaBySupplementalLoggingMode(HoodieCDCSupplementalLoggingMode supplementalLoggingMode) { - switch (supplementalLoggingMode) { - case WITH_BEFORE_AFTER: - return CDC_SCHEMA; - case WITH_BEFORE: - return CDC_SCHEMA_OP_RECORDKEY_BEFORE; - case OP_KEY: - return CDC_SCHEMA_OP_AND_RECORDKEY; - default: - throw new HoodieException("not support this supplemental logging mode: " + supplementalLoggingMode); + public static Schema schemaBySupplementalLoggingMode( + HoodieCDCSupplementalLoggingMode supplementalLoggingMode, + Schema tableSchema) { + if (supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.OP_KEY) { + return CDC_SCHEMA_OP_AND_RECORDKEY; + } else if (supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.WITH_BEFORE) { + return createCDCSchema(tableSchema, false); + } else if (supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER) { + return createCDCSchema(tableSchema, true); + } else { + throw new HoodieException("not support this supplemental logging mode: " + supplementalLoggingMode); } } + private static Schema createCDCSchema(Schema tableSchema, boolean withAfterImage) { + Schema imageSchema = AvroSchemaUtils.createNullableSchema(tableSchema); + Schema.Field opField = new Schema.Field(CDC_OPERATION_TYPE, + AvroSchemaUtils.createNullableSchema(Schema.Type.STRING), "", JsonProperties.NULL_VALUE); + Schema.Field beforeField = new Schema.Field( + CDC_BEFORE_IMAGE, imageSchema, "", JsonProperties.NULL_VALUE); + List fields; + if (withAfterImage) { + Schema.Field tsField = new Schema.Field(CDC_COMMIT_TIMESTAMP, + AvroSchemaUtils.createNullableSchema(Schema.Type.STRING), "", JsonProperties.NULL_VALUE); + Schema.Field afterField = new Schema.Field( + CDC_AFTER_IMAGE, imageSchema, "", JsonProperties.NULL_VALUE); + fields = Arrays.asList(opField, tsField, beforeField, afterField); + } else { + Schema.Field keyField = new Schema.Field(CDC_RECORD_KEY, + AvroSchemaUtils.createNullableSchema(Schema.Type.STRING), "", JsonProperties.NULL_VALUE); + fields = Arrays.asList(opField, keyField, beforeField); + } + + Schema mergedSchema = Schema.createRecord("CDC", null, tableSchema.getNamespace(), false); + mergedSchema.setFields(fields); + return mergedSchema; + } + /** * Build the cdc record which has all the cdc fields when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before_after'. */ - public static GenericData.Record cdcRecord( - String op, String commitTime, GenericRecord before, GenericRecord after) { - String beforeJsonStr = recordToJson(before); - String afterJsonStr = recordToJson(after); - return cdcRecord(op, commitTime, beforeJsonStr, afterJsonStr); - } - - public static GenericData.Record cdcRecord( - String op, String commitTime, String before, String after) { - GenericData.Record record = new GenericData.Record(CDC_SCHEMA); + public static GenericData.Record cdcRecord(Schema cdcSchema, String op, String commitTime, + GenericRecord before, GenericRecord after) { + GenericData.Record record = new GenericData.Record(cdcSchema); record.put(CDC_OPERATION_TYPE, op); record.put(CDC_COMMIT_TIMESTAMP, commitTime); record.put(CDC_BEFORE_IMAGE, before); @@ -125,20 +121,20 @@ public static GenericData.Record cdcRecord( /** * Build the cdc record when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before'. */ - public static GenericData.Record cdcRecord(String op, String recordKey, GenericRecord before) { - GenericData.Record record = new GenericData.Record(CDC_SCHEMA_OP_RECORDKEY_BEFORE); + public static GenericData.Record cdcRecord(Schema cdcSchema, String op, + String recordKey, GenericRecord before) { + GenericData.Record record = new GenericData.Record(cdcSchema); record.put(CDC_OPERATION_TYPE, op); record.put(CDC_RECORD_KEY, recordKey); - String beforeJsonStr = recordToJson(before); - record.put(CDC_BEFORE_IMAGE, beforeJsonStr); + record.put(CDC_BEFORE_IMAGE, before); return record; } /** * Build the cdc record when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_op_key'. */ - public static GenericData.Record cdcRecord(String op, String recordKey) { - GenericData.Record record = new GenericData.Record(CDC_SCHEMA_OP_AND_RECORDKEY); + public static GenericData.Record cdcRecord(Schema cdcSchema, String op, String recordKey) { + GenericData.Record record = new GenericData.Record(cdcSchema); record.put(CDC_OPERATION_TYPE, op); record.put(CDC_RECORD_KEY, recordKey); return record; diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index c037c79dd82f..19552ed5a93a 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -19,6 +19,7 @@ package org.apache.hudi.common.functional; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; @@ -37,6 +38,7 @@ import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode; import org.apache.hudi.common.table.cdc.HoodieCDCUtils; import org.apache.hudi.common.table.log.AppendResult; import org.apache.hudi.common.table.log.HoodieLogFileReader; @@ -561,46 +563,79 @@ public void testCDCBlock() throws IOException, InterruptedException { .withFs(fs) .build(); - GenericRecord record1 = HoodieCDCUtils.cdcRecord("i", "100", - null, "{\"uuid\": 1, \"name\": \"apple\"}, \"ts\": 1100}"); - GenericRecord record2 = HoodieCDCUtils.cdcRecord("u", "100", - "{\"uuid\": 2, \"name\": \"banana\"}, \"ts\": 1000}", - "{\"uuid\": 2, \"name\": \"blueberry\"}, \"ts\": 1100}"); - GenericRecord record3 = HoodieCDCUtils.cdcRecord("d", "100", - "{\"uuid\": 3, \"name\": \"cherry\"}, \"ts\": 1000}", null); + String dataSchameString = "{\"type\":\"record\",\"name\":\"Record\"," + + "\"fields\":[" + + "{\"name\":\"uuid\",\"type\":[\"int\",\"null\"]}," + + "{\"name\":\"name\",\"type\":[\"string\",\"null\"]}," + + "{\"name\":\"ts\",\"type\":[\"long\",\"null\"]}" + + "]}"; + Schema dataSchema = new Schema.Parser().parse(dataSchameString); + Schema cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode( + HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER, dataSchema); + GenericRecord insertedRecord = new GenericData.Record(dataSchema); + insertedRecord.put("uuid", 1); + insertedRecord.put("name", "apple"); + insertedRecord.put("ts", 1100L); + + GenericRecord updateBeforeImageRecord = new GenericData.Record(dataSchema); + updateBeforeImageRecord.put("uuid", 2); + updateBeforeImageRecord.put("name", "banana"); + updateBeforeImageRecord.put("ts", 1000L); + GenericRecord updateAfterImageRecord = new GenericData.Record(dataSchema); + updateAfterImageRecord.put("uuid", 2); + updateAfterImageRecord.put("name", "blueberry"); + updateAfterImageRecord.put("ts", 1100L); + + GenericRecord deletedRecord = new GenericData.Record(dataSchema); + deletedRecord.put("uuid", 3); + deletedRecord.put("name", "cherry"); + deletedRecord.put("ts", 1000L); + + GenericRecord record1 = HoodieCDCUtils.cdcRecord(cdcSchema, "i", "100", + null, insertedRecord); + GenericRecord record2 = HoodieCDCUtils.cdcRecord(cdcSchema, "u", "100", + updateBeforeImageRecord, updateAfterImageRecord); + GenericRecord record3 = HoodieCDCUtils.cdcRecord(cdcSchema, "d", "100", + deletedRecord, null); List records = new ArrayList<>(Arrays.asList(record1, record2, record3)); Map header = new HashMap<>(); header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); - header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, HoodieCDCUtils.CDC_SCHEMA_STRING); + header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, cdcSchema.toString()); HoodieDataBlock dataBlock = getDataBlock(HoodieLogBlockType.CDC_DATA_BLOCK, records, header); writer.appendBlock(dataBlock); writer.close(); - Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), HoodieCDCUtils.CDC_SCHEMA); + Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), cdcSchema); assertTrue(reader.hasNext()); HoodieLogBlock block = reader.next(); HoodieDataBlock dataBlockRead = (HoodieDataBlock) block; List recordsRead = getRecords(dataBlockRead); assertEquals(3, recordsRead.size(), "Read records size should be equal to the written records size"); - assertEquals(dataBlockRead.getSchema(), HoodieCDCUtils.CDC_SCHEMA); + assertEquals(dataBlockRead.getSchema(), cdcSchema); GenericRecord insert = (GenericRecord) recordsRead.stream() .filter(record -> record.get(0).toString().equals("i")).findFirst().get(); assertNull(insert.get("before")); assertNotNull(insert.get("after")); + assertEquals(((GenericRecord) insert.get("after")).get("name").toString(), "apple"); GenericRecord update = (GenericRecord) recordsRead.stream() .filter(record -> record.get(0).toString().equals("u")).findFirst().get(); assertNotNull(update.get("before")); assertNotNull(update.get("after")); - assertTrue(update.get("before").toString().contains("banana")); - assertTrue(update.get("after").toString().contains("blueberry")); + GenericRecord uBefore = (GenericRecord) update.get("before"); + GenericRecord uAfter = (GenericRecord) update.get("after"); + assertEquals(String.valueOf(uBefore.get("name")), "banana"); + assertEquals(Long.valueOf(uBefore.get("ts").toString()), 1000L); + assertEquals(String.valueOf(uAfter.get("name")), "blueberry"); + assertEquals(Long.valueOf(uAfter.get("ts").toString()), 1100L); GenericRecord delete = (GenericRecord) recordsRead.stream() .filter(record -> record.get(0).toString().equals("d")).findFirst().get(); assertNotNull(delete.get("before")); assertNull(delete.get("after")); + assertEquals(((GenericRecord) delete.get("before")).get("name").toString(), "cherry"); reader.close(); } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala new file mode 100644 index 000000000000..6f0731578d63 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional.cdc + +import org.apache.avro.Schema +import org.apache.avro.generic.{GenericRecord, IndexedRecord} + +import org.apache.hadoop.fs.Path + +import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieKey, HoodieLogFile, HoodieRecord} +import org.apache.hudi.common.table.cdc.{HoodieCDCOperation, HoodieCDCSupplementalLoggingMode, HoodieCDCUtils} +import org.apache.hudi.common.table.log.HoodieLogFormat +import org.apache.hudi.common.table.log.block.{HoodieDataBlock} +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.table.timeline.HoodieInstant +import org.apache.hudi.common.testutils.RawTripTestPayload +import org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, recordsToStrings} +import org.apache.hudi.config.{HoodieCleanConfig, HoodieWriteConfig} +import org.apache.hudi.testutils.HoodieClientTestBase + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.SaveMode + +import org.junit.jupiter.api.{AfterEach, BeforeEach} +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertNull, assertTrue} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.CsvSource + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +class TestCDCDataFrameSuite extends HoodieClientTestBase { + + var spark: SparkSession = _ + + val commonOpts = Map( + HoodieTableConfig.CDC_ENABLED.key -> "true", + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + "hoodie.bulkinsert.shuffle.parallelism" -> "2", + "hoodie.delete.shuffle.parallelism" -> "1", + RECORDKEY_FIELD.key -> "_row_key", + PRECOMBINE_FIELD.key -> "timestamp", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1", + HoodieCleanConfig.AUTO_CLEAN.key -> "false" + ) + + @BeforeEach override def setUp(): Unit = { + setTableName("hoodie_test") + initPath() + initSparkContexts() + spark = sqlContext.sparkSession + initTestDataGenerator() + initFileSystem() + } + + @AfterEach override def tearDown(): Unit = { + cleanupSparkContexts() + cleanupTestDataGenerator() + cleanupFileSystem() + } + + @ParameterizedTest + @CsvSource(Array("cdc_op_key", "cdc_data_before", "cdc_data_before_after")) + def testCOWDataSourceWrite(cdcSupplementalLoggingMode: String): Unit = { + val options = commonOpts ++ Map( + HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key -> cdcSupplementalLoggingMode + ) + + // Insert Operation + val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("org.apache.hudi") + .options(options) + .mode(SaveMode.Overwrite) + .save(basePath) + + metaClient = HoodieTableMetaClient.builder() + .setBasePath(basePath) + .setConf(spark.sessionState.newHadoopConf) + .build() + val instant1 = metaClient.reloadActiveTimeline.lastInstant().get() + assertEquals(spark.read.format("hudi").load(basePath).count(), 100) + // all the data is new-coming, it will write out cdc log files. + assertFalse(hasCDCLogFile(instant1)) + + val schemaResolver = new TableSchemaResolver(metaClient) + val dataSchema = schemaResolver.getTableAvroSchema(false) + val cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode( + HoodieCDCSupplementalLoggingMode.parse(cdcSupplementalLoggingMode), dataSchema) + + // Upsert Operation + val hoodieRecords2 = dataGen.generateUniqueUpdates("001", 50) + val records2 = recordsToStrings(hoodieRecords2).toList + val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2)) + inputDF2.write.format("org.apache.hudi") + .options(options) + .mode(SaveMode.Append) + .save(basePath) + val instant2 = metaClient.reloadActiveTimeline.lastInstant().get() + assertEquals(spark.read.format("hudi").load(basePath).count(), 100) + + // part of data are updated, it will write out cdc log files + assertTrue(hasCDCLogFile(instant2)) + val cdcData2: Seq[IndexedRecord] = getCDCLogFIle(instant2).flatMap(readCDCLogFile(_, cdcSchema)) + // check the num of cdc data + assertEquals(cdcData2.size, 50) + // check op + assert(cdcData2.forall( r => r.get(0).toString == "u")) + // check record key, before, after according to the supplemental logging mode + checkCDCDataForInsertOrUpdate(cdcSupplementalLoggingMode, cdcSchema, dataSchema, + cdcData2, hoodieRecords2, HoodieCDCOperation.UPDATE) + + // Delete Operation + val hoodieKey3 = dataGen.generateUniqueDeletes(20) + val records3 = deleteRecordsToStrings(hoodieKey3).toList + val inputDF3 = spark.read.json(spark.sparkContext.parallelize(records3, 2)) + inputDF3.write.format("org.apache.hudi") + .options(options) + .option(OPERATION.key, DELETE_OPERATION_OPT_VAL) + .mode(SaveMode.Append) + .save(basePath) + val instant3 = metaClient.reloadActiveTimeline.lastInstant().get() + assertEquals(spark.read.format("hudi").load(basePath).count(), 80) + + // part of data are deleted, it will write out cdc log files + assertTrue(hasCDCLogFile(instant3)) + val cdcData3 = getCDCLogFIle(instant3).flatMap(readCDCLogFile(_, cdcSchema)) + // check the num of cdc data + assertEquals(cdcData3.size, 20) + // check op + assert(cdcData3.forall( r => r.get(0).toString == "d")) + // check record key, before, after according to the supplemental logging mode + checkCDCDataForDelete(cdcSupplementalLoggingMode, cdcSchema, cdcData3, hoodieKey3) + } + + @ParameterizedTest + @CsvSource(Array("cdc_op_key", "cdc_data_before", "cdc_data_before_after")) + def testMORDataSourceWrite(cdcSupplementalLoggingMode: String): Unit = { + val options = commonOpts ++ Map( + TABLE_TYPE.key() -> MOR_TABLE_TYPE_OPT_VAL, + HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key -> cdcSupplementalLoggingMode + ) + + // 1. Insert Operation + val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("org.apache.hudi") + .options(options) + .mode(SaveMode.Overwrite) + .save(basePath) + + metaClient = HoodieTableMetaClient.builder() + .setBasePath(basePath) + .setConf(spark.sessionState.newHadoopConf) + .build() + + val schemaResolver = new TableSchemaResolver(metaClient) + val dataSchema = schemaResolver.getTableAvroSchema(false) + val cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode( + HoodieCDCSupplementalLoggingMode.parse(cdcSupplementalLoggingMode), dataSchema) + + val instant1 = metaClient.reloadActiveTimeline.lastInstant().get() + // all the data is new-coming, it will NOT write out cdc log files. + assertFalse(hasCDCLogFile(instant1)) + + // 2. Upsert Operation + val records2_1 = recordsToStrings(dataGen.generateUniqueUpdates("001", 30)).toList + val inputDF2_1 = spark.read.json(spark.sparkContext.parallelize(records2_1, 2)) + val records2_2 = recordsToStrings(dataGen.generateInserts("001", 20)).toList + val inputDF2_2 = spark.read.json(spark.sparkContext.parallelize(records2_2, 2)) + inputDF2_1.union(inputDF2_2).write.format("org.apache.hudi") + .options(options) + .mode(SaveMode.Append) + .save(basePath) + val instant2 = metaClient.reloadActiveTimeline.lastInstant().get() + + // part of data are updated, it will write out cdc log files + assertTrue(hasCDCLogFile(instant2)) + val cdcData2 = getCDCLogFIle(instant2).flatMap(readCDCLogFile(_, cdcSchema)) + assertEquals(cdcData2.size, 50) + // check op + assertEquals(cdcData2.count(r => r.get(0).toString == "u"), 30) + assertEquals(cdcData2.count(r => r.get(0).toString == "i"), 20) + + // 3. Delete Operation + val records3 = deleteRecordsToStrings(dataGen.generateUniqueDeletes(20)).toList + val inputDF3 = spark.read.json(spark.sparkContext.parallelize(records3, 2)) + inputDF3.write.format("org.apache.hudi") + .options(options) + .option(OPERATION.key, DELETE_OPERATION_OPT_VAL) + .mode(SaveMode.Append) + .save(basePath) + val instant3 = metaClient.reloadActiveTimeline.lastInstant().get() + // in cases that there is log files, it will NOT write out cdc log files. + assertFalse(hasCDCLogFile(instant3)) + } + + /** + * whether this instant will create a cdc log file. + */ + private def hasCDCLogFile(instant: HoodieInstant): Boolean = { + val commitMetadata = HoodieCommitMetadata.fromBytes( + metaClient.reloadActiveTimeline().getInstantDetails(instant).get(), + classOf[HoodieCommitMetadata] + ) + val hoodieWriteStats = commitMetadata.getWriteStats.asScala + hoodieWriteStats.exists { hoodieWriteStat => + val cdcPath = hoodieWriteStat.getCdcPath + cdcPath != null && cdcPath.nonEmpty + } + } + + /** + * whether this instant will create a cdc log file. + */ + private def getCDCLogFIle(instant: HoodieInstant): List[String] = { + val commitMetadata = HoodieCommitMetadata.fromBytes( + metaClient.reloadActiveTimeline().getInstantDetails(instant).get(), + classOf[HoodieCommitMetadata] + ) + commitMetadata.getWriteStats.asScala.map(_.getCdcPath).toList + } + + private def readCDCLogFile(relativeLogFile: String, cdcSchema: Schema): List[IndexedRecord] = { + val logFile = new HoodieLogFile( + metaClient.getFs.getFileStatus(new Path(metaClient.getBasePathV2, relativeLogFile))) + val reader = HoodieLogFormat.newReader(fs, logFile, cdcSchema); + assertTrue(reader.hasNext); + + val block = reader.next().asInstanceOf[HoodieDataBlock]; + block.getRecordIterator.asScala.toList + } + + private def checkCDCDataForInsertOrUpdate(cdcSupplementalLoggingMode: String, + cdcSchema: Schema, + dataSchema: Schema, + cdcRecords: Seq[IndexedRecord], + newHoodieRecords: java.util.List[HoodieRecord[_]], + op: HoodieCDCOperation): Unit = { + val cdcRecord = cdcRecords.head.asInstanceOf[GenericRecord] + // check schema + assertEquals(cdcRecord.getSchema, cdcSchema) + if (cdcSupplementalLoggingMode == "cdc_op_key") { + // check record key + assert(cdcRecords.map(_.get(1).toString).sorted == newHoodieRecords.map(_.getKey.getRecordKey).sorted) + } else if (cdcSupplementalLoggingMode == "cdc_data_before") { + // check record key + assert(cdcRecords.map(_.get(1).toString).sorted == newHoodieRecords.map(_.getKey.getRecordKey).sorted) + // check before + if (op == HoodieCDCOperation.INSERT) { + assertNull(cdcRecord.get("before")) + } else { + val payload = newHoodieRecords.find(_.getKey.getRecordKey == cdcRecord.get("record_key").toString).get + .getData.asInstanceOf[RawTripTestPayload] + val genericRecord = payload.getInsertValue(dataSchema).get.asInstanceOf[GenericRecord] + val cdcBeforeValue = cdcRecord.get("before").asInstanceOf[GenericRecord] + assertNotEquals(genericRecord.get("begin_lat"), cdcBeforeValue.get("begin_lat")) + } + } else { + val cdcBeforeValue = cdcRecord.get("before").asInstanceOf[GenericRecord] + val cdcAfterValue = cdcRecord.get("after").asInstanceOf[GenericRecord] + if (op == HoodieCDCOperation.INSERT) { + // check before + assertNull(cdcBeforeValue) + // check after + val payload = newHoodieRecords.find(_.getKey.getRecordKey == cdcAfterValue.get("_row_key").toString).get + .getData.asInstanceOf[RawTripTestPayload] + val genericRecord = payload.getInsertValue(dataSchema).get.asInstanceOf[GenericRecord] + assertEquals(genericRecord.get("begin_lat"), cdcAfterValue.get("begin_lat")) + } else { + val payload = newHoodieRecords.find(_.getKey.getRecordKey == cdcAfterValue.get("_row_key").toString).get + .getData.asInstanceOf[RawTripTestPayload] + val genericRecord = payload.getInsertValue(dataSchema).get.asInstanceOf[GenericRecord] + // check before + assertNotEquals(genericRecord.get("begin_lat"), cdcBeforeValue.get("begin_lat")) + // check after + assertEquals(genericRecord.get("begin_lat"), cdcAfterValue.get("begin_lat")) + } + } + } + + private def checkCDCDataForDelete(cdcSupplementalLoggingMode: String, + cdcSchema: Schema, + cdcRecords: Seq[IndexedRecord], + deletedKeys: java.util.List[HoodieKey]): Unit = { + val cdcRecord = cdcRecords.head.asInstanceOf[GenericRecord] + // check schema + assertEquals(cdcRecord.getSchema, cdcSchema) + if (cdcSupplementalLoggingMode == "cdc_op_key") { + // check record key + assert(cdcRecords.map(_.get(1).toString).sorted == deletedKeys.map(_.getRecordKey).sorted) + } else if (cdcSupplementalLoggingMode == "cdc_data_before") { + // check record key + assert(cdcRecords.map(_.get(1).toString).sorted == deletedKeys.map(_.getRecordKey).sorted) + } else { + val cdcBeforeValue = cdcRecord.get("before").asInstanceOf[GenericRecord] + val cdcAfterValue = cdcRecord.get("after").asInstanceOf[GenericRecord] + // check before + assert(deletedKeys.exists(_.getRecordKey == cdcBeforeValue.get("_row_key").toString)) + // check after + assertNull(cdcAfterValue) + } + } +}