Skip to content

Commit

Permalink
protect @CountS against race conditions
Browse files Browse the repository at this point in the history
It seems @mutex is used to protect @CountS.  If that is the case the
mutex must be used everywhere you touch @CountS, otherwise you introduce
race conditions.
  • Loading branch information
shyouhei committed Mar 23, 2015
1 parent 7901058 commit f5654bd
Showing 1 changed file with 16 additions and 8 deletions.
24 changes: 16 additions & 8 deletions lib/fluent/plugin/out_datacounter.rb
Expand Up @@ -201,12 +201,18 @@ def generate_output_per_tags(counts, step)
end

def flush(step) # returns one message
flushed,@counts = @counts,count_initialized(@counts.keys.dup.select{|k| @counts[k][-1] > 0})
flushed = nil
@mutex.synchronize {
flushed,@counts = @counts,count_initialized(@counts.keys.dup.select{|k| @counts[k][-1] > 0})
}
generate_output(flushed, step)
end

def flush_per_tags(step) # returns map of tag - message
flushed,@counts = @counts,count_initialized(@counts.keys.dup.select{|k| @counts[k][-1] > 0})
flushed = nil
@mutex.synchronize {
flushed,@counts = @counts,count_initialized(@counts.keys.dup.select{|k| @counts[k][-1] > 0})
}
generate_output_per_tags(flushed, step)
end

Expand Down Expand Up @@ -302,12 +308,14 @@ def load_status(file_path, tick)
stored[:patterns] == @patterns

if Fluent::Engine.now <= stored[:saved_at] + tick
@counts = stored[:counts]
@saved_at = stored[:saved_at]
@saved_duration = stored[:saved_duration]

# skip the saved duration to continue counting
@last_checked = Fluent::Engine.now - @saved_duration
@mutex.synchronize {
@counts = stored[:counts]
@saved_at = stored[:saved_at]
@saved_duration = stored[:saved_duration]

# skip the saved duration to continue counting
@last_checked = Fluent::Engine.now - @saved_duration
}
else
log.warn "out_datacounter: stored data is outdated. ignore stored data"
end
Expand Down

0 comments on commit f5654bd

Please sign in to comment.