Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Fetching contributors…

Cannot retrieve contributors at this time

432 lines (342 sloc) 14.18 kB

em-pg-client

Author

Rafał Michalski (rafal@yeondir.com)

DESCRIPTION

em-pg-client is the Ruby and EventMachine driver interface to the PostgreSQL RDBMS. It is based on ruby-pg.

FEATURES

  • Non-blocking / asynchronous processing with EventMachine,

  • fully async auto re-connects on connection losses (e.g.: RDBMS restarts),

  • minimal changes to PG::Connection API,

  • configurable timeouts (connect or execute) of asynchronous processing,

  • additional Fiber-aware version supporting EM-Synchrony.

BUGS/LIMITATIONS

  • no async support for: COPY commands (get_copy_data, put_copy_data), wait_for_notify and transaction

  • actually no ActiveRecord nor Sequel support (you are welcome to contribute).

  • doesn't work on Windows (issue #7)

API Changes between versions

0.1.x -> 0.2.x

  • on_reconnect renamed to more accurate on_autoreconnect (well, it's not used by PG::EM::Client#reset call).

  • async_autoreconnect is false by default if on_autoreconnect is not specified as initialization option.

TODO:

  • implement EM adapted version of get_copy_data, put_copy_data, wait_for_notify and transaction

  • add some fd/socket hackery to get it working on Windows (issue #7)

  • em-synchrony ORM (ActiveRecord, Sequel and maybe Datamapper) support as separate projects

  • present more benchmarks

REQUIREMENTS

INSTALL

$ [sudo] gem install em-pg-client

Gemfile

# eventmachine
gem "em-pg-client", "~> 0.2.0", :require => 'pg/em'
# em-synchrony
gem "em-pg-client", "~> 0.2.0", :require => ['pg/em', 'em-synchrony/pg']

Github

git clone git://github.com/royaltm/ruby-em-pg-client.git

WHY?

Because I didn't find any ruby-pg's EM implementation to fit my needs. I've found at least 3 other implementations of EM postgres client:

and (except the EM-bundled one which uses no longer maintained postgres-pr library) all of them have similiar flaws:

  • 2 of them are designed to support some ORM (ActiveRecord or Sequel), so they are EM-Synchrony only,

  • non-standard API method names,

  • no (nonexistent or non-working) autoreconnect implementation,

  • poor error handling,

  • not fully supporting asynchronous PG::Connection API.

The last one is worth some comment:

They all use blocking methods to retrieve whole result from server (PGConn#block or PGConn#get_result which also blocks when there is not enough buffered data on socket).

This implementation makes use of non-blocking: PGConn#is_busy and PGConn#consume_input methods. Depending on the size of queries results and the level of concurrency, the gain in overall speed and responsiveness of your application might be actually quite huge. I've done some tests already.

Thanks

The greetz go to:

USAGE

em-pg-client provides PG::EM::Client class which inherits PG::Connection. You can work with PG::EM::Client almost the same way you would with PG::Connection.

The real difference begins when you turn EventMachine reactor on.

BASIC

require 'pg/em'

# no async
pg = PG::EM::Client.new dbname: 'test'
pg.query('select * from foo') do |result|
  puts Array(result).inspect
end

# asynchronous
EM.run do
  df = pg.query('select * from foo')
  df.callback { |result|
    puts Array(result).inspect
    EM.stop
  }
  df.errback {|ex|
    raise ex
  }
  puts "sent"
end

# alternatively
EM.run do
  pg.query('select * from foo') do |result|
    raise result if result.is_a? ::Exception
    puts Array(result).inspect
    EM.stop
  end
  puts "sent"
end

PG::Connection methods adapted to EventMachine

The list of PG::EM::Client async methods for processing with EventMachine.

1. Async methods (always returning Deferrable object):

  • Client.async_connect (singleton)

  • async_reset

  • async_exec (alias: async_query)

  • async_prepare

  • async_exec_prepared

  • async_describe_prepared

  • async_describe_portal

For arguments of theese methods consult their original blocking (without async_ prefix) counterparts in PG::Connection manual.

Use callback on the returned Deferrable to receive result. The result you receive is PG::EM::Client for PG::EM::Client.async_connect and async_reset, and PG::Result for the rest of the methods. The received PG::EM::Client is in a connected state and ready for queries. You need to clear obtained PG::Result object yourself or leave it to gc.

To detect a failure in an executed method use errback on returned Deferrable. You should expect an instance of Exception (usually PG::Error) as errback argument. You may check its backtrace to find origin of the error.

2. Async / blocking methods (returning Deferrable only when EM is running):

  • exec (alias: query)

  • prepare

  • exec_prepared

  • describe_prepared

  • describe_portal

Outside EventMachine's event loop these methods are regular, blocking PG::Connection methods.

All the methods (1 & 2) accept block argument which they attach to callback and errback hooks of returned Deferrable.

You may also mix async and blocking methods without closing the connection. You only need to start/stop EventMachine in between async calls.

Special options

There are 3 additional connection options and 1 standard pg option used by async methods. You may add them as one of the hash options to PG::EM::Client.new or PG::EM::Client.async_connect or simply use accessor methods to change them on the fly. The additional options are not passed to libpq.

The options are:

  • async_autoreconnect (true / false with default false unless on_autoreconnect is specified) allows automatic re-connection when there was a problem with connection to the server,

  • on_autoreconnect (nil / Proc with default nil) a hook which is called after auto-reconnecting,

  • query_timeout (Float / Fixnum with default 0) allows to set timeout for query execution,

  • connect_timeout (Float / Fixnum with default 0) connection establishing and resetting timeout.

Only connect_timeout is a standard libpq option, although changing it by accessor method only affects asynchronous functions.

AUTORECONNECTING IN ASYNC MODE

Autoreconnecting is done in non-blocking manner using async_reset internally.

EM.run do
  pg = PG::EM::Client.new dbname: 'test',
        connect_timeout: 5, query_timeout: 50,
        async_autoreconnect: true
  try_query = lambda do |&blk|
    pg.query('select * from foo') do |result|
      raise result if result.is_a? ::Exception
      puts Array(result).inspect
      blk.call
    end
  end
  try_query.call {
    system 'pg_ctl stop -m fast'
    system 'pg_ctl start -w'
    try_query.call { EM.stop }
  }
end

to enable this feature call:

pg.async_autoreconnect = true

or

pg = PG::EM::Client.new dbname: 'test',
  async_autoreconnect: true

It's also possible to define on_autoreconnect callback to be invoked while the connection has been reset. It's called just before the send query command is executed:

EM.run do
  pg = PG::EM::Client.new dbname: 'test',
        async_autoreconnect: true
  pg.prepare('bar', 'select * from foo order by cdate desc') do
    pg.on_autoreconnect = proc { |c, e|
      c.prepare('bar', 'select * from foo order by cdate desc')
    }
    try_query = lambda do |&blk|
      pg.exec_prepared('bar') do |result|
        raise result if result.is_a? ::Exception
        puts Array(result).inspect
        blk.call
      end
    end
    try_query.call {
      system 'pg_ctl stop -m fast'
      system 'pg_ctl start -w'
      try_query.call { EM.stop }
    }
  end
end

As you can see it's possible to send async query from inside on_autoreconnect proc. However you have to pass Deferrable from the async callback to the caller. See PG::EM::Client#on_autoreconnect docs for details.

TRUE ASYNC

For non-blocking connect use PG::EM::Client.async_connect and PG::EM::Client#async_reset for asynchronous re-connect. Like other async methods they return Deferrable object. Use Deferrable's #callback to obtain already connected PG::EM::Client.

EM.run do
  pool = (1..10).map {
    PG::EM::Client.async_connect dbname: 'test',
        connect_timeout: 5, query_timeout: 50 }

  togo = pool.length

  pool.each_with_index do |df, i|
    df.callback do |pg|
      pg.query("select * from foo") do |result|
        puts "recv: #{i}"
        EM.stop if (togo-=1).zero?
      end
      puts "sent: #{i}"
    end
    df.errback { |ex| raise ex }
  end
end

Fibers / EM-Synchrony

There is a special version of PG::EM::Client library with fiber aware methods for EM-Synchrony or other implementations of Fiber untangled EventMachine.

The require string is “em-synchrony/pg” instead of “pg/em”.

em-synchrony/pg version of PG::EM::Client.new is fully asynchronous and blocks only current fiber. This also applies to PG::EM::Client#reset.

require 'em-synchrony'
require 'em-synchrony/pg'

EM.synchrony do
  pg = PG::EM::Client.new dbname: 'test'
  pg.query('select * from foo') do |result|
    puts Array(result).inspect
  end
  EM.stop
end

Although em-synchrony provides very nice set of tools for untangled EventMachine, you don't really require it to fully benefit from this version of PG::EM::Client.

PG::Connection methods adapted to EM-Synchrony

The list of PG::EM::Client fiber aware methods for processing with EM-Synchrony / EventMachine.

All async_* methods are exactly the same as in pure EventMachine version of PG::EM::Client.

The fiber aware methods are:

  • Client.connect (singleton, alias: new, open, setdb, setdblogin)

  • reset

  • exec (alias: query)

  • prepare

  • exec_prepared

  • describe_prepared

  • describe_portal

Under the hood, these methods call async counterparts of themselves and yield from current fiber awaiting for the result. The PG::Result (or PG::EM::Client for connect and reset) is then returned to the caller. If code block was given, it is executed with result as the argument. In that case the value of the block is returned instead and PG::Result is cleared (or in case of connect or reset PG::EM::Client is being closed) after executing block. From single fiber point of view, they behave like regular blocking PG::Connection methods.

Each of them is also automatic, detecting if EventMachine is running. If called outside EM event loop they are exactly the original methods of PG::Connection.

Like in pure EventMachine version you can mix async, fiber aware and blocking methods without finishing the connection. You only need to start/stop EventMachine in between async calls.

Handling errors

EM.synchrony do
  begin
    pg.query('select * from foo') do |result|
      puts result
    end
  rescue PG::Error => e
    puts "PSQL error: #{e.inspect}"
  end
  EM.stop
end

Parallel async queries

EM.synchrony do
  pg = EM::Synchrony::ConnectionPool.new(size: 2) do  
    PG::EM::Client.new :dbname => 'test'
  end
  multi = EventMachine::Synchrony::Multi.new
  multi.add :foo, pg.aquery('select * from foo') # or #async_query()
  multi.add :bar, pg.aquery('select * from bar') # #aquery() is just an alias
  res = multi.perform
  p res
  EM.stop
end

Fiber Concurrency

EM.synchrony do
  # use ConnectionPool when more Fibers will be querying at the same time!
  pg = EM::Synchrony::ConnectionPool.new(size: 5) do  
    PG::EM::Client.new :dbname => 'test'
  end
  counter = 0
  EM::Synchrony::FiberIterator.new(['select * from foo']*10, 5) do |query|
    i = counter
    pg.query(query) do |result|
      puts "recv: #{i}"
    end
    puts "sent: #{i}"
    counter += 1
  end
  EM.stop
end

Async reconnect with on_autoreconnect callback

EM.synchrony do
  on_autoreconnect = proc do |c, e|
    c.prepare('bar', 'select * from foo order by cdate desc')
  end
  pg = EM::Synchrony::ConnectionPool.new(size: 5) do
    p = PG::EM::Client.new dbname: 'test', on_autoreconnect: on_autoreconnect
    on_autoreconnect.call p
    p
  end
  try_query = lambda do
    pg.exec_prepared('bar') do |result|
      raise result if result.is_a? ::Exception
      puts Array(result).inspect
    end
  end
  try_query.call
  system 'pg_ctl stop -m fast'
  system 'pg_ctl start -w'
  try_query.call
  EM.stop
end

Specifying on_autoreconnect as PG::EM::Client.new initialization option, implicitly enables async_autoreconnect.

LICENCE

The MIT License - Copyright © 2012 Rafał Michalski

Jump to Line
Something went wrong with that request. Please try again.