Skip to content

Commit

Permalink
Fix emitter race condition due to missing mutex.
Browse files Browse the repository at this point in the history
  • Loading branch information
mavam committed Apr 18, 2012
1 parent 86db0cb commit fd9a934
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 36 deletions.
41 changes: 15 additions & 26 deletions src/vast/store/emitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,61 +18,50 @@ emitter::emitter(ze::component& c,
{
}

emitter::~emitter()
emitter::state emitter::start()
{
LOG(debug, store) << "removed emitter " << id();
}

void emitter::start()
{
std::lock_guard<std::mutex> lock(state_mutex_);
std::lock_guard<std::mutex> lock(mutex_);
if (state_ == finished)
return;
return state_;

LOG(debug, store) << "starting emitter " << id();
state_ = running;
state_mutex_.unlock();

io_.service().post(std::bind(&emitter::emit, shared_from_this()));
return state_;
}

void emitter::pause()
emitter::state emitter::pause()
{
std::lock_guard<std::mutex> lock(state_mutex_);
if (state_ == finished)
return;
std::lock_guard<std::mutex> lock(mutex_);
if (state_ == finished || state_ == paused)
return state_;

LOG(debug, store) << "pausing emitter " << id();

if (state_ == paused)
LOG(warn, store) << "emitter " << id() << " already paused";

state_ = paused;
return state_;
}

void emitter::emit()
{
{
std::lock_guard<std::mutex> lock(state_mutex_);
assert(state_ != finished);
if (state_ == paused)
return;
}
std::lock_guard<std::mutex> lock(mutex_);
assert(state_ != finished);
if (state_ == paused)
return;

try
{
std::shared_ptr<isegment> segment = cache_->retrieve(*current_);
auto remaining = segment->get_chunk([&](ze::event_ptr&& e) { send(e); });

LOG(debug, store)
<< "emitter " << id()
<< ": emmitted chunk, " << remaining << " remaining";
<< "emmitted chunk, " << remaining << " remaining (segment "
<< segment->id() << ")";

// Advance to the next segment after having processed all chunks.
if (remaining == 0 && ++current_ == ids_.end())
{
LOG(debug, store) << "emitter " << id() << ": finished";
std::lock_guard<std::mutex> lock(state_mutex_);
state_ = finished;
return;
}
Expand Down
13 changes: 3 additions & 10 deletions src/vast/store/emitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,18 @@ class emitter : public ze::publisher<>
std::shared_ptr<segment_cache> cache,
std::vector<ze::uuid> ids);

/// Destroys an emitter.
~emitter();

/// Stars the emission process by scheduling a task.
void start();
state start();

/// Temporarily stops the emission of events.
void pause();

/// Retrieves the emitter status.
/// @return The current status of the emitter.
state status() const;
state pause();

private:
// Note: emitting chunks asynchronously could lead to segment thrashing in
// the cache if the ingestion rate is very high.
void emit();

std::mutex state_mutex_;
std::mutex mutex_;
state state_ = stopped;
std::shared_ptr<segment_cache> cache_;
std::vector<ze::uuid> ids_;
Expand Down

0 comments on commit fd9a934

Please sign in to comment.