Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get rid of Fibers in honour of EventMachine since it’s already reached version 1 milestone #1

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 82 additions & 45 deletions lib/vines/agent/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ def initialize(options)
*options.values_at(:domain, :password, :host, :port, :download, :conf)

certs = File.expand_path('certs', conf)
@permissions, @services, @sessions, @component = {}, {}, {}, nil
@permissions, @services, @sessions, @components, @component = {}, {}, {}, [], nil
@ready = false
@task_responses = {}
@mtx = Mutex.new

jid = Blather::JID.new(fqdn, domain, 'vines')
@stream = Blather::Client.setup(jid, password, host, port, certs)
Expand All @@ -37,13 +39,16 @@ def initialize(options)
end

@stream.register_handler(:ready) do
# prevent handler called twice
unless @ready
log.info("Connected #{@stream.jid} agent to #{host}:#{port}")
log.warn("Agent must run as root user to allow user switching") unless root?
@ready = true
startup
end
# [AM] making sure we are to init once
# unless @ready is not enough for an obvious reason
@mtx.synchronize {
unless @ready
log.info("Connected #{@stream.jid} agent to #{host}:#{port}")
log.warn("Agent must run as root user to allow user switching") unless root?
@ready = true
startup
end
}
end

@stream.register_handler(:subscription, :request?) do |node|
Expand Down Expand Up @@ -77,23 +82,14 @@ def start
@stream.run
end

# —————————————————————————————————————————————————————————————————————————
private
# —————————————————————————————————————————————————————————————————————————

# After the bot connects to the chat server, discover the component, send
# our ohai system description data, and initialize permissions.
def startup
cb = proc do |component|
if component
log.info("Found vines component at #{component}")
@component = component
send_system_info
request_permissions
else
log.info("Vines component not found, rediscovering . . .")
EM::Timer.new(10) { discover_component(&cb) }
end
end
discover_component(&cb)
discover_component
end

def version(node)
Expand Down Expand Up @@ -173,33 +169,48 @@ def fqdn
# The component broadcasting the http://getvines.com/protocol feature is our
# Vines service.
def discover_component
@components = []
disco = Blather::Stanza::DiscoItems.new
disco.to = @stream.jid.domain
@stream.write_with_handler(disco) do |result|
items = result.error? ? [] : result.items
Fiber.new do
# use fiber instead of EM::Iterator until EM 1.0.0 release
found = items.find {|item| component?(item.jid) }
yield found ? found.jid : nil
end.resume
end
end

# Return true if this JID is the Vines component with which we need to
# communicate. This method suspends the Fiber that calls it in order to
# turn the disco#info requests synchronous.
def component?(jid)
fiber = Fiber.current
info = Blather::Stanza::DiscoInfo.new
info.to = jid
@stream.write_with_handler(info) do |reply|
features = reply.error? ? [] : reply.features
found = !!features.find {|f| f.var == NS }
fiber.resume(found)
unless result.error?
info = Blather::Stanza::DiscoInfo.new
# iterate through disco result and collect ’em all
EM::Iterator.new(result.items).map proc{ |comp, it_disco|
info.to = comp.jid.domain
@stream.write_with_handler(info) do |reply|
unless reply.error?
# iterate through info results and collect ’em all
EM::Iterator.new(reply.features).map proc{ |f, it_info|
it_info.return f.var == NS ? comp : nil
}, proc{ |comps|
# we have collected all the info replies for the
# disco given, let’s proceed with the next
it_disco.return comps - [nil]
}
end
end
}, proc{ |compss|
# Well, we yielded all the discos, let's request perms etc
@components = compss.flatten.uniq

if !@components || @components.length < 1
log.info("Vines component not found, rediscovering…")
EM::Timer.new(30) { discover_component }
else
@component = @components[0].jid
log.info("Vines component found #{@component}")
if @components.length > 1
log.warn("Using one #{@component} out of #{@components.length} found")
end
send_system_info
request_permissions
end
}
end
end
Fiber.yield
end

