Skip to content

Commit

Permalink
[minor] following 3304, some code refactoring (apache#6713)
Browse files Browse the repository at this point in the history
  • Loading branch information
danny0405 committed Sep 19, 2022
1 parent c0eae6d commit 1b2792c
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ static <T extends HoodieRecordPayload> Function<HoodieRecord<T>, HoodieInsertVal

static <T extends HoodieRecordPayload> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getTransformFunction(
Schema schema) {
return hoodieRecord -> new HoodieInsertValueGenResult(hoodieRecord, schema, CollectionUtils.EMPTY_PROPERTIES);
return hoodieRecord -> new HoodieInsertValueGenResult(hoodieRecord, schema, CollectionUtils.emptyProps());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,11 @@
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;

import java.util.Properties;

public class HoodieWriteHelper<T extends HoodieRecordPayload, R> extends BaseWriteHelper<T, HoodieData<HoodieRecord<T>>,
HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> {

Expand Down Expand Up @@ -64,7 +63,7 @@ public HoodieData<HoodieRecord<T>> deduplicateRecords(
return Pair.of(key, record);
}).reduceByKey((rec1, rec2) -> {
@SuppressWarnings("unchecked")
T reducedData = (T) rec2.getData().preCombine(rec1.getData(), schema.get(), new Properties());
T reducedData = (T) rec2.getData().preCombine(rec1.getData(), schema.get(), CollectionUtils.emptyProps());
HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey();

return new HoodieAvroRecord<>(reducedKey, reducedData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
Expand All @@ -39,7 +40,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -102,7 +102,7 @@ public List<HoodieRecord<T>> deduplicateRecords(
final T data1 = rec1.getData();
final T data2 = rec2.getData();

@SuppressWarnings("unchecked") final T reducedData = (T) data2.preCombine(data1, schema, new Properties());
@SuppressWarnings("unchecked") final T reducedData = (T) data2.preCombine(data1, schema, CollectionUtils.emptyProps());
// we cannot allow the user to change the key or partitionPath, since that will affect
// everything
// so pick it from one of the records.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
Expand All @@ -34,7 +35,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;

public class JavaWriteHelper<T extends HoodieRecordPayload,R> extends BaseWriteHelper<T, List<HoodieRecord<T>>,
Expand Down Expand Up @@ -70,7 +70,7 @@ public List<HoodieRecord<T>> deduplicateRecords(
final Schema schema = new Schema.Parser().parse(schemaStr);
return keyedRecords.values().stream().map(x -> x.stream().map(Pair::getRight).reduce((rec1, rec2) -> {
@SuppressWarnings("unchecked")
T reducedData = (T) rec1.getData().preCombine(rec2.getData(), schema, new Properties());
T reducedData = (T) rec1.getData().preCombine(rec2.getData(), schema, CollectionUtils.emptyProps());
// we cannot allow the user to change the key or partitionPath, since that will affect
// everything
// so pick it from one of the records.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,15 @@

public class CollectionUtils {

public static final Properties EMPTY_PROPERTIES = new Properties();
private static final Properties EMPTY_PROPERTIES = new Properties();

/**
* Returns an empty {@code Properties} instance. The props instance is a singleton,
* it should not be modified in any case.
*/
public static Properties emptyProps() {
return EMPTY_PROPERTIES;
}

public static boolean isNullOrEmpty(Collection<?> c) {
return Objects.isNull(c) || c.isEmpty();
Expand Down

0 comments on commit 1b2792c

Please sign in to comment.