Skip to content

Commit

Permalink
formatting, logging
Browse files Browse the repository at this point in the history
  • Loading branch information
mperham committed Feb 2, 2024
1 parent 250cc1e commit d09b882
Showing 1 changed file with 41 additions and 38 deletions.
79 changes: 41 additions & 38 deletions bin/multi_queue_bench
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,12 @@ class Loader
start = Time.now
@iter.times do
arr = Array.new(@count) { |idx| [idx] }
#always prepends by queue:: that's why we pass 'q1, q2 etc' instead of `queue::q1`
# Sidekiq always prepends "queue:" to the queue name,
# that's why we pass 'q1', 'q2', etc instead of 'queue:q1'
Sidekiq::Client.push_bulk("class" => LoadWorker, "args" => arr, "queue" => queue)
$stdout.write "."
end
puts "Done"
end

def monitor_single(queue)
Expand Down Expand Up @@ -176,7 +179,7 @@ class Loader
end

def run(queues, queue, monitor_all_queues)
#Sidekiq.logger.warn("Consuming from #{queue}")
Sidekiq.logger.warn("Consuming from #{queue}")
if monitor_all_queues
monitor_all(queues)
else
Expand Down Expand Up @@ -216,50 +219,50 @@ end

# We assign one queue to each sidekiq process
def run(number_of_processes, total_queues)
read_stream, write_stream = IO.pipe
read_stream, write_stream = IO.pipe

queues = []
(0..total_queues-1).each do |idx|
queues.push("queue:q#{idx}")
end
queues = []
(0..total_queues-1).each do |idx|
queues.push("queue:q#{idx}")
end

Sidekiq.logger.info("Queues are: #{queues}")
Sidekiq.logger.info("Queues are: #{queues}")

# Produce
start = Time.now
(0..total_queues-1).each do |idx|
Process.fork do
queue_num = "q#{idx}"
setup(queue_num)
end
# Produce
start = Time.now
(0..total_queues-1).each do |idx|
Process.fork do
queue_num = "q#{idx}"
setup(queue_num)
end
end

queue_sz = $iterations * $elements * total_queues
Process.waitall

ending = Time.now - start
#Sidekiq.logger.info("Pushed #{queue_sz} in #{ending} secs")

# Consume
(0..number_of_processes-1).each do |idx|
Process.fork do
# First process only consumes from it's own queue but monitors all queues.
# It works as a synchronization point. Once all processes finish
# (that is, when all queues are emptied) it prints the the stats.
if idx == 0
queue = "q#{idx}"
consume(queues, queue, true)
else
queue = "q#{idx % total_queues}"
consume(queues, queue, false)
end
queue_sz = $iterations * $elements * total_queues
Process.waitall

ending = Time.now - start
#Sidekiq.logger.info("Pushed #{queue_sz} in #{ending} secs")

# Consume
(0..number_of_processes-1).each do |idx|
Process.fork do
# First process only consumes from it's own queue but monitors all queues.
# It works as a synchronization point. Once all processes finish
# (that is, when all queues are emptied) it prints the the stats.
if idx == 0
queue = "q#{idx}"
consume(queues, queue, true)
else
queue = "q#{idx % total_queues}"
consume(queues, queue, false)
end
end
end

Process.waitall
write_stream.close
results = read_stream.read
read_stream.close
Process.waitall
write_stream.close
results = read_stream.read
read_stream.close
end

$total_processes = ENV["PROCESSES"] ? Integer(ENV["PROCESSES"]) : 8;
Expand Down

0 comments on commit d09b882

Please sign in to comment.