Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BatchPropose会造成状态机恢复时重复Execute或者丢失吗 #56

Closed
niukuo opened this issue May 2, 2017 · 23 comments
Closed

BatchPropose会造成状态机恢复时重复Execute或者丢失吗 #56

niukuo opened this issue May 2, 2017 · 23 comments

Comments

@niukuo
Copy link

niukuo commented May 2, 2017

例如状态机InstanceID=1,BatchPropose返回的InstanceID是2。包含5条log。当ExecuteForCheckpoint到中间某一条时进程挂掉,重启时状态机InstanceID是2。会有3条log没有Execute。
此时应该如何保证状态机的一致性?

@chenneal
Copy link
Contributor

chenneal commented May 7, 2017

我刚刚理解错你的意思了,sorry,我重新回复你一下:phxpaxos的batch propose是多个值绑定在同一个instance里的,其实就是多个client request是成组处理的,多个状态机也是成组执行的,具体请参见SMFac :: BatchExecute(),如果状态机不全部执行完成是不会更新InstanceID的。

我还有个疑问,你为什么要特意提到ExecuteForCheckpoint函数,这个函数是用户自定义的,说Execute函数更合理吧。

@chenneal
Copy link
Contributor

chenneal commented May 7, 2017

@niukuo 补充一下:我看到了你提到了5条log,paxos log与instance是一一对应的,所以只有一条log。

@niukuo
Copy link
Author

niukuo commented May 7, 2017

可能是表述的有问题 里面的“5条log”是想表达5个值的意思
考虑如下场景:
InstanceID=2中包含5个值,对应同一个状态机(由用户实现)。当前三个值应用(Execute)到状态机后,进程挂掉了。
用户应该何时更新状态机保存的CheckpointInstanceID(当phxpaxos调用GetCheckpointInstanceID时返回给phxpaxos)才能保证状态机的一致性?

@niukuo
Copy link
Author

niukuo commented May 7, 2017

上面的场景是说用户应该如何保存Checkpoint
想到一种方法:每当Execute(或ExecuteForCheckpoint)发现InstanceID变了,将上一个InstanceID对应的一个或多个值应用到Checkpoint
但是这样对用户来说是否太复杂了?

@chenneal
Copy link
Contributor

chenneal commented May 7, 2017

@niukuo 你说的没错,checkpoint是与上层应用挂钩的,所以phxpaxos很多地方只是提供抽象接口了事。你说的batch propose的ExecuteForCheckpoint其实就是phxpaxos里的SMFac :: BatchExecuteForCheckpoint函数,可以发现,这个函数是执行完所有的客户端请求的状态机之后才返回的,所以不存在你说的多个值执行到一半的情况,肯定是原子的动作。

@niukuo
Copy link
Author

niukuo commented May 7, 2017

我再具体一点:
InstanceID=2中包含5个值
所以一定会调用5次用户状态机实现的
bool ExecuteForCheckpoint(const int iGroupIdx, const uint64_t llInstanceID, const std::string & sPaxosValue);
分别是
sm->ExecuteForCheckpoint(groupId, InstanceID = 2, 第1个值)
sm->ExecuteForCheckpoint(groupId, InstanceID = 2, 第2个值)
sm->ExecuteForCheckpoint(groupId, InstanceID = 2, 第3个值)
sm->ExecuteForCheckpoint(groupId, InstanceID = 2, 第4个值)
sm->ExecuteForCheckpoint(groupId, InstanceID = 2, 第5个值)
(此处若有问题请指出)
正常情况下是没有问题的。
问题是:
用户实现的ExecuteForCheckpoint中,如果在收到第1个值时就更新InstanceID(此时无法知道后面是否还会有相同InstanceID的值)。如果此时用户把InstanceID持久化后,进程挂掉,下次进程启动并恢复状态机时,第2个~第5个值没有调用ExecuteForCheckpoint。
而是直接ExecuteForCheckpoint(groupId, InstanceID = 3或更大, 对应的值)

一种场景是进程挂掉这种极端情况。
不考虑极端情况的另一种场景是Execute或ExecuteForCheckpoint执行到中间某个值时(例如第3个)返回了false,后面是不是会把已经应用到状态机的第1个值和第2个值再Execute一遍

@chenneal
Copy link
Contributor

chenneal commented May 7, 2017

