Permalink
Browse files

cache: improve value expiration handling

  • Loading branch information...
aberaud committed Mar 28, 2018
1 parent 8c85f84 commit 0edee443f553f627055efa31be6cee05ccb5ee7d
View
@@ -6,12 +6,17 @@
A lightweight C++11 Distributed Hash Table implementation.
* Light and fast C++11 Kademlia DHT library
* Distributed shared key->value data-store
* Clean and powerful distributed map API with storage of arbitrary binary values of up to 56 KB
OpenDHT provides an easy to use distributed in-memory data store.
Every node in the network can read and write values to the store.
Values are distributed over the network, with redundancy.
* Lightweight and scalable, designed for large networks and small devices
* High resilience to network disruption
* Public key cryptography layer providing optional data signature and encryption (using GnuTLS)
* IPv4 and IPv6 support
* Python binding
* Clean and powerful C++11 map API
* Python 3 bindings
* REST API
## Documentation
See the wiki: <https://github.com/savoirfairelinux/opendht/wiki>
View
@@ -414,7 +414,7 @@ class OPENDHT_PUBLIC Dht final : public DhtInterface {
void expireStorage(InfoHash h);
void expireStore(decltype(store)::iterator);
void storageChanged(const InfoHash& id, Storage& st, ValueStorage&);
void storageChanged(const InfoHash& id, Storage& st, ValueStorage&, bool newValue);
std::string printStorageLog(const decltype(store)::value_type&) const;
/**
@@ -535,7 +535,9 @@ class OPENDHT_PUBLIC Dht final : public DhtInterface {
*
* @param sr The search to execute its operations.
*/
void searchStep(Sp<Search> sr);
void searchStep(Sp<Search>);
void searchSynchedNodeListen(const Sp<Search>&, SearchNode&);
void dumpSearch(const Search& sr, std::ostream& out) const;
bool neighbourhoodMaintenance(RoutingTable&);
@@ -205,18 +205,18 @@ class OPENDHT_PUBLIC DhtProxyClient final : public DhtInterface {
/**
* Get data currently being put at the given hash.
*/
std::vector<Sp<Value>> getPut(const InfoHash&) { return {}; }
std::vector<Sp<Value>> getPut(const InfoHash&);
/**
* Get data currently being put at the given hash with the given id.
*/
Sp<Value> getPut(const InfoHash&, const Value::Id&) { return {}; }
Sp<Value> getPut(const InfoHash&, const Value::Id&);
/**
* Stop any put/announce operation at the given location,
* for the value with the given id.
*/
bool cancelPut(const InfoHash&, const Value::Id&) { return false; }
bool cancelPut(const InfoHash&, const Value::Id&);
void pingNode(const sockaddr*, socklen_t, DoneCallbackSimple&& /*cb*/={}) { }
@@ -271,6 +271,8 @@ class OPENDHT_PUBLIC DhtProxyClient final : public DhtInterface {
size_t doListen(const InfoHash& key, ValueCallback, Value::Filter);
bool doCancelListen(const InfoHash& key, size_t token);
void doPut(const InfoHash&, Sp<Value>, DoneCallback, time_point created, bool permanent);
/**
* Initialize statusIpvX_
*/
@@ -304,7 +306,7 @@ class OPENDHT_PUBLIC DhtProxyClient final : public DhtInterface {
*/
struct Listener;
struct ProxySearch;
std::map<InfoHash, ProxySearch> listeners_;
std::map<InfoHash, ProxySearch> searches_;
size_t listener_token_ {0};
std::mutex lockListener_;
@@ -23,6 +23,8 @@
#include "def.h"
#include "sockaddr.h"
#include "infohash.h"
#include "scheduler.h"
#include "value.h"
#include <thread>
#include <memory>
@@ -59,6 +61,27 @@ class OPENDHT_PUBLIC DhtProxyServer
DhtProxyServer& operator=(const DhtProxyServer& other) = delete;
DhtProxyServer& operator=(DhtProxyServer&& other) = delete;
struct ServerStats {
/** Current number of listen operations */
size_t listenCount;
/** Current number of permanent put operations */
size_t putCount;
/** Current number of push tokens with at least one listen operation */
size_t pushListenersCount;
/** Average requests per second */
double requestRate;
std::string toString() const {
std::ostringstream ss;
ss << "Listens: " << listenCount << " Puts: " << putCount << " PushListeners: " << pushListenersCount << std::endl;
ss << "Requests: " << requestRate << " per second." << std::endl;
return ss.str();
}
};
ServerStats getStats() const;
std::shared_ptr<DhtRunner> getNode() const { return dht_; }
/**
* Stop the DhtProxyServer
*/
@@ -96,7 +119,7 @@ class OPENDHT_PUBLIC DhtProxyServer
* On error: HTTP 503, body: {"err":"xxxx"}
* @param session
*/
void listen(const std::shared_ptr<restbed::Session>& session) const;
void listen(const std::shared_ptr<restbed::Session>& session);
/**
* Put a value on the DHT
@@ -107,7 +130,9 @@ class OPENDHT_PUBLIC DhtProxyServer
* HTTP 400, body: {"err":"xxxx"} if bad json or HTTP 502 if put fails
* @param session
*/
void put(const std::shared_ptr<restbed::Session>& session) const;
void put(const std::shared_ptr<restbed::Session>& session);
void cancelPut(const InfoHash& key, Value::Id vid);
#if OPENDHT_PROXY_SERVER_IDENTITY
/**
@@ -167,7 +192,7 @@ class OPENDHT_PUBLIC DhtProxyServer
* on same hash for same device and must be > 0
* @param session
*/
void subscribe(const std::shared_ptr<restbed::Session>& session) const;
void subscribe(const std::shared_ptr<restbed::Session>& session);
/**
* Unsubscribe to push notifications for an iOS or Android device.
* Method: UNSUBSCRIBE "/{InfoHash: .*}"
@@ -178,7 +203,7 @@ class OPENDHT_PUBLIC DhtProxyServer
* on same hash for same device
* @param session
*/
void unsubscribe(const std::shared_ptr<restbed::Session>& session) const;
void unsubscribe(const std::shared_ptr<restbed::Session>& session);
/**
* Send a push notification via a gorush push gateway
* @param key of the device
@@ -187,10 +212,18 @@ class OPENDHT_PUBLIC DhtProxyServer
void sendPushNotification(const std::string& key, const Json::Value& json, bool isAndroid) const;
#endif //OPENDHT_PUSH_NOTIFICATIONS
using clock = std::chrono::steady_clock;
using time_point = clock::time_point;
std::thread server_thread {};
std::unique_ptr<restbed::Service> service_;
std::shared_ptr<DhtRunner> dht_;
std::mutex schedulerLock_;
std::condition_variable schedulerCv_;
Scheduler scheduler_;
std::thread schedulerThread_;
// Handle client quit for listen.
// NOTE: can be simplified when we will supports restbed 5.0
std::thread listenThread_;
@@ -199,37 +232,33 @@ class OPENDHT_PUBLIC DhtProxyServer
InfoHash hash;
std::future<size_t> token;
};
mutable std::vector<SessionToHashToken> currentListeners_;
mutable std::mutex lockListener_;
std::vector<SessionToHashToken> currentListeners_;
std::mutex lockListener_;
std::atomic_bool stopListeners {false};
struct PermanentPut;
struct SearchPuts;
std::map<InfoHash, SearchPuts> puts_;
#if OPENDHT_PUSH_NOTIFICATIONS
struct PushListener {
std::string pushToken;
InfoHash hash;
unsigned token;
std::future<size_t> internalToken;
std::chrono::steady_clock::time_point deadline;
bool started {false};
unsigned callbackId {0};
std::string clientId {};
bool isAndroid {true};
};
mutable std::mutex lockPushListeners_;
mutable std::vector<PushListener> pushListeners_;
mutable unsigned tokenPushNotif_ {0};
struct Listener;
struct PushListener;
std::mutex lockPushListeners_;
std::map<std::string, PushListener> pushListeners_;
unsigned tokenPushNotif_ {0};
void cancelPushListen(const std::string& pushToken, const InfoHash& key, unsigned token, unsigned callbackId);
#endif //OPENDHT_PUSH_NOTIFICATIONS
const std::string pushServer_;
mutable std::atomic<size_t> requestNum_ {0};
mutable std::atomic<time_point> lastStatsReset_ {time_point::min()};
/**
* Remove finished listeners
* @param testSession if we remove the listener only if the session is closed
*/
void removeClosedListeners(bool testSession = true);
/**
* Launch or remove push listeners if needed
*/
void handlePushListeners();
};
}
@@ -36,11 +36,10 @@ namespace dht {
*/
class Scheduler {
public:
Scheduler(const Logger& l) : DHT_LOG(l) {}
struct Job {
Job(std::function<void()>&& f) : do_(std::move(f)) {}
std::function<void()> do_;
void cancel() { do_ = {}; }
};
/**
@@ -58,17 +57,19 @@ class Scheduler {
return job;
}
void add(const Sp<Scheduler::Job>& job, time_point t) {
if (t != time_point::max())
timers.emplace(std::move(t), job);
}
/**
* Reschedules a job.
*
* @param time The time at which the job shall be rescheduled.
* @param job The job to edit.
*
* @return pointer to the newly scheduled job.
* @param t The time at which the job shall be rescheduled.
*/
void edit(Sp<Scheduler::Job>& job, time_point t) {
if (not job) {
DHT_LOG.ERR("editing an empty job");
return;
}
// std::function move doesn't garantee to leave the object empty.
@@ -118,7 +119,6 @@ class Scheduler {
private:
time_point now {clock::now()};
std::multimap<time_point, Sp<Job>> timers {}; /* the jobs ordered by time */
const Logger& DHT_LOG;
};
}
Oops, something went wrong.

0 comments on commit 0edee44

Please sign in to comment.