Skip to content

riverqueue/riverqueue-ruby

Repository files navigation

River client for Ruby Build Status Gem Version

An insert-only Ruby client for River packaged in the riverqueue gem. Allows jobs to be inserted in Ruby and run by a Go worker, but doesn't support working jobs in Ruby.

Basic usage

Your project's Gemfile should contain the riverqueue gem and a driver like riverqueue-sequel (see drivers):

gem "riverqueue"
gem "riverqueue-sequel"

Initialize a client with:

DB = Sequel.connect("postgres://...")
client = River::Client.new(River::Driver::Sequel.new(DB))

Define a job and insert it:

class SortArgs
  attr_accessor :strings

  def initialize(strings:)
    self.strings = strings
  end

  def kind = "sort"

  def to_json = JSON.dump({strings: strings})
end

insert_res = client.insert(SimpleArgs.new(strings: ["whale", "tiger", "bear"]))
insert_res.job # inserted job row

Job args should:

  • Respond to #kind with a unique string that identifies them in the database, and which a Go worker will recognize.
  • Response to #to_json with a JSON serialization that'll be parseable as an object in Go.

They may also respond to #insert_opts with an instance of InsertOpts to define insertion options that'll be used for all jobs of the kind.

Insertion options

Inserts take an insert_opts parameter to customize features of the inserted job:

insert_res = client.insert(
  SimpleArgs.new(strings: ["whale", "tiger", "bear"]),
  insert_opts: River::InsertOpts.new(
    max_attempts: 17,
    priority: 3,
    queue: "my_queue",
    tags: ["custom"]
  )
)

Inserting unique jobs

Unique jobs are supported through InsertOpts#unique_opts, and can be made unique by args, period, queue, and state. If a job matching unique properties is found on insert, the insert is skipped and the existing job returned.

insert_res = client.insert(args, insert_opts: River::InsertOpts.new(
  unique_opts: River::UniqueOpts.new(
    by_args: true,
    by_period: 15 * 60,
    by_queue: true,
    by_state: [River::JOB_STATE_AVAILABLE]
  )
)

# contains either a newly inserted job, or an existing one if insertion was skipped
insert_res.job

# true if insertion was skipped
insert_res.unique_skipped_as_duplicated

Custom advisory lock prefix

Unique job insertion takes a Postgres advisory lock to make sure that it's uniqueness check still works even if two conflicting insert operations are occurring in parallel. Postgres advisory locks share a global 64-bit namespace, which is a large enough space that it's unlikely for two advisory locks to ever conflict, but to guarantee that River's advisory locks never interfere with an application's, River can be configured with a 32-bit advisory lock prefix which it will use for all its locks:

client = River::Client.new(mock_driver, advisory_lock_prefix: 123456)

Doing so has the downside of leaving only 32 bits for River's locks (64 bits total - 32-bit prefix), making them somewhat more likely to conflict with each other.

Inserting jobs in bulk

Use #insert_many to bulk insert jobs as a single operation for improved efficiency:

num_inserted = client.insert_many([
  SimpleArgs.new(job_num: 1),
  SimpleArgs.new(job_num: 2)
])

Or with InsertManyParams, which may include insertion options:

num_inserted = client.insert_many([
  River::InsertManyParams.new(SimpleArgs.new(job_num: 1), insert_opts: River::InsertOpts.new(max_attempts: 5)),
  River::InsertManyParams.new(SimpleArgs.new(job_num: 2), insert_opts: River::InsertOpts.new(queue: "high_priority"))
])

Inserting in a transaction

No extra code is needed to insert jobs from inside a transaction. Just make sure that one is open from your ORM of choice, call the normal #insert or #insert_many methods, and insertions will take part in it.

ActiveRecord::Base.transaction do
  client.insert(SimpleArgs.new(strings: ["whale", "tiger", "bear"]))
end
DB.transaction do
  client.insert(SimpleArgs.new(strings: ["whale", "tiger", "bear"]))
end

Inserting with a Ruby hash

JobArgsHash can be used to insert with a kind and JSON hash so that it's not necessary to define a class:

insert_res = client.insert(River::JobArgsHash.new("hash_kind", {
    job_num: 1
}))

RBS and type checking

The gem bundles RBS files containing type annotations for its API to support type checking in Ruby through a tool like Sorbet or Steep.

Drivers

ActiveRecord

Use River with ActiveRecord by putting the riverqueue-activerecord driver in your Gemfile:

gem "riverqueue"
gem "riverqueue-activerecord"

Then initialize driver and client:

ActiveRecord::Base.establish_connection("postgres://...")
client = River::Client.new(River::Driver::ActiveRecord.new)

Sequel

Use River with Sequel by putting the riverqueue-sequel driver in your Gemfile:

gem "riverqueue"
gem "riverqueue-sequel"

Then initialize driver and client:

DB = Sequel.connect("postgres://...")
client = River::Client.new(River::Driver::Sequel.new(DB))

Development

See development.