Skip to content

Commit

Permalink
Fix a deadlock between background and network threads.
Browse files Browse the repository at this point in the history
We paused the background threads by holding a lock across reconfiguration.
Unfortunately, we did so by first blocking the threads and then the network
traffic.  If a packet arrived that required passing info to the background
threads (by holding their lock), we could deadlock.  Fix this by adding another
set of condition variables between the background threads and the thread that
performs reconfiguration.
  • Loading branch information
rescrv committed Jan 18, 2013
1 parent a3fb143 commit 7494c55
Show file tree
Hide file tree
Showing 11 changed files with 191 additions and 203 deletions.
20 changes: 1 addition & 19 deletions daemon/communication.cc
Expand Up @@ -105,15 +105,7 @@ communication :: teardown()
{
}

reconfigure_returncode
communication :: prepare(const configuration&,
const configuration&,
const server_id&)
{
return RECONFIGURE_SUCCESS;
}

reconfigure_returncode
void
communication :: reconfigure(const configuration&,
const configuration& new_config,
const server_id&)
Expand All @@ -137,16 +129,6 @@ communication :: reconfigure(const configuration&,
{
m_early_messages.push(em);
}

return RECONFIGURE_SUCCESS;
}

reconfigure_returncode
communication :: cleanup(const configuration&,
const configuration&,
const server_id&)
{
return RECONFIGURE_SUCCESS;
}

bool
Expand Down
12 changes: 3 additions & 9 deletions daemon/communication.h
Expand Up @@ -81,15 +81,9 @@ class communication
bool setup(const po6::net::location& bind_to,
unsigned threads);
void teardown();
reconfigure_returncode prepare(const configuration& old_config,
const configuration& new_config,
const server_id& us);
reconfigure_returncode reconfigure(const configuration& old_config,
const configuration& new_config,
const server_id& us);
reconfigure_returncode cleanup(const configuration& old_config,
const configuration& new_config,
const server_id& us);
void reconfigure(const configuration& old_config,
const configuration& new_config,
const server_id& us);

public:
bool send_client(const virtual_server_id& from,
Expand Down
30 changes: 10 additions & 20 deletions daemon/daemon.cc
Expand Up @@ -286,19 +286,11 @@ daemon :: run(bool daemonize,
continue;
}

// XXX we really should check the reconfigure_returncode even though
// nothing right now fails when reconfiguring

LOG(INFO) << "received new configuration version=" << new_config.version();
// prepare
m_data.prepare(old_config, new_config, m_us);
m_comm.prepare(old_config, new_config, m_us);
m_repl.prepare(old_config, new_config, m_us);
m_stm.prepare(old_config, new_config, m_us);
m_sm.prepare(old_config, new_config, m_us);
// reconfigure
LOG(INFO) << "preparations for reconfiguration done; pausing network communication";
// this line to the "unpause" below are mutually exclusive with network workers
LOG(INFO) << "received new configuration version=" << new_config.version()
<< "; pausing all activity while we reconfigure";
m_stm.pause();
m_repl.pause();
m_data.pause();
m_comm.pause();
m_data.reconfigure(old_config, new_config, m_us);
m_comm.reconfigure(old_config, new_config, m_us);
Expand All @@ -307,13 +299,11 @@ daemon :: run(bool daemonize,
m_sm.reconfigure(old_config, new_config, m_us);
m_config = new_config;
m_comm.unpause();
LOG(INFO) << "reconfiguration complete; unpausing network communication";
// cleanup
m_sm.cleanup(old_config, new_config, m_us);
m_stm.cleanup(old_config, new_config, m_us);
m_repl.cleanup(old_config, new_config, m_us);
m_comm.cleanup(old_config, new_config, m_us);
m_data.cleanup(old_config, new_config, m_us);
m_data.unpause();
m_repl.unpause();
m_stm.unpause();
LOG(INFO) << "reconfiguration complete; resuming normal operation";

// let the coordinator know we've moved to this config
m_coord.ack_config(new_config.version());
}
Expand Down
85 changes: 48 additions & 37 deletions daemon/datalayer.cc
Expand Up @@ -73,8 +73,11 @@ datalayer :: datalayer(daemon* d)
, m_cleaner(std::tr1::bind(&datalayer::cleaner, this))
, m_block_cleaner()
, m_wakeup_cleaner(&m_block_cleaner)
, m_wakeup_reconfigurer(&m_block_cleaner)
, m_need_cleaning(false)
, m_shutdown(true)
, m_need_pause(false)
, m_paused(false)
, m_state_transfer_captures()
{
}
Expand Down Expand Up @@ -383,20 +386,38 @@ datalayer :: clear_dirty()
}
}

