Skip to content

Commit

Permalink
dhtproxy: inject push notifications in opendht
Browse files Browse the repository at this point in the history
+ add the ability to inject push notifications in opendht to call
correct callbacks
+ if a timeout occurs and no cancelListen was called, relaunch the
listen operation
+ Pass devicekey to enable push notifications for DhtProxyClient
+ Update sockaddr to fix the build on all platforms

Note: update dhtnode
  • Loading branch information
AmarOk1412 committed Dec 12, 2017
1 parent 75cca66 commit 37a9a3d
Show file tree
Hide file tree
Showing 13 changed files with 416 additions and 95 deletions.
2 changes: 1 addition & 1 deletion configure.ac
Expand Up @@ -140,7 +140,7 @@ AM_CONDITIONAL(ENABLE_PROXY_CLIENT, test x$proxy_client == xyes)


AM_COND_IF([ENABLE_PROXY_SERVER], [ AM_COND_IF([ENABLE_PROXY_SERVER], [
AC_CHECK_LIB(restbed, exit,, AC_MSG_ERROR([Missing restbed files])) AC_CHECK_LIB(restbed, exit,, AC_MSG_ERROR([Missing restbed files]))
PKG_CHECK_MODULES([Jsoncpp], [jsoncpp >= 1.7.4]) PKG_CHECK_MODULES([Jsoncpp], [jsoncpp >= 1.7.2])
CPPFLAGS+=" -DOPENDHT_PROXY_SERVER=true -ljsoncpp -lrestbed" CPPFLAGS+=" -DOPENDHT_PROXY_SERVER=true -ljsoncpp -lrestbed"
], [ ], [
CPPFLAGS+=" -DOPENDHT_PROXY_SERVER=false" CPPFLAGS+=" -DOPENDHT_PROXY_SERVER=false"
Expand Down
13 changes: 12 additions & 1 deletion include/opendht/dht.h
Expand Up @@ -73,7 +73,7 @@ class OPENDHT_PUBLIC Dht final : public DhtInterface {




#if OPENDHT_PROXY_CLIENT #if OPENDHT_PROXY_CLIENT
void startProxy(const std::string&) {}; void startProxy(const std::string&, const std::string&) {};
#endif #endif


/** /**
Expand Down Expand Up @@ -295,6 +295,17 @@ class OPENDHT_PUBLIC Dht final : public DhtInterface {


std::vector<SockAddr> getPublicAddress(sa_family_t family = 0); std::vector<SockAddr> getPublicAddress(sa_family_t family = 0);


#if OPENDHT_PUSH_NOTIFICATIONS
/**
* Call linked callback with a push notification
* @param notification to process
*/
void pushNotificationReceived(const Json::Value&) {
// Ignore this
}
void resubscribe(const unsigned) {}
#endif // OPENDHT_PUSH_NOTIFICATIONS

private: private:


/* When performing a search, we search for up to SEARCH_NODES closest nodes /* When performing a search, we search for up to SEARCH_NODES closest nodes
Expand Down
15 changes: 14 additions & 1 deletion include/opendht/dht_interface.h
Expand Up @@ -29,7 +29,7 @@ class OPENDHT_PUBLIC DhtInterface {
virtual ~DhtInterface() = default; virtual ~DhtInterface() = default;


#if OPENDHT_PROXY_CLIENT #if OPENDHT_PROXY_CLIENT
virtual void startProxy(const std::string& host) = 0; virtual void startProxy(const std::string& host, const std::string& deviceKey = "") = 0;
#endif #endif


// [[deprecated]] // [[deprecated]]
Expand Down Expand Up @@ -231,6 +231,19 @@ class OPENDHT_PUBLIC DhtInterface {
virtual void setLogFilter(const InfoHash& f) { virtual void setLogFilter(const InfoHash& f) {
DHT_LOG.setFilter(f); DHT_LOG.setFilter(f);
} }

#if OPENDHT_PUSH_NOTIFICATIONS
/**
* Call linked callback with a push notification
* @param notification to process
*/
virtual void pushNotificationReceived(const Json::Value& notification) = 0;
/**
* Refresh a listen via a token
* @param token
*/
virtual void resubscribe(const unsigned token) = 0;
#endif // OPENDHT_PUSH_NOTIFICATIONS
protected: protected:
bool logFilerEnable_ {}; bool logFilerEnable_ {};
InfoHash logFiler_ {}; InfoHash logFiler_ {};
Expand Down
40 changes: 33 additions & 7 deletions include/opendht/dht_proxy_client.h
Expand Up @@ -44,13 +44,15 @@ class OPENDHT_PUBLIC DhtProxyClient final : public DhtInterface {
/** /**
* Initialise the DhtProxyClient with two open sockets (for IPv4 and IP6) * Initialise the DhtProxyClient with two open sockets (for IPv4 and IP6)
* and an ID for the node. * and an ID for the node.
* @param serverHost the proxy address
*/ */
explicit DhtProxyClient(const std::string& serverHost); explicit DhtProxyClient(const std::string& serverHost);
/** /**
* Start the connection with a server. * Start the connection with a server.
* @param serverHost the server address * @param serverHost the server address
* @param deviceKey if we use push notifications
*/ */
void startProxy(const std::string& serverHost); void startProxy(const std::string& serverHost, const std::string& deviceKey = "");


virtual ~DhtProxyClient(); virtual ~DhtProxyClient();


Expand Down Expand Up @@ -101,6 +103,8 @@ class OPENDHT_PUBLIC DhtProxyClient final : public DhtInterface {
get(key, bindGetCb(cb), bindDoneCb(donecb), std::forward<Value::Filter>(f), std::forward<Where>(w)); get(key, bindGetCb(cb), bindDoneCb(donecb), std::forward<Value::Filter>(f), std::forward<Where>(w));
} }


void get(const InfoHash& key, const GetCallback& cb, DoneCallback donecb, const Value::Filter& filterChain);

/** /**
* Announce a value on all available protocols (IPv4, IPv6). * Announce a value on all available protocols (IPv4, IPv6).
* *
Expand Down Expand Up @@ -162,18 +166,26 @@ class OPENDHT_PUBLIC DhtProxyClient final : public DhtInterface {
virtual size_t listen(const InfoHash& key, GetCallbackSimple cb, Value::Filter f={}, Where w = {}) { virtual size_t listen(const InfoHash& key, GetCallbackSimple cb, Value::Filter f={}, Where w = {}) {
return listen(key, bindGetCb(cb), std::forward<Value::Filter>(f), std::forward<Where>(w)); return listen(key, bindGetCb(cb), std::forward<Value::Filter>(f), std::forward<Where>(w));
} }
virtual bool cancelListen(const InfoHash&, size_t token);

#if OPENDHT_PUSH_NOTIFICATIONS
/**
* Call linked callback with a push notification
* @param notification to process
*/
void pushNotificationReceived(const Json::Value& notification);
/**
* Refresh a listen via a token
* @param token
*/
void resubscribe(const unsigned token);
#endif // OPENDHT_PUSH_NOTIFICATIONS


time_point periodic(const uint8_t*, size_t, const SockAddr&); time_point periodic(const uint8_t*, size_t, const SockAddr&);
time_point periodic(const uint8_t *buf, size_t buflen, const sockaddr* from, socklen_t fromlen) { time_point periodic(const uint8_t *buf, size_t buflen, const sockaddr* from, socklen_t fromlen) {
return periodic(buf, buflen, SockAddr(from, fromlen)); return periodic(buf, buflen, SockAddr(from, fromlen));
} }


/**
* TODO
* NOTE: For now, there is no endpoint in the DhtProxyServer to do the following methods.
* It will come in another version. (with push_notifications support)
*/
virtual bool cancelListen(const InfoHash&, size_t token);


/** /**
* Similar to Dht::get, but sends a Query to filter data remotely. * Similar to Dht::get, but sends a Query to filter data remotely.
Expand Down Expand Up @@ -270,6 +282,7 @@ class OPENDHT_PUBLIC DhtProxyClient final : public DhtInterface {
GetCallback cb; GetCallback cb;
Value::Filter filterChain; Value::Filter filterChain;
std::thread thread; std::thread thread;
std::shared_ptr<unsigned> pushNotifToken; // NOTE: unused if not using push notifications
}; };
std::vector<Listener> listeners_; std::vector<Listener> listeners_;
size_t listener_token_ {0}; size_t listener_token_ {0};
Expand Down Expand Up @@ -313,6 +326,19 @@ class OPENDHT_PUBLIC DhtProxyClient final : public DhtInterface {
* Store the current proxy status * Store the current proxy status
*/ */
std::unique_ptr<Json::Value> currentProxyInfos_; std::unique_ptr<Json::Value> currentProxyInfos_;

/**
* If we want to use push notifications by default.
* NOTE: empty by default to avoid to use services like FCM or APN.
*/
std::string deviceKey_ {};
unsigned callbackId_ {0};
std::mutex lockCallback_;

#if OPENDHT_PUSH_NOTIFICATIONS
void fillBodyToGetToken(std::shared_ptr<restbed::Request> request);
#endif // OPENDHT_PUSH_NOTIFICATIONS

}; };


} }
Expand Down
20 changes: 19 additions & 1 deletion include/opendht/dhtrunner.h
Expand Up @@ -381,12 +381,30 @@ class OPENDHT_PUBLIC DhtRunner {
void setProxyServer(const std::string& url = "127.0.0.1:8000") { void setProxyServer(const std::string& url = "127.0.0.1:8000") {
config_.proxy_server = url; config_.proxy_server = url;
} }
void enableProxy(bool proxify); /**
* Start or stop the proxy
* @param proxify if we want to use the proxy
* @param deviceKey non empty to enable push notifications
*/
void enableProxy(bool proxify, const std::string& deviceKey = "");
#endif // OPENDHT_PROXY_CLIENT #endif // OPENDHT_PROXY_CLIENT
#if OPENDHT_PROXY_SERVER #if OPENDHT_PROXY_SERVER
void forwardAllMessages(bool forward); void forwardAllMessages(bool forward);
#endif // OPENDHT_PROXY_SERVER #endif // OPENDHT_PROXY_SERVER


#if OPENDHT_PUSH_NOTIFICATIONS
/**
* Insert a push notification to process for OpenDHT
*/
void pushNotificationReceived(const std::string& notification) const;
void pushNotificationReceived(const Json::Value& notification) const;
/**
* Refresh a listen via a token
* @param token
*/
void resubscribe(const unsigned token);
#endif // OPENDHT_PUSH_NOTIFICATIONS

private: private:
static constexpr std::chrono::seconds BOOTSTRAP_PERIOD {10}; static constexpr std::chrono::seconds BOOTSTRAP_PERIOD {10};


Expand Down
21 changes: 19 additions & 2 deletions include/opendht/securedht.h
Expand Up @@ -294,8 +294,8 @@ class OPENDHT_PUBLIC SecureDht final : public DhtInterface {
} }


#if OPENDHT_PROXY_CLIENT #if OPENDHT_PROXY_CLIENT
void startProxy(const std::string& host) { void startProxy(const std::string& host, const std::string& deviceKey = "") {
dht_->startProxy(host); dht_->startProxy(host, deviceKey);
} }
#endif #endif


Expand All @@ -305,6 +305,23 @@ class OPENDHT_PUBLIC SecureDht final : public DhtInterface {
} }
#endif //OPENDHT_PROXY_SERVER #endif //OPENDHT_PROXY_SERVER


