Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/segment storage improvement #399

Merged
merged 23 commits into from Mar 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
414d0be
(fix) RocksDBSegmentLogStorage can't be read when sync is false
killme2008 Feb 18, 2020
1a79a60
(feat) Impl concurrent writing for segment log storage
killme2008 Feb 18, 2020
d6f08f3
(feat) use segment log storage as default log storage
killme2008 Feb 18, 2020
97f8286
(fix) forgot to reset diskId after loading snapshot
killme2008 Feb 18, 2020
938c808
(feat) tweak RocksDBSegmentLogStorage
killme2008 Feb 18, 2020
9df2668
(feat) refactor RocksDBSegmentLogStorage: use a new data format and p…
killme2008 Feb 21, 2020
8f750b9
(feat) Adds LogManagerWithSegmentLogStorageTest
killme2008 Feb 21, 2020
b4649ca
(feat) Named SegmentAllocator thread
killme2008 Feb 21, 2020
7774b0d
(fix) example log
killme2008 Feb 21, 2020
c4008f8
(feat) tweak test parameters and warn when using RocksDBSegmentLogSto…
killme2008 Feb 21, 2020
c997a46
(feat) make write executor configurable
killme2008 Feb 21, 2020
3cf3be0
(feat) warmup segments for test
killme2008 Feb 21, 2020
ae815fc
(fix) stackoverflow when fsm caller is overload
killme2008 Feb 24, 2020
3c5164c
(feat) Impl segment swap in/out
killme2008 Feb 26, 2020
7fd7ce2
(fix) panic when RocksDBSegmentLogStorage#onTruncateSuffix log not fo…
killme2008 Feb 26, 2020
3953e88
(feat) remove mlock,because it has oom risk
killme2008 Feb 26, 2020
aeff8fd
(feat) use filename in checkpoint instead of absolute path
killme2008 Feb 26, 2020
a4b0dbc
(feat) use RocksDBLogStorage as default log storage
killme2008 Feb 27, 2020
a221339
(feat) Destroy segment file out of lock
killme2008 Feb 27, 2020
5b4a04c
(feat) minor changes by CR.
killme2008 Mar 2, 2020
8fd9584
(feat) Introduce WriteContext to wrap CountDownEvents
killme2008 Mar 2, 2020
c6702f6
(fix) typo
killme2008 Mar 3, 2020
932a32f
(feat) minor changes by CR.
killme2008 Mar 4, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions jraft-core/pom.xml
Expand Up @@ -36,6 +36,11 @@
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
</dependency>
<!-- jna -->
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
</dependency>
<!-- jctools -->
<dependency>
<groupId>org.jctools</groupId>
Expand Down
Expand Up @@ -233,7 +233,7 @@ private boolean enqueueTask(final EventTranslator<ApplyTask> tpl) {
return false;
}
if (!this.taskQueue.tryPublishEvent(tpl)) {
onError(new RaftException(ErrorType.ERROR_TYPE_STATE_MACHINE, new Status(RaftError.EBUSY,
setError(new RaftException(ErrorType.ERROR_TYPE_STATE_MACHINE, new Status(RaftError.EBUSY,
fengjiachun marked this conversation as resolved.
Show resolved Hide resolved
"FSMCaller is overload.")));
return false;
}
Expand Down
Expand Up @@ -620,6 +620,9 @@ public void setSnapshot(final SnapshotMeta meta) {
if (this.lastSnapshotId.compareTo(this.appliedId) > 0) {
this.appliedId = this.lastSnapshotId.copy();
}
if (this.lastSnapshotId.compareTo(this.diskId) > 0) {
this.diskId = this.lastSnapshotId.copy();
}
fengjiachun marked this conversation as resolved.
Show resolved Hide resolved

if (term == 0) {
// last_included_index is larger than last_index
Expand Down
Expand Up @@ -84,7 +84,48 @@ public class RocksDBLogStorage implements LogStorage, Describer {
*/
private interface WriteBatchTemplate {

void execute(WriteBatch batch) throws RocksDBException, IOException;
void execute(WriteBatch batch) throws RocksDBException, IOException, InterruptedException;
}

/**
* A write context
* @author boyan(boyan@antfin.com)
*
*/
public interface WriteContext {
/**
* Start a sub job.
*/
default void startJob() {
}

/**
* Finish a sub job
*/
default void finishJob() {
}

/**
* Set an exception to context.
* @param e
*/
default void setError(final Exception e) {
}

/**
* Wait for all sub jobs finish.
*/
default void joinAll() throws InterruptedException, IOException {
}
}

/**
* An empty write context
* @author boyan(boyan@antfin.com)
*
*/
protected static class EmptyWriteContext implements WriteContext {
static EmptyWriteContext INSTANCE = new EmptyWriteContext();
}

private final String path;
Expand Down Expand Up @@ -122,11 +163,11 @@ public static DBOptions createDBOptions() {

public static ColumnFamilyOptions createColumnFamilyOptions() {
final BlockBasedTableConfig tConfig = StorageOptionsFactory
.getRocksDBTableFormatConfig(RocksDBLogStorage.class);
.getRocksDBTableFormatConfig(RocksDBLogStorage.class);
return StorageOptionsFactory.getRocksDBColumnFamilyOptions(RocksDBLogStorage.class) //
.useFixedLengthPrefixExtractor(8) //
.setTableFormatConfig(tConfig) //
.setMergeOperator(new StringAppendOperator());
.useFixedLengthPrefixExtractor(8) //
.setTableFormatConfig(tConfig) //
.setMergeOperator(new StringAppendOperator());
}

@Override
Expand Down Expand Up @@ -289,6 +330,10 @@ private boolean executeBatch(final WriteBatchTemplate template) {
} catch (final IOException e) {
LOG.error("Execute batch failed with io exception.", e);
return false;
} catch (final InterruptedException e) {
LOG.error("Execute batch failed with interrupt.", e);
Thread.currentThread().interrupt();
return false;
} finally {
this.readLock.unlock();
}
Expand Down Expand Up @@ -430,10 +475,11 @@ private void addConfBatch(final LogEntry entry, final WriteBatch batch) throws R
batch.put(this.confHandle, ks, content);
}

private void addDataBatch(final LogEntry entry, final WriteBatch batch) throws RocksDBException, IOException {
private void addDataBatch(final LogEntry entry, final WriteBatch batch,
final WriteContext ctx) throws RocksDBException, IOException, InterruptedException {
final long logIndex = entry.getId().getIndex();
final byte[] content = this.logEntryEncoder.encode(entry);
batch.put(this.defaultHandle, getKeyBytes(logIndex), onDataAppend(logIndex, content));
batch.put(this.defaultHandle, getKeyBytes(logIndex), onDataAppend(logIndex, content, ctx));
}

@Override
Expand All @@ -447,27 +493,31 @@ public boolean appendEntry(final LogEntry entry) {
LOG.warn("DB not initialized or destroyed.");
return false;
}
final WriteContext writeCtx = newWriteContext();
final long logIndex = entry.getId().getIndex();
final byte[] valueBytes = this.logEntryEncoder.encode(entry);
final byte[] newValueBytes = onDataAppend(logIndex, valueBytes);
final byte[] newValueBytes = onDataAppend(logIndex, valueBytes, writeCtx);
writeCtx.startJob();
this.db.put(this.defaultHandle, this.writeOptions, getKeyBytes(logIndex), newValueBytes);
writeCtx.joinAll();
if (newValueBytes != valueBytes) {
doSync();
}
return true;
} catch (final RocksDBException | IOException e) {
LOG.error("Fail to append entry.", e);
return false;
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
return false;
} finally {
this.readLock.unlock();
}
}
}

private void doSync() throws IOException {
if (this.sync) {
onSync();
}
private void doSync() throws IOException, InterruptedException {
onSync();
}

@Override
Expand All @@ -477,14 +527,17 @@ public int appendEntries(final List<LogEntry> entries) {
}
final int entriesCount = entries.size();
final boolean ret = executeBatch(batch -> {
final WriteContext writeCtx = newWriteContext();
for (int i = 0; i < entriesCount; i++) {
final LogEntry entry = entries.get(i);
if (entry.getType() == EntryType.ENTRY_TYPE_CONFIGURATION) {
addConfBatch(entry, batch);
} else {
addDataBatch(entry, batch);
writeCtx.startJob();
addDataBatch(entry, batch, writeCtx);
}
}
writeCtx.joinAll();
doSync();
});

Expand Down Expand Up @@ -614,13 +667,17 @@ protected void onReset(final long nextLogIndex) {
* @param firstIndexKept the first index to kept
*/
protected void onTruncatePrefix(final long startIndex, final long firstIndexKept) throws RocksDBException,
IOException {
IOException {
}

/**
* Called when sync data into file system.
*/
protected void onSync() throws IOException {
protected void onSync() throws IOException, InterruptedException {
}

protected boolean isSync() {
return this.sync;
}

/**
Expand All @@ -631,14 +688,20 @@ protected void onSync() throws IOException {
protected void onTruncateSuffix(final long lastIndexKept) throws RocksDBException, IOException {
}

protected WriteContext newWriteContext() {
return EmptyWriteContext.INSTANCE;
}

/**
* Called before appending data entry.
*
* @param logIndex the log index
* @param value the data value in log entry.
* @return the new value
*/
protected byte[] onDataAppend(final long logIndex, final byte[] value) throws IOException {
protected byte[] onDataAppend(final long logIndex, final byte[] value,
final WriteContext ctx) throws IOException, InterruptedException {
ctx.finishJob();
return value;
}

Expand Down
Expand Up @@ -17,9 +17,9 @@
package com.alipay.sofa.jraft.storage.log;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;

import com.alipay.sofa.jraft.util.Utils;
import java.util.Date;

/**
* Abort file
Expand All @@ -40,13 +40,25 @@ public AbortFile(final String path) {
}

public boolean create() throws IOException {
return new File(this.path) //
.createNewFile();
return writeDate();
}

public boolean touch() {
return new File(this.path) //
.setLastModified(Utils.nowMs());
@SuppressWarnings("deprecation")
private boolean writeDate() throws IOException {
final File file = new File(this.path);
if (file.createNewFile()) {
try (FileWriter writer = new FileWriter(file, false)) {
writer.write(new Date().toGMTString());
writer.write(System.lineSeparator());
}
return true;
} else {
return false;
}
}

public boolean touch() throws IOException {
return writeDate();
}

public boolean exists() {
Expand Down
Expand Up @@ -23,6 +23,7 @@

import com.alipay.sofa.jraft.entity.LocalFileMetaOutter.LocalFileMeta;
import com.alipay.sofa.jraft.storage.io.ProtoBufFile;
import com.alipay.sofa.jraft.util.AsciiStringUtil;
import com.alipay.sofa.jraft.util.Bits;
import com.google.protobuf.ZeroByteStringHelper;

Expand All @@ -32,32 +33,48 @@
* @author boyan(boyan@antfin.com)
*/
public class CheckpointFile {

/**
* firstLogIndex(8 B) + commitPos (4 B)
*/
private static final int CHECKPOINT_METADATA_SIZE = 12;

/**
* Checkpoint metadata info.
*
* @author boyan(boyan@antfin.com)
*/
public static final class Checkpoint {
// Segment file start offset
public final long firstLogIndex;
// Segment file name
public String segFilename;
// Segment file current commit position.
public final int committedPos;
public int committedPos;

public Checkpoint(final long firstLogIndex, final int committedPos) {
public Checkpoint(final String segFilename, final int committedPos) {
super();
this.firstLogIndex = firstLogIndex;
this.segFilename = segFilename;
this.committedPos = committedPos;
}

/**
* commitPos (4 bytes) + path(4 byte len + string bytes)
*/
byte[] encode() {
byte[] ps = AsciiStringUtil.unsafeEncode(this.segFilename);
byte[] bs = new byte[8 + ps.length];
Bits.putInt(bs, 0, this.committedPos);
Bits.putInt(bs, 4, ps.length);
System.arraycopy(ps, 0, bs, 8, ps.length);
return bs;
}

boolean decode(final byte[] bs) {
if (bs.length < 8) {
return false;
}
this.committedPos = Bits.getInt(bs, 0);
int len = Bits.getInt(bs, 4);
this.segFilename = AsciiStringUtil.unsafeDecode(bs, 8, len);
return this.committedPos >= 0 && !this.segFilename.isEmpty();
}

@Override
public String toString() {
return "Checkpoint [firstLogIndex=" + this.firstLogIndex + ", committedPos=" + this.committedPos + "]";
return "Checkpoint [segFilename=" + this.segFilename + ", committedPos=" + this.committedPos + "]";
}
}

Expand All @@ -78,9 +95,7 @@ public CheckpointFile(final String path) {

public synchronized boolean save(final Checkpoint checkpoint) throws IOException {
final ProtoBufFile file = new ProtoBufFile(this.path);
final byte[] data = new byte[CHECKPOINT_METADATA_SIZE];
Bits.putLong(data, 0, checkpoint.firstLogIndex);
Bits.putInt(data, 8, checkpoint.committedPos);
final byte[] data = checkpoint.encode();

final LocalFileMeta meta = LocalFileMeta.newBuilder() //
.setUserMeta(ZeroByteStringHelper.wrap(data)) //
Expand All @@ -94,8 +109,10 @@ public Checkpoint load() throws IOException {
final LocalFileMeta meta = file.load();
if (meta != null) {
final byte[] data = meta.getUserMeta().toByteArray();
assert (data.length == CHECKPOINT_METADATA_SIZE);
return new Checkpoint(Bits.getLong(data, 0), Bits.getInt(data, 8));
Checkpoint checkpoint = new Checkpoint(null, -1);
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
if (checkpoint.decode(data)) {
return checkpoint;
}
}
return null;
}
Expand Down