Skip to content

Commit

Permalink
full-producer: gather constructor args into Options struct
Browse files Browse the repository at this point in the history
refs #5069

Change-Id: I168efffe739a19273b596a7b1cee2f4ade147d75
  • Loading branch information
yoursunny committed Dec 30, 2023
1 parent eecd84f commit 69bcaef
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 43 deletions.
33 changes: 21 additions & 12 deletions PSync/full-producer.cpp
Expand Up @@ -31,6 +31,24 @@ namespace psync {

NDN_LOG_INIT(psync.FullProducer);

FullProducer::FullProducer(ndn::Face& face,
ndn::KeyChain& keyChain,
const ndn::Name& syncPrefix,
const Options& opts)
: ProducerBase(face, keyChain, opts.ibfCount, syncPrefix, opts.syncDataFreshness,
opts.ibfCompression, opts.contentCompression)
, m_syncInterestLifetime(opts.syncInterestLifetime)
, m_onUpdate(opts.onUpdate)
{
m_registeredPrefix = m_face.setInterestFilter(ndn::InterestFilter(m_syncPrefix).allowLoopback(false),
[this] (auto&&... args) { onSyncInterest(std::forward<decltype(args)>(args)...); },
[] (auto&&... args) { onRegisterFailed(std::forward<decltype(args)>(args)...); });

// Should we do this after setInterestFilter success call back
// (Currently following ChronoSync's way)
sendSyncInterest();
}

FullProducer::FullProducer(ndn::Face& face,
ndn::KeyChain& keyChain,
size_t expectedNumEntries,
Expand All @@ -41,20 +59,11 @@ FullProducer::FullProducer(ndn::Face& face,
ndn::time::milliseconds syncReplyFreshness,
CompressionScheme ibltCompression,
CompressionScheme contentCompression)
: ProducerBase(face, keyChain, expectedNumEntries, syncPrefix,
syncReplyFreshness, ibltCompression, contentCompression)
, m_syncInterestLifetime(syncInterestLifetime)
, m_onUpdate(std::move(onUpdateCb))
: FullProducer(face, keyChain, syncPrefix,
Options{std::move(onUpdateCb), static_cast<uint32_t>(expectedNumEntries), ibltCompression,
syncInterestLifetime, syncReplyFreshness, contentCompression})
{
addUserNode(userPrefix);

m_registeredPrefix = m_face.setInterestFilter(ndn::InterestFilter(m_syncPrefix).allowLoopback(false),
[this] (auto&&... args) { onSyncInterest(std::forward<decltype(args)>(args)...); },
[] (auto&&... args) { onRegisterFailed(std::forward<decltype(args)>(args)...); });

// Should we do this after setInterestFilter success call back
// (Currently following ChronoSync's way)
sendSyncInterest();
}

FullProducer::~FullProducer()
Expand Down
43 changes: 30 additions & 13 deletions PSync/full-producer.hpp
Expand Up @@ -42,21 +42,38 @@ class FullProducer : public ProducerBase
{
public:
/**
* @brief Constructor
*
* Registers syncPrefix in NFD and sends a sync interest.
* @brief Constructor options.
*/
struct Options
{
/// Callback to be invoked when there is new data.
UpdateCallback onUpdate = [] (const auto&) {};
/// Expected number of entries in IBF.
uint32_t ibfCount = 80;
/// Compression scheme to use for IBF.
CompressionScheme ibfCompression = CompressionScheme::DEFAULT;
/// Lifetime of sync Interest.
ndn::time::milliseconds syncInterestLifetime = SYNC_INTEREST_LIFETIME;
/// FreshnessPeriod of sync Data.
ndn::time::milliseconds syncDataFreshness = SYNC_REPLY_FRESHNESS;
/// Compression scheme to use for Data content.
CompressionScheme contentCompression = CompressionScheme::DEFAULT;
};

/**
* @brief Constructor.
*
* @param face Application's face
* @param keyChain KeyChain instance to use for signing
* @param expectedNumEntries Expected number of entries in IBF
* @param syncPrefix The prefix of the sync group
* @param userPrefix The prefix of the first user in the group
* @param onUpdateCallBack The callback to be invoked when there is new data
* @param syncInterestLifetime Lifetime of the sync interest
* @param syncReplyFreshness FreshnessPeriod of sync data
* @param ibltCompression Compression scheme to use for IBF
* @param contentCompression Compression scheme to use for Data content
* @param face Application face.
* @param keyChain KeyChain instance to use for signing.
* @param syncPrefix The prefix of the sync group.
* @param opts Options.
*/
FullProducer(ndn::Face& face,
ndn::KeyChain& keyChain,
const ndn::Name& syncPrefix,
const Options& opts);

[[deprecated]]
FullProducer(ndn::Face& face,
ndn::KeyChain& keyChain,
size_t expectedNumEntries,
Expand Down
4 changes: 2 additions & 2 deletions PSync/partial-producer.hpp
Expand Up @@ -45,9 +45,9 @@ class PartialProducer : public ProducerBase
uint32_t ibfCount = 40;
/// Compression scheme to use for IBF.
CompressionScheme ibfCompression = CompressionScheme::NONE;
/// FreshnessPeriod of hello data.
/// FreshnessPeriod of hello Data.
ndn::time::milliseconds helloDataFreshness = HELLO_REPLY_FRESHNESS;
/// FreshnessPeriod of sync data.
/// FreshnessPeriod of sync Data.
ndn::time::milliseconds syncDataFreshness = SYNC_REPLY_FRESHNESS;
};

Expand Down
21 changes: 13 additions & 8 deletions examples/full-sync.cpp
Expand Up @@ -35,20 +35,25 @@ class Producer
{
public:
/**
* @brief Initialize producer and schedule updates
* @brief Initialize producer and schedule updates.
*
* Set IBF size as 80 expecting 80 updates to IBF in a sync cycle
* Set syncInterestLifetime and syncReplyFreshness to 1.6 seconds
* userPrefix is the default user prefix, no updates are published on it in this example
* Set IBF size as 80 expecting 80 updates to IBF in a sync cycle.
* Set syncInterestLifetime and syncDataFreshness to 1.6 seconds.
* userPrefix is the prefix string of user node prefixes.
*/
Producer(const ndn::Name& syncPrefix, const std::string& userPrefix,
int numDataStreams, int maxNumPublish)
: m_producer(m_face, m_keyChain, 80, syncPrefix, userPrefix,
std::bind(&Producer::processSyncUpdate, this, _1),
1600_ms, 1600_ms)
: m_producer(m_face, m_keyChain, syncPrefix, [this] {
psync::FullProducer::Options opts;
opts.onUpdate = std::bind(&Producer::processSyncUpdate, this, _1);
opts.ibfCount = 80;
opts.syncInterestLifetime = 1600_ms;
opts.syncDataFreshness = 1600_ms;
return opts;
} ())
, m_maxNumPublish(maxNumPublish)
{
// Add user prefixes and schedule updates for them in specified interval
// Add user prefixes and schedule updates for them in specified interval.
for (int i = 0; i < numDataStreams; i++) {
ndn::Name prefix(userPrefix + "-" + std::to_string(i));
m_producer.addUserNode(prefix);
Expand Down
19 changes: 13 additions & 6 deletions tests/test-full-producer.cpp
Expand Up @@ -41,8 +41,10 @@ BOOST_FIXTURE_TEST_SUITE(TestFullProducer, FullProducerFixture)

BOOST_AUTO_TEST_CASE(OnInterest)
{
Name syncPrefix("/psync"), userNode("/testUser");
FullProducer node(m_face, m_keyChain, 40, syncPrefix, userNode, nullptr);
Name syncPrefix("/psync");
FullProducer::Options opts;
opts.ibfCount = 40;
FullProducer node(m_face, m_keyChain, syncPrefix, opts);

Name syncInterestName(syncPrefix);
syncInterestName.append("malicious-IBF");
Expand All @@ -52,8 +54,11 @@ BOOST_AUTO_TEST_CASE(OnInterest)

BOOST_AUTO_TEST_CASE(ConstantTimeoutForFirstSegment)
{
Name syncPrefix("/psync"), userNode("/testUser");
FullProducer node(m_face, m_keyChain, 40, syncPrefix, userNode, nullptr, 8_s);
Name syncPrefix("/psync");
FullProducer::Options opts;
opts.ibfCount = 40;
opts.syncInterestLifetime = 8_s;
FullProducer node(m_face, m_keyChain, syncPrefix, opts);

advanceClocks(10_ms);
m_face.sentInterests.clear();
Expand All @@ -65,8 +70,10 @@ BOOST_AUTO_TEST_CASE(ConstantTimeoutForFirstSegment)

BOOST_AUTO_TEST_CASE(OnSyncDataDecodeFailure)
{
Name syncPrefix("/psync"), userNode("/testUser");
FullProducer node(m_face, m_keyChain, 40, syncPrefix, userNode, nullptr);
Name syncPrefix("/psync");
FullProducer::Options opts;
opts.ibfCount = 40;
FullProducer node(m_face, m_keyChain, syncPrefix, opts);

Name syncInterestName(syncPrefix);
node.m_iblt.appendToName(syncInterestName);
Expand Down
6 changes: 4 additions & 2 deletions tests/test-full-sync.cpp
Expand Up @@ -44,8 +44,10 @@ class FullSyncFixture : public tests::IoFixture, public tests::KeyChainFixture
userPrefixes[id] = "/userPrefix" + std::to_string(id);
faces[id] = std::make_unique<ndn::DummyClientFace>(m_io, m_keyChain,
ndn::DummyClientFace::Options{true, true});
nodes[id] = std::make_unique<FullProducer>(*faces[id], m_keyChain, 40, syncPrefix, userPrefixes[id],
[] (const auto&) {});
FullProducer::Options opts;
opts.ibfCount = 40;
nodes[id] = std::make_unique<FullProducer>(*faces[id], m_keyChain, syncPrefix, opts);
nodes[id]->addUserNode(userPrefixes[id]);
}

void
Expand Down

0 comments on commit 69bcaef

Please sign in to comment.