diff --git a/src/main/java/io/odpf/depot/bigquery/handler/JsonErrorHandler.java b/src/main/java/io/odpf/depot/bigquery/handler/JsonErrorHandler.java index 6472aec0..b1dd4329 100644 --- a/src/main/java/io/odpf/depot/bigquery/handler/JsonErrorHandler.java +++ b/src/main/java/io/odpf/depot/bigquery/handler/JsonErrorHandler.java @@ -17,7 +17,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -28,8 +27,6 @@ the job of the class is to handle unknown field errors and then update the bq ta public class JsonErrorHandler implements ErrorHandler { private final BigQueryClient bigQueryClient; - private final String tablePartitionKey; - private final Optional partitionKeyDataType; private final boolean castAllColumnsToStringDataType; private final Map metadataColumnsTypesMap; private final String bqMetadataNamespace; @@ -40,15 +37,9 @@ public JsonErrorHandler(BigQueryClient bigQueryClient, BigQuerySinkConfig bigQue this.instrumentation = instrumentation; this.bigQueryClient = bigQueryClient; - tablePartitionKey = bigQuerySinkConfig.isTablePartitioningEnabled() ? bigQuerySinkConfig.getTablePartitionKey() : ""; defaultColumnsMap = bigQuerySinkConfig.getSinkBigqueryDefaultColumns() .stream() .collect(Collectors.toMap(TupleString::getFirst, TupleString::getSecond)); - if (bigQuerySinkConfig.isTablePartitioningEnabled()) { - partitionKeyDataType = Optional.of(LegacySQLTypeName.valueOfStrict(defaultColumnsMap.get(tablePartitionKey).toUpperCase())); - } else { - partitionKeyDataType = Optional.empty(); - } castAllColumnsToStringDataType = bigQuerySinkConfig.getSinkBigqueryDefaultDatatypeStringEnable(); bqMetadataNamespace = bigQuerySinkConfig.getBqMetadataNamespace(); if (!bigQuerySinkConfig.shouldAddMetadata()) { @@ -103,11 +94,11 @@ private List getBqErrorsWithNoSuchFields(List valu ).collect(Collectors.toList()); } + /** + * This method only used for unknown fields. + */ private Field getField(String key) { - if (!tablePartitionKey.isEmpty() && tablePartitionKey.equals(key) && partitionKeyDataType.isPresent()) { - return Field.of(key, partitionKeyDataType.get()); - } if (!bqMetadataNamespace.isEmpty()) { throw new UnsupportedOperationException("metadata namespace is not supported, because nested json structure is not supported"); } diff --git a/src/main/java/io/odpf/depot/bigquery/json/BigqueryJsonUpdateListener.java b/src/main/java/io/odpf/depot/bigquery/json/BigqueryJsonUpdateListener.java index 10ddf735..d3c00f48 100644 --- a/src/main/java/io/odpf/depot/bigquery/json/BigqueryJsonUpdateListener.java +++ b/src/main/java/io/odpf/depot/bigquery/json/BigqueryJsonUpdateListener.java @@ -93,29 +93,22 @@ private void addMetadataFields(HashSet fieldsToBeUpdated, List