Skip to content

philiprehberger/rb-batch

Repository files navigation

philiprehberger-batch

Tests Gem Version Last updated

Batch processing toolkit with chunking, progress, and error collection

Requirements

  • Ruby >= 3.1

Installation

Add to your Gemfile:

gem "philiprehberger-batch"

Or install directly:

gem install philiprehberger-batch

Usage

require "philiprehberger/batch"

result = Philiprehberger::Batch.process(records, size: 50) do |batch|
  batch.each { |record| save(record) }
end

puts result.processed  # => number of successful items
puts result.success?   # => true if no errors

Progress Tracking

result = Philiprehberger::Batch.process(items, size: 100) do |batch|
  batch.each { |item| process(item) }
  batch.on_progress do |info|
    puts "Chunk #{info[:chunk_index] + 1}/#{info[:total_chunks]} - #{info[:percentage]}%"
  end
end

Top-level Progress Callback

Pass on_progress: at the call site to subscribe without touching every chunk:

progress = ->(info) { puts "#{info[:percentage]}% (#{info[:processed]}/#{info[:total_items]})" }

Philiprehberger::Batch.process(items, size: 100, on_progress: progress) do |batch|
  batch.each { |item| process(item) }
end

Error Collection

result = Philiprehberger::Batch.process(jobs, size: 25) do |batch|
  batch.each { |job| job.execute! }
  batch.on_error { |item, err| log_error(item, err) }
end

result.errors.each do |entry|
  puts "Failed: #{entry[:item]} - #{entry[:error].message}"
end

Early Termination

result = Philiprehberger::Batch.process(items, size: 50) do |batch|
  batch.on_error { |_item, _err| :halt }
  batch.each { |item| risky_operation(item) }
end

result.halted?  # => true if processing stopped early

Retry Per Chunk

result = Philiprehberger::Batch.process(items, size: 100, retries: 2) do |batch|
  batch.each { |item| unreliable_api_call(item) }
end

Result Aggregation

result = Philiprehberger::Batch.process(users, size: 50) do |batch|
  batch.each { |user| user.active? ? :active : :inactive }
end

result.counts                              # => { active: 42, inactive: 8 }
result.flat_map { |status| [status] }      # => [:active, :active, :inactive, ...]
result.group_by { |status| status }        # => { active: [...], inactive: [...] }

Success Rate

result = Philiprehberger::Batch.process(jobs, size: 50) do |batch|
  batch.each { |job| job.execute! }
end

result.success_rate  # => 0.0..1.0 ratio of processed to total (1.0 when total is 0)
puts "#{(result.success_rate * 100).round(1)}% succeeded"

Timing Statistics

result = Philiprehberger::Batch.process(records, size: 50) do |batch|
  batch.each { |record| save(record) }
end

stats = result.timing
puts stats[:total]          # => overall elapsed time in seconds
puts stats[:per_chunk]      # => average time per chunk
puts stats[:per_item]       # => average time per item
puts stats[:fastest_chunk]  # => shortest chunk duration
puts stats[:slowest_chunk]  # => longest chunk duration

Timeout Per Chunk

result = Philiprehberger::Batch.process(items, size: 100, timeout_per_chunk: 30) do |batch|
  batch.each { |item| slow_external_call(item) }
end

# Chunks that exceed 30 seconds are interrupted. The TimeoutError is captured
# in result.errors; items from that chunk are NOT counted in result.processed.
# Processing continues with the remaining chunks.
timeout_errors = result.errors.select { |e| e[:error].is_a?(Philiprehberger::Batch::TimeoutError) }

Filtering Errors by Class

result = Philiprehberger::Batch.process(items, size: 50) do |batch|
  batch.each { |item| item.sync! }
end

timeout_errors = result.filter_errors(Philiprehberger::Batch::TimeoutError)
timeout_errors.each { |e| puts "Chunk timed out: #{e[:item].inspect}" }

arg_errors = result.filter_errors(ArgumentError)
arg_errors.each { |e| puts "Bad argument for #{e[:item]}: #{e[:error].message}" }

Errors for a Specific Item

result = Philiprehberger::Batch.process(records, size: 50) do |batch|
  batch.each { |record| record.save! }
end

result.errors_for(records.first).each do |entry|
  puts "#{entry[:item]} failed: #{entry[:error].message}"
end

Partial Success

result = Philiprehberger::Batch.process([1, 2, 3, 4]) do |batch|
  batch.each { |n| raise "even" if n.even? }
end

result.partial?      # => true (some succeeded, some failed)
result.failed_items  # => [2, 4]

Concurrency

result = Philiprehberger::Batch.process(records, size: 100, concurrency: 4) do |batch|
  batch.each { |record| api_call(record) }
end

result.processed  # => total successful across all threads
result.results    # => collected in chunk order

API

Method / Class Description
.process(collection, size:, concurrency:, retries:, timeout_per_chunk:, on_progress:) { |batch| } Process collection in chunks (optional top-level progress callback and per-chunk timeout)
Batch::TimeoutError Raised internally and captured in Result#errors when a chunk exceeds timeout_per_chunk
Chunk#each { |item| } Iterate over items in the chunk
Chunk#on_progress { |info| } Register progress callback
Chunk#on_error { |item, err| } Register error callback (return :halt to stop)
Result#processed Number of successfully processed items
Result#errors Array of error hashes
Result#total Total number of items
Result#chunks Number of chunks processed
Result#elapsed Elapsed time in seconds
Result#success? True if no errors occurred
Result#halted? True if processing was halted early
Result#results Array of collected return values
Result#flat_map { |r| } Map over results and flatten
Result#counts Hash counting occurrences of each result value
Result#group_by { |r| } Group results by block return value
Result#success_rate Ratio of processed to total as a Float in [0.0, 1.0] (1.0 when empty)
Result#timing Hash of timing stats: total, per_chunk, per_item, fastest_chunk, slowest_chunk
Result#filter_errors(error_class) Array of { item:, error: } hashes where the error is an instance of the given class
Result#errors_for(item) Array of { item:, error: } hashes for a specific item
Result#failed_items Unique items that errored, in first-failure order
Result#partial? True when some items succeeded and some errored (false on full success or full failure)

Development

bundle install
bundle exec rspec
bundle exec rubocop

Support

If you find this project useful:

⭐ Star the repo

πŸ› Report issues

πŸ’‘ Suggest features

❀️ Sponsor development

🌐 All Open Source Projects

πŸ’» GitHub Profile

πŸ”— LinkedIn Profile

License

MIT

About

Batch processing toolkit with chunking, progress, and error collection

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages