Skip to content

Commit

Permalink
OpCache: keep cache alive during callback processing
Browse files Browse the repository at this point in the history
  • Loading branch information
aberaud committed Mar 20, 2018
1 parent 9a5ef25 commit 2031508
Showing 1 changed file with 29 additions and 19 deletions.
48 changes: 29 additions & 19 deletions src/op_cache.h
Expand Up @@ -102,9 +102,14 @@ class OpCache {
//std::cout << "onValuesAdded: " << viop.first->second.refCount << " refs for value " << v->id << std::endl;
}
}
auto list = listeners;
for (auto& l : list)
l.second.get_cb(l.second.filter.filter(newValues), false);
if (not listeners.empty()) {
std::vector<LocalListener> list;
list.reserve(listeners.size());
for (const auto& l : listeners)
list.emplace_back(l.second);
for (auto& l : list)
l.get_cb(l.filter.filter(newValues), false);
}
}
void onValuesExpired(const std::vector<Sp<Value>>& vals) {
std::vector<Sp<Value>> expiredValues;
Expand All @@ -117,9 +122,14 @@ class OpCache {
values.erase(vit);
}
}
auto list = listeners;
for (auto& l : list)
l.second.get_cb(l.second.filter.filter(expiredValues), true);
if (not listeners.empty()) {
std::vector<LocalListener> list;
list.reserve(listeners.size());
for (const auto& l : listeners)
list.emplace_back(l.second);
for (auto& l : list)
l.get_cb(l.filter.filter(expiredValues), true);
}
}

void addListener(size_t token, ValueCallback cb, Sp<Query> q, Value::Filter filter) {
Expand Down Expand Up @@ -178,26 +188,26 @@ class SearchCache {
}
if (op == ops.end()) {
// New query
op = ops.emplace(q, OpCache{}).first;
auto& cache = op->second;
op = ops.emplace(q, std::unique_ptr<OpCache>(new OpCache)).first;
auto& cache = *op->second;
cache.searchToken = onListen(q, [&](const std::vector<Sp<Value>>& values, bool expired){
return cache.onValue(values, expired);
});
}
auto token = nextToken_++;
if (token == 0)
token++;
op->second.addListener(token, get_cb, q, filter);
if (nextToken_ == 0)
nextToken_++;
op->second->addListener(token, get_cb, q, filter);
return token;
}

bool cancelListen(size_t gtoken, std::function<void(size_t)> onCancel) {
for (auto it = ops.begin(); it != ops.end(); it++) {
if (it->second.removeListener(gtoken)) {
if (it->second.isDone()) {
auto ltoken = it->second.searchToken;
if (it->second->removeListener(gtoken)) {
if (it->second->isDone()) {
auto cache = std::move(it->second);
ops.erase(it);
onCancel(ltoken);
onCancel(cache->searchToken);
}
return true;
}
Expand All @@ -207,10 +217,10 @@ class SearchCache {

std::vector<Sp<Value>> get(Value::Filter& filter) const {
if (ops.size() == 1)
return ops.begin()->second.get(filter);
return ops.begin()->second->get(filter);
std::map<Value::Id, Sp<Value>> c;
for (const auto& op : ops) {
for (const auto& v : op.second.get(filter))
for (const auto& v : op.second->get(filter))
c.emplace(v->id, v);
}
std::vector<Sp<Value>> ret;
Expand All @@ -222,13 +232,13 @@ class SearchCache {

Sp<Value> get(Value::Id id) const {
for (const auto& op : ops)
if (auto v = op.second.get(id))
if (auto v = op.second->get(id))
return v;
return {};
}

private:
std::map<Sp<Query>, OpCache> ops;
std::map<Sp<Query>, std::unique_ptr<OpCache>> ops {};
size_t nextToken_ {1};
};

Expand Down

0 comments on commit 2031508

Please sign in to comment.