reconfigure_returncode
datalayer :: prepare(const configuration&,
const configuration&,
const server_id&)
void
datalayer :: pause()
{
m_block_cleaner.lock();
return RECONFIGURE_SUCCESS;
po6::threads::mutex::hold hold(&m_block_cleaner);
assert(!m_need_pause);
m_need_pause = true;
}

reconfigure_returncode
void
datalayer :: unpause()
{
po6::threads::mutex::hold hold(&m_block_cleaner);
assert(m_need_pause);
m_wakeup_cleaner.broadcast();
m_need_pause = false;
}

void
datalayer :: reconfigure(const configuration&,
const configuration& new_config,
const server_id& us)
{
{
po6::threads::mutex::hold hold(&m_block_cleaner);
assert(m_need_pause);

while (!m_paused)
{
m_wakeup_reconfigurer.wait();
}
}

std::vector<capture> captures;
new_config.captures(&captures);
std::vector<region_id> regions;
Expand All @@ -412,18 +433,6 @@ datalayer :: reconfigure(const configuration&,

std::sort(regions.begin(), regions.end());
m_counters.adopt(regions);
return RECONFIGURE_SUCCESS;
}

reconfigure_returncode
datalayer :: cleanup(const configuration&,
const configuration&,
const server_id&)
{
m_wakeup_cleaner.broadcast();
m_need_cleaning = true;
m_block_cleaner.unlock();
return RECONFIGURE_SUCCESS;
}

datalayer::returncode
Expand Down Expand Up @@ -1777,11 +1786,19 @@ datalayer :: cleaner()
{
po6::threads::mutex::hold hold(&m_block_cleaner);

while (!m_need_cleaning &&
m_state_transfer_captures.empty() &&
!m_shutdown)
while ((!m_need_cleaning &&
m_state_transfer_captures.empty() &&
!m_shutdown) || m_need_pause)
{
m_paused = true;

if (m_need_pause)
{
m_wakeup_reconfigurer.signal();
}

m_wakeup_cleaner.wait();
m_paused = false;
}

if (m_shutdown)
Expand Down Expand Up @@ -1845,23 +1862,17 @@ datalayer :: cleaner()

m_daemon->m_stm.report_wiped(cached_cid);

// If this is not a region we need to keep, we need to iterate and
// delete
if (!m_daemon->m_config.is_captured_region(capture_id(cid)))
{
po6::threads::mutex::hold hold(&m_block_cleaner);

if (!m_daemon->m_config.is_captured_region(capture_id(cid)))
{
cached_cid = capture_id(cid);
continue;
}
cached_cid = capture_id(cid);
continue;
}

if (state_transfer_captures.find(capture_id(cid)) != state_transfer_captures.end())
{
cached_cid = capture_id(cid);
state_transfer_captures.erase(cached_cid);
continue;
}
if (state_transfer_captures.find(capture_id(cid)) != state_transfer_captures.end())
{
cached_cid = capture_id(cid);
state_transfer_captures.erase(cached_cid);
continue;
}

std::vector<char> backing;
Expand Down
17 changes: 8 additions & 9 deletions daemon/datalayer.h
Expand Up @@ -97,15 +97,11 @@ class datalayer
const po6::net::hostname& coordinator);
// clears the "dirty" bit
bool clear_dirty();
reconfigure_returncode prepare(const configuration& old_config,
const configuration& new_config,
const server_id& us);
reconfigure_returncode reconfigure(const configuration& old_config,
const configuration& new_config,
const server_id& us);
reconfigure_returncode cleanup(const configuration& old_config,
const configuration& new_config,
const server_id& us);
void pause();
void unpause();
void reconfigure(const configuration& old_config,
const configuration& new_config,
const server_id& us);

public:
returncode get(const region_id& ri,
Expand Down Expand Up @@ -246,8 +242,11 @@ class datalayer
po6::threads::thread m_cleaner;
po6::threads::mutex m_block_cleaner;
po6::threads::cond m_wakeup_cleaner;
po6::threads::cond m_wakeup_reconfigurer;
bool m_need_cleaning;
bool m_shutdown;
bool m_need_pause;
bool m_paused;
std::set<capture_id> m_state_transfer_captures;
};

Expand Down

0 comments on commit 7494c55

Please sign in to comment.