# Download the list of unix user accounts and the JID's that are allowed
# to use them. This is used to determine if a change user command like
# +v user root+ is allowed.
Expand Down Expand Up @@ -238,8 +249,34 @@ def process_message(message)

return unless valid_user?(bare)
session = @sessions[full] ||= Shell.new(bare, @permissions)
session.run(message.body.strip) do |output|
@stream.write(reply(message, output, forward_to))

# [AM] Create a TickLoop to collect shell output and send
# back to recipient by portions. This is needed to
# implement non-blocking, but not annoying on the other hand
# service. E. g. “ls” im most cases will return immediately,
# while “ping google.com” will send back stanzas every second

session.on_output = lambda {|output| @task_responses[message.id] += output }
session.on_error = lambda {|error| @task_responses[message.id] += "⇒ #{error}" }

@task_responses[message.id] = ""
task_response_tickloop = EM.tick_loop do
unless @task_responses[message.id].empty?
@stream.write(reply(message, @task_responses[message.id], forward_to))
@task_responses[message.id] = ""
end
sleep 1
end
session.run(message.body.strip) do |output, exitstatus|
task_response_tickloop.stop
task_response_tickloop = nil
unless @task_responses[message.id].empty?
@stream.write(reply(message, @task_responses[message.id], forward_to))
end
@task_responses.delete message.id
if exitstatus && exitstatus != 0
@stream.write(reply(message, "#{exitstatus} ↵ #{message.body}", forward_to))
end
end
end

Expand All @@ -250,7 +287,7 @@ def reply(message, body, forward_to)
Blather::Stanza::Message.new(message.from, body).tap do |node|
node << node.document.create_element('jid', forward_to, xmlns: NS) if forward_to
node.thread = message.thread if message.thread
node.xhtml = '<span style="font-family:Menlo,Courier,monospace;"></span>'
node.xhtml = '<span style="font-family:Menlo,\'Ubuntu Mono\',Courier,monospace;"></span>'
span = node.xhtml_node.elements.first
body.each_line do |line|
span.add_child(Nokogiri::XML::Text.new(line.chomp, span.document))
Expand Down
22 changes: 16 additions & 6 deletions lib/vines/agent/shell.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,25 @@ module Agent
class Shell
include Vines::Log

attr_writer :permissions

attr_writer :permissions, :on_output, :on_error
# Create a new shell session to asynchronously execute commands for this
# JID. The JID is validated in the permissions Hash before executing
# commands.
def initialize(jid, permissions)
@jid, @permissions = jid, permissions
@user = allowed_users.first if allowed_users.size == 1
@on_output = nil
@on_error = nil
@commands = EM::Queue.new
process_command_queue
end

# Queue the shell command to run as soon as the currently executing tasks
# complete. Yields the shell output to the callback block.
# [AM] “v reset” command is supposed to be executed immediately
# to give an ability of interrupting
# (in general, interferring) the current shell task queue
def run(command, &callback)
if reset?(command)
callback.call(run_built_in(command))
Expand All @@ -46,8 +51,8 @@ def process_command_queue
run_in_slave(command[:command])
end
end
cb = proc do |output|
command[:callback].call(output)
cb = proc do |output, exitstatus|
command[:callback].call(output, exitstatus)
process_command_queue
end
EM.defer(op, cb)
Expand All @@ -58,13 +63,18 @@ def run_in_slave(command)
return "-> no user selected, run 'v user'" unless @user
log.info("Running #{command} as #{@user}")

spawn(@user) unless @shell
unless @shell
spawn(@user)
end
@shell.outproc = @on_output if @on_output
@shell.errproc = @on_error if @on_error

out, err = @shell.execute(command)
output = [].tap do |arr|
arr << out if out && !out.empty?
arr << err if err && !err.empty?
end.join("\n")
output.empty? ? '-> command completed' : output
return [output.empty? ? '-> command completed' : output, @shell.exitstatus]
rescue
close
'-> restarted shell'
Expand Down