Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

msgpack integrated

  • Loading branch information...
commit ac35fc8be926cd8d87a9d4146ff7216a349a70bc 1 parent e241821
@vshymanskyy authored
View
2  kad_sim.cpp
@@ -123,7 +123,7 @@ int TestRT(int argc, char* argv[])
KadRtNode rt(0);
for (int i=0; i<100000; i++) {
- rt.AddNode(KadContact(KadId::Random(), XSockAddr::Random()));
+ rt.AddNode(KadId::Random(), XSockAddr::Random());
}
/*for (int i=0; i<=0xFFFF; i++) {
View
4 src/KadContact.h
@@ -3,16 +3,16 @@
#include "KadConfig.h"
#include "KadNodeId.h"
+#include "KadOverIP.h"
#include <XList.h>
-#include <net/XSockAddr.h>
#include <tr1/memory>
///template <unsigned ID_SIZE>
struct KadContact
{
KadId mId;
- XSockAddr mAddrExt;
+ KadNet::Address mAddrExt;
//XSockAddr mAddrInt;
//XSockAddr mAddrSrv;
View
162 src/KadOpMgr.h
@@ -23,29 +23,17 @@
#include <map>
using namespace std;
-inline
-const XLog::Stream& operator <<(const XLog::Stream& str, const KadMsg& v)
-{
- switch (v.MsgType()) {
- case KadMsg::KAD_MSG_PING: return str << (const KadMsgPing&)v;
- case KadMsg::KAD_MSG_PONG: return str << (const KadMsgPong&)v;
- case KadMsg::KAD_MSG_FIND_REQ: return str << (const KadMsgFindReq&)v;
- case KadMsg::KAD_MSG_FIND_RSP: return str << (const KadMsgFindRsp&)v;
- default: return str << "KAD_MSG_UNKNOWN";
- }
-}
-
class KadOpMgr {
friend class KadOpMgrTS;
private:
class ReqTracker {
public:
- typedef XDelegate< void (const KadMsgRsp* rsp, KadContactPtr& contact) > Handler;
+ typedef XDelegate< void (const msgpack::object* rsp, KadContactPtr& contact) > Handler;
ReqTracker (KadContactPtr& contact, Handler handler)
: mDst (contact)
- , mMsgId (KadMsgId::Random())
+ , mMsgId (random()) // Todo: whole range
, mHandler (handler)
{
}
@@ -55,22 +43,22 @@ class KadOpMgr {
mHandler(NULL, mDst);
}
- void OnResponse(const KadMsgRsp* rsp) {
+ void OnResponse(const msgpack::object* rsp) {
mDst->mFailQty=0;
mHandler(rsp, mDst);
}
class SelectById {
public:
- SelectById(const KadMsgId& id) : mMsgId (id) {};
+ SelectById(const uint64_t& id) : mMsgId (id) {};
bool operator()(const ReqTracker& t) { return mMsgId == t.mMsgId; }
bool operator()(const ReqTracker* t) { return mMsgId == t->mMsgId; }
private:
- const KadMsgId mMsgId;
+ const uint64_t mMsgId;
};
KadContactPtr mDst;
- const KadMsgId mMsgId;
+ const uint64_t mMsgId;
protected:
Handler mHandler;
//time mStarted;
@@ -97,7 +85,7 @@ class KadOpMgr {
LOG(mMgr->mLog, FMT("%d sent", sentQty));
}
- void OnResponce(const KadMsgRsp* rsp, KadContactPtr& contact) {
+ void OnResponce(const msgpack::object* rsp, KadContactPtr& contact) {
KadContactList::It pending = mBlock.FindAfter(mBlock.First(), contact);
if (pending == mBlock.End()) {
LOG_WARN(mMgr->mLog, "Wrong pending");
@@ -129,16 +117,17 @@ class KadOpMgr {
return;
} else {
- const KadMsgFindRsp* findRsp = (const KadMsgFindRsp*)rsp;
+ const KadMsgFindRsp findRsp = rsp->convert();
// Merge lists
int insertQty = 0;
- for (int i=0; i<KADEMLIA_BUCKET_SIZE; i++) {
- const KadId& id = findRsp->mContacts[i].id;
+ for (std::vector<KadMsgContact>::const_iterator i=findRsp.mContacts.begin();
+ i != findRsp.mContacts.end(); i++)
+ {
+ const KadId& id = i->mId;
if (!id.IsZero() && id != mMgr->LocalId()) {
if (!FindById(id, mBlock) && !FindById(id, mList)) {
- KadContact c(findRsp->mContacts[i].id, XSockAddr(findRsp->mContacts[i].addr));
- if (Insert(mMgr->FindAddContact(c), mList)) {
+ if (Insert(mMgr->FindAddContact(id, i->mAddr), mList)) {
insertQty++;
}
}
@@ -248,34 +237,54 @@ class KadOpMgr {
};
private:
- void RecieveCbk(const void* buff, unsigned len, const KadNet::Address& from) {
- const KadMsg* req = (const KadMsg*)buff;
+ void RecieveCbk(const void* buff, size_t len, const KadNet::Address& from) {
+ msgpack::unpacked msg;
+ msgpack::unpack(&msg, (const char*)buff, len);
+
+ msgpack::object obj = msg.get();
+
+ // Sanity check
+ if (obj.type != msgpack::type::ARRAY || // Object
+ obj.via.array.size == 0 || // With items
+ obj.via.array.ptr[0].type !=
+ msgpack::type::ARRAY || // Header
+ obj.via.array.ptr[0].via.array.size !=
+ 3 || // 3 fields in header
+ obj.via.array.ptr[0].via.array.ptr[0].type !=
+ msgpack::type::POSITIVE_INTEGER || // first field is Type number
+ obj.via.array.ptr[0].via.array.ptr[0].via.u64
+ >= KadMsg::KAD_MSG_QTY) // proper range
+ {
+ LOG_WARN(mLog, "Invalid format");
+ return;
+ }
//XThread::SleepMs(RandRange(0,8));
- LOG_DEEP(mLog, "<< " << *req);
+ //LOG_DEEP(mLog, "<< " << *req);
+
+ KadMsg req = msg.get().via.array.ptr[0].convert();
- if (req->NodeId() == LocalId()) {
+ if (KadId(req.mSrcId) == LocalId()) {
LOG_WARN(mLog, "Message from self -> dropped");
return;
}
mLock.Lock();
- KadContact newCont(req->NodeId(), from);
- KadContactPtr contact = FindAddContact(newCont);
+ KadContactPtr contact = FindAddContact(req.mSrcId, from);
mLock.Unlock();
- switch (req->MsgType()) {
+ switch (req.mMsgType) {
case KadMsg::KAD_MSG_PING: {
KadMsgPong pong;
SendResponse(pong, from, req);
} break;
case KadMsg::KAD_MSG_FIND_REQ: {
- const KadMsgFindReq* findReq = (const KadMsgFindReq*)req;
+ const KadMsgFindReq findReq = msg.get().convert();
mLock.Lock();
- KadContactList lst = mRoutingTable.FindClosest(findReq->FindId(), KADEMLIA_BUCKET_SIZE);
+ KadContactList lst = mRoutingTable.FindClosest(findReq.mTargetId, KADEMLIA_BUCKET_SIZE);
// TODO: Remove the originating id
- KadMsgFindRsp findRsp(KadMsgRsp::KAD_MSG_STATUS_OK, lst);
+ KadMsgFindRsp findRsp(lst);
SendResponse(findRsp, from, req);
mLock.Unlock();
} break;
@@ -290,10 +299,10 @@ class KadOpMgr {
case KadMsg::KAD_MSG_FIND_RSP:
case KadMsg::KAD_MSG_STORE_RSP:
case KadMsg::KAD_MSG_REMOVE_RSP: {
- HandleResponse((const KadMsgRsp*)buff, from);
+ HandleResponse(req, &obj, from);
} break;
default:
- LOG_WARN(mLog, "Message not handled (type: " << req->MsgType() << ")");
+ LOG_WARN(mLog, "Message not handled (type: " << req.mMsgType << ")");
break;
}
}
@@ -303,11 +312,14 @@ class KadOpMgr {
void SendRequest(T& s, ReqTracker* handler) {
//LOG_DEEP(mLog, "[> " << s);
- s.SetNodeId(LocalId());
- s.SetMsgId(handler->mMsgId);
+ s.mSrcId = LocalId();
+ s.mMsgId = handler->mMsgId;
mOps.Append(handler);
- mListener.SendTo(&s, sizeof(s), handler->mDst->mAddrExt);
+
+ msgpack::sbuffer sbuf;
+ msgpack::pack(sbuf, s);
+ mListener.SendTo(sbuf.data(), sbuf.size(), handler->mDst->mAddrExt);
XTimerContext::Global().SetTimer(XTimerContext::Handler(this, &KadOpMgr::HandleTimeout), KADEMLIA_TIMEOUT_RESPONSE, 0, handler);
}
@@ -315,35 +327,40 @@ class KadOpMgr {
void SendRequest(T& s, const KadNet::Address& addr) {
//LOG_DEEP(mLog, "[> " << s);
- s.SetNodeId(LocalId());
- s.SetMsgId(0);
+ s.mSrcId = LocalId();
+ s.mMsgId = 0;
- mListener.SendTo(&s, sizeof(s), addr);
+ msgpack::sbuffer sbuf;
+ msgpack::pack(sbuf, s);
+ mListener.SendTo(sbuf.data(), sbuf.size(), addr);
}
template <typename T>
- void SendResponse(T& s, const KadNet::Address& addr, const KadMsg* req) const {
+ void SendResponse(T& s, const KadNet::Address& addr, const KadMsg& req) const {
//LOG_DEEP(mLog, ">> " << s);
- s.SetNodeId(LocalId());
- s.SetMsgId(req->MsgId());
- mListener.SendTo(&s, sizeof(s), addr);
+ s.mSrcId = LocalId();
+ s.mMsgId = req.mMsgId;
+
+ msgpack::sbuffer sbuf;
+ msgpack::pack(sbuf, s);
+ mListener.SendTo(sbuf.data(), sbuf.size(), addr);
}
- void HandleResponse(const KadMsgRsp* rsp, const KadNet::Address& addr) {
+ void HandleResponse(const KadMsg& hdr, const msgpack::object* obj, const KadNet::Address& addr) {
X_UNUSED(addr);
- if (rsp->MsgId().IsZero()) {
+ if (hdr.mMsgId == 0) {
return;
}
mLock.Lock(); // TODO: Blocks
- XList<ReqTracker*>::It t = mOps.FindAfter(mOps.First(), ReqTracker::SelectById(rsp->MsgId()));
+ XList<ReqTracker*>::It t = mOps.FindAfter(mOps.First(), ReqTracker::SelectById(hdr.mMsgId));
if (t != mOps.End()) {
ReqTracker* handler = mOps[t];
mOps.Remove(t);
XTimerContext::Global().CancelTimer(XTimerContext::Handler(this, &KadOpMgr::HandleTimeout), handler);
- handler->OnResponse(rsp);
+ handler->OnResponse(obj);
delete handler;
} else {
LOG_WARN(mLog, "Transaction not found (response)");
@@ -383,12 +400,22 @@ class KadOpMgr {
return mRoutingTable.FindClosest(id, qty);
}
- KadContactPtr FindAddContact(KadContact& cont) {
- if (cont.mId == LocalId()) {
+ KadContactPtr FindAddContact(const KadId& id, const KadNet::Address& addr) {
+ if (id == LocalId()) {
return KadContactPtr();
}
- return mRoutingTable.AddNode(cont);
+ // look in RT
+ if (KadContactPtr rt = mRoutingTable.AddNode(id, addr)) {
+ return rt;
+ }
+
+ // look in the cache
+ if (KadContactPtr cc = CheckInCache(id, addr)) {
+ return cc;
+ }
+
+ return KadContactPtr();
}
void Find(const KadId& key) {
@@ -430,6 +457,32 @@ class KadOpMgr {
SendRequest(ping, addr);
}
+ KadContactPtr CheckInCache(const KadId& id, const KadNet::Address& addr) {
+
+ for (KadContactList::It it = mContactCache.First(); it != mContactCache.End(); ++it) {
+ if (id == mContactCache[it]->mId) { // TODO: Also check IP?
+ // Hit -> move to back and return it
+ KadContactPtr result = mContactCache[it];
+ if (it != mContactCache.Last()) {
+ mContactCache.Remove(it);
+ mContactCache.Append(result);
+ }
+ return result;
+ }
+ }
+
+ {
+ // Miss -> append a new contact
+ KadContactPtr result(new KadContact(id, addr));
+ mContactCache.Append(result);
+ // If cache too large -> forget oldest
+ while (mContactCache.Count() > 64) {
+ mContactCache.PopFront();
+ }
+ return result;
+ }
+ }
+
public:
void DumpTableDot(ostream& s) const {
@@ -443,6 +496,9 @@ class KadOpMgr {
//XTimerContext mTimeoutTimers;
KadRtNode mRoutingTable;
+
+ KadContactList mContactCache;
+
XMutexRecursive mLock;
XLog mLog;
View
4 src/KadOverIP.h
@@ -37,7 +37,7 @@ namespace KadNet {
: private XThread
{
public:
- typedef XDelegate< void (const void* buff, unsigned len, const Address& addr) > Handler;
+ typedef XDelegate< void (const void* buff, size_t len, const Address& addr) > Handler;
public:
Listener(const XSockAddr& addr, Handler cbk)
@@ -59,7 +59,7 @@ namespace KadNet {
XThread::Wait();
}
- void SendTo(const void* buff, unsigned len, const Address& addr) const {
+ void SendTo(const void* buff, size_t len, const Address& addr) const {
mSocket.SendTo(buff, len, addr);
}
View
37 src/KadRtNode.h
@@ -1,6 +1,7 @@
#ifndef ROUTING_TABLE_H_
#define ROUTING_TABLE_H_
+
#include "KadContact.h"
#include "XThread.h"
@@ -65,11 +66,11 @@ class TKadRtNode
return true;
}
- KadContactPtr AddNode(const KadContact& newNode, const KadId& d) {
- if (!IsBucket()) return Closest(d)->AddNode(newNode, d);
+ KadContactPtr AddNode(const KadId& id, const KadNet::Address& addr, const KadId& dist) {
+ if (!IsBucket()) return Closest(dist)->AddNode(id, addr, dist);
// Bucket found
- KadContactList::It c = FindNodeById(mContacts, newNode.mId);
+ KadContactList::It c = FindNodeById(mContacts, id);
if (c != mContacts.End()) {
KadContactPtr node = mContacts[c];
// Node is already present, update the info
@@ -81,39 +82,39 @@ class TKadRtNode
//LOG(NULL, "New node: " << newNode.mId);
// Make a copy
- KadContactPtr node(new KadContact(newNode));
+ KadContactPtr node(new KadContact(id, addr));
// Bucket is not full, insert the node
mContacts.Append(node);
// TODO: KAD_STATS.mAddNewQty++;
return node;
} else if (Split()) {
- return AddNode(newNode, d);
+ return AddNode(id, addr, dist);
} else {
// TODO: Check cache order
- KadContactList::It c2 = FindNodeById(mCache, newNode.mId);
+ KadContactList::It c2 = FindNodeById(mCache, id);
if (c2 != mCache.End()) {
KadContactPtr node = mCache[c2];
// TODO: Update data
mCache.Append(mCache.Remove(c2));
return node;
} else if (mCache.Count() < KADEMLIA_CACHE_SIZE) {
- KadContactPtr node(new KadContact(newNode));
+ KadContactPtr node(new KadContact(id, addr));
mCache.Append(node);
return node;
} else {
mCache.PopFront();
- KadContactPtr node(new KadContact(newNode));
+ KadContactPtr node(new KadContact(id, addr));
mCache.Append(node);
return node;
}
}
- return KadContactPtr(new KadContact(newNode));
+ return KadContactPtr();
}
- bool RemoveNode(const KadId& id, const KadId& d) {
- if (!IsBucket()) return Closest(d)->RemoveNode(id, d);
+ bool RemoveNode(const KadId& id, const KadId& dist) {
+ if (!IsBucket()) return Closest(dist)->RemoveNode(id, dist);
// Bucket found, check if node is present
KadContactList::It it = FindNodeById(mContacts, id);
@@ -133,9 +134,9 @@ class TKadRtNode
return false;
}
- KadContactPtr FindNode(const KadId& id, const KadId& d) const {
+ KadContactPtr FindNode(const KadId& id, const KadId& dist) const {
if (!IsBucket()) {
- return Closest(d)->FindNode(id, d);
+ return Closest(dist)->FindNode(id, dist);
} else {
for (KadContactList::It it = mContacts.First(); it != mContacts.End(); ++it) {
if (mContacts[it]->mId == id) {
@@ -146,14 +147,14 @@ class TKadRtNode
}
}
- KadContactList FindClosest(const KadId& id, const KadId& d, unsigned qty) const {
+ KadContactList FindClosest(const KadId& id, const KadId& dist, unsigned qty) const {
if (qty <= 0) {
return KadContactList();
}
if (!IsBucket()) {
- KadContactList result = Closest(d)->FindClosest(id, d, qty);
+ KadContactList result = Closest(dist)->FindClosest(id, dist, qty);
if (result.Count() < qty) {
- result.Append(Farthest(d)->FindClosest(id, d, qty - result.Count()));
+ result.Append(Farthest(dist)->FindClosest(id, dist, qty - result.Count()));
}
return result;
} else {
@@ -210,8 +211,8 @@ class TKadRtNode
return refreshList;
}
- KadContactPtr AddNode(const KadContact& newNode) {
- return AddNode(newNode, newNode.mId ^ mLocalId);
+ KadContactPtr AddNode(const KadId& id, const KadNet::Address& addr) {
+ return AddNode(id, addr, id ^ mLocalId);
}
bool RemoveNode(const KadId& id) {
View
3  src/operations/KadFind.h
@@ -1,5 +1,5 @@
#include "KadMsg.h"
-
+/*
class KadMsgAddr
{
enum Family {
@@ -119,3 +119,4 @@ const XLog::Stream& operator <<(const XLog::Stream& str, const KadMsgFindRsp& v)
}
return str << "KAD_MSG_FIND_RSP { " << qty << " }";
}
+*/
View
229 src/operations/KadMsg.h
@@ -2,89 +2,16 @@
#define KAD_MSG_H_
#include "KadConfig.h"
+#include "KadContact.h"
#include "KadNodeId.h"
-typedef KadId KadMsgId;
-
-class KadMsg
-{
-public:
- enum KadMsgType
- {
- KAD_MSG_PING,
- KAD_MSG_PONG,
- KAD_MSG_JOIN_REQ,
- KAD_MSG_JOIN_RSP,
- KAD_MSG_FIND_REQ,
- KAD_MSG_FIND_RSP,
- KAD_MSG_STORE_REQ,
- KAD_MSG_STORE_RSP,
- KAD_MSG_REMOVE_REQ,
- KAD_MSG_REMOVE_RSP,
- KAD_MSG_LEAVE_REQ,
- KAD_MSG_LEAVE_RSP,
-
- KAD_MSG_RFIND_REQ,
- KAD_MSG_RFIND_RSP,
-
- KAD_MSG_QTY
- };
-
-public:
- KadMsg(KadMsgType type)
- : mVersion (1)
- , mMsgType (type)
- {}
-
- //suint16_t Version() const { return ntohs(mVersion); }
- const KadMsgId& MsgId() const { return mMsgId; }
- KadMsgType MsgType() const { return KadMsgType(mMsgType); }
- const KadId& NodeId() const { return mNodeId; }
-
- void SetMsgId(const KadMsgId& id) { mMsgId = id; }
- void SetNodeId(const KadId& id) { mNodeId = id; }
-
-private:
- uint8_t mVersion;
- uint8_t mMsgType;
- KadMsgId mMsgId;
- KadId mNodeId;
-
-};
-
-class KadMsgRsp : public KadMsg
-{
-public:
- enum KadMsgStatus
- {
- KAD_MSG_STATUS_OK,
- KAD_MSG_STATUS_FAILURE,
- KAD_MSG_STATUS_TIMEOUT,
- KAD_MSG_STATUS_NOT_FOUND,
- KAD_MSG_STATUS_EXISTS,
- };
-
-public:
- KadMsgRsp(KadMsgType type, KadMsgStatus status)
- : KadMsg (type)
- , mStatus (status)
- {}
-
- KadMsgStatus Status() const { return KadMsgStatus(mStatus); }
-
-private:
- uint8_t mStatus;
-
-} GCC_SPECIFIC(__attribute__((packed)));
-
-
-
#include <msgpack.hpp>
#include <vector>
#include <string>
template<unsigned SIZE>
-struct MsgPackRaw {
+struct MsgPackRaw
+{
uint8_t mData[SIZE];
void msgpack_pack(msgpack::packer<msgpack::sbuffer>& pk) const
@@ -101,50 +28,15 @@ struct MsgPackRaw {
}
};
-template<unsigned SIZE>
-struct MsgPackHeader {
-
- enum Type
- {
- KAD_MSG_PING,
- KAD_MSG_PONG,
- KAD_MSG_JOIN_REQ,
- KAD_MSG_JOIN_RSP,
- KAD_MSG_FIND_REQ,
- KAD_MSG_FIND_RSP,
- KAD_MSG_STORE_REQ,
- KAD_MSG_STORE_RSP,
- KAD_MSG_REMOVE_REQ,
- KAD_MSG_REMOVE_RSP,
- KAD_MSG_LEAVE_REQ,
- KAD_MSG_LEAVE_RSP,
-
- KAD_MSG_RFIND_REQ,
- KAD_MSG_RFIND_RSP,
-
- KAD_MSG_QTY
- };
-
- uint16_t mMsgType;
- uint64_t mMsgId;
- MsgPackRaw<SIZE> mNodeId;
-
- MSGPACK_DEFINE(mMsgType, mMsgId, mNodeId);
-};
-struct MsgPackAddr
+struct KadMsgAddr
{
enum Family {
UNSPEC, IPv4, IPv6
};
- uint8_t mFamily;
- uint16_t mPort;
- MsgPackRaw<16> mAddr;
-
-public:
- MsgPackAddr() : mFamily(UNSPEC) {}
- MsgPackAddr(const XSockAddr& a) {
+ KadMsgAddr() : mFamily(UNSPEC) {}
+ KadMsgAddr(const XSockAddr& a) {
switch (a.SA()->sa_family) {
case AF_INET:
mFamily = IPv4;
@@ -181,31 +73,116 @@ struct MsgPackAddr
return result;
}
+ uint8_t mFamily;
+ uint16_t mPort;
+ MsgPackRaw<16> mAddr;
+
MSGPACK_DEFINE(mFamily, mPort, mAddr);
};
-template<unsigned SIZE>
-struct MsgPackContact {
- MsgPackRaw<SIZE> mId;
- MsgPackAddr mAddr;
+struct KadMsgId
+{
+ KadMsgId() {}
+ KadMsgId(const KadId& id) { memcpy(mData, id.Data(), KADEMLIA_ID_SIZE); }
+ operator KadId() const { KadId id; memcpy(id.Data(), mData, KADEMLIA_ID_SIZE); return id; }
+
+ uint8_t mData[KADEMLIA_ID_SIZE];
+
+ void msgpack_pack(msgpack::packer<msgpack::sbuffer>& pk) const
+ {
+ pk.pack_raw(KADEMLIA_ID_SIZE);
+ pk.pack_raw_body((const char*)&mData, KADEMLIA_ID_SIZE);
+ }
+
+ void msgpack_unpack(msgpack::object o)
+ {
+ if(o.type != msgpack::type::RAW) { throw msgpack::type_error(); }
+ if(o.via.raw.size != KADEMLIA_ID_SIZE) { return; }
+ memcpy(&mData, o.via.raw.ptr, KADEMLIA_ID_SIZE);
+ }
+};
+
+struct KadMsgContact
+{
+ KadMsgContact() {}
+ KadMsgContact(const KadContact& a) : mId (a.mId), mAddr (a.mAddrExt) { }
+ operator KadContact() const { return KadContact(mId, mAddr); }
+
+ KadMsgId mId;
+ KadMsgAddr mAddr;
MSGPACK_DEFINE(mId, mAddr);
};
-template<unsigned SIZE>
-struct MsgPackFindReq: public MsgPackHeader<SIZE>
+struct KadMsg
{
- MsgPackRaw<SIZE> mFindId;
+ enum Type
+ {
+ KAD_MSG_PING,
+ KAD_MSG_PONG,
+ KAD_MSG_JOIN_REQ,
+ KAD_MSG_JOIN_RSP,
+ KAD_MSG_FIND_REQ,
+ KAD_MSG_FIND_RSP,
+ KAD_MSG_STORE_REQ,
+ KAD_MSG_STORE_RSP,
+ KAD_MSG_REMOVE_REQ,
+ KAD_MSG_REMOVE_RSP,
+ KAD_MSG_LEAVE_REQ,
+ KAD_MSG_LEAVE_RSP,
+
+ KAD_MSG_RFIND_REQ,
+ KAD_MSG_RFIND_RSP,
- MSGPACK_DEFINE((MsgPackHeader<SIZE>&)(*this), mFindId);
+ KAD_MSG_QTY
+ };
+
+ KadMsg() {}
+ KadMsg(Type t) : mMsgType (t) {}
+
+ uint16_t mMsgType;
+ uint64_t mMsgId;
+ KadMsgId mSrcId;
+
+ MSGPACK_DEFINE(mMsgType, mMsgId, mSrcId);
};
-template<unsigned SIZE>
-struct MsgPackFindRsp: public MsgPackHeader<SIZE>
+struct KadMsgPing: public KadMsg
+{
+ KadMsgPing() : KadMsg(KadMsg::KAD_MSG_PING) {}
+
+ MSGPACK_DEFINE((KadMsg&)(*this));
+};
+
+struct KadMsgPong: public KadMsg
+{
+ KadMsgPong() : KadMsg(KadMsg::KAD_MSG_PONG) {}
+
+ MSGPACK_DEFINE((KadMsg&)(*this));
+};
+
+struct KadMsgFindReq: public KadMsg
{
- std::vector<MsgPackContact<SIZE> > mContacts;
+ KadMsgFindReq() {}
+ KadMsgFindReq(const KadMsgId& id) : KadMsg(KadMsg::KAD_MSG_FIND_REQ), mTargetId(id) {}
+
+ KadMsgId mTargetId;
+
+ MSGPACK_DEFINE((KadMsg&)(*this), mTargetId);
+};
+
+struct KadMsgFindRsp: public KadMsg
+{
+ KadMsgFindRsp() {}
+ KadMsgFindRsp(const KadContactList& lst) : KadMsg(KadMsg::KAD_MSG_FIND_RSP) {
+ for (KadContactList::It it = lst.First(); it != lst.End(); ++it) {
+ mContacts.push_back(*lst[it]);
+ }
+ }
+
+ std::vector<KadMsgContact> mContacts;
- MSGPACK_DEFINE((MsgPackHeader<SIZE>&)(*this), mContacts);
+ MSGPACK_DEFINE((KadMsg&)(*this), mContacts);
};
#endif /* KAD_MSG_H_ */
View
22 src/operations/KadPing.h
@@ -1,25 +1,6 @@
#include "KadMsg.h"
-class KadMsgPing : public KadMsg
-{
-public:
- KadMsgPing()
- : KadMsg(KadMsg::KAD_MSG_PING)
- {}
-
-} GCC_SPECIFIC(__attribute__((packed)));
-
-class KadMsgPong : public KadMsgRsp
-{
-
-public:
- KadMsgPong()
- : KadMsgRsp(KadMsg::KAD_MSG_PONG, KadMsgRsp::KAD_MSG_STATUS_OK)
- {
- }
-
-} GCC_SPECIFIC(__attribute__((packed)));
-
+/*
inline
const XLog::Stream& operator <<(const XLog::Stream& str, const KadMsgPing& v)
{
@@ -31,3 +12,4 @@ const XLog::Stream& operator <<(const XLog::Stream& str, const KadMsgPong& v)
{
return str << "KAD_MSG_PONG";
}
+*/
View
2  src/test/KadOpMgrTS.h
@@ -7,7 +7,7 @@ class KadOpMgrTS : public CxxTest::TestSuite
void testMsgSizes() {
LOG(NULL, "Req: " << sizeof(KadMsg));
- LOG(NULL, "Rsp: " << sizeof(KadMsgRsp));
+ LOG(NULL, "Rsp: " << sizeof(KadMsg));
LOG(NULL, "Ping: " << sizeof(KadMsgPing));
LOG(NULL, "Pong: " << sizeof(KadMsgPong));
View
2  src/test/KadRtNodeTS.h
@@ -24,7 +24,7 @@ class KadRtNodeTS: public CxxTest::TestSuite
for (int i=0; i<10000; i++) {
KadId cid = KadId::Random();
- rt.AddNode(KadContact(cid, XSockAddr::Random()));
+ rt.AddNode(cid, XSockAddr::Random());
TS_ASSERT_EQUALS(rt.CountSpaces(), rt.CountBuckets()-1);
}
View
22 src/test/MsgPackTS.h
@@ -34,11 +34,11 @@ class MsgPackTS : public CxxTest::TestSuite
void testCast(void)
{
- MsgPackFindRsp<16> a;
+ KadMsgFindRsp a;
memset(&a, 0, sizeof(a));
- a.mMsgType = MsgPackHeader<16>::KAD_MSG_JOIN_RSP;
- MsgPackContact<16> c1;
+ a.mMsgType = KadMsg::KAD_MSG_JOIN_RSP;
+ KadMsgContact c1;
c1.mAddr.mPort = 4096;
a.mContacts.push_back(c1);
@@ -53,7 +53,7 @@ class MsgPackTS : public CxxTest::TestSuite
//msgpack_object_print(stdout, msg.get());
- MsgPackFindRsp<16> b;
+ KadMsgFindRsp b;
memset(&b, 0, sizeof(b));
msg.get().convert(&b);
@@ -66,33 +66,33 @@ class MsgPackTS : public CxxTest::TestSuite
{
{ // Just header: Ping/Pong
msgpack::sbuffer sbuf;
- msgpack::pack(sbuf, MsgPackHeader<16>());
+ msgpack::pack(sbuf, KadMsg());
TS_ASSERT_EQUALS(sbuf.size(), 20);
}
{ // Address
msgpack::sbuffer sbuf;
- msgpack::pack(sbuf, MsgPackAddr());
+ msgpack::pack(sbuf, KadMsgAddr());
TS_ASSERT_EQUALS(sbuf.size(), 22);
}
{ // Contact
msgpack::sbuffer sbuf;
- msgpack::pack(sbuf, MsgPackContact<16>());
+ msgpack::pack(sbuf, KadMsgContact());
TS_ASSERT_EQUALS(sbuf.size(), 38);
}
{ // FindReq
msgpack::sbuffer sbuf;
- msgpack::pack(sbuf, MsgPackFindReq<16>());
+ msgpack::pack(sbuf, KadMsgFindReq());
TS_ASSERT_EQUALS(sbuf.size(), 38);
}
{ // FindRsp with 0 contacts
msgpack::sbuffer sbuf;
- msgpack::pack(sbuf, MsgPackFindRsp<16>());
+ msgpack::pack(sbuf, KadMsgFindRsp());
TS_ASSERT_EQUALS(sbuf.size(), 22);
}
{ // FindRsp with 16 contacts
- MsgPackFindRsp<16> msg;
+ KadMsgFindRsp msg;
for (int i=0; i<16; i++) {
- msg.mContacts.push_back(MsgPackContact<16>());
+ msg.mContacts.push_back(KadMsgContact());
}
msgpack::sbuffer sbuf;
msgpack::pack(sbuf, msg);
Please sign in to comment.
Something went wrong with that request. Please try again.