Skip to content

Commit

Permalink
[HUDI-3669] Add a remote request retry mechanism for 'Remotehoodietab…
Browse files Browse the repository at this point in the history
…lefiles… (apache#5884)

- Adding request retry to RemoteHoodieTableFileSystemView. Users can enable using the new configs added.
  • Loading branch information
LinMingQiang committed Aug 7, 2022
1 parent 95d7489 commit 660177b
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 36 deletions.
Expand Up @@ -117,6 +117,11 @@ public FileSystemViewStorageConfig getRemoteFileSystemViewConfig() {
.withRemoteServerHost(hostAddr)
.withRemoteServerPort(serverPort)
.withRemoteTimelineClientTimeoutSecs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientTimeoutSecs())
.withRemoteTimelineClientRetry(writeConfig.getClientSpecifiedViewStorageConfig().isRemoteTimelineClientRetryEnabled())
.withRemoteTimelineClientMaxRetryNumbers(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientMaxRetryNumbers())
.withRemoteTimelineInitialRetryIntervalMs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineInitialRetryIntervalMs())
.withRemoteTimelineClientMaxRetryIntervalMs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientMaxRetryIntervalMs())
.withRemoteTimelineClientRetryExceptions(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientRetryExceptions())
.build();
}

Expand Down
Expand Up @@ -214,8 +214,7 @@ private static RemoteHoodieTableFileSystemView createRemoteFileSystemView(Serial
LOG.info("Creating remote view for basePath " + metaClient.getBasePath() + ". Server="
+ viewConf.getRemoteViewServerHost() + ":" + viewConf.getRemoteViewServerPort() + ", Timeout="
+ viewConf.getRemoteTimelineClientTimeoutSecs());
return new RemoteHoodieTableFileSystemView(viewConf.getRemoteViewServerHost(), viewConf.getRemoteViewServerPort(),
metaClient, viewConf.getRemoteTimelineClientTimeoutSecs());
return new RemoteHoodieTableFileSystemView(metaClient, viewConf);
}