#if OPENDHT_PUSH_NOTIFICATIONS
/**
* Call linked callback with push_notification
* @param notification to process
*/
void pushNotificationReceived(const Json::Value& notification) {
dht_->pushNotificationReceived(notification);
}
/**
* Refresh a listen via a token
* @param token
*/
void resubscribe(const unsigned token) {
dht_->resubscribe(token);
}
#endif // OPENDHT_PUSH_NOTIFICATIONS

private: private:
std::unique_ptr<DhtInterface> dht_; std::unique_ptr<DhtInterface> dht_;
// prevent copy // prevent copy
Expand Down
9 changes: 6 additions & 3 deletions include/opendht/sockaddr.h
Expand Up @@ -28,6 +28,8 @@ typedef uint16_t in_port_t;
#endif #endif
#else #else
#include <iso646.h> #include <iso646.h>
#include <stdint.h>
#include <winsock2.h>
#include <ws2def.h> #include <ws2def.h>
#include <ws2tcpip.h> #include <ws2tcpip.h>
typedef uint16_t sa_family_t; typedef uint16_t sa_family_t;
Expand All @@ -37,6 +39,7 @@ typedef uint16_t in_port_t;
#include <string> #include <string>
#include <memory> #include <memory>
#include <vector> #include <vector>
#include <stdlib.h>


