/
ReplicaForStateTransfer.cpp
97 lines (84 loc) · 4.2 KB
/
ReplicaForStateTransfer.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
// Concord
//
// Copyright (c) 2018, 2019 VMware, Inc. All Rights Reserved.
//
// This product is licensed to you under the Apache 2.0 license (the "License"). You may not use this product except in
// compliance with the Apache 2.0 License.
//
// This product may include a number of subcomponents with separate copyright notices and license terms. Your use of
// these subcomponents is subject to the terms and conditions of the sub-component's license, as noted in the LICENSE
// file.
#include "ReplicaForStateTransfer.hpp"
#include "assertUtils.hpp"
#include "Logger.hpp"
#include "NullStateTransfer.hpp"
#include "TimersSingleton.hpp"
#include "MsgHandlersRegistrator.hpp"
#include "MsgsCommunicator.hpp"
#include "ReplicasInfo.hpp"
#include "messages/StateTransferMsg.hpp"
namespace bftEngine::impl {
using namespace std::chrono_literals;
ReplicaForStateTransfer::ReplicaForStateTransfer(const ReplicaConfig &config,
IStateTransfer *stateTransfer,
std::shared_ptr<MsgsCommunicator> msgComm,
std::shared_ptr<MsgHandlersRegistrator> msgHandlerReg,
bool firstTime)
: ReplicaBase(config, msgComm, msgHandlerReg),
stateTransfer{(stateTransfer != nullptr ? stateTransfer : new NullStateTransfer())},
metric_received_state_transfers_{metrics_.RegisterCounter("receivedStateTransferMsgs")} {
msgHandlers_->registerMsgHandler(
MsgCode::StateTransfer,
std::bind(&ReplicaForStateTransfer::messageHandler<StateTransferMsg>, this, std::placeholders::_1));
if (config_.debugStatisticsEnabled) DebugStatistics::initDebugStatisticsData();
if (firstTime || !config_.debugPersistentStorageEnabled)
stateTransfer->init(kWorkWindowSize / checkpointWindowSize + 1,
config_.numOfClientProxies * config_.maxReplyMessageSize / config_.sizeOfReservedPage,
ReplicaConfigSingleton::GetInstance().GetSizeOfReservedPage());
}
void ReplicaForStateTransfer::start() {
stateTranTimer_ = TimersSingleton::getInstance().add(
5s, Timers::Timer::RECURRING, [this](Timers::Handle h) { stateTransfer->onTimer(); });
stateTransfer->startRunning(this);
ReplicaBase::start(); // msg communicator should be last in the starting chain
}
void ReplicaForStateTransfer::stop() {
// stop in reverse order
ReplicaBase::stop();
stateTransfer->stopRunning();
TimersSingleton::getInstance().cancel(stateTranTimer_);
}
template <>
void ReplicaForStateTransfer::onMessage(StateTransferMsg *m) {
metric_received_state_transfers_.Get().Inc();
size_t h = sizeof(MessageBase::Header);
stateTransfer->handleStateTransferMessage(m->body() + h, m->size() - h, m->senderId());
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// IStateTransfer
//
void ReplicaForStateTransfer::freeStateTransferMsg(char *m) {
// This method may be called by external threads
char *p = (m - sizeof(MessageBase::Header));
std::free(p);
}
void ReplicaForStateTransfer::sendStateTransferMessage(char *m, uint32_t size, uint16_t replicaId) {
// TODO(GG): if this method is invoked by an external thread, then send an "internal message" to the commands
// processing thread
MessageBase *p = new MessageBase(config_.replicaId, MsgCode::StateTransfer, size + sizeof(MessageBase::Header));
char *x = p->body() + sizeof(MessageBase::Header);
memcpy(x, m, size);
send(p, replicaId);
delete p;
}
void ReplicaForStateTransfer::onTransferringComplete(int64_t checkpointNumberOfNewState) {
// TODO(GG): if this method is invoked by an external thread, then send an "internal message" to the commands
// processing thread
onTransferringCompleteImp(checkpointNumberOfNewState * checkpointWindowSize);
}
void ReplicaForStateTransfer::changeStateTransferTimerPeriod(uint32_t timerPeriodMilli) {
// TODO(GG): if this method is invoked by an external thread, then send an "internal message" to the commands
// processing thread
TimersSingleton::getInstance().reset(stateTranTimer_, std::chrono::milliseconds(timerPeriodMilli));
}
} // namespace bftEngine::impl