Skip to content

Commit

Permalink
Synchronize cancellations using locks.
Browse files Browse the repository at this point in the history
  • Loading branch information
caladri committed Jun 5, 2015
1 parent e99e24a commit 6cf2245
Show file tree
Hide file tree
Showing 14 changed files with 58 additions and 48 deletions.
32 changes: 22 additions & 10 deletions event/action.h
Expand Up @@ -26,6 +26,8 @@
#ifndef EVENT_ACTION_H
#define EVENT_ACTION_H

#include <common/thread/lock.h>

class Action {
bool cancelled_;
protected:
Expand Down Expand Up @@ -73,21 +75,26 @@ template<class C>
class Cancellation : public Cancellable {
typedef void (C::*const method_t)(void);

Lock *lock_;
C *const obj_;
method_t method_;
public:
template<typename T>
Cancellation(C *obj, T method)
: obj_(obj),
Cancellation(Lock *lock, C *obj, T method)
: lock_(lock),
obj_(obj),
method_(method)
{ }
{
ASSERT_LOCK_OWNED("/cancellation", lock_);
}

~Cancellation()
{ }

private:
void cancel(void)
{
ScopedLock _(lock_);
(obj_->*method_)();
}
};
Expand All @@ -96,38 +103,43 @@ template<class C, typename A>
class CancellationArg : public Cancellable {
typedef void (C::*const method_t)(A);

Lock *lock_;
C *const obj_;
method_t method_;
A arg_;
public:
template<typename T>
CancellationArg(C *obj, T method, A arg)
: obj_(obj),
CancellationArg(Lock *lock, C *obj, T method, A arg)
: lock_(lock),
obj_(obj),
method_(method),
arg_(arg)
{ }
{
ASSERT_LOCK_OWNED("/cancellation", lock_);
}

~CancellationArg()
{ }

private:
void cancel(void)
{
ScopedLock _(lock_);
(obj_->*method_)(arg_);
}
};

template<class C, typename T>
Action *cancellation(C *obj, T method)
Action *cancellation(Lock *lock, C *obj, T method)
{
Action *a = new Cancellation<C>(obj, method);
Action *a = new Cancellation<C>(lock, obj, method);
return (a);
}

template<class C, typename T, typename A>
Action *cancellation(C *obj, T method, A arg)
Action *cancellation(Lock *lock, C *obj, T method, A arg)
{
Action *a = new CancellationArg<C, A>(obj, method, arg);
Action *a = new CancellationArg<C, A>(lock, obj, method, arg);
return (a);
}

Expand Down
13 changes: 4 additions & 9 deletions event/callback_thread.cc
Expand Up @@ -43,26 +43,22 @@ CallbackThread::CallbackThread(const std::string& name)
Action *
CallbackThread::schedule(CallbackBase *cb)
{
mtx_.lock();
ScopedLock _(&mtx_);
bool need_wakeup = queue_.empty();
queue_.push_back(cb);
if (need_wakeup && idle_)
sleepq_.signal();
mtx_.unlock();

return (cancellation(this, &CallbackThread::cancel, cb));
return (cancellation(&mtx_, this, &CallbackThread::cancel, cb));
}

void
CallbackThread::cancel(CallbackBase *cb)
{
Lock *interlock = cb->lock();
ASSERT_LOCK_OWNED(log_, interlock);
ASSERT_LOCK_OWNED(log_, cb->lock());
ASSERT_LOCK_OWNED(log_, &mtx_);

mtx_.lock();
if (inflight_ == cb) {
inflight_ = NULL;
mtx_.unlock();
delete cb;
return;
}
Expand All @@ -72,7 +68,6 @@ CallbackThread::cancel(CallbackBase *cb)
if (*it != cb)
continue;
queue_.erase(it);
mtx_.unlock();
delete cb;
return;
}
Expand Down
3 changes: 2 additions & 1 deletion http/http_server_pipe.cc
Expand Up @@ -68,7 +68,7 @@ HTTPServerPipe::request(HTTPRequestEventCallback *cb)
return (schedule_callback(cb));

callback_ = cb;
return (cancellation(this, &HTTPServerPipe::cancel));
return (cancellation(&mtx_, this, &HTTPServerPipe::cancel));
}

