Skip to content
This repository has been archived by the owner on Apr 24, 2020. It is now read-only.

Implements flush and flush_all #21

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions include/darner/queue/queue.h
Expand Up @@ -50,6 +50,9 @@ class queue
// writes out stats (stuff like queue count) to a stream
void write_stats(const std::string& name, std::ostringstream& out) const;

// flush the db by deleting it and creating a new one
void flush_db();

protected:

friend class iqstream;
Expand Down Expand Up @@ -194,6 +197,9 @@ class queue
void FindShortSuccessor(std::string*) const {}
};

// initalize level db
void init_db();

// any operation that adds to the queue should crank a wakeup
void wake_up();

Expand Down Expand Up @@ -241,6 +247,7 @@ class queue

size_type items_open_; // an open item is < TAIL but not in returned_
size_type bytes_evicted_; // after we've evicted 32MB from the journal, compress that evicted range
size_type total_flushes_; // total number of times this queue has been flushed

std::set<id_type> returned_; // items < TAIL that were reserved but later returned (not popped)

Expand Down
9 changes: 9 additions & 0 deletions include/darner/util/queue_map.hpp
Expand Up @@ -50,6 +50,15 @@ class queue_map
return it->second;
}

bool flush_queue(const std::string& queue_name)
{
iterator it = queues_.find(queue_name);
if (it == queues_.end())
return; // nothing to do

it->second->flush_db();
}

iterator begin() { return queues_.begin(); }
iterator end() { return queues_.end(); }
const_iterator begin() const { return queues_.begin(); }
Expand Down
11 changes: 9 additions & 2 deletions src/net/handler.cpp
Expand Up @@ -92,12 +92,19 @@ void handler::write_version()

void handler::flush()
{
// TODO: implement
queues_.flush_queue(req_.queue);

buf_ = "OK\r\n";
async_write(socket_, buffer(buf_), bind(&handler::read_request, shared_from_this(), _1, _2));
}

void handler::flush_all()
{
// TODO: implement
for (queue_map::const_iterator it = queues_.begin(); it != queues_.end(); ++it)
queues_.flush_queue(it->first);

buf_ = "OK\r\n";
async_write(socket_, buffer(buf_), bind(&handler::read_request, shared_from_this(), _1, _2));
}

void handler::set()
Expand Down
35 changes: 33 additions & 2 deletions src/queue/queue.cpp
@@ -1,6 +1,7 @@
#include "darner/queue/queue.h"

#include <boost/bind.hpp>
#include <boost/filesystem.hpp>

#include <leveldb/iterator.h>
#include <leveldb/write_batch.h>
Expand All @@ -18,16 +19,28 @@ queue::queue(asio::io_service& ios, const string& path)
chunks_head_(key_type::KT_CHUNK, 0),
items_open_(0),
bytes_evicted_(0),
total_flushes_(0),
wake_up_it_(waiters_.begin()),
ios_(ios),
path_(path)
{
init_db();
}

void queue::init_db()
{
queue_head_.id = 0;
queue_tail_.id = 0;
chunks_head_.id = 0;
//wake_up_it_ = waiters_.begin();

leveldb::Options options;
options.create_if_missing = true;
options.comparator = cmp_.get();

leveldb::DB* pdb;
if (!leveldb::DB::Open(options, path, &pdb).ok())
throw runtime_error("can't open journal: " + path);
if (!leveldb::DB::Open(options, path_, &pdb).ok())
throw runtime_error("can't open journal: " + path_);
journal_.reset(pdb);
// get head and tail of queue
scoped_ptr<leveldb::Iterator> it(journal_->NewIterator(leveldb::ReadOptions()));
Expand Down Expand Up @@ -69,6 +82,7 @@ void queue::write_stats(const string& name, ostringstream& out) const
out << "STAT queue_" << name << "_items " << count() << "\r\n";
out << "STAT queue_" << name << "_waiters " << waiters_.size() << "\r\n";
out << "STAT queue_" << name << "_open_transactions " << items_open_ << "\r\n";
out << "STAT queue_" << name << "_total_flushes " << total_flushes_ << "\r\n";
}

// protected:
Expand Down Expand Up @@ -197,6 +211,23 @@ void queue::erase_chunks(const header_type& header)
write(batch);
}

void queue::flush_db()
{
log::INFO("queue <%1%>: flushing db", path_);

wake_up(); // in case there's a waiter waiting

waiters_.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, why are you clearing out the waiters? you're invalidating iterators that are sitting out in asio wait events.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, the idea was to get around the issue you lay below, but it seems I did not know enough about the code.

journal_.reset(); // close level_db
boost::filesystem::remove_all(path_);

items_open_ = 0;
bytes_evicted_ = 0;

init_db(); // now create a new one
++total_flushes_;
}

// private:

void queue::wake_up()
Expand Down
15 changes: 15 additions & 0 deletions tests/queue.cpp
Expand Up @@ -127,6 +127,21 @@ BOOST_FIXTURE_TEST_CASE( test_queue_count, fixtures::basic_queue )
BOOST_REQUIRE_EQUAL(queue_->count(), 1);
}

// test that we can delete and reopen queue
BOOST_FIXTURE_TEST_CASE( test_queue_delete_reopen, fixtures::basic_queue )
{
string value = "More than once, I've wished my real life had a delete key";

oqs_.open(queue_, 1);
oqs_.write(value);
queue_.reset(new darner::queue(ios_, (tmp_ / "queue").string()));
BOOST_REQUIRE_EQUAL(queue_->count(), 1);

// delete and start fresh
queue_->flush_db();
BOOST_REQUIRE_EQUAL(queue_->count(), 0);
}

// test overruning an oqstream raises
BOOST_FIXTURE_TEST_CASE( test_oqstream_overflow, fixtures::basic_queue )
{
Expand Down