Skip to content
This repository has been archived by the owner on Apr 24, 2020. It is now read-only.

Commit

Permalink
towards a more perfect class layout: introduce a central handler
Browse files Browse the repository at this point in the history
  • Loading branch information
erikfrey committed Aug 3, 2012
1 parent 53e9654 commit 8324c88
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 82 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Expand Up @@ -11,6 +11,7 @@ ADD_DEFINITIONS(-Wall -Wno-return-type -O2 -g -DNDEBUG -DDARNER_VERSION="${DARNE
INCLUDE_DIRECTORIES(include /usr/local/include/thrift)

ADD_EXECUTABLE(darner
src/handler
src/net/request
src/log
src/main
Expand Down
64 changes: 64 additions & 0 deletions include/darner/handler.h
@@ -0,0 +1,64 @@
#ifndef __DARNER_HANDLER_H__
#define __DARNER_HANDLER_H__

#include <string>

#include <boost/cstdint.hpp>
#include <boost/function.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/asio.hpp>

namespace darner {

/*
* represents all the core application state for a darner server
*
* connections pass themselves to the handler, which handles the request, then ultimately passes control back to the
* connection via a common callback
*/
class request_handler
{
public:

struct stats
{
stats()
: alive_since(boost::posix_time::microsec_clock::local_time())
{
}

boost::posix_time::ptime alive_since; // time we started up the server

// these stats are accumulated during the life of a process, but don't persist across processes
boost::uint64_t items_enqueued; // enqueued across all queues
boost::uint64_t items_dequeued; // dequeued across all queues
boost::uint64_t conns_opened;
boost::uint64_t conns_closed; // closed - opened gives you current # conns
boost::uint64_t gets; // counts even empty gets
boost::uint64_t sets;

boost::mutex mutex;
};

typedef boost::asio::ip::tcp::socket socket_type;
typedef boost::function<void (const boost::system::error_code& error)> response_callback;

request_handler(const std::string& data_path);

void handle_stats(socket_type& socket, const response_callback& cb);

stats& get_stats()
{
return stats_;
}

private:

std::string data_path_;
stats stats_;
};

} // darner

#endif // __DARNER_HANDLER_H__
52 changes: 28 additions & 24 deletions include/darner/net/connection.hpp
Expand Up @@ -7,8 +7,8 @@
#include <boost/asio.hpp>

#include "darner/util/log.h"
#include "darner/util/stats.hpp"
#include "darner/net/request.h"
#include "darner/handler.h"

namespace darner {

Expand All @@ -21,22 +21,21 @@ class connection : public boost::enable_shared_from_this<connection>

connection(boost::asio::io_service& ios,
request_parser& parser,
stats& stats,
request_handler& handler,
size_t max_frame_size = 4096)
: socket_(ios),
parser_(parser),
stats_(stats),
in_buf_(max_frame_size),
out_buf_(max_frame_size)
handler_(handler),
in_buf_(max_frame_size)
{
boost::mutex::scoped_lock lock(stats_.mutex);
++stats_.curr_connections;
boost::mutex::scoped_lock lock(handler_.get_stats().mutex);
++handler_.get_stats().conns_opened;
}

~connection()
{
boost::mutex::scoped_lock lock(stats_.mutex);
--stats_.curr_connections;
boost::mutex::scoped_lock lock(handler_.get_stats().mutex);
++handler_.get_stats().conns_closed;
}

socket_type& socket()
Expand Down Expand Up @@ -81,33 +80,33 @@ class connection : public boost::enable_shared_from_this<connection>

if (!good_)
{
std::ostream o(&out_buf_);
o << "ERROR\r\n";
out_buf_ = "ERROR\r\n";
async_write(
socket_,
out_buf_,
boost::asio::buffer(out_buf_),
boost::bind(&connection::handle_write_result,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
return;
}

request_handler::response_callback cb(
boost::bind(&connection::handle_response,
shared_from_this(),
boost::asio::placeholders::error));

switch (req_.type)
{
case request::RT_STATS:
handle_stats_request();
break;
case request::RT_VERSION:
break;
case request::RT_STATS: handler_.handle_stats(socket_, cb); break;
case request::RT_VERSION: break;
case request::RT_FLUSH: break;
case request::RT_FLUSH_ALL: break;
case request::RT_SET: break;
case request::RT_GET: break;
}
}

void handle_stats_request()
{

}

void handle_write_result(const boost::system::error_code& e,
size_t bytes_transferred)
{
Expand All @@ -124,11 +123,16 @@ class connection : public boost::enable_shared_from_this<connection>
boost::asio::placeholders::bytes_transferred));
}

void handle_response(const boost::system::error_code& e)
{

}

socket_type socket_;
request_parser& parser_;
stats& stats_;
request_handler& handler_;
boost::asio::streambuf in_buf_;
boost::asio::streambuf out_buf_;
std::string out_buf_;
request req_;
bool good_;
};
Expand Down
5 changes: 2 additions & 3 deletions include/darner/net/request.h
Expand Up @@ -18,9 +18,8 @@ struct request
RT_VERSION = 2,
RT_FLUSH = 3,
RT_FLUSH_ALL = 4,
RT_DELETE = 5,
RT_SET = 6,
RT_GET = 7
RT_SET = 5,
RT_GET = 6
};

