Skip to content
LISTEN for NOTIFY events from PostgreSQL and Asynchronously Do Something with the payload
Branch: master
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
bin
gemfiles
lib
spec
.codeclimate.yml
.gitignore
.rspec
.travis.yml
Appraisals
CODE_OF_CONDUCT.md
Gemfile
LICENSE.txt
README.md
Rakefile
celluloid-io-pg-listener.gemspec

README.md

celluloid-io-pg-listener

Simple way to NOTIFY and LISTEN to channels in PostgreSQL

Standalone the listener client works great in an irb session. Real-world, practical, use requires a long running process, and for an example of that see bin/supervisor

Project CelluloidIOPGListener
gem name celluloid-io-pg-listener
license License: MIT
expert support Get help on Codementor
download rank Total Downloads
version Gem Version
dependencies Dependency Status
code quality Code Climate
inline documenation Inline docs
continuous integration Build Status
test coverage Coverage Status
homepage on Github.com
documentation on Rdoc.info
live chat Join the chat at https://gitter.im/pboling/celluloid-io-pg-listener
Spread ♡ⓛⓞⓥⓔ♡ on Coderbits, on Coderwall

Prerequisites

Compatible with Ruby 2.1+, and tested on Travis. Might also be compatible with Ruby 2.0, but haven't had time or inclination to find out. That's why we have a fork button!

Requires Celluloid greater than 0.17.2 because that release fixed an important memory leak in Celluloid.

gem "celluloid", ">= 0.17.2"

Currently requires pg gem v0.18.3 or greater, but really that's only because I haven't had time to test with older versions. Feel free to do that and send a PR.

gem "pg", ">= 0.18.3"

Installation

Add this line to your application's Gemfile:

gem 'celluloid-io-pg-listener'

And then execute:

$ bundle

Or install it yourself as:

$ gem install celluloid-io-pg-listener

Upgrading 0.2.x to 0.3.x

Change:

include CelluloidIOPGListener::Client

To:

include CelluloidIOPGListener.client(:unlisten_wrapper_with_termination)

You now have a choice of unlisten wrappers, with or without termination as part of the unlisten. The one without terminate is used when you are running a client inside a celluloid supervisor container, which handles termination. See bin/supervisor and bin/supervision in this project for two examples of this.

Usage

Find a data base that exists that you want to run notifications through. Won't affect anything else in the database, so doesn't matter which one you pick. Then pick an arbitrary name for the channel. Only requirement is that the server and the client use the same database name and channel name or they won't be communicating.

In an irb session taking care to:

  • replace the database names with your own
  • replace the channel name, if you want, they are arbitrary, and don't need to be "created" in the DB.

Turn on debug mode to see what is happening:

>> $CELLULOID_DEBUG=true
=> true

Then create a server:

>> server = CelluloidIOPGListener::Examples::Server.new(dbname: "celluloid_io_pg_listener_test", channel: "users_insert")

