Skip to content
Permalink
Browse files

Report consensus phase changes in the server subscription stream

  • Loading branch information
movitto authored and manojsdoshi committed Aug 17, 2019
1 parent 726dd69 commit 15c5f9c1111eeea0743dbd9d9b0028756ff72ade
@@ -455,6 +455,12 @@ class RCLConsensus
return adaptor_.mode();
}

ConsensusPhase
phase() const
{
return consensus_.phase();
}

//! @see Consensus::getJson
Json::Value
getJson(bool full) const;
@@ -385,6 +385,7 @@ class NetworkOPsImp final
boost::optional<std::chrono::milliseconds> consensusDelay) override;
uint256 getConsensusLCL () override;
void reportFeeChange () override;
void reportConsensusStateChange(ConsensusPhase phase);

void updateLocalTx (ReadView const& view) override
{
@@ -486,6 +487,9 @@ class NetworkOPsImp final
bool unsubPeerStatus (std::uint64_t uListener) override;
void pubPeerStatus (std::function<Json::Value(void)> const&) override;

bool subConsensus (InfoSub::ref ispListener) override;
bool unsubConsensus (std::uint64_t uListener) override;

InfoSub::pointer findRpcSub (std::string const& strUrl) override;
InfoSub::pointer addRpcSub (
std::string const& strUrl, InfoSub::ref) override;
@@ -543,6 +547,7 @@ class NetworkOPsImp final
bool isAccepted);

void pubServer ();
void pubConsensus (ConsensusPhase phase);

std::string getHostId (bool forAdmin);

@@ -570,6 +575,8 @@ class NetworkOPsImp final

RCLConsensus mConsensus;

ConsensusPhase mLastConsensusPhase;

LedgerMaster& m_ledgerMaster;
std::shared_ptr<InboundLedger> mAcquiringLedger;

@@ -587,9 +594,10 @@ class NetworkOPsImp final
sRTTransactions, // All proposed and accepted transactions.
sValidations, // Received validations.
sPeerStatus, // Peer status changes.
sConsensusPhase, // Consensus phase

sLastEntry = sPeerStatus // as this name implies, any new entry must
// be ADDED ABOVE this one
sLastEntry = sConsensusPhase // as this name implies, any new entry must
// be ADDED ABOVE this one
};
std::array<SubMapType, SubTypes::sLastEntry+1> mStreamMaps;

@@ -769,6 +777,13 @@ void NetworkOPsImp::processHeartbeatTimer ()

mConsensus.timerEntry (app_.timeKeeper().closeTime());

const ConsensusPhase currPhase = mConsensus.phase();
if (mLastConsensusPhase != currPhase)
{
reportConsensusStateChange(currPhase);
mLastConsensusPhase = currPhase;
}

setHeartbeatTimer ();
}

@@ -1464,6 +1479,13 @@ bool NetworkOPsImp::beginConsensus (uint256 const& networkClosed)
prevLedger,
changes.removed);

const ConsensusPhase currPhase = mConsensus.phase();
if (mLastConsensusPhase != currPhase)
{
reportConsensusStateChange(currPhase);
mLastConsensusPhase = currPhase;
}

JLOG(m_journal.debug()) << "Initiating consensus engine";
return true;
}
@@ -1711,6 +1733,33 @@ void NetworkOPsImp::pubServer ()
}
}

void NetworkOPsImp::pubConsensus (ConsensusPhase phase)
{
std::lock_guard sl (mSubLock);

auto& streamMap = mStreamMaps[sConsensusPhase];
if (!streamMap.empty ())
{
Json::Value jvObj (Json::objectValue);
jvObj [jss::type] = "consensusPhase";
jvObj [jss::consensus] = to_string(phase);

for (auto i = streamMap.begin ();
i != streamMap.end (); )
{
if (auto p = i->second.lock())
{
p->send (jvObj, true);
++i;
}
else
{
i = streamMap.erase (i);
}
}
}
}


void NetworkOPsImp::pubValidation (STValidation::ref val)
{
@@ -2517,6 +2566,13 @@ void NetworkOPsImp::reportFeeChange ()
}
}

void NetworkOPsImp::reportConsensusStateChange (ConsensusPhase phase)
{
m_job_queue.addJob (
jtCLIENT, "reportConsensusStateChange->pubConsensus",
[this, phase] (Job&) { pubConsensus(phase); });
}

// This routine should only be used to publish accepted or validated
// transactions.
Json::Value NetworkOPsImp::transJson(
@@ -2973,6 +3029,21 @@ bool NetworkOPsImp::unsubPeerStatus (std::uint64_t uSeq)
return mStreamMaps[sPeerStatus].erase (uSeq);
}

// <-- bool: true=added, false=already there
bool NetworkOPsImp::subConsensus (InfoSub::ref isrListener)
{
std::lock_guard sl (mSubLock);
return mStreamMaps[sConsensusPhase].emplace (
isrListener->getSeq (), isrListener).second;
}

// <-- bool: true=erased, false=was not there
bool NetworkOPsImp::unsubConsensus (std::uint64_t uSeq)
{
std::lock_guard sl (mSubLock);
return mStreamMaps[sConsensusPhase].erase (uSeq);
}

InfoSub::pointer NetworkOPsImp::findRpcSub (std::string const& strUrl)
{
std::lock_guard sl (mSubLock);
@@ -413,6 +413,12 @@ class Consensus
return prevLedgerID_;
}

ConsensusPhase
phase() const
{
return phase_;
}

/** Get the Json state of the consensus process.
Called by the consensus_info RPC.
@@ -109,6 +109,9 @@ class InfoSub
virtual bool unsubPeerStatus (std::uint64_t uListener) = 0;
virtual void pubPeerStatus (std::function<Json::Value(void)> const&) = 0;

virtual bool subConsensus (ref ispListener) = 0;
virtual bool unsubConsensus (std::uint64_t uListener) = 0;

// VFALCO TODO Remove
// This was added for one particular partner, it
// "pushes" subscription data to a particular URL.
@@ -64,6 +64,7 @@ InfoSub::~InfoSub ()
m_source.unsubServer (mSeq);
m_source.unsubValidations (mSeq);
m_source.unsubPeerStatus (mSeq);
m_source.unsubConsensus (mSeq);

// Use the internal unsubscribe so that it won't call
// back to us and modify its own parameter
@@ -152,6 +152,10 @@ Json::Value doSubscribe (RPC::Context& context)
return rpcError(rpcNO_PERMISSION);
context.netOps.subPeerStatus (ispSub);
}
else if (streamName == "consensus")
{
context.netOps.subConsensus (ispSub);
}
else
{
return rpcError(rpcSTREAM_MALFORMED);
@@ -97,6 +97,10 @@ Json::Value doUnsubscribe (RPC::Context& context)
{
context.netOps.unsubPeerStatus (ispSub->getSeq ());
}
else if (streamName == "consensus")
{
context.netOps.unsubConsensus (ispSub->getSeq());
}
else
{
return rpcError(rpcSTREAM_MALFORMED);

0 comments on commit 15c5f9c

Please sign in to comment.
You can’t perform that action at this time.