From ba66daa257f1520011b8a7f149d3fb3dc8f301bb Mon Sep 17 00:00:00 2001 From: qiujiayu <153163285@qq.com> Date: Wed, 15 Mar 2023 14:50:00 +0800 Subject: [PATCH] Snapshot temporary directory support configurable (#933) --- .../sofa/jraft/JRaftServiceFactory.java | 5 +- .../core/DefaultJRaftServiceFactory.java | 7 ++- .../com/alipay/sofa/jraft/core/NodeImpl.java | 1 - .../alipay/sofa/jraft/option/NodeOptions.java | 10 ++++ .../jraft/option/SnapshotExecutorOptions.java | 10 ---- .../snapshot/SnapshotExecutorImpl.java | 16 ++++--- .../snapshot/local/LocalSnapshotStorage.java | 47 +++++++++++++------ .../jraft/storage/SnapshotExecutorTest.java | 9 ++-- .../local/LocalSnapshotStorageTest.java | 2 +- 9 files changed, 67 insertions(+), 40 deletions(-) diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/JRaftServiceFactory.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/JRaftServiceFactory.java index 84a78fca2..d393f017b 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/JRaftServiceFactory.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/JRaftServiceFactory.java @@ -40,11 +40,10 @@ public interface JRaftServiceFactory { /** * Creates a raft snapshot storage - * @param uri The snapshot storage uri from {@link NodeOptions#getSnapshotUri()} - * @param raftOptions the raft options. + * @param nodeOptions the node options. * @return storage to store state machine snapshot. */ - SnapshotStorage createSnapshotStorage(final String uri, final RaftOptions raftOptions); + SnapshotStorage createSnapshotStorage(final NodeOptions nodeOptions); /** * Creates a raft meta storage. diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/DefaultJRaftServiceFactory.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/DefaultJRaftServiceFactory.java index 26321f49f..1db5d4450 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/DefaultJRaftServiceFactory.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/DefaultJRaftServiceFactory.java @@ -16,6 +16,7 @@ */ package com.alipay.sofa.jraft.core; +import com.alipay.sofa.jraft.option.NodeOptions; import org.apache.commons.lang.StringUtils; import com.alipay.sofa.jraft.JRaftServiceFactory; @@ -51,9 +52,11 @@ public LogStorage createLogStorage(final String uri, final RaftOptions raftOptio } @Override - public SnapshotStorage createSnapshotStorage(final String uri, final RaftOptions raftOptions) { + public SnapshotStorage createSnapshotStorage(final NodeOptions nodeOptions) { + String uri = nodeOptions.getSnapshotUri(); + String tempUri = nodeOptions.getSnapshotTempUri(); Requires.requireTrue(!StringUtils.isBlank(uri), "Blank snapshot storage uri."); - return new LocalSnapshotStorage(uri, raftOptions); + return new LocalSnapshotStorage(uri, tempUri, nodeOptions.getRaftOptions()); } @Override diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java index ff87d27ca..d01977305 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java @@ -563,7 +563,6 @@ private boolean initSnapshotStorage() { } this.snapshotExecutor = new SnapshotExecutorImpl(); final SnapshotExecutorOptions opts = new SnapshotExecutorOptions(); - opts.setUri(this.options.getSnapshotUri()); opts.setFsmCaller(this.fsmCaller); opts.setNode(this); opts.setLogManager(this.logManager); diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/option/NodeOptions.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/option/NodeOptions.java index dbc0b90c8..2fe7b6b2a 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/option/NodeOptions.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/option/NodeOptions.java @@ -104,6 +104,8 @@ public class NodeOptions extends RpcOptions implements Copiable { // Describe a specific SnapshotStorage in format ${type}://${parameters} private String snapshotUri; + private String snapshotTempUri; + // If enable, we will filter duplicate files before copy remote snapshot, // to avoid useless transmission. Two files in local and remote are duplicate, // only if they has the same filename and the same checksum (stored in file meta). @@ -366,6 +368,14 @@ public void setSnapshotUri(final String snapshotUri) { this.snapshotUri = snapshotUri; } + public String getSnapshotTempUri() { + return snapshotTempUri; + } + + public void setSnapshotTempUri(String snapshotTempUri) { + this.snapshotTempUri = snapshotTempUri; + } + public boolean isFilterBeforeCopyRemote() { return this.filterBeforeCopyRemote; } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/option/SnapshotExecutorOptions.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/option/SnapshotExecutorOptions.java index d8ef7c843..6d2b2d7c1 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/option/SnapshotExecutorOptions.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/option/SnapshotExecutorOptions.java @@ -31,8 +31,6 @@ */ public class SnapshotExecutorOptions { - // URI of SnapshotStorage - private String uri; private FSMCaller fsmCaller; private NodeImpl node; private LogManager logManager; @@ -49,14 +47,6 @@ public void setSnapshotThrottle(SnapshotThrottle snapshotThrottle) { this.snapshotThrottle = snapshotThrottle; } - public String getUri() { - return this.uri; - } - - public void setUri(String uri) { - this.uri = uri; - } - public FSMCaller getFsmCaller() { return this.fsmCaller; } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/SnapshotExecutorImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/SnapshotExecutorImpl.java index b429dc78a..af5218d28 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/SnapshotExecutorImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/SnapshotExecutorImpl.java @@ -17,11 +17,13 @@ package com.alipay.sofa.jraft.storage.snapshot; import java.io.IOException; +import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import com.alipay.sofa.jraft.option.NodeOptions; import com.alipay.sofa.jraft.util.ThreadPoolsFactory; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; @@ -224,16 +226,18 @@ public SnapshotReader start() { @Override public boolean init(final SnapshotExecutorOptions opts) { - if (StringUtils.isBlank(opts.getUri())) { + this.node = opts.getNode(); + Objects.requireNonNull(this.node, "Node is null."); + NodeOptions nodeOptions = this.node.getOptions(); + String snapshotUri = nodeOptions.getSnapshotUri(); + if (StringUtils.isBlank(snapshotUri)) { LOG.error("Snapshot uri is empty."); return false; } this.logManager = opts.getLogManager(); this.fsmCaller = opts.getFsmCaller(); - this.node = opts.getNode(); this.term = opts.getInitTerm(); - this.snapshotStorage = this.node.getServiceFactory().createSnapshotStorage(opts.getUri(), - this.node.getRaftOptions()); + this.snapshotStorage = this.node.getServiceFactory().createSnapshotStorage(nodeOptions); if (opts.isFilterBeforeCopyRemote()) { this.snapshotStorage.setFilterBeforeCopyRemote(); } @@ -257,7 +261,7 @@ public boolean init(final SnapshotExecutorOptions opts) { } this.loadingSnapshotMeta = reader.load(); if (this.loadingSnapshotMeta == null) { - LOG.error("Fail to load meta from {}.", opts.getUri()); + LOG.error("Fail to load meta from {}.", snapshotUri); Utils.closeQuietly(reader); return false; } @@ -276,7 +280,7 @@ public boolean init(final SnapshotExecutorOptions opts) { Utils.closeQuietly(reader); } if (!done.status.isOk()) { - LOG.error("Fail to load snapshot from {}, FirstSnapshotLoadDone status is {}.", opts.getUri(), done.status); + LOG.error("Fail to load snapshot from {}, FirstSnapshotLoadDone status is {}.", snapshotUri, done.status); return false; } return true; diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/local/LocalSnapshotStorage.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/local/LocalSnapshotStorage.java index ff1c56aa1..0fcafd979 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/local/LocalSnapshotStorage.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/local/LocalSnapshotStorage.java @@ -18,6 +18,7 @@ import java.io.File; import java.io.IOException; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -28,6 +29,7 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +50,7 @@ * Snapshot storage based on local file storage. * * @author boyan (boyan@alibaba-inc.com) - * + *

* 2018-Mar-13 2:11:30 PM */ public class LocalSnapshotStorage implements SnapshotStorage { @@ -58,6 +60,7 @@ public class LocalSnapshotStorage implements SnapshotStorage { private static final String TEMP_PATH = "temp"; private final ConcurrentMap refMap = new ConcurrentHashMap<>(); private final String path; + private final String tempPath; private Endpoint addr; private boolean filterBeforeCopyRemote; private long lastSnapshotIndex; @@ -78,14 +81,33 @@ public void setServerAddr(Endpoint addr) { this.addr = addr; } - public LocalSnapshotStorage(String path, RaftOptions raftOptions) { + public LocalSnapshotStorage(String path, String tempPath, RaftOptions raftOptions) { super(); this.path = path; + if (StringUtils.isEmpty(tempPath)) { + this.tempPath = buildTempPath(this.path); + } else { + File pathFile = new File(path); + File tempPathFile = new File(tempPath); + + String pathAbsolutePath = pathFile.getAbsolutePath(); + String tempPathAbsolutePath = tempPathFile.getAbsolutePath(); + if (pathAbsolutePath.equals(tempPathAbsolutePath) || pathAbsolutePath.startsWith(tempPathAbsolutePath)) { + this.tempPath = buildTempPath(this.path); + } else { + this.tempPath = tempPath; + } + LOG.info("The snapshot temp path is {}", this.tempPath); + } this.lastSnapshotIndex = 0; this.raftOptions = raftOptions; this.lock = new ReentrantLock(); } + private String buildTempPath(String path) { + return Paths.get(path, TEMP_PATH).toString(); + } + public long getLastSnapshotIndex() { this.lock.lock(); try { @@ -108,13 +130,12 @@ public boolean init(final Void v) { // delete temp snapshot if (!this.filterBeforeCopyRemote) { - final String tempSnapshotPath = this.path + File.separator + TEMP_PATH; - final File tempFile = new File(tempSnapshotPath); + final File tempFile = new File(this.tempPath); if (tempFile.exists()) { try { FileUtils.forceDelete(tempFile); } catch (final IOException e) { - LOG.error("Fail to delete temp snapshot path {}.", tempSnapshotPath, e); + LOG.error("Fail to delete temp snapshot path {}.", this.tempPath, e); return false; } } @@ -223,7 +244,6 @@ void close(final LocalSnapshotWriter writer, final boolean keepDataOnError) thro break; } // rename temp to new - final String tempPath = this.path + File.separator + TEMP_PATH; final String newPath = getSnapshotPath(newIndex); if (!destroySnapshot(newPath)) { @@ -232,11 +252,11 @@ void close(final LocalSnapshotWriter writer, final boolean keepDataOnError) thro ioe = new IOException("Fail to delete new snapshot path: " + newPath); break; } - LOG.info("Renaming {} to {}.", tempPath, newPath); - if (!Utils.atomicMoveFile(new File(tempPath), new File(newPath), true)) { - LOG.error("Renamed temp snapshot failed, from path {} to path {}.", tempPath, newPath); + LOG.info("Renaming {} to {}.", this.tempPath, newPath); + if (!Utils.atomicMoveFile(new File(this.tempPath), new File(newPath), true)) { + LOG.error("Renamed temp snapshot failed, from path {} to path {}.", this.tempPath, newPath); ret = RaftError.EIO.getNumber(); - ioe = new IOException("Fail to rename temp snapshot from: " + tempPath + " to: " + newPath); + ioe = new IOException("Fail to rename temp snapshot from: " + this.tempPath + " to: " + newPath); break; } ref(newIndex); @@ -283,15 +303,14 @@ public SnapshotWriter create(final boolean fromEmpty) { LocalSnapshotWriter writer = null; // noinspection ConstantConditions do { - final String snapshotPath = this.path + File.separator + TEMP_PATH; // delete temp // TODO: Notify watcher before deleting - if (new File(snapshotPath).exists() && fromEmpty) { - if (!destroySnapshot(snapshotPath)) { + if (new File(this.tempPath).exists() && fromEmpty) { + if (!destroySnapshot(this.tempPath)) { break; } } - writer = new LocalSnapshotWriter(snapshotPath, this, this.raftOptions); + writer = new LocalSnapshotWriter(this.tempPath, this, this.raftOptions); if (!writer.init(null)) { LOG.error("Fail to init snapshot writer."); writer = null; diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/SnapshotExecutorTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/SnapshotExecutorTest.java index d81da5b54..526f1f51b 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/SnapshotExecutorTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/SnapshotExecutorTest.java @@ -118,19 +118,22 @@ public void setup() throws Exception { this.uri = "remote://" + this.hostPort + "/" + this.readerId; this.copyOpts = new CopyOptions(); + NodeOptions nodeOptions = new NodeOptions(); + nodeOptions.setSnapshotUri(this.path); Mockito.when(this.node.getGroupId()).thenReturn(GROUP_ID); - Mockito.when(this.node.getRaftOptions()).thenReturn(new RaftOptions()); - Mockito.when(this.node.getOptions()).thenReturn(new NodeOptions()); + Mockito.when(this.node.getRaftOptions()).thenReturn(this.raftOptions); + Mockito.when(this.node.getOptions()).thenReturn(nodeOptions); Mockito.when(this.node.getRpcService()).thenReturn(this.raftClientService); Mockito.when(this.node.getTimerManager()).thenReturn(this.timerManager); Mockito.when(this.node.getServiceFactory()).thenReturn(DefaultJRaftServiceFactory.newInstance()); + this.executor = new SnapshotExecutorImpl(); final SnapshotExecutorOptions opts = new SnapshotExecutorOptions(); opts.setFsmCaller(this.fSMCaller); opts.setInitTerm(0); opts.setNode(this.node); opts.setLogManager(this.logManager); - opts.setUri(this.path); + this.addr = new Endpoint("localhost", 8081); opts.setAddr(this.addr); assertTrue(this.executor.init(opts)); diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/snapshot/local/LocalSnapshotStorageTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/snapshot/local/LocalSnapshotStorageTest.java index 5d4077fff..0fdf4f796 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/snapshot/local/LocalSnapshotStorageTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/snapshot/local/LocalSnapshotStorageTest.java @@ -51,7 +51,7 @@ public void setup() throws Exception { .setLastIncludedTerm(1).build()); this.table.saveToFile(snapshotPath + File.separator + Snapshot.JRAFT_SNAPSHOT_META_FILE); - this.snapshotStorage = new LocalSnapshotStorage(path, new RaftOptions()); + this.snapshotStorage = new LocalSnapshotStorage(path, null, new RaftOptions()); assertTrue(this.snapshotStorage.init(null)); }