@niukuo 我觉得你还是没理解我的意思,batch propose只会调用SMFac :: BatchExecuteForCheckpoint,然后这个函数会一次性调用每个值的SMFac的ExecuteForCheckpoint,不会是每个值单独调用ExecuteForCheckpoint,只有所有ExecuteForCheckpoint执行完毕之后才会更新InstanceID。

具体代码:

bool SMFac :: BatchExecuteForCheckpoint(const int iGroupIdx, const uint64_t llInstanceID, 
        const std::string & sBodyValue)
{
    BatchPaxosValues oBatchValues;
    bool bSucc = oBatchValues.ParseFromArray(sBodyValue.data(), sBodyValue.size());
    if (!bSucc)
    {
        PLG1Err("ParseFromArray fail, valuesize %zu", sBodyValue.size());
        return false;
    }

    for (int i = 0; i < oBatchValues.values_size(); i++)
    {
        const PaxosValue & oValue = oBatchValues.values(i);
        bool bExecuteSucc = DoExecuteForCheckpoint(iGroupIdx, llInstanceID, oValue.value(), oValue.smid());
        if (!bExecuteSucc)
        {
            return false;
        }
    }

    return true;
}

因为上层应用耦合问题,phxpaxos只是帮用户实现了这个接口,怎么调用还是由用户自己决定,所以在使用phxpaxos库时还是需要对整个流程有个整体的把握才行。

@niukuo
Copy link
Author

niukuo commented May 7, 2017

CheckpointInstanceID不是用户通过实现GetCheckpointInstanceID返回给phxpaxos的吗
在进程重启时会不会遇到我上面说的"InstanceID=2的5个值执行了1个时进程挂掉下次重启时会直接从InstanceID=3开始恢复"还是说phxpaxos会保存当前每个状态机对应的checkpoint版本?

还有代码中

    for (int i = 0; i < oBatchValues.values_size(); i++)
    {
        const PaxosValue & oValue = oBatchValues.values(i);
        bool bExecuteSucc = DoExecuteForCheckpoint(iGroupIdx, llInstanceID, oValue.value(), oValue.smid());
        if (!bExecuteSucc)
        {
            return false;
        }
    }

看了一下DoExecuteForCheckpoint里面并没有保存"当前执行到该Instance中第几个值"这个状态,是不是可能存在我上面说的“执行到第3个值时用户return false会导致1,2两个值被重复Execute”。

@niukuo
Copy link
Author

niukuo commented May 7, 2017

sm.h中没有BatchExecute(ForCheckpoint)接口,所以batch propose最终到达用户的状态机时一定是对同一个instance中的5个值执行多次ExecuteForCheckpoint我的理解是否有问题?

@chenneal
Copy link
Contributor

chenneal commented May 7, 2017

@niukuo 你理解的没错,假设5个值执行了一半但是挂了,但是并没有什么关系啊,1、2重复执行就重复执行呗,并没有影响数据一致性,因为只要保证instance执行单元是原子的就行,假如你说这么做有问题,那么就算是普通的propose也会有同样的问题,你自己类比一下吧。

@niukuo
Copy link
Author

niukuo commented May 7, 2017

大部分场景下重复执行可能问题不大 可以先略过 但是上面说的使用ExecuteForCheckpoint生成快照时 是不是可能会漏掉batch propose中的后面几个值呢 执行了第一个值后已经更新了InstanceID 后面几个值没来得及执行时进程挂掉后是不是下次启动会直接从下一个InstanceID开始执行了呢

@niukuo
Copy link
Author

niukuo commented May 7, 2017

例如进程启动后GetCheckpointInstanceID返回是2 那么下次是会从2开始执行还是3呢

@chenneal
Copy link
Contributor

chenneal commented May 7, 2017

@niukuo 所有的值的状态机执行完毕之前不会更新InstanceID,部分执行的值宕机恢复时会重复去执行。具体看代码去吧,不再回复。

@niukuo
Copy link
Author

niukuo commented May 7, 2017

好的多谢讨论

@lynncui00
Copy link
Collaborator

lynncui00 commented May 7, 2017

针对单个propose来说,执行状态机和更新instance操作并不是一个事务,也就是说这两者并不是一个原子操作,所以必定导致最后一个instance会重复执行状态机。对于batch也是一样。要解决这个问题,必定是要求paxos和状态机紧密结合,协同设计,而phxpaxos是一个状态机完全开放性的通用库,暂时无法解决这个重复执行问题。但是在大部分场景,状态机自身解决重复执行最后一个instance的问题,还是比较简单的。

