diff --git a/.github/ISSUE_TEMPLATE/SUPPORT_REQUEST.md b/.github/ISSUE_TEMPLATE/SUPPORT_REQUEST.md
index adf727399355..9ce26e9d2bd3 100644
--- a/.github/ISSUE_TEMPLATE/SUPPORT_REQUEST.md
+++ b/.github/ISSUE_TEMPLATE/SUPPORT_REQUEST.md
@@ -8,7 +8,7 @@ labels: question
**_Tips before filing an issue_**
-- Have you gone through our [FAQs](https://cwiki.apache.org/confluence/display/HUDI/FAQ)?
+- Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)?
- Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.
diff --git a/.github/workflows/bot.yml b/.github/workflows/bot.yml
index 0652de980e16..fc79fefd2065 100644
--- a/.github/workflows/bot.yml
+++ b/.github/workflows/bot.yml
@@ -18,8 +18,20 @@ jobs:
include:
- scala: "scala-2.11"
spark: "spark2"
+ - scala: "scala-2.11"
+ spark: "spark2,spark-shade-unbundle-avro"
+ - scala: "scala-2.12"
+ spark: "spark3,spark3.0.x"
+ - scala: "scala-2.12"
+ spark: "spark3,spark3.0.x,spark-shade-unbundle-avro"
+ - scala: "scala-2.12"
+ spark: "spark3,spark3.1.x"
+ - scala: "scala-2.12"
+ spark: "spark3,spark3.1.x,spark-shade-unbundle-avro"
- scala: "scala-2.12"
spark: "spark3"
+ - scala: "scala-2.12"
+ spark: "spark3,spark-shade-unbundle-avro"
steps:
- uses: actions/checkout@v2
- name: Set up JDK 8
diff --git a/.gitignore b/.gitignore
index 413e0a0c4c0d..f465a1992bb1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -61,7 +61,8 @@ local.properties
# IntelliJ specific files/directories #
#######################################
.out
-.idea
+.idea/*
+!.idea/vcs.xml
*.ipr
*.iws
*.iml
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
new file mode 100644
index 000000000000..10142b516e46
--- /dev/null
+++ b/.idea/vcs.xml
@@ -0,0 +1,36 @@
+
+
+
+
+
+
+
+
+
+
diff --git a/NOTICE b/NOTICE
index 9b249331f343..437b974ac217 100644
--- a/NOTICE
+++ b/NOTICE
@@ -159,3 +159,9 @@ its NOTICE file:
This product includes software developed at
StreamSets (http://www.streamsets.com/).
+--------------------------------------------------------------------------------
+
+This product includes code from hilbert-curve project
+ * Copyright https://github.com/davidmoten/hilbert-curve
+ * Licensed under the Apache-2.0 License
+
diff --git a/README.md b/README.md
index 764ba0d48bcc..4f48fc6c7f9b 100644
--- a/README.md
+++ b/README.md
@@ -51,7 +51,7 @@ Prerequisites for building Apache Hudi:
* Unix-like system (like Linux, Mac OS X)
* Java 8 (Java 9 or 10 may work)
* Git
-* Maven
+* Maven (>=3.3.1)
```
# Checkout code and build
@@ -78,12 +78,19 @@ The default Scala version supported is 2.11. To build for Scala 2.12 version, bu
mvn clean package -DskipTests -Dscala-2.12
```
-### Build with Spark 3.0.0
+### Build with Spark 3
-The default Spark version supported is 2.4.4. To build for Spark 3.0.0 version, build using `spark3` profile
+The default Spark version supported is 2.4.4. To build for different Spark 3 versions, use the corresponding profile
```
+# Build against Spark 3.2.0 (default build shipped with the public jars)
mvn clean package -DskipTests -Dspark3
+
+# Build against Spark 3.1.2
+mvn clean package -DskipTests -Dspark3.1.x
+
+# Build against Spark 3.0.3
+mvn clean package -DskipTests -Dspark3.0.x
```
### Build without spark-avro module
diff --git a/azure-pipelines.yml b/azure-pipelines.yml
index ef68cd52e468..cd75e28dae24 100644
--- a/azure-pipelines.yml
+++ b/azure-pipelines.yml
@@ -26,13 +26,14 @@ variables:
MAVEN_OPTS: '-Dmaven.repo.local=$(MAVEN_CACHE_FOLDER) -Dcheckstyle.skip=true -Drat.skip=true -Djacoco.skip=true'
SPARK_VERSION: '2.4.4'
HADOOP_VERSION: '2.7'
- SPARK_HOME: $(Pipeline.Workspace)/spark-$(SPARK_VERSION)-bin-hadoop$(HADOOP_VERSION)
+ SPARK_ARCHIVE: spark-$(SPARK_VERSION)-bin-hadoop$(HADOOP_VERSION)
stages:
- stage: test
jobs:
- job: UT_FT_1
displayName: UT FT common & flink & UT client/spark-client
+ timeoutInMinutes: '90'
steps:
- task: Cache@2
displayName: set cache
@@ -47,7 +48,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'install'
- options: -DskipTests
+ options: -T 2.5C -DskipTests
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
@@ -71,6 +72,7 @@ stages:
mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
- job: UT_FT_2
displayName: FT client/spark-client
+ timeoutInMinutes: '90'
steps:
- task: Cache@2
displayName: set cache
@@ -85,7 +87,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'install'
- options: -DskipTests
+ options: -T 2.5C -DskipTests
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
@@ -99,7 +101,8 @@ stages:
jdkVersionOption: '1.8'
mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
- job: UT_FT_3
- displayName: UT FT cli & utilities & sync/hive-sync
+ displayName: UT FT clients & cli & utilities & sync/hive-sync
+ timeoutInMinutes: '90'
steps:
- task: Cache@2
displayName: set cache
@@ -114,30 +117,31 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'install'
- options: -DskipTests
+ options: -T 2.5C -DskipTests
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
- task: Maven@3
- displayName: UT cli & utilities & sync/hive-sync
+ displayName: UT clients & cli & utilities & sync/hive-sync
inputs:
mavenPomFile: 'pom.xml'
goals: 'test'
- options: -Punit-tests -pl hudi-cli,hudi-utilities,hudi-sync/hudi-hive-sync
+ options: -Punit-tests -pl hudi-client/hudi-client-common,hudi-client/hudi-flink-client,hudi-client/hudi-java-client,hudi-cli,hudi-utilities,hudi-sync/hudi-hive-sync
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
- task: Maven@3
- displayName: FT cli & utilities & sync/hive-sync
+ displayName: FT clients & cli & utilities & sync/hive-sync
inputs:
mavenPomFile: 'pom.xml'
goals: 'test'
- options: -Pfunctional-tests -pl hudi-cli,hudi-utilities,hudi-sync/hudi-hive-sync
+ options: -Pfunctional-tests -pl hudi-client/hudi-client-common,hudi-client/hudi-flink-client,hudi-client/hudi-java-client,hudi-cli,hudi-utilities,hudi-sync/hudi-hive-sync
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
- job: UT_FT_4
displayName: UT FT other modules
+ timeoutInMinutes: '90'
steps:
- task: Cache@2
displayName: set cache
@@ -152,7 +156,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'install'
- options: -DskipTests
+ options: -T 2.5C -DskipTests
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
@@ -161,7 +165,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'test'
- options: -Punit-tests -pl !hudi-common,!hudi-flink,!hudi-client/hudi-spark-client,!hudi-cli,!hudi-utilities,!hudi-sync/hudi-hive-sync
+ options: -Punit-tests -pl !hudi-common,!hudi-flink,!hudi-client/hudi-spark-client,!hudi-client/hudi-client-common,!hudi-client/hudi-flink-client,!hudi-client/hudi-java-client,!hudi-cli,!hudi-utilities,!hudi-sync/hudi-hive-sync
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
@@ -170,16 +174,23 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'test'
- options: -Pfunctional-tests -pl !hudi-common,!hudi-flink,!hudi-client/hudi-spark-client,!hudi-cli,!hudi-utilities,!hudi-sync/hudi-hive-sync
+ options: -Pfunctional-tests -pl !hudi-common,!hudi-flink,!hudi-client/hudi-spark-client,!hudi-client/hudi-client-common,!hudi-client/hudi-flink-client,!hudi-client/hudi-java-client,!hudi-cli,!hudi-utilities,!hudi-sync/hudi-hive-sync
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
- job: IT
steps:
+ - task: AzureCLI@2
+ displayName: Prepare for IT
+ inputs:
+ azureSubscription: apachehudici-service-connection
+ scriptType: bash
+ scriptLocation: inlineScript
+ inlineScript: |
+ echo 'Downloading $(SPARK_ARCHIVE)'
+ az storage blob download -c ci-caches -n $(SPARK_ARCHIVE).tgz -f $(Pipeline.Workspace)/$(SPARK_ARCHIVE).tgz --account-name apachehudici
+ tar -xvf $(Pipeline.Workspace)/$(SPARK_ARCHIVE).tgz -C $(Pipeline.Workspace)/
+ mkdir /tmp/spark-events/
- script: |
- echo 'Downloading spark-$(SPARK_VERSION)-bin-hadoop$(HADOOP_VERSION)'
- wget https://archive.apache.org/dist/spark/spark-$(SPARK_VERSION)/spark-$(SPARK_VERSION)-bin-hadoop$(HADOOP_VERSION).tgz -O $(Pipeline.Workspace)/spark-$(SPARK_VERSION).tgz
- tar -xvf $(Pipeline.Workspace)/spark-$(SPARK_VERSION).tgz -C $(Pipeline.Workspace)/
- mkdir /tmp/spark-events/
mvn $(MAVEN_OPTS) -Pintegration-tests verify
displayName: IT
diff --git a/conf/hudi-defaults.conf.template b/conf/hudi-defaults.conf.template
new file mode 100644
index 000000000000..175dbaf23d73
--- /dev/null
+++ b/conf/hudi-defaults.conf.template
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Default system properties included when running Hudi jobs.
+# This is useful for setting default environmental settings.
+
+# Example:
+# hoodie.datasource.hive_sync.jdbcurl jdbc:hive2://localhost:10000
+# hoodie.datasource.hive_sync.use_jdbc true
+# hoodie.datasource.hive_sync.support_timestamp false
+# hoodie.index.type BLOOM
+# hoodie.metadata.enable false
diff --git a/doap_HUDI.rdf b/doap_HUDI.rdf
index fc474d767fba..33f64ecf82ec 100644
--- a/doap_HUDI.rdf
+++ b/doap_HUDI.rdf
@@ -76,6 +76,11 @@
2021-08-260.9.0
+
+ Apache Hudi 0.10.0
+ 2021-12-08
+ 0.10.0
+
diff --git a/docker/hoodie/hadoop/base/pom.xml b/docker/hoodie/hadoop/base/pom.xml
index 5361e90684b9..bb0ec788e0f3 100644
--- a/docker/hoodie/hadoop/base/pom.xml
+++ b/docker/hoodie/hadoop/base/pom.xml
@@ -19,7 +19,7 @@
hudi-hadoop-dockerorg.apache.hudi
- 0.10.0-SNAPSHOT
+ 0.11.0-SNAPSHOT4.0.0pom
diff --git a/docker/hoodie/hadoop/datanode/pom.xml b/docker/hoodie/hadoop/datanode/pom.xml
index 8f3f90f0559c..e9b4e4fbac6a 100644
--- a/docker/hoodie/hadoop/datanode/pom.xml
+++ b/docker/hoodie/hadoop/datanode/pom.xml
@@ -19,7 +19,7 @@
hudi-hadoop-dockerorg.apache.hudi
- 0.10.0-SNAPSHOT
+ 0.11.0-SNAPSHOT4.0.0pom
diff --git a/docker/hoodie/hadoop/historyserver/pom.xml b/docker/hoodie/hadoop/historyserver/pom.xml
index fdf41e15a537..dbdb7182f682 100644
--- a/docker/hoodie/hadoop/historyserver/pom.xml
+++ b/docker/hoodie/hadoop/historyserver/pom.xml
@@ -19,7 +19,7 @@
hudi-hadoop-dockerorg.apache.hudi
- 0.10.0-SNAPSHOT
+ 0.11.0-SNAPSHOT4.0.0pom
diff --git a/docker/hoodie/hadoop/hive_base/pom.xml b/docker/hoodie/hadoop/hive_base/pom.xml
index c7a828e5c3f1..e267e8487e79 100644
--- a/docker/hoodie/hadoop/hive_base/pom.xml
+++ b/docker/hoodie/hadoop/hive_base/pom.xml
@@ -19,7 +19,7 @@
hudi-hadoop-dockerorg.apache.hudi
- 0.10.0-SNAPSHOT
+ 0.11.0-SNAPSHOT4.0.0pom
diff --git a/docker/hoodie/hadoop/namenode/pom.xml b/docker/hoodie/hadoop/namenode/pom.xml
index a0e6af4fad3b..71998198f53c 100644
--- a/docker/hoodie/hadoop/namenode/pom.xml
+++ b/docker/hoodie/hadoop/namenode/pom.xml
@@ -19,7 +19,7 @@
hudi-hadoop-dockerorg.apache.hudi
- 0.10.0-SNAPSHOT
+ 0.11.0-SNAPSHOT4.0.0pom
diff --git a/docker/hoodie/hadoop/pom.xml b/docker/hoodie/hadoop/pom.xml
index 443badc11161..db47f3924e4b 100644
--- a/docker/hoodie/hadoop/pom.xml
+++ b/docker/hoodie/hadoop/pom.xml
@@ -19,7 +19,7 @@
hudiorg.apache.hudi
- 0.10.0-SNAPSHOT
+ 0.11.0-SNAPSHOT../../../pom.xml4.0.0
@@ -54,7 +54,7 @@
2.3.32.8.40.217
- 1.4.3
+ 1.4.13true${project.parent.basedir}
diff --git a/docker/hoodie/hadoop/prestobase/pom.xml b/docker/hoodie/hadoop/prestobase/pom.xml
index 3d53485654dd..98502aa0fe2f 100644
--- a/docker/hoodie/hadoop/prestobase/pom.xml
+++ b/docker/hoodie/hadoop/prestobase/pom.xml
@@ -20,7 +20,7 @@
hudi-hadoop-dockerorg.apache.hudi
- 0.10.0-SNAPSHOT
+ 0.11.0-SNAPSHOT4.0.0pom
diff --git a/docker/hoodie/hadoop/spark_base/pom.xml b/docker/hoodie/hadoop/spark_base/pom.xml
index b7c49e34a516..8cd1ee2dea3c 100644
--- a/docker/hoodie/hadoop/spark_base/pom.xml
+++ b/docker/hoodie/hadoop/spark_base/pom.xml
@@ -19,7 +19,7 @@
hudi-hadoop-dockerorg.apache.hudi
- 0.10.0-SNAPSHOT
+ 0.11.0-SNAPSHOT4.0.0pom
diff --git a/docker/hoodie/hadoop/sparkadhoc/pom.xml b/docker/hoodie/hadoop/sparkadhoc/pom.xml
index 2da40edf1fa4..20a9cab164c3 100644
--- a/docker/hoodie/hadoop/sparkadhoc/pom.xml
+++ b/docker/hoodie/hadoop/sparkadhoc/pom.xml
@@ -19,7 +19,7 @@
hudi-hadoop-dockerorg.apache.hudi
- 0.10.0-SNAPSHOT
+ 0.11.0-SNAPSHOT4.0.0pom
diff --git a/docker/hoodie/hadoop/sparkmaster/pom.xml b/docker/hoodie/hadoop/sparkmaster/pom.xml
index b000eb675282..3a8dabc4afc3 100644
--- a/docker/hoodie/hadoop/sparkmaster/pom.xml
+++ b/docker/hoodie/hadoop/sparkmaster/pom.xml
@@ -19,7 +19,7 @@
hudi-hadoop-dockerorg.apache.hudi
- 0.10.0-SNAPSHOT
+ 0.11.0-SNAPSHOT4.0.0pom
diff --git a/docker/hoodie/hadoop/sparkworker/pom.xml b/docker/hoodie/hadoop/sparkworker/pom.xml
index cd44580c7ea1..6a79f8debc16 100644
--- a/docker/hoodie/hadoop/sparkworker/pom.xml
+++ b/docker/hoodie/hadoop/sparkworker/pom.xml
@@ -19,7 +19,7 @@
hudi-hadoop-dockerorg.apache.hudi
- 0.10.0-SNAPSHOT
+ 0.11.0-SNAPSHOT4.0.0pom
diff --git a/hudi-aws/pom.xml b/hudi-aws/pom.xml
new file mode 100644
index 000000000000..d44a389a61f6
--- /dev/null
+++ b/hudi-aws/pom.xml
@@ -0,0 +1,208 @@
+
+
+
+
+ hudi
+ org.apache.hudi
+ 0.11.0-SNAPSHOT
+
+ 4.0.0
+
+ hudi-aws
+ 0.11.0-SNAPSHOT
+
+ hudi-aws
+ jar
+
+
+ 1.15.0
+
+
+
+
+
+ org.apache.hudi
+ hudi-common
+ ${project.version}
+
+
+
+
+ log4j
+ log4j
+
+
+
+
+ org.apache.hadoop
+ hadoop-common
+ tests
+ test
+
+
+ org.mortbay.jetty
+ *
+
+
+ javax.servlet.jsp
+ *
+
+
+ javax.servlet
+ *
+
+
+
+
+
+ com.amazonaws
+ dynamodb-lock-client
+ ${dynamodb.lockclient.version}
+
+
+
+
+ com.amazonaws
+ aws-java-sdk-cloudwatch
+ ${aws.sdk.version}
+
+
+
+ com.amazonaws
+ aws-java-sdk-dynamodb
+ ${aws.sdk.version}
+
+
+ io.netty
+ *
+
+
+
+
+ com.amazonaws
+ aws-java-sdk-core
+ ${aws.sdk.version}
+
+
+
+ io.dropwizard.metrics
+ metrics-core
+
+
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ test
+
+
+
+ org.mockito
+ mockito-junit-jupiter
+ test
+
+
+
+
+
+
+ org.jacoco
+ jacoco-maven-plugin
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+ compile
+
+ compile
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ test-jar
+
+ test-compile
+
+
+
+ false
+
+
+
+ io.fabric8
+ docker-maven-plugin
+
+
+ prepare-it-database
+ pre-integration-test
+
+ start
+
+
+
+
+ amazon/dynamodb-local:${dynamodb-local.version}
+ it-database
+
+
+ ${dynamodb-local.port}:${dynamodb-local.port}
+
+
+
+ ${dynamodb-local.endpoint}/shell/
+
+
+
+
+
+
+
+
+
+ remove-it-database
+ post-integration-test
+
+ stop
+
+
+
+
+
+ org.apache.rat
+ apache-rat-plugin
+
+
+
+
+
+ src/main/resources
+
+
+ src/test/resources
+
+
+
+
diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/cloudwatch/CloudWatchReporter.java b/hudi-aws/src/main/java/org/apache/hudi/aws/cloudwatch/CloudWatchReporter.java
new file mode 100644
index 000000000000..e4bc598ce293
--- /dev/null
+++ b/hudi-aws/src/main/java/org/apache/hudi/aws/cloudwatch/CloudWatchReporter.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.aws.cloudwatch;
+
+import org.apache.hudi.aws.credentials.HoodieAWSCredentialsProviderFactory;
+import org.apache.hudi.common.util.Option;
+
+import com.amazonaws.services.cloudwatch.AmazonCloudWatchAsync;
+import com.amazonaws.services.cloudwatch.AmazonCloudWatchAsyncClientBuilder;
+import com.amazonaws.services.cloudwatch.model.Dimension;
+import com.amazonaws.services.cloudwatch.model.MetricDatum;
+import com.amazonaws.services.cloudwatch.model.PutMetricDataRequest;
+import com.amazonaws.services.cloudwatch.model.PutMetricDataResult;
+import com.amazonaws.services.cloudwatch.model.StandardUnit;
+import com.codahale.metrics.Clock;
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Counting;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.Timer;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.SortedMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A reporter for publishing metrics to Amazon CloudWatch. It is responsible for collecting, converting DropWizard
+ * metrics to CloudWatch metrics and composing metrics payload.
+ */
+public class CloudWatchReporter extends ScheduledReporter {
+
+ static final String DIMENSION_TABLE_NAME_KEY = "Table";
+ static final String DIMENSION_METRIC_TYPE_KEY = "Metric Type";
+ static final String DIMENSION_GAUGE_TYPE_VALUE = "gauge";
+ static final String DIMENSION_COUNT_TYPE_VALUE = "count";
+
+ private static final Logger LOG = LogManager.getLogger(CloudWatchReporter.class);
+
+ private final AmazonCloudWatchAsync cloudWatchClientAsync;
+ private final Clock clock;
+ private final String prefix;
+ private final String namespace;
+ private final int maxDatumsPerRequest;
+
+ public static Builder forRegistry(MetricRegistry registry) {
+ return new Builder(registry);
+ }
+
+ public static class Builder {
+ private MetricRegistry registry;
+ private Clock clock;
+ private String prefix;
+ private TimeUnit rateUnit;
+ private TimeUnit durationUnit;
+ private MetricFilter filter;
+ private String namespace;
+ private int maxDatumsPerRequest;
+
+ private Builder(MetricRegistry registry) {
+ this.registry = registry;
+ this.clock = Clock.defaultClock();
+ this.rateUnit = TimeUnit.SECONDS;
+ this.durationUnit = TimeUnit.MILLISECONDS;
+ this.filter = MetricFilter.ALL;
+ this.maxDatumsPerRequest = 20;
+ }
+
+ public Builder withClock(Clock clock) {
+ this.clock = clock;
+ return this;
+ }
+
+ public Builder prefixedWith(String prefix) {
+ this.prefix = prefix;
+ return this;
+ }
+
+ public Builder convertRatesTo(TimeUnit rateUnit) {
+ this.rateUnit = rateUnit;
+ return this;
+ }
+
+ public Builder convertDurationsTo(TimeUnit durationUnit) {
+ this.durationUnit = durationUnit;
+ return this;
+ }
+
+ public Builder filter(MetricFilter filter) {
+ this.filter = filter;
+ return this;
+ }
+
+ public Builder namespace(String namespace) {
+ this.namespace = namespace;
+ return this;
+ }
+
+ public Builder maxDatumsPerRequest(int maxDatumsPerRequest) {
+ this.maxDatumsPerRequest = maxDatumsPerRequest;
+ return this;
+ }
+
+ public CloudWatchReporter build(Properties props) {
+ return new CloudWatchReporter(registry,
+ getAmazonCloudWatchClient(props),
+ clock,
+ prefix,
+ namespace,
+ maxDatumsPerRequest,
+ filter,
+ rateUnit,
+ durationUnit);
+ }
+
+ CloudWatchReporter build(AmazonCloudWatchAsync amazonCloudWatchAsync) {
+ return new CloudWatchReporter(registry,
+ amazonCloudWatchAsync,
+ clock,
+ prefix,
+ namespace,
+ maxDatumsPerRequest,
+ filter,
+ rateUnit,
+ durationUnit);
+ }
+ }
+
+ protected CloudWatchReporter(MetricRegistry registry,
+ AmazonCloudWatchAsync cloudWatchClientAsync,
+ Clock clock,
+ String prefix,
+ String namespace,
+ int maxDatumsPerRequest,
+ MetricFilter filter,
+ TimeUnit rateUnit,
+ TimeUnit durationUnit) {
+ super(registry, "hudi-cloudWatch-reporter", filter, rateUnit, durationUnit);
+ this.cloudWatchClientAsync = cloudWatchClientAsync;
+ this.clock = clock;
+ this.prefix = prefix;
+ this.namespace = namespace;
+ this.maxDatumsPerRequest = maxDatumsPerRequest;
+ }
+
+ private static AmazonCloudWatchAsync getAmazonCloudWatchClient(Properties props) {
+ return AmazonCloudWatchAsyncClientBuilder.standard()
+ .withCredentials(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(props))
+ .build();
+ }
+
+ @Override
+ public void report(SortedMap gauges,
+ SortedMap counters,
+ SortedMap histograms,
+ SortedMap meters,
+ SortedMap timers) {
+ LOG.info("Reporting Metrics to CloudWatch.");
+
+ final long timestampMilliSec = clock.getTime();
+ List metricsData = new ArrayList<>();
+
+ for (Map.Entry entry : gauges.entrySet()) {
+ processGauge(entry.getKey(), entry.getValue(), timestampMilliSec, metricsData);
+ }
+
+ for (Map.Entry entry : counters.entrySet()) {
+ processCounter(entry.getKey(), entry.getValue(), timestampMilliSec, metricsData);
+ }
+
+ for (Map.Entry entry : histograms.entrySet()) {
+ processCounter(entry.getKey(), entry.getValue(), timestampMilliSec, metricsData);
+ //TODO: Publish other Histogram metrics to cloud watch
+ }
+
+ for (Map.Entry entry : meters.entrySet()) {
+ processCounter(entry.getKey(), entry.getValue(), timestampMilliSec, metricsData);
+ //TODO: Publish other Meter metrics to cloud watch
+ }
+
+ for (Map.Entry entry : timers.entrySet()) {
+ processCounter(entry.getKey(), entry.getValue(), timestampMilliSec, metricsData);
+ //TODO: Publish other Timer metrics to cloud watch
+ }
+
+ report(metricsData);
+ }
+
+ private void report(List metricsData) {
+ List> cloudWatchFutures = new ArrayList<>(metricsData.size());
+ List> partitions = new ArrayList<>();
+
+ for (int i = 0; i < metricsData.size(); i += maxDatumsPerRequest) {
+ int end = Math.min(metricsData.size(), i + maxDatumsPerRequest);
+ partitions.add(metricsData.subList(i, end));
+ }
+
+ for (List partition : partitions) {
+ PutMetricDataRequest request = new PutMetricDataRequest()
+ .withNamespace(namespace)
+ .withMetricData(partition);
+
+ cloudWatchFutures.add(cloudWatchClientAsync.putMetricDataAsync(request));
+ }
+
+ for (final Future cloudWatchFuture : cloudWatchFutures) {
+ try {
+ cloudWatchFuture.get(30, TimeUnit.SECONDS);
+ } catch (final Exception ex) {
+ LOG.error("Error reporting metrics to CloudWatch. The data in this CloudWatch request "
+ + "may have been discarded, and not made it to CloudWatch.", ex);
+ }
+ }
+ }
+
+ private void processGauge(final String metricName,
+ final Gauge> gauge,
+ final long timestampMilliSec,
+ final List metricData) {
+ Option.ofNullable(gauge.getValue())
+ .toJavaOptional()
+ .filter(value -> value instanceof Number)
+ .map(value -> (Number) value)
+ .ifPresent(value -> stageMetricDatum(metricName,
+ value.doubleValue(),
+ DIMENSION_GAUGE_TYPE_VALUE,
+ StandardUnit.None,
+ timestampMilliSec,
+ metricData));
+ }
+
+ private void processCounter(final String metricName,
+ final Counting counter,
+ final long timestampMilliSec,
+ final List metricData) {
+ stageMetricDatum(metricName,
+ counter.getCount(),
+ DIMENSION_COUNT_TYPE_VALUE,
+ StandardUnit.Count,
+ timestampMilliSec,
+ metricData);
+ }
+
+ private void stageMetricDatum(String metricName,
+ double metricValue,
+ String metricType,
+ StandardUnit standardUnit,
+ long timestampMilliSec,
+ List metricData) {
+ String[] metricNameParts = metricName.split("\\.", 2);
+ String tableName = metricNameParts[0];
+
+
+ metricData.add(new MetricDatum()
+ .withTimestamp(new Date(timestampMilliSec))
+ .withMetricName(prefix(metricNameParts[1]))
+ .withValue(metricValue)
+ .withDimensions(getDimensions(tableName, metricType))
+ .withUnit(standardUnit));
+ }
+
+ private List getDimensions(String tableName, String metricType) {
+ List dimensions = new ArrayList<>();
+ dimensions.add(new Dimension()
+ .withName(DIMENSION_TABLE_NAME_KEY)
+ .withValue(tableName));
+ dimensions.add(new Dimension()
+ .withName(DIMENSION_METRIC_TYPE_KEY)
+ .withValue(metricType));
+ return dimensions;
+ }
+
+ private String prefix(String... components) {
+ return MetricRegistry.name(prefix, components);
+ }
+
+ @Override
+ public void stop() {
+ try {
+ super.stop();
+ } finally {
+ try {
+ cloudWatchClientAsync.shutdown();
+ } catch (Exception ex) {
+ LOG.warn("Exception while shutting down CloudWatch client.", ex);
+ }
+ }
+ }
+}
diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieAWSCredentialsProviderFactory.java b/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieAWSCredentialsProviderFactory.java
new file mode 100644
index 000000000000..631b0fa8d534
--- /dev/null
+++ b/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieAWSCredentialsProviderFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.aws.credentials;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSCredentialsProviderChain;
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Factory class for Hoodie AWSCredentialsProvider.
+ */
+public class HoodieAWSCredentialsProviderFactory {
+ public static AWSCredentialsProvider getAwsCredentialsProvider(Properties props) {
+ return getAwsCredentialsProviderChain(props);
+ }
+
+ private static AWSCredentialsProvider getAwsCredentialsProviderChain(Properties props) {
+ List providers = new ArrayList<>();
+ providers.add(new HoodieConfigAWSCredentialsProvider(props));
+ providers.add(new DefaultAWSCredentialsProviderChain());
+ AWSCredentialsProviderChain providerChain = new AWSCredentialsProviderChain(providers);
+ providerChain.setReuseLastProvider(true);
+ return providerChain;
+ }
+}
diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieConfigAWSCredentialsProvider.java b/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieConfigAWSCredentialsProvider.java
new file mode 100644
index 000000000000..4e9cf383906a
--- /dev/null
+++ b/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieConfigAWSCredentialsProvider.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.aws.credentials;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.auth.BasicSessionCredentials;
+import org.apache.hudi.config.HoodieAWSConfig;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.Properties;
+
+/**
+ * Credentials provider which fetches AWS access key from Hoodie config.
+ */
+public class HoodieConfigAWSCredentialsProvider implements AWSCredentialsProvider {
+
+ private static final Logger LOG = LogManager.getLogger(HoodieConfigAWSCredentialsProvider.class);
+
+ private AWSCredentials awsCredentials;
+
+ public HoodieConfigAWSCredentialsProvider(Properties props) {
+ String accessKey = props.getProperty(HoodieAWSConfig.AWS_ACCESS_KEY.key());
+ String secretKey = props.getProperty(HoodieAWSConfig.AWS_SECRET_KEY.key());
+ String sessionToken = props.getProperty(HoodieAWSConfig.AWS_SESSION_TOKEN.key());
+
+ if (StringUtils.isNullOrEmpty(accessKey) || StringUtils.isNullOrEmpty(secretKey)) {
+ LOG.debug("AWS access key or secret key not found in the Hudi configuration. "
+ + "Use default AWS credentials");
+ } else {
+ this.awsCredentials = createCredentials(accessKey, secretKey, sessionToken);
+ }
+ }
+
+ private static AWSCredentials createCredentials(String accessKey, String secretKey,
+ String sessionToken) {
+ return (sessionToken == null)
+ ? new BasicAWSCredentials(accessKey, secretKey)
+ : new BasicSessionCredentials(accessKey, secretKey, sessionToken);
+ }
+
+ @Override
+ public AWSCredentials getCredentials() {
+ return this.awsCredentials;
+ }
+
+ @Override
+ public void refresh() {
+
+ }
+}
diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java
new file mode 100644
index 000000000000..57348bea426c
--- /dev/null
+++ b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.aws.transaction.lock;
+
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.regions.RegionUtils;
+import com.amazonaws.services.dynamodbv2.AcquireLockOptions;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions;
+import com.amazonaws.services.dynamodbv2.LockItem;
+import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
+import com.amazonaws.services.dynamodbv2.model.BillingMode;
+import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
+import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
+import com.amazonaws.services.dynamodbv2.model.KeyType;
+import com.amazonaws.services.dynamodbv2.model.LockNotGrantedException;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
+import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
+import com.amazonaws.services.dynamodbv2.util.TableUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.aws.credentials.HoodieAWSCredentialsProviderFactory;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.DynamoDbBasedLockConfig;
+import org.apache.hudi.exception.HoodieLockException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
+
+/**
+ * A DynamoDB based lock. This {@link LockProvider} implementation allows to lock table operations
+ * using DynamoDB. Users need to have access to AWS DynamoDB to be able to use this lock.
+ */
+@NotThreadSafe
+public class DynamoDBBasedLockProvider implements LockProvider {
+
+ private static final Logger LOG = LogManager.getLogger(DynamoDBBasedLockProvider.class);
+
+ private static final String DYNAMODB_ATTRIBUTE_NAME = "key";
+
+ private final AmazonDynamoDBLockClient client;
+ private final String tableName;
+ private final String dynamoDBPartitionKey;
+ protected LockConfiguration lockConfiguration;
+ private volatile LockItem lock;
+
+ public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration, final Configuration conf) {
+ this(lockConfiguration, conf, null);
+ }
+
+ public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration, final Configuration conf, AmazonDynamoDB dynamoDB) {
+ checkRequiredProps(lockConfiguration);
+ this.lockConfiguration = lockConfiguration;
+ this.tableName = lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key());
+ this.dynamoDBPartitionKey = lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY.key());
+ long leaseDuration = Long.parseLong(lockConfiguration.getConfig().getString(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY));
+ if (dynamoDB == null) {
+ dynamoDB = getDynamoDBClient();
+ }
+ // build the dynamoDb lock client
+ this.client = new AmazonDynamoDBLockClient(
+ AmazonDynamoDBLockClientOptions.builder(dynamoDB, tableName)
+ .withTimeUnit(TimeUnit.MILLISECONDS)
+ .withLeaseDuration(leaseDuration)
+ .withHeartbeatPeriod(leaseDuration / 3)
+ .withCreateHeartbeatBackgroundThread(true)
+ .build());
+
+ if (!this.client.lockTableExists()) {
+ createLockTableInDynamoDB(dynamoDB, tableName);
+ }
+ }
+
+ @Override
+ public boolean tryLock(long time, TimeUnit unit) {
+ LOG.info(generateLogStatement(LockState.ACQUIRING, generateLogSuffixString()));
+ try {
+ lock = client.acquireLock(AcquireLockOptions.builder(dynamoDBPartitionKey)
+ .withAdditionalTimeToWaitForLock(time)
+ .withTimeUnit(TimeUnit.MILLISECONDS)
+ .build());
+ LOG.info(generateLogStatement(LockState.ACQUIRED, generateLogSuffixString()));
+ } catch (InterruptedException e) {
+ throw new HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE, generateLogSuffixString()), e);
+ } catch (LockNotGrantedException e) {
+ return false;
+ }
+ return lock != null && !lock.isExpired();
+ }
+
+ @Override
+ public void unlock() {
+ try {
+ LOG.info(generateLogStatement(LockState.RELEASING, generateLogSuffixString()));
+ if (lock == null) {
+ return;
+ }
+ if (!client.releaseLock(lock)) {
+ LOG.warn("The lock has already been stolen");
+ }
+ lock = null;
+ LOG.info(generateLogStatement(LockState.RELEASED, generateLogSuffixString()));
+ } catch (Exception e) {
+ throw new HoodieLockException(generateLogStatement(LockState.FAILED_TO_RELEASE, generateLogSuffixString()), e);
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (lock != null) {
+ if (!client.releaseLock(lock)) {
+ LOG.warn("The lock has already been stolen");
+ }
+ lock = null;
+ }
+ this.client.close();
+ } catch (Exception e) {
+ LOG.error(generateLogStatement(LockState.FAILED_TO_RELEASE, generateLogSuffixString()));
+ }
+ }
+
+ @Override
+ public LockItem getLock() {
+ return lock;
+ }
+
+ private AmazonDynamoDB getDynamoDBClient() {
+ String region = this.lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION.key());
+ String endpointURL = this.lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_ENDPOINT_URL.key()) == null
+ ? RegionUtils.getRegion(region).getServiceEndpoint(AmazonDynamoDB.ENDPOINT_PREFIX)
+ : this.lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_ENDPOINT_URL.key());
+ AwsClientBuilder.EndpointConfiguration dynamodbEndpoint =
+ new AwsClientBuilder.EndpointConfiguration(endpointURL, region);
+ return AmazonDynamoDBClientBuilder.standard()
+ .withEndpointConfiguration(dynamodbEndpoint)
+ .withCredentials(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(lockConfiguration.getConfig()))
+ .build();
+ }
+
+ private void createLockTableInDynamoDB(AmazonDynamoDB dynamoDB, String tableName) {
+ String billingMode = lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE.key());
+ KeySchemaElement partitionKeyElement = new KeySchemaElement();
+ partitionKeyElement.setAttributeName(DYNAMODB_ATTRIBUTE_NAME);
+ partitionKeyElement.setKeyType(KeyType.HASH);
+
+ List keySchema = new ArrayList<>();
+ keySchema.add(partitionKeyElement);
+
+ Collection attributeDefinitions = new ArrayList<>();
+ attributeDefinitions.add(new AttributeDefinition().withAttributeName(DYNAMODB_ATTRIBUTE_NAME).withAttributeType(ScalarAttributeType.S));
+
+ CreateTableRequest createTableRequest = new CreateTableRequest(tableName, keySchema);
+ createTableRequest.setAttributeDefinitions(attributeDefinitions);
+ createTableRequest.setBillingMode(billingMode);
+ if (billingMode.equals(BillingMode.PROVISIONED.name())) {
+ createTableRequest.setProvisionedThroughput(new ProvisionedThroughput()
+ .withReadCapacityUnits(Long.parseLong(lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_READ_CAPACITY.key())))
+ .withWriteCapacityUnits(Long.parseLong(lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_WRITE_CAPACITY.key()))));
+ }
+ dynamoDB.createTable(createTableRequest);
+
+ LOG.info("Creating dynamoDB table " + tableName + ", waiting for table to be active");
+ try {
+ TableUtils.waitUntilActive(dynamoDB, tableName, Integer.parseInt(lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT.key())), 20 * 1000);
+ } catch (TableUtils.TableNeverTransitionedToStateException e) {
+ throw new HoodieLockException("Created dynamoDB table never transits to active", e);
+ } catch (InterruptedException e) {
+ throw new HoodieLockException("Thread interrupted while waiting for dynamoDB table to turn active", e);
+ }
+ LOG.info("Created dynamoDB table " + tableName);
+ }
+
+ private void checkRequiredProps(final LockConfiguration config) {
+ ValidationUtils.checkArgument(config.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key()) != null);
+ ValidationUtils.checkArgument(config.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION.key()) != null);
+ ValidationUtils.checkArgument(config.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY.key()) != null);
+ config.getConfig().putIfAbsent(DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE.key(), BillingMode.PAY_PER_REQUEST.name());
+ config.getConfig().putIfAbsent(DynamoDbBasedLockConfig.DYNAMODB_LOCK_READ_CAPACITY.key(), "20");
+ config.getConfig().putIfAbsent(DynamoDbBasedLockConfig.DYNAMODB_LOCK_WRITE_CAPACITY.key(), "10");
+ config.getConfig().putIfAbsent(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT.key(), "600000");
+ }
+
+ private String generateLogSuffixString() {
+ return StringUtils.join("DynamoDb table = ", tableName, ", partition key = ", dynamoDBPartitionKey);
+ }
+
+ protected String generateLogStatement(LockState state, String suffix) {
+ return StringUtils.join(state.name(), " lock at ", suffix);
+ }
+}
diff --git a/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java b/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java
new file mode 100644
index 000000000000..19c63ea78655
--- /dev/null
+++ b/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.Option;
+
+import com.amazonaws.regions.RegionUtils;
+import com.amazonaws.services.dynamodbv2.model.BillingMode;
+
+import static org.apache.hudi.common.config.LockConfiguration.LOCK_PREFIX;
+
+/**
+ * Hoodie Configs for Locks.
+ */
+@ConfigClassProperty(name = "DynamoDB based Locks Configurations",
+ groupName = ConfigGroups.Names.WRITE_CLIENT,
+ description = "Configs that control DynamoDB based locking mechanisms required for concurrency control "
+ + " between writers to a Hudi table. Concurrency between Hudi's own table services "
+ + " are auto managed internally.")
+public class DynamoDbBasedLockConfig extends HoodieConfig {
+
+ // configs for DynamoDb based locks
+ public static final String DYNAMODB_BASED_LOCK_PROPERTY_PREFIX = LOCK_PREFIX + "dynamodb.";
+
+ public static final ConfigProperty DYNAMODB_LOCK_TABLE_NAME = ConfigProperty
+ .key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "table")
+ .noDefaultValue()
+ .sinceVersion("0.10.0")
+ .withDocumentation("For DynamoDB based lock provider, the name of the DynamoDB table acting as lock table");
+
+ public static final ConfigProperty DYNAMODB_LOCK_PARTITION_KEY = ConfigProperty
+ .key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "partition_key")
+ .noDefaultValue()
+ .sinceVersion("0.10.0")
+ .withInferFunction(cfg -> {
+ if (cfg.contains(HoodieTableConfig.NAME)) {
+ return Option.of(cfg.getString(HoodieTableConfig.NAME));
+ }
+ return Option.empty();
+ })
+ .withDocumentation("For DynamoDB based lock provider, the partition key for the DynamoDB lock table. "
+ + "Each Hudi dataset should has it's unique key so concurrent writers could refer to the same partition key."
+ + " By default we use the Hudi table name specified to be the partition key");
+
+ public static final ConfigProperty DYNAMODB_LOCK_REGION = ConfigProperty
+ .key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "region")
+ .defaultValue("us-east-1")
+ .sinceVersion("0.10.0")
+ .withInferFunction(cfg -> {
+ String regionFromEnv = System.getenv("AWS_REGION");
+ if (regionFromEnv != null) {
+ return Option.of(RegionUtils.getRegion(regionFromEnv).getName());
+ }
+ return Option.empty();
+ })
+ .withDocumentation("For DynamoDB based lock provider, the region used in endpoint for Amazon DynamoDB service."
+ + " Would try to first get it from AWS_REGION environment variable. If not find, by default use us-east-1");
+
+ public static final ConfigProperty DYNAMODB_LOCK_BILLING_MODE = ConfigProperty
+ .key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "billing_mode")
+ .defaultValue(BillingMode.PAY_PER_REQUEST.name())
+ .sinceVersion("0.10.0")
+ .withDocumentation("For DynamoDB based lock provider, by default it is PAY_PER_REQUEST mode");
+
+ public static final ConfigProperty DYNAMODB_LOCK_READ_CAPACITY = ConfigProperty
+ .key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "read_capacity")
+ .defaultValue("20")
+ .sinceVersion("0.10.0")
+ .withDocumentation("For DynamoDB based lock provider, read capacity units when using PROVISIONED billing mode");
+
+ public static final ConfigProperty DYNAMODB_LOCK_WRITE_CAPACITY = ConfigProperty
+ .key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "write_capacity")
+ .defaultValue("10")
+ .sinceVersion("0.10.0")
+ .withDocumentation("For DynamoDB based lock provider, write capacity units when using PROVISIONED billing mode");
+
+ public static final ConfigProperty DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT = ConfigProperty
+ .key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "table_creation_timeout")
+ .defaultValue(String.valueOf(10 * 60 * 1000))
+ .sinceVersion("0.10.0")
+ .withDocumentation("For DynamoDB based lock provider, the maximum number of milliseconds to wait for creating DynamoDB table");
+
+ public static final ConfigProperty DYNAMODB_ENDPOINT_URL = ConfigProperty
+ .key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "endpoint_url")
+ .defaultValue("us-east-1")
+ .sinceVersion("0.11.0")
+ .withDocumentation("For DynamoDB based lock provider, the url endpoint used for Amazon DynamoDB service."
+ + " Useful for development with a local dynamodb instance.");
+}
diff --git a/hudi-aws/src/main/java/org/apache/hudi/config/HoodieAWSConfig.java b/hudi-aws/src/main/java/org/apache/hudi/config/HoodieAWSConfig.java
new file mode 100644
index 000000000000..623704232e41
--- /dev/null
+++ b/hudi-aws/src/main/java/org/apache/hudi/config/HoodieAWSConfig.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+import javax.annotation.concurrent.Immutable;
+
+import static org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE;
+import static org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY;
+import static org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_READ_CAPACITY;
+import static org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION;
+import static org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME;
+import static org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_WRITE_CAPACITY;
+
+/**
+ * Configurations used by the AWS credentials and AWS DynamoDB based lock.
+ */
+@Immutable
+@ConfigClassProperty(name = "Amazon Web Services Configs",
+ groupName = ConfigGroups.Names.AWS,
+ description = "Amazon Web Services configurations to access resources like Amazon DynamoDB (for locks),"
+ + " Amazon CloudWatch (metrics).")
+public class HoodieAWSConfig extends HoodieConfig {
+ public static final ConfigProperty AWS_ACCESS_KEY = ConfigProperty
+ .key("hoodie.aws.access.key")
+ .noDefaultValue()
+ .sinceVersion("0.10.0")
+ .withDocumentation("AWS access key id");
+
+ public static final ConfigProperty AWS_SECRET_KEY = ConfigProperty
+ .key("hoodie.aws.secret.key")
+ .noDefaultValue()
+ .sinceVersion("0.10.0")
+ .withDocumentation("AWS secret key");
+
+ public static final ConfigProperty AWS_SESSION_TOKEN = ConfigProperty
+ .key("hoodie.aws.session.token")
+ .noDefaultValue()
+ .sinceVersion("0.10.0")
+ .withDocumentation("AWS session token");
+
+ private HoodieAWSConfig() {
+ super();
+ }
+
+ public static HoodieAWSConfig.Builder newBuilder() {
+ return new HoodieAWSConfig.Builder();
+ }
+
+ public String getAWSAccessKey() {
+ return getString(AWS_ACCESS_KEY);
+ }
+
+ public String getAWSSecretKey() {
+ return getString(AWS_SECRET_KEY);
+ }
+
+ public String getAWSSessionToken() {
+ return getString(AWS_SESSION_TOKEN);
+ }
+
+ public static class Builder {
+
+ private final HoodieAWSConfig awsConfig = new HoodieAWSConfig();
+
+ public HoodieAWSConfig.Builder fromFile(File propertiesFile) throws IOException {
+ try (FileReader reader = new FileReader(propertiesFile)) {
+ this.awsConfig.getProps().load(reader);
+ return this;
+ }
+ }
+
+ public HoodieAWSConfig.Builder fromProperties(Properties props) {
+ this.awsConfig.getProps().putAll(props);
+ return this;
+ }
+
+ public HoodieAWSConfig.Builder withAccessKey(String accessKey) {
+ awsConfig.setValue(AWS_ACCESS_KEY, accessKey);
+ return this;
+ }
+
+ public HoodieAWSConfig.Builder withSecretKey(String secretKey) {
+ awsConfig.setValue(AWS_SECRET_KEY, secretKey);
+ return this;
+ }
+
+ public HoodieAWSConfig.Builder withSessionToken(String sessionToken) {
+ awsConfig.setValue(AWS_SESSION_TOKEN, sessionToken);
+ return this;
+ }
+
+ public Builder withDynamoDBTable(String dynamoDbTableName) {
+ awsConfig.setValue(DYNAMODB_LOCK_TABLE_NAME, dynamoDbTableName);
+ return this;
+ }
+
+ public Builder withDynamoDBPartitionKey(String partitionKey) {
+ awsConfig.setValue(DYNAMODB_LOCK_PARTITION_KEY, partitionKey);
+ return this;
+ }
+
+ public Builder withDynamoDBRegion(String region) {
+ awsConfig.setValue(DYNAMODB_LOCK_REGION, region);
+ return this;
+ }
+
+ public Builder withDynamoDBBillingMode(String mode) {
+ awsConfig.setValue(DYNAMODB_LOCK_BILLING_MODE, mode);
+ return this;
+ }
+
+ public Builder withDynamoDBReadCapacity(String capacity) {
+ awsConfig.setValue(DYNAMODB_LOCK_READ_CAPACITY, capacity);
+ return this;
+ }
+
+ public Builder withDynamoDBWriteCapacity(String capacity) {
+ awsConfig.setValue(DYNAMODB_LOCK_WRITE_CAPACITY, capacity);
+ return this;
+ }
+
+ public HoodieAWSConfig build() {
+ awsConfig.setDefaults(HoodieAWSConfig.class.getName());
+ return awsConfig;
+ }
+ }
+}
diff --git a/hudi-aws/src/main/java/org/apache/hudi/config/HoodieMetricsCloudWatchConfig.java b/hudi-aws/src/main/java/org/apache/hudi/config/HoodieMetricsCloudWatchConfig.java
new file mode 100644
index 000000000000..e4e46d5a1f7b
--- /dev/null
+++ b/hudi-aws/src/main/java/org/apache/hudi/config/HoodieMetricsCloudWatchConfig.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import java.util.Properties;
+
+@ConfigClassProperty(
+ name = "Metrics Configurations for Amazon CloudWatch",
+ groupName = ConfigGroups.Names.METRICS,
+ description =
+ "Enables reporting on Hudi metrics using Amazon CloudWatch. "
+ + " Hudi publishes metrics on every commit, clean, rollback etc.")
+public class HoodieMetricsCloudWatchConfig extends HoodieConfig {
+
+ public static final String CLOUDWATCH_PREFIX = "hoodie.metrics.cloudwatch";
+
+ public static final ConfigProperty REPORT_PERIOD_SECONDS = ConfigProperty
+ .key(CLOUDWATCH_PREFIX + ".report.period.seconds")
+ .defaultValue(60)
+ .sinceVersion("0.10.0")
+ .withDocumentation("Reporting interval in seconds");
+
+ public static final ConfigProperty METRIC_PREFIX = ConfigProperty
+ .key(CLOUDWATCH_PREFIX + ".metric.prefix")
+ .defaultValue("")
+ .sinceVersion("0.10.0")
+ .withDocumentation("Metric prefix of reporter");
+
+ public static final ConfigProperty METRIC_NAMESPACE = ConfigProperty
+ .key(CLOUDWATCH_PREFIX + ".namespace")
+ .defaultValue("Hudi")
+ .sinceVersion("0.10.0")
+ .withDocumentation("Namespace of reporter");
+ /*
+ Amazon CloudWatch allows a maximum of 20 metrics per request. Choosing this as the default maximum.
+ Reference: https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_PutMetricData.html
+ */
+ public static final ConfigProperty MAX_DATUMS_PER_REQUEST =
+ ConfigProperty.key(CLOUDWATCH_PREFIX + ".maxDatumsPerRequest")
+ .defaultValue(20)
+ .sinceVersion("0.10.0")
+ .withDocumentation("Max number of Datums per request");
+
+ public HoodieMetricsCloudWatchConfig() {
+ super();
+ }
+
+ public static HoodieMetricsCloudWatchConfig.Builder newBuilder() {
+ return new HoodieMetricsCloudWatchConfig.Builder();
+ }
+
+ public static class Builder {
+
+ private HoodieMetricsCloudWatchConfig hoodieMetricsCloudWatchConfig = new HoodieMetricsCloudWatchConfig();
+
+ public HoodieMetricsCloudWatchConfig.Builder fromProperties(Properties props) {
+ this.hoodieMetricsCloudWatchConfig.getProps().putAll(props);
+ return this;
+ }
+
+ public HoodieMetricsCloudWatchConfig build() {
+ hoodieMetricsCloudWatchConfig.setDefaults(HoodieMetricsCloudWatchConfig.class.getName());
+ return hoodieMetricsCloudWatchConfig;
+ }
+ }
+}
diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/TestHoodieAWSCredentialsProviderFactory.java b/hudi-aws/src/test/java/org/apache/hudi/aws/TestHoodieAWSCredentialsProviderFactory.java
new file mode 100644
index 000000000000..051fe81e8b0f
--- /dev/null
+++ b/hudi-aws/src/test/java/org/apache/hudi/aws/TestHoodieAWSCredentialsProviderFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.aws;
+
+import com.amazonaws.auth.BasicSessionCredentials;
+import org.apache.hudi.config.HoodieAWSConfig;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestHoodieAWSCredentialsProviderFactory {
+
+ @Test
+ public void testGetAWSCredentials() {
+ HoodieConfig cfg = new HoodieConfig();
+ cfg.setValue(HoodieAWSConfig.AWS_ACCESS_KEY, "random-access-key");
+ cfg.setValue(HoodieAWSConfig.AWS_SECRET_KEY, "random-secret-key");
+ cfg.setValue(HoodieAWSConfig.AWS_SESSION_TOKEN, "random-session-token");
+ BasicSessionCredentials credentials = (BasicSessionCredentials) org.apache.hudi.aws.credentials.HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(cfg.getProps()).getCredentials();
+ assertEquals("random-access-key", credentials.getAWSAccessKeyId());
+ assertEquals("random-secret-key", credentials.getAWSSecretKey());
+ assertEquals("random-session-token", credentials.getSessionToken());
+ }
+}
diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/cloudwatch/TestCloudWatchReporter.java b/hudi-aws/src/test/java/org/apache/hudi/aws/cloudwatch/TestCloudWatchReporter.java
new file mode 100644
index 000000000000..85f551e6fda8
--- /dev/null
+++ b/hudi-aws/src/test/java/org/apache/hudi/aws/cloudwatch/TestCloudWatchReporter.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.aws.cloudwatch;
+
+import com.amazonaws.services.cloudwatch.AmazonCloudWatchAsync;
+import com.amazonaws.services.cloudwatch.model.Dimension;
+import com.amazonaws.services.cloudwatch.model.MetricDatum;
+import com.amazonaws.services.cloudwatch.model.PutMetricDataRequest;
+import com.amazonaws.services.cloudwatch.model.PutMetricDataResult;
+import com.codahale.metrics.Clock;
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.ExponentiallyDecayingReservoir;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.List;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hudi.aws.cloudwatch.CloudWatchReporter.DIMENSION_COUNT_TYPE_VALUE;
+import static org.apache.hudi.aws.cloudwatch.CloudWatchReporter.DIMENSION_GAUGE_TYPE_VALUE;
+import static org.apache.hudi.aws.cloudwatch.CloudWatchReporter.DIMENSION_METRIC_TYPE_KEY;
+import static org.apache.hudi.aws.cloudwatch.CloudWatchReporter.DIMENSION_TABLE_NAME_KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@ExtendWith(MockitoExtension.class)
+public class TestCloudWatchReporter {
+
+ private static final String NAMESPACE = "Hudi Test";
+ private static final String PREFIX = "testPrefix";
+ private static final String TABLE_NAME = "testTable";
+ private static final int MAX_DATUMS_PER_REQUEST = 2;
+
+ @Mock
+ MetricRegistry metricRegistry;
+
+ @Mock
+ AmazonCloudWatchAsync cloudWatchAsync;
+
+ @Mock
+ CompletableFuture cloudWatchFuture;
+
+ @Captor
+ ArgumentCaptor putMetricDataRequestCaptor;
+
+ CloudWatchReporter reporter;
+
+ @BeforeEach
+ public void setup() {
+ reporter = CloudWatchReporter.forRegistry(metricRegistry)
+ .namespace(NAMESPACE)
+ .prefixedWith(PREFIX)
+ .maxDatumsPerRequest(MAX_DATUMS_PER_REQUEST)
+ .withClock(Clock.defaultClock())
+ .filter(MetricFilter.ALL)
+ .convertRatesTo(TimeUnit.SECONDS)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .build(cloudWatchAsync);
+
+ Mockito.when(cloudWatchAsync.putMetricDataAsync(ArgumentMatchers.any())).thenReturn(cloudWatchFuture);
+ }
+
+ @Test
+ public void testReporter() {
+ SortedMap gauges = new TreeMap<>();
+ Gauge gauge1 = () -> 100L;
+ Gauge gauge2 = () -> 100.1;
+ gauges.put(TABLE_NAME + ".gauge1", gauge1);
+ gauges.put(TABLE_NAME + ".gauge2", gauge2);
+
+ SortedMap counters = new TreeMap<>();
+ Counter counter1 = new Counter();
+ counter1.inc(200);
+ counters.put(TABLE_NAME + ".counter1", counter1);
+
+ SortedMap histograms = new TreeMap<>();
+ Histogram histogram1 = new Histogram(new ExponentiallyDecayingReservoir());
+ histogram1.update(300);
+ histograms.put(TABLE_NAME + ".histogram1", histogram1);
+
+ SortedMap meters = new TreeMap<>();
+ Meter meter1 = new Meter();
+ meter1.mark(400);
+ meters.put(TABLE_NAME + ".meter1", meter1);
+
+ SortedMap timers = new TreeMap<>();
+ Timer timer1 = new Timer();
+ timer1.update(100, TimeUnit.SECONDS);
+ timers.put(TABLE_NAME + ".timer1", timer1);
+
+ Mockito.when(metricRegistry.getGauges(MetricFilter.ALL)).thenReturn(gauges);
+ Mockito.when(metricRegistry.getCounters(MetricFilter.ALL)).thenReturn(counters);
+ Mockito.when(metricRegistry.getHistograms(MetricFilter.ALL)).thenReturn(histograms);
+ Mockito.when(metricRegistry.getMeters(MetricFilter.ALL)).thenReturn(meters);
+ Mockito.when(metricRegistry.getTimers(MetricFilter.ALL)).thenReturn(timers);
+
+ reporter.report();
+
+ // Since there are 6 metrics in total, and max datums per request is 2 we would expect 3 calls to CloudWatch
+ // with 2 datums in each
+ Mockito.verify(cloudWatchAsync, Mockito.times(3)).putMetricDataAsync(putMetricDataRequestCaptor.capture());
+ Assertions.assertEquals(NAMESPACE, putMetricDataRequestCaptor.getValue().getNamespace());
+
+ List putMetricDataRequests = putMetricDataRequestCaptor.getAllValues();
+ putMetricDataRequests.forEach(request -> assertEquals(2, request.getMetricData().size()));
+
+ List metricDataBatch1 = putMetricDataRequests.get(0).getMetricData();
+ assertEquals(PREFIX + ".gauge1", metricDataBatch1.get(0).getMetricName());
+ assertEquals(Double.valueOf(gauge1.getValue()), metricDataBatch1.get(0).getValue());
+ assertDimensions(metricDataBatch1.get(0).getDimensions(), DIMENSION_GAUGE_TYPE_VALUE);
+
+ assertEquals(PREFIX + ".gauge2", metricDataBatch1.get(1).getMetricName());
+ assertEquals(gauge2.getValue(), metricDataBatch1.get(1).getValue());
+ assertDimensions(metricDataBatch1.get(1).getDimensions(), DIMENSION_GAUGE_TYPE_VALUE);
+
+ List metricDataBatch2 = putMetricDataRequests.get(1).getMetricData();
+ assertEquals(PREFIX + ".counter1", metricDataBatch2.get(0).getMetricName());
+ assertEquals(counter1.getCount(), metricDataBatch2.get(0).getValue().longValue());
+ assertDimensions(metricDataBatch2.get(0).getDimensions(), DIMENSION_COUNT_TYPE_VALUE);
+
+ assertEquals(PREFIX + ".histogram1", metricDataBatch2.get(1).getMetricName());
+ assertEquals(histogram1.getCount(), metricDataBatch2.get(1).getValue().longValue());
+ assertDimensions(metricDataBatch2.get(1).getDimensions(), DIMENSION_COUNT_TYPE_VALUE);
+
+ List metricDataBatch3 = putMetricDataRequests.get(2).getMetricData();
+ assertEquals(PREFIX + ".meter1", metricDataBatch3.get(0).getMetricName());
+ assertEquals(meter1.getCount(), metricDataBatch3.get(0).getValue().longValue());
+ assertDimensions(metricDataBatch3.get(0).getDimensions(), DIMENSION_COUNT_TYPE_VALUE);
+
+ assertEquals(PREFIX + ".timer1", metricDataBatch3.get(1).getMetricName());
+ assertEquals(timer1.getCount(), metricDataBatch3.get(1).getValue().longValue());
+ assertDimensions(metricDataBatch3.get(1).getDimensions(), DIMENSION_COUNT_TYPE_VALUE);
+
+ reporter.stop();
+ Mockito.verify(cloudWatchAsync).shutdown();
+ }
+
+ private void assertDimensions(List actualDimensions, String metricTypeDimensionVal) {
+ assertEquals(2, actualDimensions.size());
+
+ Dimension expectedTableNameDimension = new Dimension()
+ .withName(DIMENSION_TABLE_NAME_KEY)
+ .withValue(TABLE_NAME);
+ Dimension expectedMetricTypeDimension = new Dimension()
+ .withName(DIMENSION_METRIC_TYPE_KEY)
+ .withValue(metricTypeDimensionVal);
+
+ assertEquals(expectedTableNameDimension, actualDimensions.get(0));
+ assertEquals(expectedMetricTypeDimension, actualDimensions.get(1));
+ }
+}
\ No newline at end of file
diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/integ/ITTestDynamoDBBasedLockProvider.java b/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/integ/ITTestDynamoDBBasedLockProvider.java
new file mode 100644
index 000000000000..d2ab0375e050
--- /dev/null
+++ b/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/integ/ITTestDynamoDBBasedLockProvider.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.aws.transaction.integ;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
+import com.amazonaws.services.dynamodbv2.model.BillingMode;
+import org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.config.DynamoDbBasedLockConfig;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.UUID;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
+
+/**
+ * Test for {@link DynamoDBBasedLockProvider}.
+ * Set it as integration test because it requires setting up docker environment.
+ */
+public class ITTestDynamoDBBasedLockProvider {
+
+ private static LockConfiguration lockConfiguration;
+ private static AmazonDynamoDB dynamoDb;
+
+ private static final String TABLE_NAME_PREFIX = "testDDBTable-";
+ private static final String REGION = "us-east-2";
+
+ @BeforeAll
+ public static void setup() throws InterruptedException {
+ Properties properties = new Properties();
+ properties.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE.key(), BillingMode.PAY_PER_REQUEST.name());
+ // properties.setProperty(AWSLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX);
+ properties.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY.key(), "testKey");
+ properties.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION.key(), REGION);
+ properties.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000");
+ properties.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_READ_CAPACITY.key(), "0");
+ properties.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_WRITE_CAPACITY.key(), "0");
+ lockConfiguration = new LockConfiguration(properties);
+ dynamoDb = getDynamoClientWithLocalEndpoint();
+ }
+
+ @Test
+ public void testAcquireLock() {
+ lockConfiguration.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID());
+ DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb);
+ Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig()
+ .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
+ dynamoDbBasedLockProvider.unlock();
+ }
+
+ @Test
+ public void testUnlock() {
+ lockConfiguration.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID());
+ DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb);
+ Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig()
+ .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
+ dynamoDbBasedLockProvider.unlock();
+ Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig()
+ .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void testReentrantLock() {
+ lockConfiguration.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID());
+ DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb);
+ Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig()
+ .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
+ Assertions.assertFalse(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig()
+ .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
+ dynamoDbBasedLockProvider.unlock();
+ }
+
+ @Test
+ public void testUnlockWithoutLock() {
+ lockConfiguration.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID());
+ DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb);
+ dynamoDbBasedLockProvider.unlock();
+ }
+
+ private static AmazonDynamoDB getDynamoClientWithLocalEndpoint() {
+ String endpoint = System.getProperty("dynamodb-local.endpoint");
+ if (endpoint == null || endpoint.isEmpty()) {
+ throw new IllegalStateException("dynamodb-local.endpoint system property not set");
+ }
+ return AmazonDynamoDBClientBuilder.standard()
+ .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, REGION))
+ .withCredentials(getCredentials())
+ .build();
+ }
+
+ private static AWSCredentialsProvider getCredentials() {
+ return new AWSStaticCredentialsProvider(new BasicAWSCredentials("random-access-key", "random-secret-key"));
+ }
+}
diff --git a/hudi-aws/src/test/resources/log4j-surefire.properties b/hudi-aws/src/test/resources/log4j-surefire.properties
new file mode 100644
index 000000000000..a59d4ebe2b19
--- /dev/null
+++ b/hudi-aws/src/test/resources/log4j-surefire.properties
@@ -0,0 +1,25 @@
+###
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+###
+log4j.rootLogger=WARN, A1
+log4j.category.org.apache=INFO
+log4j.category.org.apache.parquet.hadoop=WARN
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
diff --git a/hudi-cli/pom.xml b/hudi-cli/pom.xml
index 26c0e685a931..29bdf85ab08c 100644
--- a/hudi-cli/pom.xml
+++ b/hudi-cli/pom.xml
@@ -19,7 +19,7 @@
hudiorg.apache.hudi
- 0.10.0-SNAPSHOT
+ 0.11.0-SNAPSHOT4.0.0
@@ -234,6 +234,12 @@
org.springframework.shellspring-shell${spring.shell.version}
+
+
+ com.google.guava
+ guava
+
+
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java
index 092f9270b964..9adae1daa533 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java
@@ -40,14 +40,21 @@ public class ClusteringCommand implements CommandMarker {
private static final Logger LOG = LogManager.getLogger(ClusteringCommand.class);
+ /**
+ * Schedule clustering table service.
+ *
+ * Example:
+ * > connect --path {path to hudi table}
+ * > clustering schedule --sparkMaster local --sparkMemory 2g
+ */
@CliCommand(value = "clustering schedule", help = "Schedule Clustering")
public String scheduleClustering(
- @CliOption(key = "sparkMemory", help = "Spark executor memory",
- unspecifiedDefaultValue = "1G") final String sparkMemory,
- @CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for clustering",
- unspecifiedDefaultValue = "") final String propsFilePath,
- @CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array",
- unspecifiedDefaultValue = "") final String[] configs) throws Exception {
+ @CliOption(key = "sparkMaster", unspecifiedDefaultValue = SparkUtil.DEFAULT_SPARK_MASTER, help = "Spark master") final String master,
+ @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "1g", help = "Spark executor memory") final String sparkMemory,
+ @CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations "
+ + "for hoodie client for clustering", unspecifiedDefaultValue = "") final String propsFilePath,
+ @CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can "
+ + "be passed here in the form of an array", unspecifiedDefaultValue = "") final String[] configs) throws Exception {
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
boolean initialized = HoodieCLI.initConf();
HoodieCLI.initFS(initialized);
@@ -59,8 +66,8 @@ public String scheduleClustering(
// First get a clustering instant time and pass it to spark launcher for scheduling clustering
String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();
- sparkLauncher.addAppArgs(SparkCommand.CLUSTERING_SCHEDULE.toString(), client.getBasePath(),
- client.getTableConfig().getTableName(), clusteringInstantTime, sparkMemory, propsFilePath);
+ sparkLauncher.addAppArgs(SparkCommand.CLUSTERING_SCHEDULE.toString(), master, sparkMemory,
+ client.getBasePath(), client.getTableConfig().getTableName(), clusteringInstantTime, propsFilePath);
UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
@@ -71,21 +78,25 @@ public String scheduleClustering(
return "Succeeded to schedule clustering for " + clusteringInstantTime;
}
+ /**
+ * Run clustering table service.
+ *
+ * Example:
+ * > connect --path {path to hudi table}
+ * > clustering schedule --sparkMaster local --sparkMemory 2g
+ * > clustering run --sparkMaster local --sparkMemory 2g --clusteringInstant 20211124005208
+ */
@CliCommand(value = "clustering run", help = "Run Clustering")
public String runClustering(
- @CliOption(key = "parallelism", help = "Parallelism for hoodie clustering",
- unspecifiedDefaultValue = "1") final String parallelism,
- @CliOption(key = "sparkMemory", help = "Spark executor memory",
- unspecifiedDefaultValue = "4G") final String sparkMemory,
- @CliOption(key = "retry", help = "Number of retries",
- unspecifiedDefaultValue = "1") final String retry,
- @CliOption(key = "clusteringInstant", help = "Clustering instant time",
- mandatory = true) final String clusteringInstantTime,
- @CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for compacting",
- unspecifiedDefaultValue = "") final String propsFilePath,
- @CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array",
- unspecifiedDefaultValue = "") final String[] configs
- ) throws Exception {
+ @CliOption(key = "sparkMaster", unspecifiedDefaultValue = SparkUtil.DEFAULT_SPARK_MASTER, help = "Spark master") final String master,
+ @CliOption(key = "sparkMemory", help = "Spark executor memory", unspecifiedDefaultValue = "4g") final String sparkMemory,
+ @CliOption(key = "parallelism", help = "Parallelism for hoodie clustering", unspecifiedDefaultValue = "1") final String parallelism,
+ @CliOption(key = "retry", help = "Number of retries", unspecifiedDefaultValue = "1") final String retry,
+ @CliOption(key = "clusteringInstant", help = "Clustering instant time", mandatory = true) final String clusteringInstantTime,
+ @CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for "
+ + "hoodie client for compacting", unspecifiedDefaultValue = "") final String propsFilePath,
+ @CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be "
+ + "passed here in the form of an array", unspecifiedDefaultValue = "") final String[] configs) throws Exception {
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
boolean initialized = HoodieCLI.initConf();
HoodieCLI.initFS(initialized);
@@ -93,8 +104,9 @@ public String runClustering(
String sparkPropertiesPath =
Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala());
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
- sparkLauncher.addAppArgs(SparkCommand.CLUSTERING_RUN.toString(), client.getBasePath(),
- client.getTableConfig().getTableName(), clusteringInstantTime, parallelism, sparkMemory, retry, propsFilePath);
+ sparkLauncher.addAppArgs(SparkCommand.CLUSTERING_RUN.toString(), master, sparkMemory,
+ client.getBasePath(), client.getTableConfig().getTableName(), clusteringInstantTime,
+ parallelism, retry, propsFilePath);
UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
index ac898a8e644f..94e56102dbd5 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
@@ -18,20 +18,25 @@
package org.apache.hudi.cli.commands;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.HoodiePrintHelper;
+import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliCommand;
@@ -40,25 +45,44 @@
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* CLI commands to operate on the Metadata Table.
+ *
+ *
+ * Example:
+ * The default spark.master conf is set to yarn. If you are running on a local deployment,
+ * we can set the spark master to local using set conf command.
+ * > set --conf SPARK_MASTER=local[2]
+ *
+ * Connect to the table
+ * > connect --path {path to hudi table}
+ *
+ * Run metadata commands
+ * > metadata list-partitions
*/
@Component
public class MetadataCommand implements CommandMarker {
- private JavaSparkContext jsc;
+ private static final Logger LOG = LogManager.getLogger(MetadataCommand.class);
private static String metadataBaseDirectory;
+ private JavaSparkContext jsc;
/**
* Sets the directory to store/read Metadata Table.
- *
+ *
* This can be used to store the metadata table away from the dataset directory.
- * - Useful for testing as well as for using via the HUDI CLI so that the actual dataset is not written to.
- * - Useful for testing Metadata Table performance and operations on existing datasets before enabling.
+ * - Useful for testing as well as for using via the HUDI CLI so that the actual dataset is not written to.
+ * - Useful for testing Metadata Table performance and operations on existing datasets before enabling.
*/
public static void setMetadataBaseDirectory(String metadataDir) {
ValidationUtils.checkState(metadataBaseDirectory == null,
@@ -75,8 +99,7 @@ public static String getMetadataTableBasePath(String tableBasePath) {
@CliCommand(value = "metadata set", help = "Set options for Metadata Table")
public String set(@CliOption(key = {"metadataDir"},
- help = "Directory to read/write metadata table (can be different from dataset)", unspecifiedDefaultValue = "")
- final String metadataDir) {
+ help = "Directory to read/write metadata table (can be different from dataset)", unspecifiedDefaultValue = "") final String metadataDir) {
if (!metadataDir.isEmpty()) {
setMetadataBaseDirectory(metadataDir);
}
@@ -152,16 +175,22 @@ public String stats() throws IOException {
config, HoodieCLI.basePath, "/tmp");
Map stats = metadata.stats();
- StringBuffer out = new StringBuffer("\n");
- out.append(String.format("Base path: %s\n", getMetadataTableBasePath(HoodieCLI.basePath)));
+ final List rows = new ArrayList<>();
for (Map.Entry entry : stats.entrySet()) {
- out.append(String.format("%s: %s\n", entry.getKey(), entry.getValue()));
+ Comparable[] row = new Comparable[2];
+ row[0] = entry.getKey();
+ row[1] = entry.getValue();
+ rows.add(row);
}
- return out.toString();
+ TableHeader header = new TableHeader()
+ .addTableHeaderField("stat key")
+ .addTableHeaderField("stat value");
+ return HoodiePrintHelper.print(header, new HashMap<>(), "",
+ false, Integer.MAX_VALUE, false, rows);
}
- @CliCommand(value = "metadata list-partitions", help = "Print a list of all partitions from the metadata")
+ @CliCommand(value = "metadata list-partitions", help = "List all partitions from metadata")
public String listPartitions() throws IOException {
HoodieCLI.getTableMetaClient();
initJavaSparkContext();
@@ -169,55 +198,158 @@ public String listPartitions() throws IOException {
HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(new HoodieSparkEngineContext(jsc), config,
HoodieCLI.basePath, "/tmp");
- StringBuffer out = new StringBuffer("\n");
if (!metadata.enabled()) {
- out.append("=== Metadata Table not initilized. Using file listing to get list of partitions. ===\n\n");
+ return "[ERROR] Metadata Table not enabled/initialized\n\n";
}
- long t1 = System.currentTimeMillis();
+ HoodieTimer timer = new HoodieTimer().startTimer();
List partitions = metadata.getAllPartitionPaths();
- long t2 = System.currentTimeMillis();
-
- int[] count = {0};
- partitions.stream().sorted((p1, p2) -> p2.compareTo(p1)).forEach(p -> {
- out.append(p);
- if (++count[0] % 15 == 0) {
- out.append("\n");
- } else {
- out.append(", ");
- }
- });
+ LOG.debug("Took " + timer.endTimer() + " ms");
- out.append(String.format("\n\n=== List of partitions retrieved in %.2fsec ===", (t2 - t1) / 1000.0));
+ final List rows = new ArrayList<>();
+ partitions.stream().sorted(Comparator.reverseOrder()).forEach(p -> {
+ Comparable[] row = new Comparable[1];
+ row[0] = p;
+ rows.add(row);
+ });
- return out.toString();
+ TableHeader header = new TableHeader().addTableHeaderField("partition");
+ return HoodiePrintHelper.print(header, new HashMap<>(), "",
+ false, Integer.MAX_VALUE, false, rows);
}
@CliCommand(value = "metadata list-files", help = "Print a list of all files in a partition from the metadata")
public String listFiles(
- @CliOption(key = {"partition"}, help = "Name of the partition to list files", mandatory = true)
- final String partition) throws IOException {
+ @CliOption(key = {"partition"}, help = "Name of the partition to list files", mandatory = true) final String partition) throws IOException {
HoodieCLI.getTableMetaClient();
HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().enable(true).build();
- HoodieBackedTableMetadata metaReader = new HoodieBackedTableMetadata(new HoodieLocalEngineContext(HoodieCLI.conf), config, HoodieCLI.basePath, "/tmp");
+ HoodieBackedTableMetadata metaReader = new HoodieBackedTableMetadata(
+ new HoodieLocalEngineContext(HoodieCLI.conf), config, HoodieCLI.basePath, "/tmp");
- StringBuffer out = new StringBuffer("\n");
if (!metaReader.enabled()) {
- out.append("=== Metadata Table not initialized. Using file listing to get list of files in partition. ===\n\n");
+ return "[ERROR] Metadata Table not enabled/initialized\n\n";
}
- long t1 = System.currentTimeMillis();
+ HoodieTimer timer = new HoodieTimer().startTimer();
FileStatus[] statuses = metaReader.getAllFilesInPartition(new Path(HoodieCLI.basePath, partition));
- long t2 = System.currentTimeMillis();
+ LOG.debug("Took " + timer.endTimer() + " ms");
- Arrays.stream(statuses).sorted((p1, p2) -> p2.getPath().getName().compareTo(p1.getPath().getName())).forEach(p -> {
- out.append("\t" + p.getPath().getName());
- out.append("\n");
+ final List rows = new ArrayList<>();
+ Arrays.stream(statuses).sorted((p1, p2) -> p2.getPath().getName().compareTo(p1.getPath().getName())).forEach(f -> {
+ Comparable[] row = new Comparable[1];
+ row[0] = f;
+ rows.add(row);
});
- out.append(String.format("\n=== Files in partition retrieved in %.2fsec ===", (t2 - t1) / 1000.0));
+ TableHeader header = new TableHeader().addTableHeaderField("file path");
+ return HoodiePrintHelper.print(header, new HashMap<>(), "",
+ false, Integer.MAX_VALUE, false, rows);
+ }
+
+ @CliCommand(value = "metadata validate-files", help = "Validate all files in all partitions from the metadata")
+ public String validateFiles(
+ @CliOption(key = {"verbose"}, help = "Print all file details", unspecifiedDefaultValue = "false") final boolean verbose) throws IOException {
+ HoodieCLI.getTableMetaClient();
+ HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().enable(true).build();
+ HoodieBackedTableMetadata metadataReader = new HoodieBackedTableMetadata(
+ new HoodieLocalEngineContext(HoodieCLI.conf), config, HoodieCLI.basePath, "/tmp");
+
+ if (!metadataReader.enabled()) {
+ return "[ERROR] Metadata Table not enabled/initialized\n\n";
+ }
+
+ HoodieMetadataConfig fsConfig = HoodieMetadataConfig.newBuilder().enable(false).build();
+ HoodieBackedTableMetadata fsMetaReader = new HoodieBackedTableMetadata(
+ new HoodieLocalEngineContext(HoodieCLI.conf), fsConfig, HoodieCLI.basePath, "/tmp");
+
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ List metadataPartitions = metadataReader.getAllPartitionPaths();
+ LOG.debug("Listing partitions Took " + timer.endTimer() + " ms");
+ List fsPartitions = fsMetaReader.getAllPartitionPaths();
+ Collections.sort(fsPartitions);
+ Collections.sort(metadataPartitions);
+
+ Set allPartitions = new HashSet<>();
+ allPartitions.addAll(fsPartitions);
+ allPartitions.addAll(metadataPartitions);
+
+ if (!fsPartitions.equals(metadataPartitions)) {
+ LOG.error("FS partition listing is not matching with metadata partition listing!");
+ LOG.error("All FS partitions: " + Arrays.toString(fsPartitions.toArray()));
+ LOG.error("All Metadata partitions: " + Arrays.toString(metadataPartitions.toArray()));
+ }
+
+ final List rows = new ArrayList<>();
+ for (String partition : allPartitions) {
+ Map fileStatusMap = new HashMap<>();
+ Map metadataFileStatusMap = new HashMap<>();
+ FileStatus[] metadataStatuses = metadataReader.getAllFilesInPartition(new Path(HoodieCLI.basePath, partition));
+ Arrays.stream(metadataStatuses).forEach(entry -> metadataFileStatusMap.put(entry.getPath().getName(), entry));
+ FileStatus[] fsStatuses = fsMetaReader.getAllFilesInPartition(new Path(HoodieCLI.basePath, partition));
+ Arrays.stream(fsStatuses).forEach(entry -> fileStatusMap.put(entry.getPath().getName(), entry));
+
+ Set allFiles = new HashSet<>();
+ allFiles.addAll(fileStatusMap.keySet());
+ allFiles.addAll(metadataFileStatusMap.keySet());
+
+ for (String file : allFiles) {
+ Comparable[] row = new Comparable[6];
+ row[0] = partition;
+ FileStatus fsFileStatus = fileStatusMap.get(file);
+ FileStatus metaFileStatus = metadataFileStatusMap.get(file);
+ boolean doesFsFileExists = fsFileStatus != null;
+ boolean doesMetadataFileExists = metaFileStatus != null;
+ long fsFileLength = doesFsFileExists ? fsFileStatus.getLen() : 0;
+ long metadataFileLength = doesMetadataFileExists ? metaFileStatus.getLen() : 0;
+ row[1] = file;
+ row[2] = doesFsFileExists;
+ row[3] = doesMetadataFileExists;
+ row[4] = fsFileLength;
+ row[5] = metadataFileLength;
+ if (verbose) { // if verbose print all files
+ rows.add(row);
+ } else if ((doesFsFileExists != doesMetadataFileExists) || (fsFileLength != metadataFileLength)) { // if non verbose, print only non matching files
+ rows.add(row);
+ }
+ }
+
+ if (metadataStatuses.length != fsStatuses.length) {
+ LOG.error(" FS and metadata files count not matching for " + partition + ". FS files count " + fsStatuses.length + ", metadata base files count "
+ + metadataStatuses.length);
+ }
- return out.toString();
+ for (Map.Entry entry : fileStatusMap.entrySet()) {
+ if (!metadataFileStatusMap.containsKey(entry.getKey())) {
+ LOG.error("FS file not found in metadata " + entry.getKey());
+ } else {
+ if (entry.getValue().getLen() != metadataFileStatusMap.get(entry.getKey()).getLen()) {
+ LOG.error(" FS file size mismatch " + entry.getKey() + ", size equality "
+ + (entry.getValue().getLen() == metadataFileStatusMap.get(entry.getKey()).getLen())
+ + ". FS size " + entry.getValue().getLen() + ", metadata size "
+ + metadataFileStatusMap.get(entry.getKey()).getLen());
+ }
+ }
+ }
+ for (Map.Entry entry : metadataFileStatusMap.entrySet()) {
+ if (!fileStatusMap.containsKey(entry.getKey())) {
+ LOG.error("Metadata file not found in FS " + entry.getKey());
+ } else {
+ if (entry.getValue().getLen() != fileStatusMap.get(entry.getKey()).getLen()) {
+ LOG.error(" Metadata file size mismatch " + entry.getKey() + ", size equality "
+ + (entry.getValue().getLen() == fileStatusMap.get(entry.getKey()).getLen())
+ + ". Metadata size " + entry.getValue().getLen() + ", FS size "
+ + metadataFileStatusMap.get(entry.getKey()).getLen());
+ }
+ }
+ }
+ }
+ TableHeader header = new TableHeader().addTableHeaderField("Partition")
+ .addTableHeaderField("File Name")
+ .addTableHeaderField(" Is Present in FS ")
+ .addTableHeaderField(" Is Present in Metadata")
+ .addTableHeaderField(" FS size")
+ .addTableHeaderField(" Metadata size");
+ return HoodiePrintHelper.print(header, new HashMap<>(), "", false, Integer.MAX_VALUE, false, rows);
}
private HoodieWriteConfig getWriteConfig() {
@@ -227,7 +359,7 @@ private HoodieWriteConfig getWriteConfig() {
private void initJavaSparkContext() {
if (jsc == null) {
- jsc = SparkUtil.initJavaSparkConf("HoodieClI");
+ jsc = SparkUtil.initJavaSparkConf(SparkUtil.getDefaultConf("HoodieCLI", Option.empty()));
}
}
-}
+}
\ No newline at end of file
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
index 127cb28ad010..2533562d8206 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
@@ -156,7 +156,7 @@ public String overwriteHoodieProperties(
newProps.load(new FileInputStream(new File(overwriteFilePath)));
Map oldProps = client.getTableConfig().propsMap();
Path metaPathDir = new Path(client.getBasePath(), METAFOLDER_NAME);
- HoodieTableConfig.createHoodieProperties(client.getFs(), metaPathDir, newProps);
+ HoodieTableConfig.create(client.getFs(), metaPathDir, newProps);
TreeSet allPropKeys = new TreeSet<>();
allPropKeys.addAll(newProps.keySet().stream().map(Object::toString).collect(Collectors.toSet()));
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkEnvCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkEnvCommand.java
index 7969808e2983..a3d78d126096 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkEnvCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkEnvCommand.java
@@ -43,6 +43,7 @@ public void setEnv(@CliOption(key = {"conf"}, help = "Env config to be set") fin
throw new IllegalArgumentException("Illegal set parameter, please use like [set --conf SPARK_HOME=/usr/etc/spark]");
}
env.put(map[0].trim(), map[1].trim());
+ System.setProperty(map[0].trim(), map[1].trim());
}
@CliCommand(value = "show envs all", help = "Show spark launcher envs")
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index 82688fecc366..d1ee109f5904 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -31,6 +31,7 @@
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieBootstrapConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -79,12 +80,14 @@ enum SparkCommand {
}
public static void main(String[] args) throws Exception {
- String command = args[0];
- LOG.info("Invoking SparkMain:" + command);
+ ValidationUtils.checkArgument(args.length >= 4);
+ final String commandString = args[0];
+ LOG.info("Invoking SparkMain: " + commandString);
+ final SparkCommand cmd = SparkCommand.valueOf(commandString);
- SparkCommand cmd = SparkCommand.valueOf(command);
+ JavaSparkContext jsc = SparkUtil.initJavaSparkConf("hoodie-cli-" + commandString,
+ Option.of(args[1]), Option.of(args[2]));
- JavaSparkContext jsc = SparkUtil.initJavaSparkConf("hoodie-cli-" + command, Option.of(args[1]), Option.of(args[2]));
int returnCode = 0;
try {
switch (cmd) {
@@ -111,8 +114,8 @@ public static void main(String[] args) throws Exception {
if (args.length > 13) {
configs.addAll(Arrays.asList(args).subList(13, args.length));
}
- returnCode = dataLoad(jsc, command, args[3], args[4], args[5], args[6], args[7], args[8],
- Integer.parseInt(args[9]), args[10], Integer.parseInt(args[11]), propsFilePath, configs);
+ returnCode = dataLoad(jsc, commandString, args[3], args[4], args[5], args[6], args[7], args[8],
+ Integer.parseInt(args[9]), args[10], Integer.parseInt(args[11]), propsFilePath, configs);
break;
case COMPACT_RUN:
assert (args.length >= 10);
@@ -159,33 +162,34 @@ public static void main(String[] args) throws Exception {
case COMPACT_UNSCHEDULE_PLAN:
assert (args.length == 9);
doCompactUnschedule(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]),
- Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8]));
+ Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8]));
returnCode = 0;
break;
case CLUSTERING_RUN:
- assert (args.length >= 8);
+ assert (args.length >= 9);
propsFilePath = null;
- if (!StringUtils.isNullOrEmpty(args[7])) {
- propsFilePath = args[7];
+ if (!StringUtils.isNullOrEmpty(args[8])) {
+ propsFilePath = args[8];
}
configs = new ArrayList<>();
- if (args.length > 8) {
- configs.addAll(Arrays.asList(args).subList(8, args.length));
+ if (args.length > 9) {
+ configs.addAll(Arrays.asList(args).subList(9, args.length));
}
- returnCode = cluster(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5],
- Integer.parseInt(args[6]), false, propsFilePath, configs);
+ returnCode = cluster(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), args[2],
+ Integer.parseInt(args[7]), false, propsFilePath, configs);
break;
case CLUSTERING_SCHEDULE:
- assert (args.length >= 6);
+ assert (args.length >= 7);
propsFilePath = null;
- if (!StringUtils.isNullOrEmpty(args[5])) {
- propsFilePath = args[5];
+ if (!StringUtils.isNullOrEmpty(args[6])) {
+ propsFilePath = args[6];
}
configs = new ArrayList<>();
- if (args.length > 6) {
- configs.addAll(Arrays.asList(args).subList(6, args.length));
+ if (args.length > 7) {
+ configs.addAll(Arrays.asList(args).subList(7, args.length));
}
- returnCode = cluster(jsc, args[1], args[2], args[3], 1, args[4], 0, true, propsFilePath, configs);
+ returnCode = cluster(jsc, args[3], args[4], args[5], 1, args[2],
+ 0, true, propsFilePath, configs);
break;
case CLEAN:
assert (args.length >= 5);
@@ -229,7 +233,7 @@ public static void main(String[] args) throws Exception {
break;
}
} catch (Throwable throwable) {
- LOG.error("Fail to execute command", throwable);
+ LOG.error("Fail to execute commandString", throwable);
returnCode = -1;
} finally {
jsc.stop();
@@ -360,7 +364,7 @@ private static int doBootstrap(JavaSparkContext jsc, String tableName, String ta
String payloadClassName, String enableHiveSync, String propsFilePath, List configs) throws IOException {
TypedProperties properties = propsFilePath == null ? UtilHelpers.buildProperties(configs)
- : UtilHelpers.readConfig(FSUtils.getFs(propsFilePath, jsc.hadoopConfiguration()), new Path(propsFilePath), configs).getConfig();
+ : UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(propsFilePath), configs).getProps(true);
properties.setProperty(HoodieBootstrapConfig.BASE_PATH.key(), sourcePath);
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
index 03c76375e599..41cdfb88f844 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
@@ -20,13 +20,16 @@
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.HoodiePrintHelper;
+import org.apache.hudi.cli.HoodieTableHeaderFields;
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.avro.Schema;
+import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.springframework.shell.core.CommandMarker;
@@ -35,12 +38,21 @@
import org.springframework.stereotype.Component;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
/**
* CLI command to display hudi table options.
@@ -170,6 +182,67 @@ public String fetchTableSchema(
}
}
+ @CliCommand(value = "table recover-configs", help = "Recover table configs, from update/delete that failed midway.")
+ public String recoverTableConfig() throws IOException {
+ HoodieCLI.refreshTableMetadata();
+ HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
+ Path metaPathDir = new Path(client.getBasePath(), METAFOLDER_NAME);
+ HoodieTableConfig.recover(client.getFs(), metaPathDir);
+ return descTable();
+ }
+
+ @CliCommand(value = "table update-configs", help = "Update the table configs with configs with provided file.")
+ public String updateTableConfig(
+ @CliOption(key = {"props-file"}, mandatory = true, help = "Path to a properties file on local filesystem")
+ final String updatePropsFilePath) throws IOException {
+ HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
+ Map oldProps = client.getTableConfig().propsMap();
+
+ Properties updatedProps = new Properties();
+ updatedProps.load(new FileInputStream(updatePropsFilePath));
+ Path metaPathDir = new Path(client.getBasePath(), METAFOLDER_NAME);
+ HoodieTableConfig.update(client.getFs(), metaPathDir, updatedProps);
+
+ HoodieCLI.refreshTableMetadata();
+ Map newProps = HoodieCLI.getTableMetaClient().getTableConfig().propsMap();
+ return renderOldNewProps(newProps, oldProps);
+ }
+
+ @CliCommand(value = "table delete-configs", help = "Delete the supplied table configs from the table.")
+ public String deleteTableConfig(
+ @CliOption(key = {"comma-separated-configs"}, mandatory = true, help = "Comma separated list of configs to delete.")
+ final String csConfigs) {
+ HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
+ Map oldProps = client.getTableConfig().propsMap();
+
+ Set deleteConfigs = Arrays.stream(csConfigs.split(",")).collect(Collectors.toSet());
+ Path metaPathDir = new Path(client.getBasePath(), METAFOLDER_NAME);
+ HoodieTableConfig.delete(client.getFs(), metaPathDir, deleteConfigs);
+
+ HoodieCLI.refreshTableMetadata();
+ Map newProps = HoodieCLI.getTableMetaClient().getTableConfig().propsMap();
+ return renderOldNewProps(newProps, oldProps);
+ }
+
+ private static String renderOldNewProps(Map newProps, Map oldProps) {
+ TreeSet allPropKeys = new TreeSet<>();
+ allPropKeys.addAll(newProps.keySet().stream().map(Object::toString).collect(Collectors.toSet()));
+ allPropKeys.addAll(oldProps.keySet());
+
+ String[][] rows = new String[allPropKeys.size()][];
+ int ind = 0;
+ for (String propKey : allPropKeys) {
+ String[] row = new String[]{
+ propKey,
+ oldProps.getOrDefault(propKey, "null"),
+ newProps.getOrDefault(propKey, "null")
+ };
+ rows[ind++] = row;
+ }
+ return HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_HOODIE_PROPERTY,
+ HoodieTableHeaderFields.HEADER_OLD_VALUE, HoodieTableHeaderFields.HEADER_NEW_VALUE}, rows);
+ }
+
/**
* Use Streams when you are dealing with raw data.
* @param filePath output file path.
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java
index 5f08f0097a45..a95cc53df329 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java
@@ -51,7 +51,7 @@ public static long countNewRecords(HoodieTableMetaClient target, List co
public static String getTimeDaysAgo(int numberOfDays) {
Date date = Date.from(ZonedDateTime.now().minusDays(numberOfDays).toInstant());
- return HoodieActiveTimeline.formatInstantTime(date);
+ return HoodieActiveTimeline.formatDate(date);
}
/**
@@ -61,8 +61,8 @@ public static String getTimeDaysAgo(int numberOfDays) {
* b) hours: -1, returns 20200202010000
*/
public static String addHours(String compactionCommitTime, int hours) throws ParseException {
- Instant instant = HoodieActiveTimeline.parseInstantTime(compactionCommitTime).toInstant();
+ Instant instant = HoodieActiveTimeline.parseDateFromInstantTime(compactionCommitTime).toInstant();
ZonedDateTime commitDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault());
- return HoodieActiveTimeline.formatInstantTime(Date.from(commitDateTime.plusHours(hours).toInstant()));
+ return HoodieActiveTimeline.formatDate(Date.from(commitDateTime.plusHours(hours).toInstant()));
}
}
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java
index 7127631775de..45e048755166 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java
@@ -34,13 +34,14 @@
import java.net.URISyntaxException;
import java.util.Map;
import java.util.Objects;
+import java.util.Properties;
/**
* Utility functions dealing with Spark.
*/
public class SparkUtil {
- private static final String DEFAULT_SPARK_MASTER = "yarn";
+ public static final String DEFAULT_SPARK_MASTER = "yarn";
/**
* TODO: Need to fix a bunch of hardcoded stuff here eg: history server, spark distro.
@@ -62,28 +63,30 @@ public static SparkLauncher initLauncher(String propertiesFile) throws URISyntax
return sparkLauncher;
}
- public static JavaSparkContext initJavaSparkConf(String name) {
- return initJavaSparkConf(name, Option.empty(), Option.empty());
- }
-
- public static JavaSparkContext initJavaSparkConf(String name, Option master,
- Option executorMemory) {
- SparkConf sparkConf = new SparkConf().setAppName(name);
+ /**
+ * Get the default spark configuration.
+ *
+ * @param appName - Spark application name
+ * @param sparkMaster - Spark master node name
+ * @return Spark configuration
+ */
+ public static SparkConf getDefaultConf(final String appName, final Option sparkMaster) {
+ final Properties properties = System.getProperties();
+ SparkConf sparkConf = new SparkConf().setAppName(appName);
- String defMaster = master.orElse(sparkConf.getenv(HoodieCliSparkConfig.CLI_SPARK_MASTER));
- if ((null == defMaster) || (defMaster.isEmpty())) {
- sparkConf.setMaster(DEFAULT_SPARK_MASTER);
- } else {
- sparkConf.setMaster(defMaster);
+ // Configure the sparkMaster
+ String sparkMasterNode = DEFAULT_SPARK_MASTER;
+ if (properties.getProperty(HoodieCliSparkConfig.CLI_SPARK_MASTER) != null) {
+ sparkMasterNode = properties.getProperty(HoodieCliSparkConfig.CLI_SPARK_MASTER);
}
+ sparkMasterNode = sparkMaster.orElse(sparkMasterNode);
+ sparkConf.setMaster(sparkMasterNode);
- sparkConf.set(HoodieCliSparkConfig.CLI_SERIALIZER, "org.apache.spark.serializer.KryoSerializer");
+ // Configure driver
sparkConf.set(HoodieCliSparkConfig.CLI_DRIVER_MAX_RESULT_SIZE, "2g");
sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_OVERWRITE, "true");
- sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_ENABLED, "true");
- if (executorMemory.isPresent()) {
- sparkConf.set(HoodieCliSparkConfig.CLI_EXECUTOR_MEMORY, executorMemory.get());
- }
+ sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_ENABLED, "false");
+ sparkConf.set(HoodieCliSparkConfig.CLI_SERIALIZER, "org.apache.spark.serializer.KryoSerializer");
// Configure hadoop conf
sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESS, "true");
@@ -91,10 +94,28 @@ public static JavaSparkContext initJavaSparkConf(String name, Option mas
sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_CODEC, "org.apache.hadoop.io.compress.GzipCodec");
sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_TYPE, "BLOCK");
+ return sparkConf;
+ }
+
+ public static JavaSparkContext initJavaSparkConf(String name) {
+ return initJavaSparkConf(name, Option.empty(), Option.empty());
+ }
+
+ public static JavaSparkContext initJavaSparkConf(String name, Option master, Option executorMemory) {
+ SparkConf sparkConf = getDefaultConf(name, master);
+ if (executorMemory.isPresent()) {
+ sparkConf.set(HoodieCliSparkConfig.CLI_EXECUTOR_MEMORY, executorMemory.get());
+ }
+
+ return initJavaSparkConf(sparkConf);
+ }
+
+ public static JavaSparkContext initJavaSparkConf(SparkConf sparkConf) {
SparkRDDWriteClient.registerClasses(sparkConf);
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
jsc.hadoopConfiguration().setBoolean(HoodieCliSparkConfig.CLI_PARQUET_ENABLE_SUMMARY_METADATA, false);
FSUtils.prepareHadoopConf(jsc.hadoopConfiguration());
return jsc;
}
+
}
diff --git a/hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala b/hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala
index 04c5f93e7218..e471ed925865 100644
--- a/hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala
+++ b/hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala
@@ -233,7 +233,7 @@ class DedupeSparkJob(basePath: String,
}
println("No duplicates found & counts are in check!!!! ")
- // 4. Prepare to copy the fixed files back.
+ // 5. Prepare to copy the fixed files back.
fileNameToPathMap.foreach { case (_, filePath) =>
val srcPath = new Path(s"$repairOutputPath/${filePath.getName}")
val dstPath = new Path(s"$basePath/$duplicatedPartitionPath/${filePath.getName}")
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java
index 9732ce72b913..091c4b4173ae 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java
@@ -29,6 +29,7 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
@@ -72,6 +73,8 @@ public void init() throws Exception {
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
+ .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+ .withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.forTable("test-trip-table").build();
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java
index d71e7ec8d987..959346e15007 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java
@@ -32,6 +32,7 @@
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.NumericUtils;
@@ -209,6 +210,8 @@ public void testShowArchivedCommits() throws Exception {
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath1)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
+ .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+ .withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.forTable("test-trip-table").build();
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java
index de305f404455..21841a576945 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java
@@ -33,6 +33,7 @@
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.CompactionTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
@@ -159,6 +160,8 @@ private void generateArchive() throws IOException {
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
+ .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+ .withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.forTable("test-trip-table").build();
// archive
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java
index ceaabddec7d2..17bc48f66f0c 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java
@@ -90,6 +90,7 @@ public void init() throws Exception {
.withBaseFilesInPartitions(partitionAndFileId);
// generate two rollback
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(tablePath)
+ .withRollbackUsingMarkers(false)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
try (AbstractHoodieWriteClient client = new SparkRDDWriteClient(context(), config)) {
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java
index a39329e018bf..cba6d901b956 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java
@@ -28,8 +28,10 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -62,6 +64,8 @@ public void init() throws Exception {
new TableCommand().createTable(
tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(),
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
+ timelineService = HoodieClientTestUtils.initTimelineService(
+ context, basePath(), FileSystemViewStorageConfig.REMOTE_PORT_NUM.defaultValue());
metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
//Create some commits files and base files
HoodieTestTable.of(metaClient)
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/functional/CLIFunctionalTestHarness.java b/hudi-cli/src/test/java/org/apache/hudi/cli/functional/CLIFunctionalTestHarness.java
index b2b9f86b0207..7a12a6692a2b 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/functional/CLIFunctionalTestHarness.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/functional/CLIFunctionalTestHarness.java
@@ -22,7 +22,10 @@
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.providers.SparkProvider;
+import org.apache.hudi.timeline.service.TimelineService;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
@@ -39,10 +42,13 @@
public class CLIFunctionalTestHarness implements SparkProvider {
+ protected static int timelineServicePort =
+ FileSystemViewStorageConfig.REMOTE_PORT_NUM.defaultValue();
+ protected static transient TimelineService timelineService;
+ protected static transient HoodieSparkEngineContext context;
private static transient SparkSession spark;
private static transient SQLContext sqlContext;
private static transient JavaSparkContext jsc;
- private static transient HoodieSparkEngineContext context;
private static transient JLineShellComponent shell;
/**
* An indicator of the initialization status.
@@ -107,6 +113,9 @@ public synchronized void runBeforeEach() {
jsc = new JavaSparkContext(spark.sparkContext());
context = new HoodieSparkEngineContext(jsc);
shell = new Bootstrap().getJLineShellComponent();
+ timelineService = HoodieClientTestUtils.initTimelineService(
+ context, basePath(), incrementTimelineServicePortToUse());
+ timelineServicePort = timelineService.getServerPort();
}
}
@@ -120,14 +129,25 @@ public static synchronized void cleanUpAfterAll() {
shell.stop();
shell = null;
}
+ if (timelineService != null) {
+ timelineService.close();
+ }
}
/**
* Helper to prepare string for matching.
+ *
* @param str Input string.
* @return pruned string with non word characters removed.
*/
protected static String removeNonWordAndStripSpace(String str) {
return str.replaceAll("[\\s]+", ",").replaceAll("[\\W]+", ",");
}
+
+ protected int incrementTimelineServicePortToUse() {
+ // Increment the timeline service port for each individual test
+ // to avoid port reuse causing failures
+ timelineServicePort = (timelineServicePort + 1 - 1024) % (65536 - 1024) + 1024;
+ return timelineServicePort;
+ }
}
diff --git a/hudi-client/hudi-client-common/pom.xml b/hudi-client/hudi-client-common/pom.xml
index c67621fbb9a3..a9209f5534df 100644
--- a/hudi-client/hudi-client-common/pom.xml
+++ b/hudi-client/hudi-client-common/pom.xml
@@ -19,12 +19,12 @@
hudi-clientorg.apache.hudi
- 0.10.0-SNAPSHOT
+ 0.11.0-SNAPSHOT4.0.0hudi-client-common
- 0.10.0-SNAPSHOT
+ 0.11.0-SNAPSHOThudi-client-commonjar
@@ -36,6 +36,11 @@
hudi-common${project.version}
+
+ org.apache.hudi
+ hudi-aws
+ ${project.version}
+ org.apache.hudihudi-timeline-service
@@ -59,6 +64,13 @@
parquet-avro
+
+
+ com.github.davidmoten
+ hilbert-curve
+ 0.2.2
+
+
io.dropwizard.metrics
@@ -94,7 +106,6 @@
io.prometheussimpleclient_pushgateway
-
org.apache.hudi
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java
index 85e008199adc..f57484d886c9 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java
@@ -29,7 +29,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
@@ -130,7 +129,7 @@ public void start(Function onShutdownCallback) {
future = res.getKey();
executor = res.getValue();
started = true;
- monitorThreads(onShutdownCallback);
+ shutdownCallback(onShutdownCallback);
}
/**
@@ -141,34 +140,15 @@ public void start(Function onShutdownCallback) {
protected abstract Pair startService();
/**
- * A monitor thread is started which would trigger a callback if the service is shutdown.
+ * Add shutdown callback for the completable future.
*
- * @param onShutdownCallback
+ * @param callback The callback
*/
- private void monitorThreads(Function onShutdownCallback) {
- LOG.info("Submitting monitor thread !!");
- Executors.newSingleThreadExecutor(r -> {
- Thread t = new Thread(r, "Monitor Thread");
- t.setDaemon(isRunInDaemonMode());
- return t;
- }).submit(() -> {
- boolean error = false;
- try {
- LOG.info("Monitoring thread(s) !!");
- future.get();
- } catch (ExecutionException ex) {
- LOG.error("Monitor noticed one or more threads failed. Requesting graceful shutdown of other threads", ex);
- error = true;
- } catch (InterruptedException ie) {
- LOG.error("Got interrupted Monitoring threads", ie);
- error = true;
- } finally {
- // Mark as shutdown
- shutdown = true;
- if (null != onShutdownCallback) {
- onShutdownCallback.apply(error);
- }
- shutdown(false);
+ @SuppressWarnings("unchecked")
+ private void shutdownCallback(Function callback) {
+ future.whenComplete((resp, error) -> {
+ if (null != callback) {
+ callback.apply(null != error);
}
});
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
index 444eae62b2ec..76b10fddd438 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
@@ -32,6 +32,7 @@
import org.apache.hudi.client.heartbeat.HeartbeatUtils;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.client.utils.TransactionUtils;
+import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
@@ -67,6 +68,7 @@
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.HoodieTimelineArchiveLog;
import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.rollback.RollbackUtils;
import org.apache.hudi.table.action.savepoint.SavepointHelpers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
@@ -80,6 +82,8 @@
import java.text.ParseException;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -197,7 +201,7 @@ public boolean commitStats(String instantTime, List stats, Opti
} catch (IOException e) {
throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime, e);
} finally {
- this.txnManager.endTransaction();
+ this.txnManager.endTransaction(Option.of(inflightInstant));
}
// do this outside of lock since compaction, clustering can be time taking and we don't need a lock for the entire execution period
runTableServicesInline(table, metadata, extraMetadata);
@@ -218,6 +222,8 @@ protected void commit(HoodieTable table, String commitActionType, String instant
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
// Finalize write
finalizeWrite(table, instantTime, stats);
+ // update Metadata table
+ writeTableMetadata(table, instantTime, commitActionType, metadata);
activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType, instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
}
@@ -233,7 +239,7 @@ void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String
if (writeTimer != null) {
long durationInMs = metrics.getDurationInMs(writeTimer.stop());
- metrics.updateCommitMetrics(HoodieActiveTimeline.parseInstantTime(instantTime).getTime(), durationInMs,
+ metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(instantTime).getTime(), durationInMs,
metadata, actionType);
writeTimer = null;
}
@@ -244,16 +250,25 @@ void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String
}
/**
- * Any pre-commit actions like conflict resolution or updating metadata table goes here.
+ * Any pre-commit actions like conflict resolution goes here.
* @param inflightInstant instant of inflight operation.
* @param metadata commit metadata for which pre commit is being invoked.
*/
protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata metadata) {
- // Create a Hoodie table after starting the transaction which encapsulated the commits and files visible.
- // Important to create this after the lock to ensure latest commits show up in the timeline without need for reload
- HoodieTable table = createTable(config, hadoopConf);
- table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, inflightInstant.getTimestamp(),
- table.isTableServiceAction(inflightInstant.getAction())));
+ // To be overridden by specific engines to perform conflict resolution if any.
+ }
+
+ /**
+ * Write the HoodieCommitMetadata to metadata table if available.
+ * @param table {@link HoodieTable} of interest.
+ * @param instantTime instant time of the commit.
+ * @param actionType action type of the commit.
+ * @param metadata instance of {@link HoodieCommitMetadata}.
+ */
+ protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
+ context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table");
+ table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, instantTime,
+ table.isTableServiceAction(actionType)));
}
/**
@@ -410,7 +425,11 @@ protected void preWrite(String instantTime, WriteOperationType writeOperationTyp
HoodieTableMetaClient metaClient) {
setOperationType(writeOperationType);
this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);
- this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
+ if (null == this.asyncCleanerService) {
+ this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
+ } else {
+ this.asyncCleanerService.start(null);
+ }
}
/**
@@ -436,11 +455,9 @@ protected void postCommit(HoodieTable table, HoodieCommitMetadata me
WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
autoCleanOnCommit();
- // We cannot have unbounded commit files. Archive commits if we have to archive
- HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table);
- archiveLog.archiveIfRequired(context);
- } catch (IOException ioe) {
- throw new HoodieIOException(ioe.getMessage(), ioe);
+ if (config.isAutoArchive()) {
+ archive(table);
+ }
} finally {
this.heartbeatClient.stop(instantTime);
}
@@ -508,6 +525,13 @@ protected void autoCleanOnCommit() {
}
}
+ /**
+ * Run any pending compactions.
+ */
+ public void runAnyPendingCompactions() {
+ runAnyPendingCompactions(createTable(config, hadoopConf, config.isMetadataTableEnabled()));
+ }
+
/**
* Create a savepoint based on the latest commit action on the timeline.
*
@@ -574,22 +598,39 @@ public void restoreToSavepoint(String savepointTime) {
@Deprecated
public boolean rollback(final String commitInstantTime) throws HoodieRollbackException {
- return rollback(commitInstantTime, false);
+ HoodieTable table = createTable(config, hadoopConf);
+ Option pendingRollbackInfo = getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+ return rollback(commitInstantTime, pendingRollbackInfo, false);
}
/**
* @Deprecated
* Rollback the inflight record changes with the given commit time. This
* will be removed in future in favor of {@link AbstractHoodieWriteClient#restoreToInstant(String)}
- *
+ * Adding this api for backwards compatability.
* @param commitInstantTime Instant time of the commit
* @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
* @throws HoodieRollbackException if rollback cannot be performed successfully
*/
@Deprecated
public boolean rollback(final String commitInstantTime, boolean skipLocking) throws HoodieRollbackException {
+ return rollback(commitInstantTime, Option.empty(), skipLocking);
+ }
+
+ /**
+ * @Deprecated
+ * Rollback the inflight record changes with the given commit time. This
+ * will be removed in future in favor of {@link AbstractHoodieWriteClient#restoreToInstant(String)}
+ *
+ * @param commitInstantTime Instant time of the commit
+ * @param pendingRollbackInfo pending rollback instant and plan if rollback failed from previous attempt.
+ * @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
+ * @throws HoodieRollbackException if rollback cannot be performed successfully
+ */
+ @Deprecated
+ public boolean rollback(final String commitInstantTime, Option pendingRollbackInfo, boolean skipLocking) throws HoodieRollbackException {
LOG.info("Begin rollback of instant " + commitInstantTime);
- final String rollbackInstantTime = HoodieActiveTimeline.createNewInstantTime();
+ final String rollbackInstantTime = pendingRollbackInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
final Timer.Context timerContext = this.metrics.getRollbackCtx();
try {
HoodieTable table = createTable(config, hadoopConf);
@@ -598,8 +639,8 @@ public boolean rollback(final String commitInstantTime, boolean skipLocking) thr
.findFirst());
if (commitInstantOpt.isPresent()) {
LOG.info("Scheduling Rollback at instant time :" + rollbackInstantTime);
- Option rollbackPlanOption = table.scheduleRollback(context, rollbackInstantTime,
- commitInstantOpt.get(), false);
+ Option rollbackPlanOption = pendingRollbackInfo.map(entry -> Option.of(entry.getRollbackPlan())).orElse(table.scheduleRollback(context, rollbackInstantTime,
+ commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers()));
if (rollbackPlanOption.isPresent()) {
// execute rollback
HoodieRollbackMetadata rollbackMetadata = table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true,
@@ -713,6 +754,31 @@ public HoodieCleanMetadata clean(boolean skipLocking) {
return clean(HoodieActiveTimeline.createNewInstantTime(), skipLocking);
}
+ /**
+ * Trigger archival for the table. This ensures that the number of commits do not explode
+ * and keep increasing unbounded over time.
+ * @param table table to commit on.
+ */
+ protected void archive(HoodieTable table) {
+ try {
+ // We cannot have unbounded commit files. Archive commits if we have to archive
+ HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table);
+ archiveLog.archiveIfRequired(context);
+ } catch (IOException ioe) {
+ throw new HoodieIOException("Failed to archive", ioe);
+ }
+ }
+
+ /**
+ * Trigger archival for the table. This ensures that the number of commits do not explode
+ * and keep increasing unbounded over time.
+ */
+ public void archive() {
+ // Create a Hoodie table which encapsulated the commits and files visible
+ HoodieTable table = createTable(config, hadoopConf);
+ archive(table);
+ }
+
/**
* Provides a new commit time for a write operation (insert/update/delete).
*/
@@ -828,6 +894,33 @@ private HoodieTimeline getInflightTimelineExcludeCompactionAndClustering(HoodieT
return inflightTimelineExcludeClusteringCommit;
}
+ private Option getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback) {
+ return getPendingRollbackInfos(metaClient).getOrDefault(commitToRollback, Option.empty());
+ }
+
+ /**
+ * Fetch map of pending commits to be rolled-back to {@link HoodiePendingRollbackInfo}.
+ * @param metaClient instance of {@link HoodieTableMetaClient} to use.
+ * @return map of pending commits to be rolled-back instants to Rollback Instant and Rollback plan Pair.
+ */
+ protected Map> getPendingRollbackInfos(HoodieTableMetaClient metaClient) {
+ List instants = metaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants().collect(Collectors.toList());
+ Map> infoMap = new HashMap<>();
+ HoodieTimeline pendingCompactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline();
+ for (HoodieInstant instant : instants) {
+ try {
+ HoodieRollbackPlan rollbackPlan = RollbackUtils.getRollbackPlan(metaClient, instant);
+ String instantToRollback = rollbackPlan.getInstantToRollback().getCommitTime();
+ if (!pendingCompactionTimeline.containsInstant(instantToRollback)) {
+ infoMap.putIfAbsent(instantToRollback, Option.of(new HoodiePendingRollbackInfo(instant, rollbackPlan)));
+ }
+ } catch (IOException e) {
+ LOG.warn("Fetching rollback plan failed for " + infoMap + ", skip the plan", e);
+ }
+ }
+ return infoMap;
+ }
+
/**
* Rollback all failed writes.
*/
@@ -841,29 +934,30 @@ public Boolean rollbackFailedWrites() {
*/
public Boolean rollbackFailedWrites(boolean skipLocking) {
HoodieTable table = createTable(config, hadoopConf);
- List instantsToRollback = getInstantsToRollback(table.getMetaClient(), config.getFailedWritesCleanPolicy(),
- Option.empty());
- rollbackFailedWrites(instantsToRollback, skipLocking);
+ List instantsToRollback = getInstantsToRollback(table.getMetaClient(), config.getFailedWritesCleanPolicy(), Option.empty());
+ Map> pendingRollbacks = getPendingRollbackInfos(table.getMetaClient());
+ instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty()));
+ rollbackFailedWrites(pendingRollbacks, skipLocking);
return true;
}
- protected void rollbackFailedWrites(List instantsToRollback, boolean skipLocking) {
- for (String instant : instantsToRollback) {
- if (HoodieTimeline.compareTimestamps(instant, HoodieTimeline.LESSER_THAN_OR_EQUALS,
+ protected void rollbackFailedWrites(Map> instantsToRollback, boolean skipLocking) {
+ // sort in reverse order of commit times
+ LinkedHashMap> reverseSortedRollbackInstants = instantsToRollback.entrySet()
+ .stream().sorted((i1, i2) -> i2.getKey().compareTo(i1.getKey()))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1, LinkedHashMap::new));
+ for (Map.Entry> entry : reverseSortedRollbackInstants.entrySet()) {
+ if (HoodieTimeline.compareTimestamps(entry.getKey(), HoodieTimeline.LESSER_THAN_OR_EQUALS,
HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) {
+ // do we need to handle failed rollback of a bootstrap
rollbackFailedBootstrap();
+ HeartbeatUtils.deleteHeartbeatFile(fs, basePath, entry.getKey(), config);
break;
} else {
- rollback(instant, skipLocking);
+ rollback(entry.getKey(), entry.getValue(), skipLocking);
+ HeartbeatUtils.deleteHeartbeatFile(fs, basePath, entry.getKey(), config);
}
}
- // Delete any heartbeat files for already rolled back commits
- try {
- HeartbeatUtils.cleanExpiredHeartbeats(this.heartbeatClient.getAllExistingHeartbeatInstants(),
- createMetaClient(true), basePath);
- } catch (IOException io) {
- LOG.error("Unable to delete heartbeat files", io);
- }
}
protected List getInstantsToRollback(HoodieTableMetaClient metaClient, HoodieFailedWritesCleaningPolicy cleaningPolicy, Option curInstantTime) {
@@ -977,13 +1071,14 @@ public Option scheduleTableService(Option