Skip to content

Commit

Permalink
Use ndn-cxx's Segmenter in SegmentPublisher
Browse files Browse the repository at this point in the history
This commit also makes lld the preferred linker on Linux

Change-Id: I3a66f2e79a5378d005046c61a8ec2d49345508d6
  • Loading branch information
Pesa committed Nov 18, 2022
1 parent 03426ef commit b68f284
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 107 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ jobs:
include: include:
- os: macos-12 - os: macos-12
xcode: '13.4' xcode: '13.4'
- os: macos-12
xcode: '14.1'
steps: steps:
- name: Set up Xcode - name: Set up Xcode
uses: maxim-lobanov/setup-xcode@v1 uses: maxim-lobanov/setup-xcode@v1
Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/docs.yml
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
strategy: strategy:
fail-fast: false fail-fast: false
matrix: matrix:
os: [macos-12, ubuntu-20.04] os: [macos-latest, ubuntu-latest]
env: env:
JOB_NAME: Docs JOB_NAME: Docs
steps: steps:
Expand All @@ -27,5 +27,7 @@ jobs:
./.jenkins ./.jenkins
- name: Build documentation - name: Build documentation
run: | run: |
pybindir=$(python3 -c 'import sysconfig; print(sysconfig.get_path("scripts", "posix_user"))')
export PATH="${pybindir}${PATH:+:}${PATH}"
./waf --color=yes configure ./waf --color=yes configure
./waf --color=yes docs ./waf --color=yes docs
1 change: 1 addition & 0 deletions .jenkins
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ case $(uname) in
# Emulate a subset of os-release(5) # Emulate a subset of os-release(5)
export ID=macos export ID=macos
export VERSION_ID=$(sw_vers -productVersion) export VERSION_ID=$(sw_vers -productVersion)
export PATH="/usr/local/bin${PATH:+:}${PATH}"
if [[ -x /opt/homebrew/bin/brew ]]; then if [[ -x /opt/homebrew/bin/brew ]]; then
eval "$(/opt/homebrew/bin/brew shellenv)" eval "$(/opt/homebrew/bin/brew shellenv)"
elif [[ -x /usr/local/bin/brew ]]; then elif [[ -x /usr/local/bin/brew ]]; then
Expand Down
16 changes: 5 additions & 11 deletions .jenkins.d/00-deps.sh
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -25,20 +25,14 @@ if [[ $ID == macos ]]; then
fi fi
brew update brew update
brew install --formula "${FORMULAE[@]}" brew install --formula "${FORMULAE[@]}"

if (( ${#PIP_PKGS[@]} )); then
pip3 install --upgrade --upgrade-strategy=eager "${PIP_PKGS[@]}"
fi

elif [[ $ID_LIKE == *debian* ]]; then elif [[ $ID_LIKE == *debian* ]]; then
sudo apt-get -qq update sudo apt-get -qq update
sudo apt-get -qy install "${APT_PKGS[@]}" sudo apt-get -qy install "${APT_PKGS[@]}"

if (( ${#PIP_PKGS[@]} )); then
pip3 install --user --upgrade --upgrade-strategy=eager "${PIP_PKGS[@]}"
fi

elif [[ $ID_LIKE == *fedora* ]]; then elif [[ $ID_LIKE == *fedora* ]]; then
sudo dnf -y install gcc-c++ libasan pkgconf-pkg-config python3 \ sudo dnf -y install gcc-c++ libasan lld pkgconf-pkg-config python3 \
boost-devel openssl-devel sqlite-devel boost-devel openssl-devel sqlite-devel
fi fi

if (( ${#PIP_PKGS[@]} )); then
pip3 install --user --upgrade --upgrade-strategy=eager "${PIP_PKGS[@]}"
fi
4 changes: 1 addition & 3 deletions .waf-tools/default-compiler-flags.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -137,9 +137,7 @@ class GccBasicFlags(CompilerFlags):
def getGeneralFlags(self, conf): def getGeneralFlags(self, conf):
flags = super(GccBasicFlags, self).getGeneralFlags(conf) flags = super(GccBasicFlags, self).getGeneralFlags(conf)
flags['CXXFLAGS'] += ['-std=c++17'] flags['CXXFLAGS'] += ['-std=c++17']
if Utils.unversioned_sys_platform() == 'linux': if Utils.unversioned_sys_platform() != 'darwin':
flags['LINKFLAGS'] += ['-fuse-ld=gold']
elif Utils.unversioned_sys_platform() == 'freebsd':
flags['LINKFLAGS'] += ['-fuse-ld=lld'] flags['LINKFLAGS'] += ['-fuse-ld=lld']
return flags return flags


Expand Down
4 changes: 2 additions & 2 deletions PSync/full-producer.cpp
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -266,14 +266,14 @@ FullProducer::sendSyncData(const ndn::Name& name, const ndn::Block& block)
NDN_LOG_DEBUG("Sending sync Data"); NDN_LOG_DEBUG("Sending sync Data");


// Send data after removing pending sync interest on face // Send data after removing pending sync interest on face
m_segmentPublisher.publish(name, dataName, content, m_syncReplyFreshness); m_segmentPublisher.publish(name, dataName, *content, m_syncReplyFreshness);


NDN_LOG_TRACE("Renewing sync interest"); NDN_LOG_TRACE("Renewing sync interest");
sendSyncInterest(); sendSyncInterest();
} }
else { else {
NDN_LOG_DEBUG("Sending sync Data"); NDN_LOG_DEBUG("Sending sync Data");
m_segmentPublisher.publish(name, dataName, content, m_syncReplyFreshness); m_segmentPublisher.publish(name, dataName, *content, m_syncReplyFreshness);
} }
} }


Expand Down
72 changes: 15 additions & 57 deletions PSync/segment-publisher.cpp
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -19,79 +19,37 @@


#include "PSync/segment-publisher.hpp" #include "PSync/segment-publisher.hpp"


#include <ndn-cxx/name-component.hpp>

namespace psync { namespace psync {


SegmentPublisher::SegmentPublisher(ndn::Face& face, ndn::KeyChain& keyChain, size_t imsLimit) SegmentPublisher::SegmentPublisher(ndn::Face& face, ndn::KeyChain& keyChain,
const ndn::security::SigningInfo& signingInfo, size_t imsLimit)
: m_face(face) : m_face(face)
, m_scheduler(m_face.getIoService()) , m_scheduler(m_face.getIoService())
, m_keyChain(keyChain) , m_segmenter(keyChain, signingInfo)
, m_ims(imsLimit) , m_ims(imsLimit)
{ {
} }


void void
SegmentPublisher::publish(const ndn::Name& interestName, const ndn::Name& dataName, SegmentPublisher::publish(const ndn::Name& interestName, const ndn::Name& dataName,
const ndn::Block& block, ndn::time::milliseconds freshness, ndn::span<const uint8_t> buffer, ndn::time::milliseconds freshness)
const ndn::security::SigningInfo& signingInfo)
{ {
auto buf = std::make_shared<const ndn::Buffer>(block.begin(), block.end()); auto segments = m_segmenter.segment(buffer, ndn::Name(dataName).appendVersion(),
publish(interestName, dataName, buf, freshness, signingInfo); ndn::MAX_NDN_PACKET_SIZE >> 1, freshness);
} for (const auto& data : segments) {
m_ims.insert(*data, freshness);
m_scheduler.schedule(freshness, [this, name = data->getName()] { m_ims.erase(name); });
}


void // Put on face only the segment which has a pending interest,
SegmentPublisher::publish(const ndn::Name& interestName, const ndn::Name& dataName, // otherwise the segment is unsolicited
const ndn::ConstBufferPtr& buffer,
ndn::time::milliseconds freshness,
const ndn::security::SigningInfo& signingInfo)
{
uint64_t interestSegment = 0; uint64_t interestSegment = 0;
if (interestName[-1].isSegment()) { if (interestName[-1].isSegment()) {
interestSegment = interestName[-1].toSegment(); interestSegment = interestName[-1].toSegment();
} }

if (interestSegment < segments.size()) {
const uint8_t* rawBuffer = buffer->data(); m_face.put(*segments[interestSegment]);
const uint8_t* segmentBegin = rawBuffer; }
const uint8_t* end = rawBuffer + buffer->size();

const size_t maxPacketSize = ndn::MAX_NDN_PACKET_SIZE >> 1;
uint64_t totalSegments = buffer->size() / maxPacketSize;

ndn::Name segmentPrefix(dataName);
segmentPrefix.appendVersion();

uint64_t segmentNo = 0;
do {
const uint8_t* segmentEnd = segmentBegin + maxPacketSize;
if (segmentEnd > end) {
segmentEnd = end;
}

ndn::Name segmentName(segmentPrefix);
segmentName.appendSegment(segmentNo);

// We get a std::exception: bad_weak_ptr from m_ims if we don't use shared_ptr for data
auto data = std::make_shared<ndn::Data>(segmentName);
data->setContent(ndn::span<const uint8_t>(segmentBegin, segmentEnd));
data->setFreshnessPeriod(freshness);
data->setFinalBlock(ndn::name::Component::fromSegment(totalSegments));

m_keyChain.sign(*data, signingInfo);

segmentBegin = segmentEnd;

// Put on face only the segment which has a pending interest
// otherwise the segment is unsolicited
if (interestSegment == segmentNo) {
m_face.put(*data);
}

m_ims.insert(*data, freshness);
m_scheduler.schedule(freshness, [this, segmentName] { m_ims.erase(segmentName); });

++segmentNo;
} while (segmentBegin < end);
} }


bool bool
Expand Down
33 changes: 8 additions & 25 deletions PSync/segment-publisher.hpp
Original file line number Original file line Diff line number Diff line change
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/* /*
* Copyright (c) 2014-2020, The University of Memphis * Copyright (c) 2014-2022, The University of Memphis
* *
* This file is part of PSync. * This file is part of PSync.
* See AUTHORS.md for complete list of PSync authors and contributors. * See AUTHORS.md for complete list of PSync authors and contributors.
Expand All @@ -23,63 +23,46 @@
#include "PSync/detail/access-specifiers.hpp" #include "PSync/detail/access-specifiers.hpp"


#include <ndn-cxx/face.hpp> #include <ndn-cxx/face.hpp>
#include <ndn-cxx/name.hpp>
#include <ndn-cxx/ims/in-memory-storage-fifo.hpp> #include <ndn-cxx/ims/in-memory-storage-fifo.hpp>
#include <ndn-cxx/security/key-chain.hpp>
#include <ndn-cxx/util/scheduler.hpp> #include <ndn-cxx/util/scheduler.hpp>
#include <ndn-cxx/util/time.hpp> #include <ndn-cxx/util/segmenter.hpp>


namespace psync { namespace psync {


/** /**
* @brief Segment Publisher to publish segmented data * @brief Helper class to publish segmented data.
*/ */
class SegmentPublisher class SegmentPublisher
{ {
public: public:
SegmentPublisher(ndn::Face& face, ndn::KeyChain& keyChain, SegmentPublisher(ndn::Face& face, ndn::KeyChain& keyChain,
const ndn::security::SigningInfo& signingInfo = ndn::security::SigningInfo(),
size_t imsLimit = 100); size_t imsLimit = 100);


/**
* @brief Put all the segments in memory.
*
* @param interestName the interest name, to determine the sequence to be answered immediately
* @param dataName the data name, has components after interest name
* @param block the content of the data
* @param freshness freshness of the segments
* @param signingInfo signing info to sign the data with
*/
void
publish(const ndn::Name& interestName, const ndn::Name& dataName,
const ndn::Block& block, ndn::time::milliseconds freshness,
const ndn::security::SigningInfo& signingInfo = ndn::security::SigningInfo());

/** /**
* @brief Put all the segments in memory. * @brief Put all the segments in memory.
* *
* @param interestName the interest name, to determine the sequence to be answered immediately * @param interestName the interest name, to determine the sequence to be answered immediately
* @param dataName the data name, has components after interest name * @param dataName the data name, has components after interest name
* @param buffer the content of the data * @param buffer the content of the data
* @param freshness freshness of the segments * @param freshness freshness period of the segments
* @param signingInfo signing info to sign the data with
*/ */
void void
publish(const ndn::Name& interestName, const ndn::Name& dataName, publish(const ndn::Name& interestName, const ndn::Name& dataName,
const ndn::ConstBufferPtr& buffer, ndn::time::milliseconds freshness, ndn::span<const uint8_t> buffer, ndn::time::milliseconds freshness);
const ndn::security::SigningInfo& signingInfo = ndn::security::SigningInfo());


/** /**
* @brief Try to reply from memory, return false if we cannot find the segment. * @brief Try to reply from memory, return false if we cannot find the segment.
* *
* The caller is then expected to use publish if this returns false. * The caller is then expected to use publish() if this returns false.
*/ */
bool bool
replyFromStore(const ndn::Name& interestName); replyFromStore(const ndn::Name& interestName);


private: private:
ndn::Face& m_face; ndn::Face& m_face;
ndn::Scheduler m_scheduler; ndn::Scheduler m_scheduler;
ndn::KeyChain& m_keyChain; ndn::util::Segmenter m_segmenter;


PSYNC_PUBLIC_WITH_TESTS_ELSE_PRIVATE: PSYNC_PUBLIC_WITH_TESTS_ELSE_PRIVATE:
ndn::InMemoryStorageFifo m_ims; ndn::InMemoryStorageFifo m_ims;
Expand Down
33 changes: 25 additions & 8 deletions tests/test-segment-publisher.cpp
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class SegmentPublisherFixture : public tests::IoFixture, public tests::KeyChainF
SegmentPublisherFixture() SegmentPublisherFixture()
{ {
m_face.setInterestFilter(InterestFilter("/hello/world"), m_face.setInterestFilter(InterestFilter("/hello/world"),
bind(&SegmentPublisherFixture::onInterest, this, _1, _2), bind(&SegmentPublisherFixture::onInterest, this, _2),
[] (auto&&...) { BOOST_CHECK(false); }); [] (auto&&...) { BOOST_CHECK(false); });
advanceClocks(10_ms); advanceClocks(10_ms);


Expand All @@ -63,7 +63,7 @@ class SegmentPublisherFixture : public tests::IoFixture, public tests::KeyChainF
} }


void void
onInterest(const Name& prefix, const Interest& interest) onInterest(const Interest& interest)
{ {
if (publisher.replyFromStore(interest.getName())) { if (publisher.replyFromStore(interest.getName())) {
numRepliesFromStore++; numRepliesFromStore++;
Expand All @@ -82,7 +82,7 @@ class SegmentPublisherFixture : public tests::IoFixture, public tests::KeyChainF
protected: protected:
util::DummyClientFace m_face{m_io, m_keyChain, {true, true}}; util::DummyClientFace m_face{m_io, m_keyChain, {true, true}};
SegmentPublisher publisher{m_face, m_keyChain}; SegmentPublisher publisher{m_face, m_keyChain};
shared_ptr<util::SegmentFetcher> fetcher; std::shared_ptr<util::SegmentFetcher> fetcher;
Name dataName; Name dataName;
detail::State state; detail::State state;


Expand All @@ -97,19 +97,28 @@ BOOST_FIXTURE_TEST_SUITE(TestSegmentPublisher, SegmentPublisherFixture)
BOOST_AUTO_TEST_CASE(Basic) BOOST_AUTO_TEST_CASE(Basic)
{ {
BOOST_CHECK_EQUAL(publisher.m_ims.size(), 0); BOOST_CHECK_EQUAL(publisher.m_ims.size(), 0);

expressInterest(Interest("/hello/world")); expressInterest(Interest("/hello/world"));
BOOST_CHECK_EQUAL(numComplete, 1); BOOST_CHECK_EQUAL(numComplete, 1);
// First segment is answered directly in publish, // First segment is answered directly in publish,
// Rest two are satisfied by the store // Rest two are satisfied by the store
BOOST_CHECK_EQUAL(numRepliesFromStore, 2); BOOST_CHECK_EQUAL(numRepliesFromStore, 2);
BOOST_CHECK_EQUAL(publisher.m_ims.size(), 3); BOOST_CHECK_EQUAL(publisher.m_ims.size(), 3);


for (const auto& data : publisher.m_ims) {
BOOST_TEST_CONTEXT(data.getName()) {
BOOST_REQUIRE_EQUAL(data.getName().size(), 4);
BOOST_CHECK(data.getName()[-1].isSegment());
BOOST_CHECK(data.getName()[-2].isVersion());
}
}

numRepliesFromStore = 0; numRepliesFromStore = 0;
expressInterest(Interest("/hello/world")); expressInterest(Interest("/hello/world"));
BOOST_CHECK_EQUAL(numComplete, 2); BOOST_CHECK_EQUAL(numComplete, 2);
BOOST_CHECK_EQUAL(numRepliesFromStore, 3); BOOST_CHECK_EQUAL(numRepliesFromStore, 3);


advanceClocks(time::milliseconds(freshness)); advanceClocks(freshness);
BOOST_CHECK_EQUAL(publisher.m_ims.size(), 0); BOOST_CHECK_EQUAL(publisher.m_ims.size(), 0);


numRepliesFromStore = 0; numRepliesFromStore = 0;
Expand All @@ -118,7 +127,7 @@ BOOST_AUTO_TEST_CASE(Basic)
BOOST_CHECK_EQUAL(numRepliesFromStore, 2); BOOST_CHECK_EQUAL(numRepliesFromStore, 2);


numRepliesFromStore = 0; numRepliesFromStore = 0;
m_face.expressInterest(Interest("/hello/world/").setCanBePrefix(true), m_face.expressInterest(Interest("/hello/world").setCanBePrefix(true),
[this] (auto&&...) { this->numComplete++; }, [this] (auto&&...) { this->numComplete++; },
[] (auto&&...) { BOOST_CHECK(false); }, [] (auto&&...) { BOOST_CHECK(false); },
[] (auto&&...) { BOOST_CHECK(false); }); [] (auto&&...) { BOOST_CHECK(false); });
Expand All @@ -127,10 +136,10 @@ BOOST_AUTO_TEST_CASE(Basic)
BOOST_CHECK_EQUAL(numRepliesFromStore, 1); BOOST_CHECK_EQUAL(numRepliesFromStore, 1);
} }


BOOST_AUTO_TEST_CASE(LargerDataName) BOOST_AUTO_TEST_CASE(LongerDataName)
{ {
BOOST_CHECK_EQUAL(publisher.m_ims.size(), 0);
dataName = Name("/hello/world/IBF"); dataName = Name("/hello/world/IBF");
BOOST_CHECK_EQUAL(publisher.m_ims.size(), 0);


expressInterest(Interest("/hello/world")); expressInterest(Interest("/hello/world"));
BOOST_CHECK_EQUAL(numComplete, 1); BOOST_CHECK_EQUAL(numComplete, 1);
Expand All @@ -139,7 +148,15 @@ BOOST_AUTO_TEST_CASE(LargerDataName)
BOOST_CHECK_EQUAL(numRepliesFromStore, 2); BOOST_CHECK_EQUAL(numRepliesFromStore, 2);
BOOST_CHECK_EQUAL(publisher.m_ims.size(), 3); BOOST_CHECK_EQUAL(publisher.m_ims.size(), 3);


advanceClocks(time::milliseconds(freshness)); for (const auto& data : publisher.m_ims) {
BOOST_TEST_CONTEXT(data.getName()) {
BOOST_REQUIRE_EQUAL(data.getName().size(), 5);
BOOST_CHECK(data.getName()[-1].isSegment());
BOOST_CHECK(data.getName()[-2].isVersion());
}
}

advanceClocks(freshness);
BOOST_CHECK_EQUAL(publisher.m_ims.size(), 0); BOOST_CHECK_EQUAL(publisher.m_ims.size(), 0);
} }


Expand Down

0 comments on commit b68f284

Please sign in to comment.