@niukuo
Copy link
Author

niukuo commented May 8, 2017

以下的代码可以重现这个问题

#include <string>
#include <functional>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>

#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <phxpaxos/node.h>
#include <phxpaxos/sm.h>

using namespace std;

static const int gSMID = 123;
static const int gGroupId = 0;

class MockSM : public phxpaxos::StateMachine {
public:
    static const int mSMID = gSMID;
    uint64_t mCptVersion;
    MockSM(uint64_t cptVersion = phxpaxos::NoCheckpoint)
        : mCptVersion{cptVersion}
    {
    }
    bool Execute(const int iGroupIdx, const uint64_t llInstanceID, 
            const std::string & sPaxosValue, phxpaxos::SMCtx* poSMCtx) override
    {
        return true;
    }

    const int SMID() const override { return mSMID; }

    bool ExecuteForCheckpoint(const int iGroupIdx, const uint64_t llInstanceID,
            const std::string & sPaxosValue) override
    {
        bool ret = mCptVersion == phxpaxos::NoCheckpoint || llInstanceID > mCptVersion;
        printf("update checkpoint id = %lu, value = %s, ret = %s\n",
            llInstanceID, sPaxosValue.c_str(), ret ? "true" : "false");
        if (ret) {
            mCptVersion = llInstanceID;
        }
        return ret;
    }

    const uint64_t GetCheckpointInstanceID(const int iGroupIdx) const override
    { return mCptVersion; }
};

class ManualResetEvent {
private:
    std::mutex mMutex;
    std::condition_variable mCond;
    bool mSet{false};
public:
    void Set()
    {
        std::unique_lock<std::mutex> locker(mMutex);
        mSet = true;
        mCond.notify_all();
    }
    void Reset()
    {
        std::unique_lock<std::mutex> locker(mMutex);
        mSet = false;
    }
    void Wait()
    {
        std::unique_lock<std::mutex> locker(mMutex);
        mCond.wait(locker, [this]() { return mSet; });
    }
};

void BatchPropose(phxpaxos::Node *node, ManualResetEvent *e, std::string value)
{
    std::mutex m;
    std::unique_lock<std::mutex> locker(m);
    e->Wait();
    uint64_t id = (uint64_t)-1;
    uint32_t idx = (uint32_t)-1;
    phxpaxos::SMCtx ctx;
    ctx.m_iSMID = gSMID;
    int ret = node->BatchPropose(gGroupId, value, id, idx, &ctx);
    printf("batch propose id = %lu, idx = %u, value = %s, ret = %d\n", id, idx, value.c_str(), ret);
    ASSERT_EQ(0, ret);
}