void
Expand Down Expand Up @@ -164,6 +164,7 @@ HTTPServerPipe::send_response(HTTPProtocol::Status status, const std::string& bo
void
HTTPServerPipe::cancel(void)
{
ASSERT_LOCK_OWNED(log_, &mtx_);
if (action_ != NULL) {
action_->cancel();
action_ = NULL;
Expand Down
4 changes: 2 additions & 2 deletions io/io_system.cc
Expand Up @@ -179,7 +179,7 @@ IOSystem::read(int fd, Channel *owner, off_t offset, size_t amount, EventCallbac
ASSERT(log_, h->read_callback_ != NULL);
h->read_action_ = h->read_schedule();
ASSERT(log_, h->read_action_ != NULL);
return (cancellation(h, &IOSystem::Handle::read_cancel));
return (cancellation(&h->mtx_, h, &IOSystem::Handle::read_cancel));
}
ASSERT(log_, h->read_callback_ == NULL);
return (a);
Expand Down Expand Up @@ -212,7 +212,7 @@ IOSystem::write(int fd, Channel *owner, off_t offset, Buffer *buffer, EventCallb
ASSERT(log_, h->write_callback_ != NULL);
h->write_action_ = h->write_schedule();
ASSERT(log_, h->write_action_ != NULL);
return (cancellation(h, &IOSystem::Handle::write_cancel));
return (cancellation(&h->mtx_, h, &IOSystem::Handle::write_cancel));
}
ASSERT(log_, h->write_callback_ == NULL);
return (a);
Expand Down
4 changes: 2 additions & 2 deletions io/io_system_handle.cc
Expand Up @@ -124,7 +124,7 @@ IOSystem::Handle::read_callback(Event e)
void
IOSystem::Handle::read_cancel(void)
{
ScopedLock _(&mtx_);
ASSERT_LOCK_OWNED(log_, &mtx_);
ASSERT(log_, read_action_ != NULL);
read_action_->cancel();
read_action_ = NULL;
Expand Down Expand Up @@ -303,7 +303,7 @@ IOSystem::Handle::write_callback(Event e)
void
IOSystem::Handle::write_cancel(void)
{
ScopedLock _(&mtx_);
ASSERT_LOCK_OWNED(log_, &mtx_);
ASSERT(log_, write_action_ != NULL);
write_action_->cancel();
write_action_ = NULL;
Expand Down
4 changes: 2 additions & 2 deletions io/net/tcp_client.cc
Expand Up @@ -100,13 +100,13 @@ TCPClient::connect(const std::string& iface, const std::string& name, SocketEven
connect_action_ = socket_->connect(name, cb);
connect_callback_ = ccb;

return (cancellation(this, &TCPClient::connect_cancel));
return (cancellation(&mtx_, this, &TCPClient::connect_cancel));
}

void
TCPClient::connect_cancel(void)
{
ScopedLock _(&mtx_);
ASSERT_LOCK_OWNED(log_, &mtx_);
ASSERT(log_, close_action_ == NULL);
ASSERT(log_, connect_action_ != NULL);

Expand Down
4 changes: 2 additions & 2 deletions io/net/udp_client.cc
Expand Up @@ -94,13 +94,13 @@ UDPClient::connect(const std::string& iface, const std::string& name, SocketEven
connect_action_ = socket_->connect(name, cb);
connect_callback_ = ccb;

return (cancellation(this, &UDPClient::connect_cancel));
return (cancellation(&mtx_, this, &UDPClient::connect_cancel));
}

void
UDPClient::connect_cancel(void)
{
ScopedLock _(&mtx_);
ASSERT_LOCK_OWNED(log_, &mtx_);
ASSERT(log_, close_action_ == NULL);
ASSERT(log_, connect_action_ != NULL);

Expand Down
4 changes: 2 additions & 2 deletions io/pipe/pipe_producer.cc
Expand Up @@ -91,13 +91,13 @@ PipeProducer::output(EventCallback *cb)

output_callback_ = cb;

return (cancellation(this, &PipeProducer::output_cancel));
return (cancellation(lock_, this, &PipeProducer::output_cancel));
}

void
PipeProducer::output_cancel(void)
{
ScopedLock _(lock_);
ASSERT_LOCK_OWNED(log_, lock_);
if (output_action_ != NULL) {
ASSERT(log_, output_callback_ == NULL);

Expand Down
4 changes: 2 additions & 2 deletions io/pipe/pipe_splice.cc
Expand Up @@ -63,13 +63,13 @@ PipeSplice::start(EventCallback *scb)

callback_ = scb;

return (cancellation(this, &PipeSplice::cancel));
return (cancellation(&mtx_, this, &PipeSplice::cancel));
}

void
PipeSplice::cancel(void)
{
ScopedLock _(&mtx_);
ASSERT_LOCK_OWNED(log_, &mtx_);
if (action_ != NULL) {
action_->cancel();
action_ = NULL;
Expand Down
4 changes: 2 additions & 2 deletions io/pipe/splice.cc
Expand Up @@ -84,13 +84,13 @@ Splice::start(EventCallback *cb)
output_action_ = pipe_->output(pcb);
}

return (cancellation(this, &Splice::cancel));
return (cancellation(&mtx_, this, &Splice::cancel));
}

void
Splice::cancel(void)
{
ScopedLock _(&mtx_);
ASSERT_LOCK_OWNED(log_, &mtx_);
if (callback_ != NULL) {
delete callback_;
callback_ = NULL;
Expand Down
4 changes: 2 additions & 2 deletions io/pipe/splice_pair.cc
Expand Up @@ -69,13 +69,13 @@ SplicePair::start(EventCallback *cb)
EventCallback *rcb = callback(&mtx_, this, &SplicePair::splice_complete, right_);
right_action_ = right_->start(rcb);

return (cancellation(this, &SplicePair::cancel));
return (cancellation(&mtx_, this, &SplicePair::cancel));
}

void
SplicePair::cancel(void)
{
ScopedLock _(&mtx_);
ASSERT_LOCK_OWNED(log_, &mtx_);
if (callback_ != NULL) {
delete callback_;
callback_ = NULL;
Expand Down
8 changes: 4 additions & 4 deletions io/socket/socket_handle.cc
Expand Up @@ -73,7 +73,7 @@ SocketHandle::accept(SocketEventCallback *cb)

accept_callback_ = cb;
accept_action_ = accept_schedule();
return (cancellation(this, &SocketHandle::accept_cancel));
return (cancellation(&mtx_, this, &SocketHandle::accept_cancel));
}

bool
Expand Down Expand Up @@ -155,7 +155,7 @@ SocketHandle::connect(const std::string& name, EventCallback *cb)
default:
HALT(log_) << "Connect returned unexpected value: " << rv;
}
return (cancellation(this, &SocketHandle::connect_cancel));
return (cancellation(&mtx_, this, &SocketHandle::connect_cancel));
}

bool
Expand Down Expand Up @@ -274,7 +274,7 @@ SocketHandle::accept_callback(Event e)
void
SocketHandle::accept_cancel(void)
{
ScopedLock _(&mtx_);
ASSERT_LOCK_OWNED(log_, &mtx_);
ASSERT(log_, accept_action_ != NULL);
accept_action_->cancel();
accept_action_ = NULL;
Expand Down Expand Up @@ -319,7 +319,7 @@ SocketHandle::connect_callback(Event e)
void
SocketHandle::connect_cancel(void)
{
ScopedLock _(&mtx_);
ASSERT_LOCK_OWNED(log_, &mtx_);
ASSERT(log_, connect_action_ != NULL);
connect_action_->cancel();
connect_action_ = NULL;
Expand Down
12 changes: 6 additions & 6 deletions programs/wanproxy/ssh_stream.cc
Expand Up @@ -108,7 +108,7 @@ SSHStream::start(Socket *socket, SimpleCallback *cb)
EventCallback *scb = callback(&mtx_, this, &SSHStream::splice_complete);
splice_action_ = splice_->start(scb);

return (cancellation(this, &SSHStream::start_cancel));
return (cancellation(&mtx_, this, &SSHStream::start_cancel));
}

Action *
Expand Down Expand Up @@ -149,7 +149,7 @@ SSHStream::read(size_t amt, EventCallback *cb)
EventCallback *rcb = callback(&mtx_, this, &SSHStream::receive_complete);
read_action_ = pipe_->receive(rcb);

return (cancellation(this, &SSHStream::read_cancel));
return (cancellation(&mtx_, this, &SSHStream::read_cancel));
}

Action *
Expand All @@ -165,7 +165,7 @@ SSHStream::write(Buffer *buf, EventCallback *cb)

if (!ready_) {
write_callback_ = cb;
return (cancellation(this, &SSHStream::write_cancel));
return (cancellation(&mtx_, this, &SSHStream::write_cancel));
}

write_do();
Expand All @@ -185,7 +185,7 @@ SSHStream::shutdown(bool, bool, EventCallback *cb)
void
SSHStream::start_cancel(void)
{
ScopedLock _(&mtx_);
ASSERT_LOCK_OWNED(log_, &mtx_);
if (start_callback_ != NULL) {
delete start_callback_;
start_callback_ = NULL;
Expand All @@ -210,7 +210,7 @@ SSHStream::start_cancel(void)
void
SSHStream::read_cancel(void)
{
ScopedLock _(&mtx_);
ASSERT_LOCK_OWNED(log_, &mtx_);
if (read_callback_ != NULL) {
delete read_callback_;
read_callback_ = NULL;
Expand Down Expand Up @@ -318,7 +318,7 @@ SSHStream::receive_complete(Event e)
void
SSHStream::write_cancel(void)
{
ScopedLock _(&mtx_);
ASSERT_LOCK_OWNED(log_, &mtx_);
if (write_action_ != NULL) {
write_action_->cancel();
write_action_ = NULL;
Expand Down

0 comments on commit 6cf2245

Please sign in to comment.