Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Initial working code

  • Loading branch information...
commit 656218da9576a5018daac4b547a40bcfa77ecea9 0 parents
@takai authored
19 .gitignore
@@ -0,0 +1,19 @@
+*.gem
+*.rbc
+.bundle
+.config
+.yardoc
+Gemfile.lock
+InstalledFiles
+_yardoc
+coverage
+doc/
+lib/bundler/man
+pkg
+rdoc
+spec/reports
+test/tmp
+test/version_tmp
+tmp
+.idea/
+.DS_Store
8 .yardopts
@@ -0,0 +1,8 @@
+yardoc
+--title ruote-nats
+--no-private
+--protected lib/**/*.rb
+--markup-provider=redcarpet
+--markup=markdown
+--asset image
+- README.md
3  Gemfile
@@ -0,0 +1,3 @@
+source :rubygems
+
+gemspec
22 LICENSE
@@ -0,0 +1,22 @@
+Copyright (C) 2012 Naoto Takai. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions
+are met:
+1. Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+2. Redistributions in binary form must reproduce the above copyright
+notice, this list of conditions and the following disclaimer in the
+documentation and/or other materials provided with the distribution.
+
+THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+SUCH DAMAGE.RE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
62 README.md
@@ -0,0 +1,62 @@
+# ruote-nats
+
+ruote-nats is an implementation of the ruote participant and receivers to process workitem on remote host using NATS.
+
+![System Diagram](image/system-diagram.png)
+
+## Usage
+
+Participant registration:
+
+```ruby
+engine.register_participant :remote_shell, RuoteNATS::Participant
+```
+
+Process definition:
+
+```ruby
+remote_shell :command => '/bin/date', :env => { 'LANG' => 'C' }
+```
+
+### Options
+
+* ```queue```: subject to publish command name, the default value is "remote.command".
+* ```executor```: executor to dispatch on remote host, the default value is "RuoteNATS::ShellExecutor".
+
+### Options for ShellExecutor
+
+* ```command```: (required) shell script.
+* ```env```: environments variables for command.
+
+## Single File Example
+
+```ruby
+require 'bundler/setup'
+
+require 'ruote'
+require 'ruote-nats'
+
+RuoteNATS.logger.level = Logger::DEBUG
+
+NATS.start do
+ begin
+ pdef = Ruote.define do
+ remote_shell :command => '/bin/date', :env => { 'LANG' => 'C' }
+ end
+
+ engine = Ruote::Engine.new(Ruote::Worker.new(Ruote::HashStorage.new))
+ engine.register_participant :remote_shell, RuoteNATS::Participant
+
+ RuoteNATS::CommandReceiver.new.start
+ RuoteNATS::ReplyReceiver.new(engine).start
+
+ engine.launch(pdef)
+
+ EM.add_timer(1) do
+ NATS.stop
+ end
+ rescue
+ Logger.new(STDOUT).error($!.message)
+ end
+end
+```
8 Rakefile
@@ -0,0 +1,8 @@
+# -*- mode: ruby; coding: utf-8 -*-
+
+require 'bundler/gem_tasks'
+require 'rspec/core/rake_task'
+
+RSpec::Core::RakeTask.new(:spec)
+
+task :default => :spec
30 example/single_file.rb
@@ -0,0 +1,30 @@
+# -*- mode: ruby; coding: utf-8 -*-
+
+require 'bundler/setup'
+
+require 'ruote'
+require 'ruote-nats'
+
+RuoteNATS.logger.level = Logger::DEBUG
+
+NATS.start do
+ begin
+ pdef = Ruote.define do
+ remote_shell :command => '/bin/date', :env => { 'LANG' => 'C' }
+ end
+
+ engine = Ruote::Engine.new(Ruote::Worker.new(Ruote::HashStorage.new))
+ engine.register_participant :remote_shell, RuoteNATS::Participant
+
+ RuoteNATS::CommandReceiver.new.start
+ RuoteNATS::ReplyReceiver.new(engine).start
+
+ engine.launch(pdef)
+
+ EM.add_timer(1) do
+ NATS.stop
+ end
+ rescue
+ Logger.new(STDOUT).error($!.message)
+ end
+end
BIN  image/system-diagram.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
29 lib/ruote-nats.rb
@@ -0,0 +1,29 @@
+require 'logger'
+require 'msgpack'
+require 'nats/client'
+require 'ruote'
+require 'open3'
+
+require 'ruote-nats/command_receiver'
+require 'ruote-nats/participant'
+require 'ruote-nats/reply_receiver'
+require 'ruote-nats/shell_executor'
+require 'ruote-nats/version'
+
+module RuoteNATS
+ class << self
+ # Get the logger.
+ #
+ # @return [Logger] logger
+ def logger
+ @logger ||= Logger.new(STDOUT).tap { |log| log.level = Logger::INFO }
+ end
+
+ # Sets the logger.
+ #
+ # @param [Logger] logger
+ def logger=(logger)
+ @logger = logger
+ end
+ end
+end
69 lib/ruote-nats/command_receiver.rb
@@ -0,0 +1,69 @@
+module RuoteNATS
+ class CommandReceiver
+
+ # Starts to subscribe command queue.
+ #
+ # @param [String] queue_name
+ def start(queue_name = 'remote.command')
+ NATS.subscribe(queue_name, queue: queue_name, max: 1) do |message, reply|
+ NATS.publish(reply, 'ACCEPT') do
+ unpacked = MessagePack.unpack(message)
+ workitem = Ruote::Workitem.new(unpacked)
+
+ RuoteNATS.logger.info do
+ "(#{workitem.sid}) receive command: #{lookup_executor(workitem)} (#{workitem.lookup('params')})"
+ end
+
+ dispatch(workitem)
+ publish_reply(workitem)
+ end
+ end
+ end
+
+ private
+ def dispatch(workitem)
+ executor = lookup_executor(workitem)
+ results = constantize(executor).new.execute(workitem)
+ rescue
+ workitem.result = 'error'
+ results = { message: $!.message, backtrace: $!.backtrace }
+ ensure
+ store_results(workitem, results)
+ end
+
+ def lookup_executor(workitem)
+ workitem.lookup('params.executor') || 'RuoteNATS::ShellExecutor'
+ end
+
+ def constantize(executor_name)
+ names = executor_name.split('::')
+ names.shift if names.empty? || names.first.empty?
+
+ constant = Object
+ names.each do |name|
+ constant = constant.const_defined?(name, false) ? constant.const_get(name) : constant.const_missing(name)
+ end
+ constant
+ end
+
+ def store_results(workitem, results)
+ params = workitem.lookup('params')
+
+ workitem.set_field('results', Hash.new) unless workitem.lookup('results')
+ workitem.set_field("results.#{workitem.sid}", params.merge(results))
+ end
+
+ def publish_reply(workitem)
+ queue_name = workitem.lookup('reply_to') || 'remote.command.reply'
+
+ packed = MessagePack.pack(workitem.to_h)
+ NATS.publish(queue_name, packed) do
+ RuoteNATS.logger.info do
+ results = workitem.lookup("results.#{workitem.sid}")
+ "(#{workitem.sid}) reply: #{lookup_executor(workitem)} (#{results})"
+ end
+ end
+ end
+ end
+
+end
50 lib/ruote-nats/participant.rb
@@ -0,0 +1,50 @@
+module RuoteNATS
+
+ class TimeoutError < StandardError
+ end
+
+ # # RuoteNATS::Participant
+ #
+ class Participant
+ include Ruote::LocalParticipant
+
+ DEFALUT_TIMEOUT = 1
+
+ # @param [Ruote::Workitem] workitem
+ def consume(workitem)
+ queue_name = workitem.lookup('params.queue') || 'remote.command'
+ message = MessagePack.pack(workitem.to_h)
+
+ sid = NATS.request(queue_name, message) do |reply|
+ RuoteNATS.logger.info do
+ executor = workitem.lookup('params.executor') || 'RuoteNATS::ShellExecutor'
+ "(#{workitem.sid}) request: #{executor} (#{workitem.lookup('params')})"
+ end
+ end
+
+ timeout = (workitem.lookup('params.timeout') || DEFALUT_TIMEOUT).to_i
+ NATS.timeout(sid, timeout) do
+ handle_error(workitem)
+ end
+ rescue
+ RuoteNATS.logger.error($!.message)
+ raise $!
+ end
+
+ def cancel
+ end
+
+ private
+ def handle_error(workitem)
+ executor = workitem.lookup('params.executor') || 'RuoteNATS::ShellExecutor'
+ RuoteNATS.logger.error do
+ "(#{workitem.sid}) timeout: #{executor} (#{workitem.lookup('params')})"
+ end
+
+ error = TimeoutError.new("Request timeout: workitem could not be processed.")
+
+ error_handler = context.error_handler
+ error_handler.action_handle('error', workitem.to_h['fei'], error)
+ end
+ end
+end
44 lib/ruote-nats/reply_receiver.rb
@@ -0,0 +1,44 @@
+module RuoteNATS
+ class ReplyReceiver
+
+ # @param [Ruote::Engine] engine
+ def initialize(engine)
+ @engine = engine
+ end
+
+ # Start to subscribe reply queue.
+ #
+ # @param [String] queue name
+ def start(queue_name = 'remote.command.reply')
+ NATS.subscribe(queue_name) do |message, reply|
+ unpacked = MessagePack.unpack(message)
+ workitem = Ruote::Workitem.new(unpacked)
+
+ RuoteNATS.logger.info do
+ executor = workitem.lookup('params.executor') || 'RuoteNATS::ShellExecutor'
+ result = workitem.lookup("results.#{workitem.sid}")
+
+ "(#{workitem.sid}) receive reply: #{executor} #{workitem.result} (#{result})"
+ end
+
+ if workitem.result == 'success'
+ @engine.reply_to_engine(workitem)
+ else
+ handle_error(workitem)
+ end
+ end
+ end
+
+ private
+ def handle_error(workitem)
+ message = workitem.lookup("results.#{workitem.sid}.message")
+ backtrace = workitem.lookup("results.#{workitem.sid}.backtrace")
+
+ error = RuntimeError.new(message)
+ error.set_backtrace(backtrace)
+
+ error_handler = @engine.context.error_handler
+ error_handler.action_handle('error', workitem.to_h['fei'], error)
+ end
+ end
+end
55 lib/ruote-nats/shell_executor.rb
@@ -0,0 +1,55 @@
+module RuoteNATS
+ class ShellExecutor
+
+ # Execute shell command
+ #
+ # @param [Ruote::Workitem] workitem
+ # @return [Hash] the result of command execution
+ def execute(workitem)
+ if workitem.lookup('params.command')
+ out, status = invoke(workitem)
+
+ if status.success?
+ { out: out, status: status.exitstatus, finished_at: Ruote.now_to_utc_s }
+ else
+ raise "out: #{out}, status: #{status.exitstatus}, finished_at: #{Ruote.now_to_utc_s}"
+ end
+ else
+ workitem.result = 'failure'
+
+ message = 'command is not specified, check your process definition'
+ RuoteNATS.logger.error do
+ "(#{workitem.sid}) shell: #{message}"
+ end
+ raise message
+ end
+ end
+
+ private
+ def invoke(workitem)
+ params = workitem.lookup('params')
+ env = params['env'] || { }
+ command = params['command']
+
+ RuoteNATS.logger.info do
+ message = "(#{workitem.sid}) shell: `#{command}`"
+ message << " with env #{env.inspect}" if env
+ message
+ end
+
+ out, status = Open3.capture2e(env, command)
+
+ RuoteNATS.logger.info do
+ "(#{workitem.sid}) shell: `#{command}` returns #{status.exitstatus}"
+ end
+ if RuoteNATS.logger.debug?
+ out.each_line do |line|
+ RuoteNATS.logger.debug "(#{workitem.sid}) shell: #{line.chomp}"
+ end
+ end
+
+ return out, status
+ end
+
+ end
+end
3  lib/ruote-nats/version.rb
@@ -0,0 +1,3 @@
+module RuoteNATS
+ VERSION = "0.0.1"
+end
32 ruote-nats.gemspec
@@ -0,0 +1,32 @@
+# -*- mode: ruby; coding: utf-8 -*-
+
+require File.expand_path('../lib/ruote-nats/version', __FILE__)
+
+Gem::Specification.new do |gem|
+ gem.authors = ['Naoto Takai']
+ gem.email = ['takai@recompile.net']
+ gem.description = 'NATS participant and receivers for ruote'
+ gem.summary = 'ruote-nats is an implementation of the ruote participant and receivers ' \
+ 'to process workitem on remote host using NATS'
+ gem.homepage = 'https://github.com/takai/ruote-nats'
+
+ gem.files = `git ls-files`.split($\)
+ gem.executables = gem.files.grep(%r{^bin/}).map { |f| File.basename(f) }
+ gem.test_files = gem.files.grep(%r{^(test|spec|features)/})
+ gem.name = 'ruote-nats'
+ gem.require_paths = ['lib']
+ gem.version = RuoteNATS::VERSION
+
+ gem.add_dependency('ruote', '= 2.2.0')
+ gem.add_dependency('nats', '>= 0.4.22')
+ gem.add_dependency('msgpack', '>= 0.4.6')
+
+ gem.add_development_dependency('rspec')
+ gem.add_development_dependency('rake')
+ gem.add_development_dependency('simplecov')
+ gem.add_development_dependency('pry')
+ gem.add_development_dependency('pry-nav')
+ gem.add_development_dependency('yard')
+ gem.add_development_dependency('redcarpet')
+ gem.add_development_dependency('github-markup')
+end
57 spec/command_receiver_spec.rb
@@ -0,0 +1,57 @@
+require 'spec_helper'
+
+module RuoteNATS
+
+ class NoopExecutor
+ def execute(workitem)
+ { noop: true }
+ end
+ end
+
+ describe CommandReceiver do
+ let(:dispatcher) { CommandReceiver.new }
+ let(:workitem) do
+ Ruote::Workitem.new("fields" =>
+ { "params" =>
+ { "command" => "/bin/date",
+ "env" => {
+ "LANG" => "C" },
+ "ref" => "noop",
+ "executor" => "RuoteNATS::NoopExecutor" },
+ "dispatched_at" => "2000-01-01 11:11:11.111111 UTC" },
+ "fei" =>
+ { "engine_id" => "engine",
+ "wfid" => "20000101-abcdefg",
+ "subid" => "abcdefghijklmnopqrstu",
+ "expid" => "0_0" },
+ "participant_name" => "noop")
+ end
+
+ around :each do |example|
+ NATS.start(autostart: true) { example.run }
+ end
+
+ describe '#start' do
+ context do
+ it 'starts to subscribe queue' do
+ sid = NATS.subscribe('remote.command.reply') do |message|
+ unpacked = MessagePack.unpack(message)
+ workitem = Ruote::Workitem.new(unpacked)
+ workitem.lookup("results.#{workitem.sid}").should include("noop" => true)
+
+ NATS.stop
+ end
+ NATS.timeout(sid, 1, expected: 1) do
+ NATS.stop
+ fail "reply message is not sent"
+ end
+
+ subject.start
+ message = MessagePack.pack(workitem.to_h)
+ NATS.request('remote.command', message) do |reply|
+ end
+ end
+ end
+ end
+ end
+end
57 spec/participant_spec.rb
@@ -0,0 +1,57 @@
+require 'spec_helper'
+
+module RuoteNATS
+ describe Participant do
+ around :each do |example|
+ NATS.start(autostart: true) { example.run }
+ end
+
+ let(:workitem) do
+ Ruote::Workitem.new("fields" =>
+ { "params" =>
+ { "command" => "/bin/date",
+ "env" => {
+ "LANG" => "C" },
+ "ref" => "shell" },
+ "dispatched_at" => "2000-01-01 11:11:11.111111 UTC" },
+ "fei" =>
+ { "engine_id" => "engine",
+ "wfid" => "20000101-abcdefg",
+ "subid" => "abcdefghijklmnopqrstu",
+ "expid" => "0_0" },
+ "participant_name" => "shell")
+ end
+
+ describe '#consume' do
+ context 'send successfully' do
+ it 'sends command message' do
+ sid = NATS.subscribe('remote.command', queue: 'remote.command', max: 1) do |message, reply|
+ unpacked = MessagePack.unpack(message)
+ unpacked.should eq workitem.to_h
+
+ NATS.publish(reply, 'ACCEPT') do
+ NATS.stop
+ end
+ end
+ NATS.timeout(sid, 1, expected: 1) do
+ NATS.stop
+ fail "command message is not sent"
+ end
+
+ subject.consume(workitem)
+ end
+ end
+ end
+ context 'send successfully' do
+ before { subject.context = MockContext.new }
+
+ it 'sends command message' do
+ EM.add_timer(2) do
+ NATS.stop
+ fail "#handle_error must be called"
+ end
+ subject.consume(workitem)
+ end
+ end
+ end
+end
70 spec/reply_receiver_spec.rb
@@ -0,0 +1,70 @@
+require 'spec_helper'
+
+module RuoteNATS
+
+ describe ReplyReceiver do
+ around :each do |example|
+ NATS.start(autostart: true) { example.run }
+ end
+
+ let(:engine) { MockEngine.new }
+ let(:receiver) { ReplyReceiver.new(engine) }
+ let(:message) { MessagePack.pack(workitem.to_h) }
+
+ describe '#start' do
+ context 'success' do
+ let(:workitem) do
+ Ruote::Workitem.new("fields" =>
+ { "params" =>
+ { "executor" => "ReplyReceiverSpec" },
+ "dispatched_at" => "2000-01-01 11:11:11.111111 UTC",
+ "__result__" => "success" },
+ "fei" =>
+ { "engine_id" => "engine",
+ "wfid" => "20000101-abcdefg",
+ "subid" => "abcdefghijklmnopqrstu",
+ "expid" => "0_0" },
+ "participant_name" => "shell")
+ end
+
+ it 'replies to engine' do
+ receiver.start
+ NATS.publish('remote.command.reply', message)
+
+ EM.add_timer(1) do
+ fail "#reply_to_engine must be called"
+ end
+ end
+ end
+
+ context 'failure' do
+ let(:workitem) do
+ Ruote::Workitem.new("fields" =>
+ { "params" =>
+ { "executor" => "ReplyReceiverSpec" },
+ "dispatched_at" => "2000-01-01 11:11:11.111111 UTC",
+ "__result__" => "failure" },
+ "fei" =>
+ { "engine_id" => "engine",
+ "wfid" => "20000101-abcdefg",
+ "subid" => "abcdefghijklmnopqrstu",
+ "expid" => "0_0" },
+ "participant_name" => "shell")
+ end
+ it 'replies to engine' do
+ handler = double.as_null_object
+ engine.stub_chain(:context, :error_handler => handler)
+
+ receiver.start
+ NATS.publish('remote.command.reply', message)
+
+ EM.add_timer(1) do
+ fail "#handle_error must be called"
+ NATS.stop
+ end
+ end
+ end
+
+ end
+ end
+end
44 spec/shell_executor_spec.rb
@@ -0,0 +1,44 @@
+require 'spec_helper'
+
+module RuoteNATS
+ describe ShellExecutor do
+ describe '#execute' do
+ subject { ShellExecutor.new.execute(workitem) }
+
+ context 'with command and env fields' do
+ let(:workitem) do
+ Ruote::Workitem.new("fields" =>
+ { "params" =>
+ { "command" => "ruby -e 'print ENV[\"LANG\"]'",
+ "env" => {
+ "LANG" => "ja_JP.UTF-8" },
+ "ref" => "shell" },
+ "dispatched_at" => "2000-01-01 11:11:11.111111 UTC" },
+ "fei" =>
+ { "engine_id" => "engine",
+ "wfid" => "20000101-abcdefg",
+ "subid" => "abcdefghijklmnopqrstu",
+ "expid" => "0_0" },
+ "participant_name" => "shell")
+ end
+ it { should include(out: "ja_JP.UTF-8") }
+ end
+
+ context 'without command fields' do
+ let(:workitem) do
+ Ruote::Workitem.new("fields" =>
+ { "params" =>
+ { "ref" => "shell" },
+ "dispatched_at" => "2000-01-01 11:11:11.111111 UTC" },
+ "fei" =>
+ { "engine_id" => "engine",
+ "wfid" => "20000101-abcdefg",
+ "subid" => "abcdefghijklmnopqrstu",
+ "expid" => "0_0" },
+ "participant_name" => "send_command")
+ end
+ it { expect{ subject }.to raise_error(RuntimeError, 'command is not specified, check your process definition') }
+ end
+ end
+ end
+end
46 spec/spec_helper.rb
@@ -0,0 +1,46 @@
+# -*- mode:ruby; coding: utf-8 -*-
+
+require 'eventmachine'
+require 'simplecov'
+SimpleCov.start { add_filter 'spec' }
+
+require 'ruote-nats'
+
+RSpec.configure do |config|
+ config.before(:all) do
+ RuoteNATS.logger.level = Logger::DEBUG
+ end
+
+ config.after(:all) do
+ begin
+ pid = IO.read('/tmp/nats-server.pid')
+ `kill -TERM #{pid}`
+ rescue Errno::ENOENT
+ end
+ end
+end
+
+
+module RuoteNATS
+ class MockErrorHandler
+ def action_handle(action, fei, exception)
+ NATS.stop
+ end
+ end
+
+ class MockContext
+ def error_handler
+ MockErrorHandler.new
+ end
+ end
+
+ class MockEngine
+ def reply_to_engine(workitem)
+ NATS.stop
+ end
+
+ def context
+ MockContext.new
+ end
+ end
+end
Please sign in to comment.
Something went wrong with that request. Please try again.