TEST(phxpaxos, BatchTest)
{
    std::string ip = "127.0.0.1";
    int port = 12121;
    MockSM sm;
    phxpaxos::Node *node = nullptr;
    phxpaxos::Options options;

    {
        options.sLogStoragePath = "data/log_test";
        mkdir(options.sLogStoragePath.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
        options.oMyNode = phxpaxos::NodeInfo(ip, port);
        options.vecNodeInfoList.emplace_back(options.oMyNode);
        options.bUseBatchPropose = true;
        options.bUseCheckpointReplayer = true;

        phxpaxos::GroupSMInfo smInfo;
        smInfo.iGroupIdx = gGroupId;
        smInfo.vecSMList.push_back(&sm);
        smInfo.bIsUseMaster = false;

        options.vecGroupSMInfoList.push_back(smInfo);

    }

    // start a single paxos node
    int ret = phxpaxos::Node::RunNode(options, node);
    ASSERT_EQ(0, ret);

    // batch propose
    {
        ManualResetEvent e;

        std::vector<std::thread> workers;
        workers.emplace_back(&BatchPropose, node, &e, "a");
        workers.emplace_back(&BatchPropose, node, &e, "b");
        workers.emplace_back(&BatchPropose, node, &e, "c");
        workers.emplace_back(&BatchPropose, node, &e, "d");
        workers.emplace_back(&BatchPropose, node, &e, "e");

        e.Set();

        for (auto &worker : workers) {
            worker.join();
        }
    }

    node->ContinueCheckpointReplayer();
    puts("cpt replayer started");

    // propose
    {
        phxpaxos::SMCtx ctx;
        ctx.m_iSMID = gSMID;
        uint64_t id = (uint64_t)-1;
        std::string value = "single1";
        int ret = node->Propose(gGroupId, value, id, &ctx);
        printf("propose id = %lu, value = %s, ret = %d\n", id, value.c_str(), ret);
        ASSERT_EQ(0, ret);
        value = "single2";
        ret = node->Propose(gGroupId, value, id, &ctx);
        printf("propose id = %lu, value = %s, ret = %d\n", id, value.c_str(), ret);
        ASSERT_EQ(0, ret);
    }

    while (sm.mCptVersion == phxpaxos::NoCheckpoint) {
        sleep(1);
    }

    printf("now cpt version = %lu\n", sm.mCptVersion);

    delete node;
    node = nullptr;

    printf("starting new node\n");
    ret = phxpaxos::Node::RunNode(options, node);
    ASSERT_EQ(0, ret);

    node->ContinueCheckpointReplayer();
    puts("cpt replayer started");

    while (true) {
        sleep(1);
    }
}

构造了如下场景:

  1. 使用ExecuteForCheckpoint辅助生成snapshot
  2. 使用BatchPropose
  3. BatchPropose中的多个值对应同一个状态机
  4. 多个值ExecuteForCheckpoint时,中间某个值执行时返回了false
  5. 节点或进程重启

执行结果

[==========] Running 1 test from 1 test case.
[----------] Global test environment set-up.
[----------] 1 test from phxpaxos
[ RUN      ] phxpaxos.BatchTest
batch propose id = 0, idx = 2, value = b, ret = 0
batch propose id = 0, idx = 4, value = d, ret = 0
batch propose id = 0, idx = 3, value = c, ret = 0
batch propose id = 0, idx = 1, value = a, ret = 0
batch propose id = 0, idx = 0, value = e, ret = 0
cpt replayer started
propose id = 1, value = single1, ret = 0
propose id = 2, value = single2, ret = 0
update checkpoint id = 0, value = e, ret = true
update checkpoint id = 0, value = a, ret = false
now cpt version = 0
update checkpoint id = 0, value = e, ret = false
update checkpoint id = 0, value = e, ret = false
update checkpoint id = 0, value = e, ret = false
starting new node
cpt replayer started
update checkpoint id = 1, value = single1, ret = true
^C

instanceID=0的5个值按顺序分别是 e, a, b, c, d
可以看到:
e在已经被执行的情况下又被执行(这个问题状态机自身可以解决)
a执行时return false后Node重启导致bcd被跳过,状态机没有机会执行。 <---- 这是问题的重点

@lynncui00
Copy link
Collaborator

当你excute返回了false,决不能更新你的checkpoint instanceid,因为下次重启会首先依据你给予的checkpoint instanceid来作为execute的起始点。具体可以参考wiki里面的 checkpoint详解。

@niukuo
Copy link
Author

niukuo commented May 8, 2017

这里没有影响的。
由于是同一个instanceid对应的多个值,checkpoint instanceid在这里实际上没有改变。(从0变成0)
第一次return true时instanceid 由 -1变成了0

@lynncui00
Copy link
Collaborator

我指的execute返回false是针对整个instance来说的。很明显你应该在整个instance的所有value都execute后,才能更新checkpointid,而第一次的-1变成0已经是错误了。在batch模式下,如何更新chexkpoint是一个要仔细思考的事情。

@niukuo
Copy link
Author

niukuo commented May 8, 2017

状态机在执行ExecuteForCheckpoint时没有机会知道这是batch中的一个值还是一个单独的instance

@lynncui00
Copy link
Collaborator

lynncui00 commented May 8, 2017

能知道是最后一个固然更好,但目前为了API能向下兼容,暂时不会更改。对于一个有数据的状态机。这里有非常多的简单的办法来生成checkpoint,比如可以执行i的时候生成i-1的checkpoint,这里就不继续讨论了。

@niukuo
Copy link
Author

niukuo commented May 8, 2017

感谢讨论,希望能够更新一下wiki

@lynncui00
Copy link
Collaborator

关于batch使用注意事项,确实要更新一些文档。您有兴趣的话也可以写一下,我们会进行采纳:)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants