Skip to content
This repository has been archived by the owner on Apr 24, 2020. It is now read-only.

Commit

Permalink
simpler flush implementation, for now
Browse files Browse the repository at this point in the history
  • Loading branch information
erikfrey committed Mar 18, 2013
1 parent 06e3e1d commit 70f93b2
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 31 deletions.
3 changes: 0 additions & 3 deletions include/darner/queue/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ class queue : public boost::enable_shared_from_this<queue>
// delete the journal upon destruction
void destroy();

// flushes all items from the queue
void flush();

// returns the number of items in the queue
size_type count() const;

Expand Down
16 changes: 8 additions & 8 deletions include/darner/util/queue_map.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include <map>

#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/make_shared.hpp>
#include <boost/filesystem/operations.hpp>

#include "darner/queue/queue.h"
Expand All @@ -32,8 +32,7 @@ class queue_map
{
std::string queue_name =
boost::filesystem::path(it->path().filename()).string(); // useless recast for boost backwards compat
boost::shared_ptr<queue> p(new queue(ios_, (data_path_ / queue_name).string()));
queues_.insert(container_type::value_type(queue_name, p));
queues_[queue_name] = boost::make_shared<queue>(boost::ref(ios_), (data_path_ / queue_name).string());
}
}

Expand All @@ -42,15 +41,13 @@ class queue_map
iterator it = queues_.find(queue_name);

if (it == queues_.end())
{
boost::shared_ptr<queue> p(new queue(ios_, (data_path_ / queue_name).string()));
it = queues_.insert(container_type::value_type(queue_name, p)).first;
}
it = queues_.insert(container_type::value_type(queue_name,
boost::make_shared<queue>(boost::ref(ios_), (data_path_ / queue_name).string()))).first;

return it->second;
}

void erase(const std::string& queue_name)
void erase(const std::string& queue_name, bool recreate = false)
{
iterator it = queues_.find(queue_name);

Expand All @@ -60,6 +57,9 @@ class queue_map
it->second->destroy();

queues_.erase(it);

if (recreate)
queues_[queue_name] = boost::make_shared<queue>(boost::ref(ios_), (data_path_ / queue_name).string());
}

iterator begin() { return queues_.begin(); }
Expand Down
8 changes: 5 additions & 3 deletions src/net/handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,20 +93,22 @@ void handler::write_version()

void handler::destroy()
{
queues_.erase(req_.queue);
queues_.erase(req_.queue, false);
return end("DELETED");
}

void handler::flush()
{
queues_[req_.queue]->flush();
// TODO: flush should guarantee that an item that's halfway pushed should still appear after
// the flush. right now, item will only appear to a client that was waiting to pop before the flush
queues_.erase(req_.queue, true);
return end();
}

void handler::flush_all()
{
for (queue_map::iterator it = queues_.begin(); it != queues_.end(); ++it)
it->second->flush();
queues_.erase(it->first, true);
return end("Flushed all queues.");
}

Expand Down
8 changes: 2 additions & 6 deletions src/queue/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ queue::queue(asio::io_service& ios, const string& path)
queue::~queue()
{
journal_.reset();
// TODO: most non-crap filesystems should be able to drop large files quickly, but this will block painfully on ext3.
// one ugly solution is a separate delete thread. or we can wait out everyone upgrading to ext4 :)
if (destroy_)
boost::filesystem::remove_all(path_);
}
Expand Down Expand Up @@ -92,12 +94,6 @@ void queue::destroy()
destroy_ = true;
}

void queue::flush()
{
queue_tail_ = queue_head_;
returned_.clear();
}

queue::size_type queue::count() const
{
return (queue_head_.id - queue_tail_.id) + returned_.size();
Expand Down
11 changes: 0 additions & 11 deletions tests/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,15 +210,4 @@ BOOST_FIXTURE_TEST_CASE( test_delete_queue, fixtures::basic_queue )
BOOST_REQUIRE(!filesystem::exists(tmp_ / "queue.0")); // finally, destroying the queue deletes the journal
}

// test that we can flush a queue
BOOST_FIXTURE_TEST_CASE( test_flush, fixtures::basic_queue )
{
string value = "Sometimes I get emotional over fonts";
oqs_.open(queue_, 1);
oqs_.write(value);
queue_->flush();
BOOST_REQUIRE_EQUAL(queue_->count(), 0);
BOOST_REQUIRE(!iqs_.open(queue_));
}

BOOST_AUTO_TEST_SUITE_END()

0 comments on commit 70f93b2

Please sign in to comment.