request_type type;
Expand Down
39 changes: 20 additions & 19 deletions include/darner/net/server.hpp
Expand Up @@ -8,7 +8,7 @@
#include "darner/util/log.h"
#include "darner/net/request.h"
#include "darner/net/connection.hpp"
#include "darner/util/stats.hpp"
#include "darner/handler.h"

namespace darner {

Expand All @@ -19,19 +19,14 @@ class server
{
public:

server(const std::string& data_path,
server(request_handler& handler,
unsigned short listen_port,
size_t num_workers,
stats& stats)
: data_path_(data_path),
size_t num_workers)
: handler_(handler),
listen_port_(listen_port),
num_workers_(num_workers),
stats_(stats),
acceptor_(ios_)
{
}

void start()
acceptor_(ios_),
strand_(ios_)
{
// open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).
boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::tcp::v4(), listen_port_);
Expand All @@ -41,7 +36,7 @@ class server
acceptor_.listen();

// get our first conn ready
session_ = connection::ptr_type(new connection(ios_, parser_, stats_));
session_ = connection::ptr_type(new connection(ios_, parser_, handler_));

// pump the first async accept into the loop
acceptor_.async_accept(session_->socket(),
Expand All @@ -54,8 +49,8 @@ class server

void stop()
{
acceptor_.close();
// TODO: also close any idling clients
strand_.post(boost::bind(&server::handle_close, this));
// TODO: also close any idling clients, delete the work
}

void join()
Expand All @@ -76,20 +71,26 @@ class server

session_->start();

session_ = connection::ptr_type(new connection(ios_, parser_, stats_));
session_ = connection::ptr_type(new connection(ios_, parser_, handler_));
acceptor_.async_accept(session_->socket(),
boost::bind(&server::handle_accept, this, boost::asio::placeholders::error));
strand_.wrap(
boost::bind(&server::handle_accept, this, boost::asio::placeholders::error)));
}

const std::string data_path_;
void handle_close()
{
acceptor_.close();
}

request_handler& handler_;
unsigned short listen_port_;
size_t num_workers_;
stats& stats_;

boost::asio::io_service ios_;
connection::ptr_type session_;
boost::asio::ip::tcp::acceptor acceptor_;
boost::asio::strand strand_; // to avoid close() and async_accept firing at the same time
request_parser parser_;
connection::ptr_type session_;
boost::thread_group workers_;
};

Expand Down
28 changes: 0 additions & 28 deletions include/darner/util/stats.hpp

This file was deleted.

16 changes: 16 additions & 0 deletions src/handler.cpp
@@ -0,0 +1,16 @@
#include "darner/handler.h"

using namespace std;
using namespace boost;
using namespace darner;

request_handler::request_handler(const string& data_path)
: data_path_(data_path)
{

}

void request_handler::handle_stats(request_handler::socket_type& socket, const request_handler::response_callback& cb)
{

}
14 changes: 6 additions & 8 deletions src/main.cpp
Expand Up @@ -3,11 +3,12 @@

#include <pthread.h>
#include <signal.h>
#include <boost/thread.hpp>
#include <boost/program_options.hpp>
#include <boost/filesystem/operations.hpp>

#include "darner/util/log.h"
#include "darner/util/stats.hpp"
#include "darner/handler.h"
#include "darner/net/server.hpp"

using namespace std;
Expand Down Expand Up @@ -119,10 +120,8 @@ int main(int argc, char * argv[])

log::INFO("starting up");

stats _stats;
server s(data_path, port, workers, _stats);

s.start();
request_handler handler(data_path);
server srv(handler, port, workers);

// Restore previous signals.
pthread_sigmask(SIG_SETMASK, &old_mask, 0);
Expand All @@ -140,10 +139,9 @@ int main(int argc, char * argv[])
// Stop the server.
log::INFO("received signal. stopping server and finishing work.");

s.stop();
s.join();
srv.stop(); // stop accepting, wait for connections to all close out

log::INFO("shutting down");
log::INFO("shut down. ciao.");

return 0;
}

0 comments on commit 8324c88

Please sign in to comment.