#include <cstring> #include <cstring>
#include <cstddef> #include <cstddef>
Expand Down Expand Up @@ -123,7 +126,7 @@ class OPENDHT_PUBLIC SockAddr {
} }
if (new_length != len) { if (new_length != len) {
len = new_length; len = new_length;
if (len) addr.reset((sockaddr*)std::calloc(len, 1)); if (len) addr.reset((sockaddr*)::calloc(len, 1));
else addr.reset(); else addr.reset();
} }
if (len > sizeof(sa_family_t)) if (len > sizeof(sa_family_t))
Expand Down Expand Up @@ -239,13 +242,13 @@ class OPENDHT_PUBLIC SockAddr {
}; };
private: private:
socklen_t len {0}; socklen_t len {0};
struct free_delete { void operator()(void* p) { std::free(p); } }; struct free_delete { void operator()(void* p) { ::free(p); } };
std::unique_ptr<sockaddr, free_delete> addr {}; std::unique_ptr<sockaddr, free_delete> addr {};


void set(const sockaddr* sa, socklen_t length) { void set(const sockaddr* sa, socklen_t length) {
if (len != length) { if (len != length) {
len = length; len = length;
if (len) addr.reset((sockaddr*)std::malloc(len)); if (len) addr.reset((sockaddr*)::malloc(len));
else addr.reset(); else addr.reset();
} }
if (len) if (len)
Expand Down
5 changes: 5 additions & 0 deletions src/Makefile.am
Expand Up @@ -59,6 +59,11 @@ libopendht_la_SOURCES += base64.h base64.cpp dht_proxy_server.cpp
nobase_include_HEADERS += ../include/opendht/dht_proxy_server.h nobase_include_HEADERS += ../include/opendht/dht_proxy_server.h
endif endif


if ENABLE_PROXY_CLIENT
libopendht_la_SOURCES += dht_proxy_client.cpp
nobase_include_HEADERS += ../include/opendht/dht_proxy_client.h ../include/opendht/dht_interface.h
endif

clean-local: clean-local:
rm -rf libargon2.la rm -rf libargon2.la


Expand Down

0 comments on commit 37a9a3d

Please sign in to comment.