Skip to content

Conversation

@Cczzzz
Copy link
Contributor

@Cczzzz Cczzzz commented Sep 25, 2021

Too much text, so I write in Chinese。
我发现rocketmq 在 dledger 模式下 性能不是很好,可以看:apache/rocketmq#3315
rocketmq, 4.9.1 ,16c, 32g ,1wtps,p99 很难到达 2ms 以下,最好的情况也需要 3ms。
我重新编译了代码发现瓶颈在 raft 同步数据上,写完pageccache 后等待同步完成平均需要 1.7ms。
我在dledger 代码中发现了几个问题,我尝试修改了dledger 的代码,之后进行了测试,3wtps 下 p99 到达了 1-2ms。
平均耗时 0.8-1ms。大约提升了1ms。

修改项:
1.
src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java 745
将waitForRunning(1); 改为 Thread.yield();
设计思想应该是 如果一直有消息进入推动 dLedgerStore.getLedgerEndIndex() 的数值变大,就不会break 循环去执行waitForRunning(1);
但经过我的测试 在 1w tps 下 waitForRunning(1); 执行的非常频繁每次都会休眠 1ms ,因为有新消息时并不会 wakeup(); 唤醒线程。我猜可能是 getLedgerEndIndex() 访问的 ledgerEndIndex 属性没有被 volatile 修饰?

481行
if (writeIndex > dLedgerStore.getLedgerEndIndex()) {
doCommit();
doCheckAppendResponse();
break; //break 循环去执行 waitForRunning(1);
}

src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java 799
增加 wakeup(); 在有新消息时唤醒 线程

src/main/java/io/openmessaging/storage/dledger/store/file/DLedgerMmapFileStore.java
提出一个新方法用于获取 消息的 pos 和 size
原来的方法从index 文件读完就已经能得知pos 和 size ,还有钱commitlog 里面读消息体加上解码

但是getCommittedIndex 只是需要 pos 和size。
这样修改完成后相比之前少了一个 check
PreConditions.check(pos == dLedgerEntry.getPos(), DLedgerResponseCode.DISK_ERROR, "%d != %d", pos, dLedgerEntry.getPos());
是否会有问题呢

@Cczzzz
Copy link
Contributor Author

Cczzzz commented Sep 25, 2021

我在本地 执行mvn -B package jacoco:report coveralls:report 是下面这个错误
Failed tests:
BatchPushTest.testBatchPushTruncate:251 expected:<4> but was:<9>

但是 这个test 本地执行没有问题

LeaderElectorTest.testThreeServer:91 expected:<2> but was:<1>

doCompare();
}
waitForRunning(1);
Thread.yield();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with the changes here

future.complete(buildResponse(request, DLedgerResponseCode.UNEXPECTED_ARGUMENT.getCode()));
break;
}
wakeup();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t understand why wakeup called here, the thread is already working.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

       long nextIndex = dLedgerStore.getLedgerEndIndex() + 1;
                Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair = writeRequestMap.remove(nextIndex);
                if (pair == null) {
                    checkAbnormalFuture(dLedgerStore.getLedgerEndIndex());
                    waitForRunning(1);
                    return;
                }
                PushEntryRequest request = pair.getKey();
                if (dLedgerConfig.isEnableBatchPush()) {
                    handleDoBatchAppend(nextIndex, request, pair.getValue());
                } else {
                    handleDoAppend(nextIndex, request, pair.getValue());
                }

src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher #1031 line

when has no new message ,writeRequestMap is empty,but when new message arrive,put in writeRequestMap . It didn't wake up the thread

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

       long nextIndex = dLedgerStore.getLedgerEndIndex() + 1;
                Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair = writeRequestMap.remove(nextIndex);
                if (pair == null) {
                    checkAbnormalFuture(dLedgerStore.getLedgerEndIndex());
                    waitForRunning(1);
                    return;
                }
                PushEntryRequest request = pair.getKey();
                if (dLedgerConfig.isEnableBatchPush()) {
                    handleDoBatchAppend(nextIndex, request, pair.getValue());
                } else {
                    handleDoAppend(nextIndex, request, pair.getValue());
                }

src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher #1031 line

when has no new message ,writeRequestMap is empty,but when new message arrive,put in writeRequestMap . It didn't wake up the thread

ok, I understand

@RongtongJin
Copy link
Contributor

我在本地 执行mvn -B package jacoco:report coveralls:report 是下面这个错误
Failed tests:
BatchPushTest.testBatchPushTruncate:251 expected:<4> but was:<9>

但是 这个test 本地执行没有问题

LeaderElectorTest.testThreeServer:91 expected:<2> but was:<1>

这个应该是单测sleep时间不足导致的,我来处理一下

@Cczzzz
Copy link
Contributor Author

Cczzzz commented Sep 26, 2021

check failed ,how do i fix it

@RongtongJin
Copy link
Contributor

check failed ,how do i fix it

It seems to have nothing to do with your code

@Cczzzz
Copy link
Contributor Author

Cczzzz commented Sep 26, 2021

check failed ,how do i fix it

It seems to have nothing to do with your code

It looks like this

@Cczzzz
Copy link
Contributor Author

Cczzzz commented Sep 27, 2021

Will the changes be merged

@RongtongJin
Copy link
Contributor

Will the changes be merged

Yes

@RongtongJin RongtongJin changed the title Performance optimization [ISSUE #97] Performance optimization Sep 27, 2021
@RongtongJin RongtongJin merged commit 591a82c into openmessaging:master Sep 27, 2021
@RongtongJin
Copy link
Contributor

Hi @Cczzzz 关于第一个修改项,发现将waitForRunning(1); 改为 Thread.yield();后,即使没有消息,该线程也会将一个CPU跑满,因此考虑回退到将waitForRunning(1)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants