Skip to content

Commit

Permalink
face: send and receive NACK
Browse files Browse the repository at this point in the history
refs #2930

Change-Id: I70c969ac12b493d2c83fa892beffae936cc23791
  • Loading branch information
eric135 authored and cawka committed Sep 26, 2015
1 parent cd183d2 commit 83872fd
Show file tree
Hide file tree
Showing 12 changed files with 479 additions and 134 deletions.
92 changes: 72 additions & 20 deletions src/detail/face-impl.hpp
Expand Up @@ -39,6 +39,9 @@

#include "../management/nfd-controller.hpp"
#include "../management/nfd-command-options.hpp"
#include "../management/nfd-local-control-header.hpp"

#include "../lp/packet.hpp"

namespace ndn {

Expand Down Expand Up @@ -74,7 +77,7 @@ class Face::Impl : noncopyable
satisfyPendingInterests(Data& data)
{
for (auto entry = m_pendingInterestTable.begin(); entry != m_pendingInterestTable.end(); ) {
if ((*entry)->getInterest().matchesData(data)) {
if ((*entry)->getInterest()->matchesData(data)) {
shared_ptr<PendingInterest> matchedEntry = *entry;

entry = m_pendingInterestTable.erase(entry);
Expand All @@ -86,6 +89,24 @@ class Face::Impl : noncopyable
}
}

void
nackPendingInterests(const lp::Nack& nack)
{
for (auto entry = m_pendingInterestTable.begin(); entry != m_pendingInterestTable.end(); ) {
const Interest& pendingInterest = *(*entry)->getInterest();
if (pendingInterest == nack.getInterest()) {
shared_ptr<PendingInterest> matchedEntry = *entry;

entry = m_pendingInterestTable.erase(entry);

matchedEntry->invokeNackCallback(nack);
}
else {
++entry;
}
}
}

void
processInterestFilters(Interest& interest)
{
Expand All @@ -111,26 +132,32 @@ class Face::Impl : noncopyable
}

void
asyncExpressInterest(const shared_ptr<const Interest>& interest,
const OnData& onData, const OnTimeout& onTimeout)
asyncExpressInterest(shared_ptr<const Interest> interest,
const DataCallback& afterSatisfied,
const NackCallback& afterNacked,
const TimeoutCallback& afterTimeout)
{
this->ensureConnected(true);

auto entry =
m_pendingInterestTable.insert(make_shared<PendingInterest>(interest,
onData, onTimeout,
afterSatisfied,
afterNacked,
afterTimeout,
ref(m_scheduler))).first;
(*entry)->setDeleter([this, entry] { m_pendingInterestTable.erase(entry); });

if (!interest->getLocalControlHeader().empty(nfd::LocalControlHeader::ENCODE_NEXT_HOP)) {
// encode only NextHopFaceId towards the forwarder
m_face.m_transport->send(interest->getLocalControlHeader()
.wireEncode(*interest, nfd::LocalControlHeader::ENCODE_NEXT_HOP),
interest->wireEncode());
}
else {
m_face.m_transport->send(interest->wireEncode());
lp::Packet packet;

nfd::LocalControlHeader localControlHeader = interest->getLocalControlHeader();
if (localControlHeader.hasNextHopFaceId()) {
packet.add<lp::NextHopFaceIdField>(localControlHeader.getNextHopFaceId());
}

packet.add<lp::FragmentField>(std::make_pair(interest->wireEncode().begin(),
interest->wireEncode().end()));

m_face.m_transport->send(packet.wireEncode());
}

void
Expand All @@ -144,15 +171,40 @@ class Face::Impl : noncopyable
{
this->ensureConnected(true);

if (!data->getLocalControlHeader().empty(nfd::LocalControlHeader::ENCODE_CACHING_POLICY)) {
m_face.m_transport->send(
data->getLocalControlHeader().wireEncode(*data,
nfd::LocalControlHeader::ENCODE_CACHING_POLICY),
data->wireEncode());
}
else {
m_face.m_transport->send(data->wireEncode());
lp::Packet packet;

nfd::LocalControlHeader localControlHeader = data->getLocalControlHeader();
if (localControlHeader.hasCachingPolicy()) {
switch (localControlHeader.getCachingPolicy()) {
case nfd::LocalControlHeader::CachingPolicy::NO_CACHE: {
lp::CachePolicy cachePolicy;
cachePolicy.setPolicy(lp::CachePolicyType::NO_CACHE);
packet.add<lp::CachePolicyField>(cachePolicy);
break;
}
default:
break;
}
}

packet.add<lp::FragmentField>(std::make_pair(data->wireEncode().begin(),
data->wireEncode().end()));

m_face.m_transport->send(packet.wireEncode());
}

void
asyncPutNack(shared_ptr<const lp::Nack> nack)
{
this->ensureConnected(true);

lp::Packet packet;
packet.add<lp::NackField>(nack->getHeader());

Block interest = nack->getInterest().wireEncode();
packet.add<lp::FragmentField>(std::make_pair(interest.begin(), interest.end()));

m_face.m_transport->send(packet.wireEncode());
}

/////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
60 changes: 36 additions & 24 deletions src/detail/pending-interest.hpp
Expand Up @@ -28,31 +28,32 @@
#include "../util/time.hpp"
#include "../util/scheduler.hpp"
#include "../util/scheduler-scoped-event-id.hpp"
#include "../lp/nack.hpp"

namespace ndn {

class PendingInterest : noncopyable
{
public:
typedef function<void(const Interest&, Data&)> OnData;
typedef function<void(const Interest&)> OnTimeout;

/**
* @brief Create a new PitEntry and set the timeout based on the current time and
* the interest lifetime.
*
* @param interest A shared_ptr for the interest
* @param onData A function object to call when a matching data packet is received.
* @param onTimeout A function object to call if the interest times out.
* If onTimeout is an empty OnTimeout(), this does not use it.
* @param scheduler Scheduler instance to use to schedule a timeout event. The scheduled
* event will be automatically cancelled when pending interest is destroyed.
* the Interest lifetime.
* @param interest shared_ptr for the Interest
* @param dataCallback function to call when matching Data packet is received
* @param nackCallback function to call when Nack matching Interest is received
* @param timeoutCallback function to call if Interest times out
* @param scheduler Scheduler instance to use to schedule a timeout event. The scheduled
* event will be automatically cancelled when pending Interest is destroyed.
*/
PendingInterest(shared_ptr<const Interest> interest, const OnData& onData,
const OnTimeout& onTimeout, Scheduler& scheduler)
PendingInterest(shared_ptr<const Interest> interest,
const DataCallback& dataCallback,
const NackCallback& nackCallback,
const TimeoutCallback& timeoutCallback,
Scheduler& scheduler)
: m_interest(interest)
, m_onData(onData)
, m_onTimeout(onTimeout)
, m_dataCallback(dataCallback)
, m_nackCallback(nackCallback)
, m_timeoutCallback(timeoutCallback)
, m_timeoutEvent(scheduler)
{
m_timeoutEvent =
Expand All @@ -65,20 +66,30 @@ class PendingInterest : noncopyable
/**
* @return the Interest
*/
const Interest&
shared_ptr<const Interest>
getInterest() const
{
return *m_interest;
return m_interest;
}

/**
* @brief invokes the DataCallback
* @note If the DataCallback is an empty function, this method does nothing.
*/
void
invokeDataCallback(Data& data)
invokeDataCallback(const Data& data)
{
m_dataCallback(*m_interest, data);
}

/**
* @brief invokes the NackCallback
* @note If the NackCallback is an empty function, this method does nothing.
*/
void
invokeNackCallback(const lp::Nack& nack)
{
m_onData(*m_interest, data);
m_nackCallback(*m_interest, nack);
}

/**
Expand All @@ -98,8 +109,8 @@ class PendingInterest : noncopyable
void
invokeTimeoutCallback()
{
if (m_onTimeout) {
m_onTimeout(*m_interest);
if (m_timeoutCallback) {
m_timeoutCallback(*m_interest);
}

BOOST_ASSERT(m_deleter);
Expand All @@ -108,8 +119,9 @@ class PendingInterest : noncopyable

private:
shared_ptr<const Interest> m_interest;
const OnData m_onData;
const OnTimeout m_onTimeout;
DataCallback m_dataCallback;
NackCallback m_nackCallback;
TimeoutCallback m_timeoutCallback;
util::scheduler::ScopedEventId m_timeoutEvent;
std::function<void()> m_deleter;
};
Expand All @@ -133,7 +145,7 @@ class MatchPendingInterestId
operator()(const shared_ptr<const PendingInterest>& pendingInterest) const
{
return (reinterpret_cast<const PendingInterestId*>(
&pendingInterest->getInterest()) == m_id);
pendingInterest->getInterest().get()) == m_id);
}
private:
const PendingInterestId* m_id;
Expand Down
94 changes: 74 additions & 20 deletions src/face.cpp
Expand Up @@ -134,24 +134,50 @@ Face::construct(shared_ptr<Transport> transport, KeyChain& keyChain)
Face::~Face() = default;

const PendingInterestId*
Face::expressInterest(const Interest& interest, const OnData& onData, const OnTimeout& onTimeout)
Face::expressInterest(const Interest& interest,
const DataCallback& afterSatisfied,
const NackCallback& afterNacked,
const TimeoutCallback& afterTimeout)
{
shared_ptr<Interest> interestToExpress = make_shared<Interest>(interest);

// Use `interestToExpress` to avoid wire format creation for the original Interest
if (interestToExpress->wireEncode().size() > MAX_NDN_PACKET_SIZE)
if (interestToExpress->wireEncode().size() > MAX_NDN_PACKET_SIZE) {
BOOST_THROW_EXCEPTION(Error("Interest size exceeds maximum limit"));
}

// If the same ioService thread, dispatch directly calls the method
m_ioService.dispatch([=] { m_impl->asyncExpressInterest(interestToExpress, onData, onTimeout); });
m_ioService.dispatch([=] { m_impl->asyncExpressInterest(interestToExpress, afterSatisfied,
afterNacked, afterTimeout); });

return reinterpret_cast<const PendingInterestId*>(interestToExpress.get());
}

const PendingInterestId*
Face::expressInterest(const Interest& interest,
const OnData& onData,
const OnTimeout& onTimeout)
{
return this->expressInterest(
interest,
[onData] (const Interest& interest, const Data& data) {
if (onData != nullptr) {
onData(interest, const_cast<Data&>(data));
}
},
[onTimeout] (const Interest& interest, const lp::Nack& nack) {
if (onTimeout != nullptr) {
onTimeout(interest);
}
},
onTimeout
);
}

const PendingInterestId*
Face::expressInterest(const Name& name,
const Interest& tmpl,
const OnData& onData, const OnTimeout& onTimeout/* = OnTimeout()*/)
const OnData& onData, const OnTimeout& onTimeout/* = nullptr*/)
{
return expressInterest(Interest(tmpl)
.setName(name)
Expand Down Expand Up @@ -180,6 +206,12 @@ Face::put(const Data& data)
m_ioService.dispatch([=] { m_impl->asyncPutData(dataPtr); });
}

void
Face::put(const lp::Nack& nack)
{
m_ioService.dispatch([=] { m_impl->asyncPutNack(make_shared<lp::Nack>(nack)); });
}

void
Face::removePendingInterest(const PendingInterestId* pendingInterestId)
{
Expand Down Expand Up @@ -427,28 +459,50 @@ Face::asyncShutdown()
m_impl->m_ioServiceWork.reset();
}

/**
* @brief extract local fields from NDNLPv2 packet and tag onto a network layer packet
*/
template<typename NETPKT>
static void
extractLpLocalFields(NETPKT& netPacket, const lp::Packet& lpPacket)
{
if (lpPacket.has<lp::IncomingFaceIdField>()) {
netPacket.getLocalControlHeader().
setIncomingFaceId(lpPacket.get<lp::IncomingFaceIdField>());
}
}

void
Face::onReceiveElement(const Block& blockFromDaemon)
{
const Block& block = nfd::LocalControlHeader::getPayload(blockFromDaemon);

if (block.type() == tlv::Interest)
{
shared_ptr<Interest> interest = make_shared<Interest>(block);
if (&block != &blockFromDaemon)
interest->getLocalControlHeader().wireDecode(blockFromDaemon);

m_impl->processInterestFilters(*interest);
lp::Packet lpPacket(blockFromDaemon); // bare Interest/Data is a valid lp::Packet,
// no need to distinguish

Buffer::const_iterator begin, end;
std::tie(begin, end) = lpPacket.get<lp::FragmentField>();
Block netPacket(&*begin, std::distance(begin, end));
switch (netPacket.type()) {
case tlv::Interest: {
shared_ptr<Interest> interest = make_shared<Interest>(netPacket);
if (lpPacket.has<lp::NackField>()) {
auto nack = make_shared<lp::Nack>(std::move(*interest));
nack->setHeader(lpPacket.get<lp::NackField>());
extractLpLocalFields(*nack, lpPacket);
m_impl->nackPendingInterests(*nack);
}
else {
extractLpLocalFields(*interest, lpPacket);
m_impl->processInterestFilters(*interest);
}
break;
}
else if (block.type() == tlv::Data)
{
shared_ptr<Data> data = make_shared<Data>(block);
if (&block != &blockFromDaemon)
data->getLocalControlHeader().wireDecode(blockFromDaemon);

case tlv::Data: {
shared_ptr<Data> data = make_shared<Data>(netPacket);
extractLpLocalFields(*data, lpPacket);
m_impl->satisfyPendingInterests(*data);
break;
}
// ignore any other type
}
}

} // namespace ndn

0 comments on commit 83872fd

Please sign in to comment.