Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

quick compact: compact only evicted range when it reaches 32MB #11

Merged
merged 1 commit into from

3 participants

Erik Frey mat kelcey Norman Casagrande
Erik Frey

When darner pops an item, it writes a deletion marker to its journal. Occasionally, it should sweep through the journal and delete the old items and the deletion markers.

The old approach had two problems:

  • Old Darner compacted every 1024 pops, which meant the actual compaction range varied widely depending on item size. With really large items (hundreds of kilobytes to megabytes) going through the queue, compaction happened infrequently, caused unbounded memory bloat, and was expensive, freezing the world for tens of seconds, when it did.

  • Old Darner swept the whole journal instead of just the evicted range. Sweeping just the known evicted range is much more efficient than trying to compact the whole journal.

This new compaction approach is more carefully amortized into regular operation: compactions will be upper bounded by how quickly the machine can sweep through 32MB of ram: in tests this is typically hundreds of milliseconds.

Many other queues do compaction in a separate thread - I've amortized it into regular operation because I don't want to deal with the extra complexity. But I may consider that in the future.

mat kelcey

nice!

Norman Casagrande

Looks fine. Just one question: is the compact operation be run in a multi-threaded environment? Leveldb has different levels of safety for concurrency and I don't think CompactRange is among these.

Erik Frey

@nova77 darner does all its work in a single thread.

Erik Frey erikfrey merged commit a989904 into from
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
Showing with 60 additions and 23 deletions.
  1. +17 −1 include/darner/queue/queue.h
  2. +43 −22 src/queue/queue.cpp
18 include/darner/queue/queue.h
View
@@ -151,7 +151,18 @@ class queue
leveldb::Slice slice() const;
- int compare(const key_type& other) const;
+ int compare(const key_type& other) const
+ {
+ if (type < other.type)
+ return -1;
+ else if (type > other.type)
+ return 1;
+ else if (id < other.id)
+ return -1;
+ else if (id > other.id)
+ return 1;
+ return 0;
+ }
unsigned char type;
id_type id;
@@ -193,6 +204,9 @@ class queue
// fires either if timer times out or is canceled
void waiter_wakeup(const boost::system::error_code& e, boost::ptr_list<waiter>::iterator waiter_it);
+ // compact the underlying journal, discarding deleted items
+ void compact();
+
// some leveldb sugar:
void put(const key_type& key, const std::string& value)
@@ -230,6 +244,7 @@ class queue
key_type chunks_head_;
size_type items_open_; // an open item is < TAIL but not in returned_
+ size_type bytes_evicted_; // after we've evicted 32MB from the journal, compress that evicted range
std::set<id_type> returned_; // items < TAIL that were reserved but later returned (not popped)
@@ -237,6 +252,7 @@ class queue
boost::ptr_list<waiter>::iterator wake_up_it_;
boost::asio::io_service& ios_;
+ const std::string path_;
};
} // darner
65 src/queue/queue.cpp
View
@@ -5,6 +5,8 @@
#include <leveldb/iterator.h>
#include <leveldb/write_batch.h>
+#include "darner/util/log.h"
+
using namespace std;
using namespace boost;
using namespace darner;
@@ -15,8 +17,10 @@ queue::queue(asio::io_service& ios, const string& path)
queue_tail_(key_type::KT_QUEUE, 0),
chunks_head_(key_type::KT_CHUNK, 0),
items_open_(0),
+ bytes_evicted_(0),
wake_up_it_(waiters_.begin()),
- ios_(ios)
+ ios_(ios),
+ path_(path)
{
leveldb::Options options;
options.create_if_missing = true;
@@ -141,10 +145,16 @@ void queue::pop_end(bool erase, id_type id, const header_type& header)
write(batch);
- // leveldb is conservative about reclaiming deleted keys, of which there will be many when a queue grows and
- // later shrinks. let's explicitly force it to compact every 1024 deletes
- if (id % 1024 == 0)
- journal_->CompactRange(NULL, NULL);
+ bytes_evicted_ += header.size;
+
+ // leveldb is conservative about reclaiming deleted keys from its underlying journal. let's amortize this
+ // reclamation cost by compacting the evicted range when it reaches 32MB in size. note that this size may be
+ // different than what's on disk, because of snappy compression
+ if (bytes_evicted_ > 33554432)
+ {
+ compact();
+ bytes_evicted_ = 0;
+ }
}
else
{
@@ -206,6 +216,34 @@ void queue::waiter_wakeup(const system::error_code& e, ptr_list<queue::waiter>::
waiter->cb(asio::error::timed_out);
}
+void queue::compact()
+{
+ scoped_ptr<leveldb::Iterator> it(journal_->NewIterator(leveldb::ReadOptions()));
+
+ // compact queue range first
+ key_type kq_beg(key_type::KT_QUEUE, 0);
+ it->Seek(kq_beg.slice());
+ if (!it->Valid())
+ return;
+
+ key_type kq_end(it->key());
+ --kq_end.id; // leveldb::CompactRange is inclusive [beg, end]
+ leveldb::Slice sq_beg = kq_beg.slice(), sq_end = kq_end.slice();
+ journal_->CompactRange(&sq_beg, &sq_end);
+
+ // now compact chunk range
+ key_type kc_beg(key_type::KT_CHUNK, 0);
+ it->Seek(kc_beg.slice());
+ if (!it->Valid())
+ return log::INFO("queue<%1%>: compacted queue range to %2%", path_, kq_end.id);
+
+ key_type kc_end(it->key());
+ --kc_end.id;
+ leveldb::Slice sc_beg = kc_beg.slice(), sc_end = kc_end.slice();
+ journal_->CompactRange(&sc_beg, &sc_end);
+ log::INFO("queue<%1%>: compacted queue range to %2%, chunk range to %3%", path_, kq_end.id, kc_end.id);
+}
+
// child classes
const string& queue::header_type::str() const
{
@@ -220,20 +258,3 @@ leveldb::Slice queue::key_type::slice() const
return leveldb::Slice(&buf_[0], sizeof(buf_));
}
-int queue::key_type::compare(const key_type& other) const
-{
- if (type < other.type)
- return -1;
- else if (type > other.type)
- return 1;
- else
- {
- if (id < other.id)
- return -1;
- else if (id > other.id)
- return 1;
- else
- return 0;
- }
-}
-
Something went wrong with that request. Please try again.