Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

quick compact: compact only evicted range when it reaches 32MB

  • Loading branch information...
commit 31994bc7b9b206dc61ec2086dad652bc4c4b9e2d 1 parent 3d2e8a9
@erikfrey erikfrey authored
Showing with 60 additions and 23 deletions.
  1. +17 −1 include/darner/queue/queue.h
  2. +43 −22 src/queue/queue.cpp
View
18 include/darner/queue/queue.h
@@ -151,7 +151,18 @@ class queue
leveldb::Slice slice() const;
- int compare(const key_type& other) const;
+ int compare(const key_type& other) const
+ {
+ if (type < other.type)
+ return -1;
+ else if (type > other.type)
+ return 1;
+ else if (id < other.id)
+ return -1;
+ else if (id > other.id)
+ return 1;
+ return 0;
+ }
unsigned char type;
id_type id;
@@ -193,6 +204,9 @@ class queue
// fires either if timer times out or is canceled
void waiter_wakeup(const boost::system::error_code& e, boost::ptr_list<waiter>::iterator waiter_it);
+ // compact the underlying journal, discarding deleted items
+ void compact();
+
// some leveldb sugar:
void put(const key_type& key, const std::string& value)
@@ -230,6 +244,7 @@ class queue
key_type chunks_head_;
size_type items_open_; // an open item is < TAIL but not in returned_
+ size_type bytes_evicted_; // after we've evicted 32MB from the journal, compress that evicted range
std::set<id_type> returned_; // items < TAIL that were reserved but later returned (not popped)
@@ -237,6 +252,7 @@ class queue
boost::ptr_list<waiter>::iterator wake_up_it_;
boost::asio::io_service& ios_;
+ const std::string path_;
};
} // darner
View
65 src/queue/queue.cpp
@@ -5,6 +5,8 @@
#include <leveldb/iterator.h>
#include <leveldb/write_batch.h>
+#include "darner/util/log.h"
+
using namespace std;
using namespace boost;
using namespace darner;
@@ -15,8 +17,10 @@ queue::queue(asio::io_service& ios, const string& path)
queue_tail_(key_type::KT_QUEUE, 0),
chunks_head_(key_type::KT_CHUNK, 0),
items_open_(0),
+ bytes_evicted_(0),
wake_up_it_(waiters_.begin()),
- ios_(ios)
+ ios_(ios),
+ path_(path)
{
leveldb::Options options;
options.create_if_missing = true;
@@ -141,10 +145,16 @@ void queue::pop_end(bool erase, id_type id, const header_type& header)
write(batch);
- // leveldb is conservative about reclaiming deleted keys, of which there will be many when a queue grows and
- // later shrinks. let's explicitly force it to compact every 1024 deletes
- if (id % 1024 == 0)
- journal_->CompactRange(NULL, NULL);
+ bytes_evicted_ += header.size;
+
+ // leveldb is conservative about reclaiming deleted keys from its underlying journal. let's amortize this
+ // reclamation cost by compacting the evicted range when it reaches 32MB in size. note that this size may be
+ // different than what's on disk, because of snappy compression
+ if (bytes_evicted_ > 33554432)
+ {
+ compact();
+ bytes_evicted_ = 0;
+ }
}
else
{
@@ -206,6 +216,34 @@ void queue::waiter_wakeup(const system::error_code& e, ptr_list<queue::waiter>::
waiter->cb(asio::error::timed_out);
}
+void queue::compact()
+{
+ scoped_ptr<leveldb::Iterator> it(journal_->NewIterator(leveldb::ReadOptions()));
+
+ // compact queue range first
+ key_type kq_beg(key_type::KT_QUEUE, 0);
+ it->Seek(kq_beg.slice());
+ if (!it->Valid())
+ return;
+
+ key_type kq_end(it->key());
+ --kq_end.id; // leveldb::CompactRange is inclusive [beg, end]
+ leveldb::Slice sq_beg = kq_beg.slice(), sq_end = kq_end.slice();
+ journal_->CompactRange(&sq_beg, &sq_end);
+
+ // now compact chunk range
+ key_type kc_beg(key_type::KT_CHUNK, 0);
+ it->Seek(kc_beg.slice());
+ if (!it->Valid())
+ return log::INFO("queue<%1%>: compacted queue range to %2%", path_, kq_end.id);
+
+ key_type kc_end(it->key());
+ --kc_end.id;
+ leveldb::Slice sc_beg = kc_beg.slice(), sc_end = kc_end.slice();
+ journal_->CompactRange(&sc_beg, &sc_end);
+ log::INFO("queue<%1%>: compacted queue range to %2%, chunk range to %3%", path_, kq_end.id, kc_end.id);
+}
+
// child classes
const string& queue::header_type::str() const
{
@@ -220,20 +258,3 @@ leveldb::Slice queue::key_type::slice() const
return leveldb::Slice(&buf_[0], sizeof(buf_));
}
-int queue::key_type::compare(const key_type& other) const
-{
- if (type < other.type)
- return -1;
- else if (type > other.type)
- return 1;
- else
- {
- if (id < other.id)
- return -1;
- else if (id > other.id)
- return 1;
- else
- return 0;
- }
-}
-
Please sign in to comment.
Something went wrong with that request. Please try again.