public static FileSystemViewManager createViewManager(final HoodieEngineContext context,
Expand Down
Expand Up @@ -110,6 +110,37 @@ public class FileSystemViewStorageConfig extends HoodieConfig {
.defaultValue(5 * 60) // 5 min
.withDocumentation("Timeout in seconds, to wait for API requests against a remote file system view. e.g timeline server.");

public static final ConfigProperty<String> REMOTE_RETRY_ENABLE = ConfigProperty
.key("hoodie.filesystem.view.remote.retry.enable")
.defaultValue("false")
.sinceVersion("0.12.0")
.withDocumentation("Whether to enable API request retry for remote file system view.");

public static final ConfigProperty<Integer> REMOTE_MAX_RETRY_NUMBERS = ConfigProperty
.key("hoodie.filesystem.view.remote.retry.max_numbers")
.defaultValue(3) // 3 times
.sinceVersion("0.12.0")
.withDocumentation("Maximum number of retry for API requests against a remote file system view. e.g timeline server.");

public static final ConfigProperty<Long> REMOTE_INITIAL_RETRY_INTERVAL_MS = ConfigProperty
.key("hoodie.filesystem.view.remote.retry.initial_interval_ms")
.defaultValue(100L)
.sinceVersion("0.12.0")
.withDocumentation("Amount of time (in ms) to wait, before retry to do operations on storage.");

public static final ConfigProperty<Long> REMOTE_MAX_RETRY_INTERVAL_MS = ConfigProperty
.key("hoodie.filesystem.view.remote.retry.max_interval_ms")
.defaultValue(2000L)
.sinceVersion("0.12.0")
.withDocumentation("Maximum amount of time (in ms), to wait for next retry.");

public static final ConfigProperty<String> RETRY_EXCEPTIONS = ConfigProperty
.key("hoodie.filesystem.view.remote.retry.exceptions")
.defaultValue("")
.sinceVersion("0.12.0")
.withDocumentation("The class name of the Exception that needs to be re-tryed, separated by commas. "
+ "Default is empty which means retry all the IOException and RuntimeException from Remote Request.");

public static final ConfigProperty<String> REMOTE_BACKUP_VIEW_ENABLE = ConfigProperty
.key("hoodie.filesystem.remote.backup.view.enable")
.defaultValue("true") // Need to be disabled only for tests.
Expand Down Expand Up @@ -144,6 +175,26 @@ public Integer getRemoteTimelineClientTimeoutSecs() {
return getInt(REMOTE_TIMEOUT_SECS);
}

public boolean isRemoteTimelineClientRetryEnabled() {
return getBoolean(REMOTE_RETRY_ENABLE);
}

public Integer getRemoteTimelineClientMaxRetryNumbers() {
return getInt(REMOTE_MAX_RETRY_NUMBERS);
}

public Long getRemoteTimelineInitialRetryIntervalMs() {
return getLong(REMOTE_INITIAL_RETRY_INTERVAL_MS);
}

public Long getRemoteTimelineClientMaxRetryIntervalMs() {
return getLong(REMOTE_MAX_RETRY_INTERVAL_MS);
}

public String getRemoteTimelineClientRetryExceptions() {
return getString(RETRY_EXCEPTIONS);
}

public long getMaxMemoryForFileGroupMap() {
long totalMemory = getLong(SPILLABLE_MEMORY);
return totalMemory - getMaxMemoryForPendingCompaction() - getMaxMemoryForBootstrapBaseFile();
Expand Down Expand Up @@ -245,6 +296,31 @@ public Builder withRemoteTimelineClientTimeoutSecs(Integer timelineClientTimeout
return this;
}

public Builder withRemoteTimelineClientRetry(boolean enableRetry) {
fileSystemViewStorageConfig.setValue(REMOTE_RETRY_ENABLE, Boolean.toString(enableRetry));
return this;
}

public Builder withRemoteTimelineClientMaxRetryNumbers(Integer maxRetryNumbers) {
fileSystemViewStorageConfig.setValue(REMOTE_MAX_RETRY_NUMBERS, maxRetryNumbers.toString());
return this;
}

public Builder withRemoteTimelineInitialRetryIntervalMs(Long initialRetryIntervalMs) {
fileSystemViewStorageConfig.setValue(REMOTE_INITIAL_RETRY_INTERVAL_MS, initialRetryIntervalMs.toString());
return this;
}

public Builder withRemoteTimelineClientMaxRetryIntervalMs(Long maxRetryIntervalMs) {
fileSystemViewStorageConfig.setValue(REMOTE_MAX_RETRY_INTERVAL_MS, maxRetryIntervalMs.toString());
return this;
}

public Builder withRemoteTimelineClientRetryExceptions(String retryExceptions) {
fileSystemViewStorageConfig.setValue(RETRY_EXCEPTIONS, retryExceptions);
return this;
}

public Builder withMemFractionForPendingCompaction(Double memFractionForPendingCompaction) {
fileSystemViewStorageConfig.setValue(SPILLABLE_COMPACTION_MEM_FRACTION, memFractionForPendingCompaction.toString());
return this;
Expand Down
Expand Up @@ -39,6 +39,7 @@
import org.apache.hudi.common.table.timeline.dto.InstantDTO;
import org.apache.hudi.common.table.timeline.dto.TimelineDTO;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.RetryHelper;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
Expand Down Expand Up @@ -132,22 +133,35 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,

private boolean closed = false;

private RetryHelper<Response> retryHelper;

private final HttpRequestCheckedFunction urlCheckedFunc;

private enum RequestMethod {
GET, POST
}

public RemoteHoodieTableFileSystemView(String server, int port, HoodieTableMetaClient metaClient) {
this(server, port, metaClient, 300);
this(metaClient, FileSystemViewStorageConfig.newBuilder().withRemoteServerHost(server).withRemoteServerPort(port).build());
}

public RemoteHoodieTableFileSystemView(String server, int port, HoodieTableMetaClient metaClient, int timeoutSecs) {
public RemoteHoodieTableFileSystemView(HoodieTableMetaClient metaClient, FileSystemViewStorageConfig viewConf) {
this.basePath = metaClient.getBasePath();
this.serverHost = server;
this.serverPort = port;
this.mapper = new ObjectMapper();
this.metaClient = metaClient;
this.timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
this.timeoutSecs = timeoutSecs;
this.serverHost = viewConf.getRemoteViewServerHost();
this.serverPort = viewConf.getRemoteViewServerPort();
this.timeoutSecs = viewConf.getRemoteTimelineClientTimeoutSecs();
this.urlCheckedFunc = new HttpRequestCheckedFunction(this.timeoutSecs * 1000);
if (viewConf.isRemoteTimelineClientRetryEnabled()) {
retryHelper = new RetryHelper(
viewConf.getRemoteTimelineClientMaxRetryIntervalMs(),
viewConf.getRemoteTimelineClientMaxRetryNumbers(),
viewConf.getRemoteTimelineInitialRetryIntervalMs(),
viewConf.getRemoteTimelineClientRetryExceptions(),
"Sending request");
}
}

private <T> T executeRequest(String requestPath, Map<String, String> queryParameters, TypeReference reference,
Expand All @@ -165,17 +179,9 @@ private <T> T executeRequest(String requestPath, Map<String, String> queryParame

String url = builder.toString();
LOG.info("Sending request : (" + url + ")");
Response response;
int timeout = this.timeoutSecs * 1000; // msec
switch (method) {
case GET:
response = Request.Get(url).connectTimeout(timeout).socketTimeout(timeout).execute();
break;
case POST:
default:
response = Request.Post(url).connectTimeout(timeout).socketTimeout(timeout).execute();
break;
}
// Reset url and method, to avoid repeatedly instantiating objects.
urlCheckedFunc.setUrlAndMethod(url, method);
Response response = retryHelper != null ? retryHelper.tryWith(urlCheckedFunc).start() : urlCheckedFunc.get();
String content = response.returnContent().asString();
return (T) mapper.readValue(content, reference);
}
Expand Down Expand Up @@ -495,4 +501,33 @@ public Option<HoodieBaseFile> getLatestBaseFile(String partitionPath, String fil
throw new HoodieRemoteException(e);
}
}

/**
* For remote HTTP requests, to avoid repeatedly instantiating objects.
*/
private class HttpRequestCheckedFunction implements RetryHelper.CheckedFunction<Response> {
private String url;
private RequestMethod method;
private final int timeoutMs;

public void setUrlAndMethod(String url, RequestMethod method) {
this.method = method;
this.url = url;
}

public HttpRequestCheckedFunction(int timeoutMs) {
this.timeoutMs = timeoutMs;
}

@Override
public Response get() throws IOException {
switch (method) {
case GET:
return Request.Get(url).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute();
case POST:
default:
return Request.Post(url).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute();
}
}
}
}
Expand Up @@ -18,47 +18,52 @@

package org.apache.hudi.common.util;

import org.apache.hudi.exception.HoodieException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;

public class RetryHelper<T> {
public class RetryHelper<T> implements Serializable {
private static final Logger LOG = LogManager.getLogger(RetryHelper.class);
private CheckedFunction<T> func;
private int num;
private long maxIntervalTime;
private long initialIntervalTime = 100L;
private transient CheckedFunction<T> func;
private final int num;
private final long maxIntervalTime;
private final long initialIntervalTime;
private String taskInfo = "N/A";
private List<? extends Class<? extends Exception>> retryExceptionsClasses;

public RetryHelper() {
}

public RetryHelper(long maxRetryIntervalMs, int maxRetryNumbers, long initialRetryIntervalMs, String retryExceptions) {
this.num = maxRetryNumbers;
this.initialIntervalTime = initialRetryIntervalMs;
this.maxIntervalTime = maxRetryIntervalMs;
if (StringUtils.isNullOrEmpty(retryExceptions)) {
this.retryExceptionsClasses = new ArrayList<>();
} else {
this.retryExceptionsClasses = Arrays.stream(retryExceptions.split(","))
.map(exception -> (Exception) ReflectionUtils.loadClass(exception, ""))
.map(Exception::getClass)
.collect(Collectors.toList());
try {
this.retryExceptionsClasses = Arrays.stream(retryExceptions.split(","))
.map(exception -> (Exception) ReflectionUtils.loadClass(exception, ""))
.map(Exception::getClass)
.collect(Collectors.toList());
} catch (HoodieException e) {
LOG.error("Exception while loading retry exceptions classes '" + retryExceptions + "'.", e);
this.retryExceptionsClasses = new ArrayList<>();
}
}
}

public RetryHelper(String taskInfo) {
public RetryHelper(long maxRetryIntervalMs, int maxRetryNumbers, long initialRetryIntervalMs, String retryExceptions, String taskInfo) {
this(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptions);
this.taskInfo = taskInfo;
}

public RetryHelper tryWith(CheckedFunction<T> func) {
public RetryHelper<T> tryWith(CheckedFunction<T> func) {
this.func = func;
return this;
}
Expand All @@ -77,21 +82,26 @@ public T start() throws IOException {
throw e;
}
if (retries++ >= num) {
LOG.error("Still failed to " + taskInfo + " after retried " + num + " times.", e);
String message = "Still failed to " + taskInfo + " after retried " + num + " times.";
LOG.error(message, e);
if (e instanceof IOException) {
throw new IOException(message, e);
}
throw e;
}
LOG.warn("Catch Exception " + taskInfo + ", will retry after " + waitTime + " ms.", e);
LOG.warn("Catch Exception for " + taskInfo + ", will retry after " + waitTime + " ms.", e);
try {
Thread.sleep(waitTime);
} catch (InterruptedException ex) {
// ignore InterruptedException here
// ignore InterruptedException here
}
}
}

if (retries > 0) {
LOG.info("Success to " + taskInfo + " after retried " + retries + " times.");
}

return functionResult;
}

Expand Down Expand Up @@ -123,7 +133,7 @@ private long getWaitTimeExp(int retryCount) {
}

@FunctionalInterface
public interface CheckedFunction<T> {
public interface CheckedFunction<T> extends Serializable {
T get() throws IOException;
}
}
Expand Up @@ -431,6 +431,11 @@ public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throw
.withRemoteServerHost(viewStorageConfig.getRemoteViewServerHost())
.withRemoteServerPort(viewStorageConfig.getRemoteViewServerPort())
.withRemoteTimelineClientTimeoutSecs(viewStorageConfig.getRemoteTimelineClientTimeoutSecs())
.withRemoteTimelineClientRetry(viewStorageConfig.isRemoteTimelineClientRetryEnabled())
.withRemoteTimelineClientMaxRetryNumbers(viewStorageConfig.getRemoteTimelineClientMaxRetryNumbers())
.withRemoteTimelineInitialRetryIntervalMs(viewStorageConfig.getRemoteTimelineInitialRetryIntervalMs())
.withRemoteTimelineClientMaxRetryIntervalMs(viewStorageConfig.getRemoteTimelineClientMaxRetryIntervalMs())
.withRemoteTimelineClientRetryExceptions(viewStorageConfig.getRemoteTimelineClientRetryExceptions())
.build();
ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), rebuilt, conf);
return writeClient;
Expand Down
Expand Up @@ -28,12 +28,14 @@
import org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.table.view.TestHoodieTableFileSystemView;
import org.apache.hudi.exception.HoodieRemoteException;
import org.apache.hudi.timeline.service.TimelineService;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.jupiter.api.Test;

/**
* Bring up a remote Timeline Server and run all test-cases of TestHoodieTableFileSystemView against it.
Expand Down Expand Up @@ -64,4 +66,31 @@ protected SyncableFileSystemView getFileSystemView(HoodieTimeline timeline) {
view = new RemoteHoodieTableFileSystemView("localhost", server.getServerPort(), metaClient);
return view;
}

@Test
public void testRemoteHoodieTableFileSystemViewWithRetry() {
// Service is available.
view.getLatestBaseFiles();
// Shut down the service.
server.close();
try {
// Immediately fails and throws a connection refused exception.
view.getLatestBaseFiles();
} catch (HoodieRemoteException e) {
assert e.getMessage().contains("Connection refused (Connection refused)");
}
// Enable API request retry for remote file system view.
view = new RemoteHoodieTableFileSystemView(metaClient, FileSystemViewStorageConfig
.newBuilder()
.withRemoteServerHost("localhost")
.withRemoteServerPort(server.getServerPort())
.withRemoteTimelineClientRetry(true)
.withRemoteTimelineClientMaxRetryNumbers(4)
.build());
try {
view.getLatestBaseFiles();
} catch (HoodieRemoteException e) {
assert e.getMessage().equalsIgnoreCase("Still failed to Sending request after retried 4 times.");
}
}
}

0 comments on commit 660177b

Please sign in to comment.