D, [2015-10-14T12:59:31.840206 #23209] DEBUG -- : Server will send notifications to celluloid_io_pg_listener_test:users_insert

=> #<Celluloid::Proxy::Cell(CelluloidIOPGListener::Examples::Server:0x3ff71a6f6db8) @client_extracted_signature=#<CelluloidIOPGListener::Initialization::ClientExtractedSignature:0x007fee34dec310 @channel="users_insert", @conninfo_hash={:dbname=>"celluloid_io_pg_listener_test"}, @super_signature=[]>>

Then start the server sending notifications. There is no client listening to the notifications yet, so nothing will hear them:

>> server.start
=> #<Celluloid::Proxy::Async(CelluloidIOPGListener::Examples::Server)>

D, [2015-10-14T12:59:36.115639 #23209] DEBUG -- : Notified users_insert
D, [2015-10-14T12:59:37.107589 #23209] DEBUG -- : Notified users_insert
D, [2015-10-14T12:59:38.112089 #23209] DEBUG -- : Notified users_insert
D, [2015-10-14T12:59:39.112341 #23209] DEBUG -- : Notified users_insert

The notifications will just keep flowing, 1 per second as the example server is configured by default. Now in another irb session:

>> CelluloidIOPGListener::Examples::ListenerClientByInheritance.new(dbname: "celluloid_io_pg_listener_test", channel: "users_insert", callback_method: :foo_bar)

=> #<Celluloid::Proxy::Cell(CelluloidIOPGListener::Examples::ListenerClientByInheritance:0x3fe50d93f15c) @client_extracted_signature=#<CelluloidIOPGListener::Initialization::ClientExtractedSignature:0x007fca1b27c738 @channel="users_insert", @conninfo_hash={:dbname=>"celluloid_io_pg_listener_test"}, @super_signature=[{}]> @callback_method=:foo_bar @listening=true @pg_connection=#<PG::Connection:0x007fca1b287b38> @actions={"users_insert"=>:foo_bar}>

I, [2015-10-14T12:59:46.127120 #23223]  INFO -- : Received notification: ["users_insert", 23220, "1444852786"]
I, [2015-10-14T12:59:47.127021 #23223]  INFO -- : Received notification: ["users_insert", 23220, "1444852787"]
I, [2015-10-14T12:59:48.127152 #23223]  INFO -- : Received notification: ["users_insert", 23220, "1444852788"]
I, [2015-10-14T12:59:49.127509 #23223]  INFO -- : Received notification: ["users_insert", 23220, "1444852789"]

Simply exit the sessions to end the test.

Or keep the client running, and only exit the server and do a real test with real DB triggers.

If you have downloaded the gem source, cd to the gem's directory, and run bin/setup to create the test database.

Open psql and \c celluloid_io_pg_listener_test

pboling=# \c celluloid_io_pg_listener_test
You are now connected to database "celluloid_io_pg_listener_test" as user "pboling".
celluloid_io_pg_listener_test=# insert into users (name) values ('jack');
NOTICE:  INSERT TRIGGER called on users
INSERT 0 1
celluloid_io_pg_listener_test=# insert into users (name) values ('jill');
NOTICE:  INSERT TRIGGER called on users
INSERT 0 1
celluloid_io_pg_listener_test=# \q

In the irb session with the Client still running you will see new notifications:

I, [2015-10-14T13:42:55.511294 #25295]  INFO -- : Received notification: ["users_insert", 25304, "{\"table\" : \"users\", \"id\" : 1, \"name\" : \"jack\", \"type\" : \"INSERT\"}"]
I, [2015-10-14T13:43:03.841323 #25295]  INFO -- : Received notification: ["users_insert", 25304, "{\"table\" : \"users\", \"id\" : 2, \"name\" : \"jill\", \"type\" : \"INSERT\"}"]

A more advanced client could be made to deserialize that JSON and get real work done. That's where you come in! ;)

The example Client (Listener) class included with the gem is just a proof of concept. It shows you how to use the CelluloidIOPGListener::Client module to make your own listener class that does what you need done. You could, for example, push the payload of the notification to Redis, to be worked by Sidekiq or Resque.

module CelluloidIOPGListener
  module Examples
    class Client

      include CelluloidIOPGListener.client(:unlisten_wrapper_with_termination)

      # Defining initialize is optional,
      #   unless you have custom args you need to handle
      #   aside from those used by the CelluloidIOPGListener::Client
      # But if you do define it, use a splat,
      #   hash or array splat should work,
      #   depending on your signature needs.
      # With either splat, only pass the splat params to super,
      #   and handle all other params locally.
      #
      # def initialize(optional_arg = nil, *options)
      #   @optional_arg = optional_arg # handle it here, don't pass it on!
      #   super(*options)
      # end

      def insert_callback(channel, payload)
        # <-- within the unlisten_wrapper's block if :insert_callback is the callback_method
        debug "#{self.class} channel is #{channel}"
        debug "#{self.class} payload is #{payload}"
      end

    end
  end
end

Credits

Written by Peter Boling and Rohit Gupta for Trumaker & Co

Inspired by:

Development

After checking out the repo, run bin/setup to install dependencies, and setup the test environment, including creating a role and a database. Then, run appraisal rake to run the tests. You can also run bin/console for an interactive prompt that will allow you to experiment.

To install this gem onto your local machine, run bundle exec rake install. To release a new version, update the version number in version.rb, and then run bundle exec rake release, which will create a git tag for the version, push git commits and tags, and push the .gem file to rubygems.org.

Running the tests

Setup has been implemented with bin/setup, so review the file to see what it will do before you:

bin/setup

Run the specs with rake:

appraisal rake

Or, run the specs without rake:

appraisal rspec

NOTE: If you need to recreate db/structure.sql from the contents of the test database:

cd spec/apps
SKIP_RAILS_ROOT_OVERRIDE=true bundle exec rake db:structure:dump

Contributing

Bug reports and pull requests are welcome on GitHub at https://github.com/[USERNAME]/celluloid-io-pg-listener. This project is intended to be a safe, welcoming space for collaboration, and contributors are expected to adhere to the Contributor Covenant code of conduct.

  1. Fork it
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Added some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Make sure to add tests for it. This is important so I don't break it in a future version unintentionally.
  6. Create new Pull Request

Versioning

This library aims to adhere to Semantic Versioning 2.0.0. Violations of this scheme should be reported as bugs. Specifically, if a minor or patch version is released that breaks backward compatibility, a new version should be immediately released that restores compatibility. Breaking changes to the public API will only be introduced with new major versions.

As a result of this policy, you can (and should) specify a dependency on this gem using the Pessimistic Version Constraint with two digits of precision.

For example:

spec.add_dependency 'celluloid-io-pg-listener', '~> 0.2'

License

The gem is available as open source under the terms of the MIT License.

You can’t perform that action at this time.