diff --git a/lib/async/container/controller.rb b/lib/async/container/controller.rb index 5860109..813d312 100644 --- a/lib/async/container/controller.rb +++ b/lib/async/container/controller.rb @@ -46,6 +46,7 @@ class Controller def initialize(startup_duration: nil) @container = nil + # If there is no `NOTIFY_SOCKET` this will be nil: @notify = Notify::Client.open @signals = {} diff --git a/lib/async/container/forked/group.rb b/lib/async/container/forked/group.rb index 5717be6..ed88449 100644 --- a/lib/async/container/forked/group.rb +++ b/lib/async/container/forked/group.rb @@ -25,7 +25,7 @@ module Async module Container module Forked class Group < Async::Container::Group - def initialize + def initialize(**options) @pgid = nil super @@ -34,10 +34,13 @@ def initialize def spawn(*arguments, **options) self.yield - arguments = self.prepare_for_spawn(arguments) + # Arguments are modified in place. + Notify.before_spawn(@notify, arguments) pid = ::Process.spawn(*arguments, **options) + @context&.add(pid) + return Process.new(self, pid) end @@ -45,11 +48,13 @@ def fork(&block) self.yield pid = ::Process.fork do - self.after_fork + Notify.after_fork(@notify) yield end + @context&.add(pid) + return Process.new(self, pid) end @@ -110,6 +115,7 @@ def wait_one(blocking = true) return if !blocking && pid == nil + @context&.delete(pid) fiber = @running.delete(pid) if @running.empty? diff --git a/lib/async/container/forked/instance.rb b/lib/async/container/forked/instance.rb index 42c2ab7..95371d5 100644 --- a/lib/async/container/forked/instance.rb +++ b/lib/async/container/forked/instance.rb @@ -32,9 +32,9 @@ def name= name ::Process.setproctitle(@name) end - def exec(*arguments) - ::Process.exec(*arguments) - end + # def exec(*arguments) + # ::Process.exec(*arguments) + # end def to_s "\#<#{self.class}: #{@name}>" diff --git a/lib/async/container/group.rb b/lib/async/container/group.rb index 62a676d..c3c32c0 100644 --- a/lib/async/container/group.rb +++ b/lib/async/container/group.rb @@ -23,14 +23,21 @@ module Async module Container class Group - def initialize + def initialize(notify: false) @running = {} # This queue allows us to wait for processes to complete, without spawning new processes as a result. @queue = nil - @notify = Notify::Server.open - @context = @notify.bind + if notify == true + @notify = Notify::Server.open + elsif notify + @notify = notify + else + @notify = nil + end + + @context = @notify&.context end def any? @@ -41,16 +48,12 @@ def empty? @running.empty? end - def pids - @running.keys - end - # This method sleeps for the specified duration, then def sleep(duration) self.resume self.suspend - self.wait_for_children(self.pids, duration || 1) + self.wait_for_children(duration) # This waits for any process to exit. while self.wait_one(false) @@ -89,24 +92,16 @@ def close protected - def after_fork - ENV.update(@notify.export) - end - - def prepare_for_spawn(arguments) - if arguments.first.is_a?(Hash) - arguments[0] = arguments[0].merge(@notify.export) - else - arguments.unshift(@notify.export) + def wait_for_children(duration) + if @notify + self.wait_until_ready(duration) + elsif duration + Kernel::sleep(duration) end - - return arguments end - def wait_for_children(pids, duration) - @context.clear - - puts "Waiting on #{self.pids}" + def wait_until_ready(duration) + puts "Waiting on #{@context.pids}" Sync do |task| waiting_task = nil @@ -117,7 +112,7 @@ def wait_for_children(pids, duration) yield message - break if @context.ready?(self.pids) + break if @context.ready? end waiting_task&.stop diff --git a/lib/async/container/notify.rb b/lib/async/container/notify.rb index 36a2abd..d74d51a 100644 --- a/lib/async/container/notify.rb +++ b/lib/async/container/notify.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true +# # Copyright, 2020, by Samuel G. D. Williams. # # Permission is hereby granted, free of charge, to any person obtaining a copy @@ -28,6 +30,7 @@ module Async module Container module Notify + NOTIFY_SOCKET = 'NOTIFY_SOCKET' MAXIMUM_MESSAGE_SIZE = 4096 def self.load(message) @@ -38,8 +41,35 @@ def self.load(message) ] end + # Sets or clears NOTIFY_SOCKET environment variable depending on whether the server exists (and in theory bound). + def self.after_fork(server, environment = ENV) + if server + # Set the environment variable: + environment[NOTIFY_SOCKET] = server.path + else + # Unset the environment variable (this doesn't actually set it to nil): + environment[NOTIFY_SOCKET] = nil + end + + return environment + end + + # Inserts or duplicates the environment given an argument array. + # Sets or clears it in a way that is suitable for {::Process.spawn}. + def self.before_spawn(server, arguments) + if arguments.first.is_a?(Hash) + environment = arguments.first = arguments.first.dup + else + arguments.unshift(environment = Hash.new) + end + + after_fork(server, arguments.first) + + return arguments + end + class Client - def self.open(path = ENV['NOTIFY_SOCKET']) + def self.open(path = ENV[NOTIFY_SOCKET]) if path self.new( IO::Endpoint.unix(path, Socket::SOCK_DGRAM) @@ -107,10 +137,6 @@ def initialize(path) attr :path - def export - {'NOTIFY_SOCKET' => @path} - end - def bind Context.new(@path) end @@ -133,6 +159,24 @@ def clear @status.clear end + def pids + @state.keys + end + + def add(pid) + @state[pid] = :preparing + end + + def fail(pid, reason = nil) + @state[pid] = :failed + @status[pid] = reason + end + + def remove(pid) + @state.delete(pid) + @status.delete(pid) + end + attr :state attr :status @@ -142,7 +186,7 @@ def update(pid, message) end if message['RELOADING'] == '1' - @state[pid] = :reloading + @state[pid] = :preparing end if message['READY'] == '1' @@ -155,7 +199,7 @@ def update(pid, message) end def ready?(pids) - pids.all?{|pid| @state[pid] == :ready} + pids.all?{|pid| @state[pid] != :preparing} end def close diff --git a/lib/async/container/threaded/group.rb b/lib/async/container/threaded/group.rb index 0dce368..8c253bb 100644 --- a/lib/async/container/threaded/group.rb +++ b/lib/async/container/threaded/group.rb @@ -28,32 +28,28 @@ module Async module Container module Threaded class Group < Async::Container::Group - def initialize + def initialize(**options) @finished = Thread::Queue.new @pids = Set.new super end - def self.pids - @pids - end - def finished(*arguments) @finished.push(arguments) end def spawn(*arguments, **options) - arguments = prepare_for_spawn(arguments) + # Arguments are modified in place: + Notify.before_spawn(@notify, arguments) + + pid = ::Process.spawn(*arguments) + @context&.add(pid) self.fork do begin - pid = ::Process.spawn(*arguments) - @pids.add(pid) - ::Process.waitpid(pid) ensure - @pids.delete(pid) ::Process.kill(:TERM, pid) end end diff --git a/lib/async/container/threaded/instance.rb b/lib/async/container/threaded/instance.rb index ab4417b..25ffe9d 100644 --- a/lib/async/container/threaded/instance.rb +++ b/lib/async/container/threaded/instance.rb @@ -30,13 +30,13 @@ def name= value @thread.name = value end - def exec(*arguments) - pid = ::Process.spawn(*arguments) - - ::Process.waitpid(pid) - ensure - ::Process.kill(:TERM, pid) - end + # def exec(*arguments) + # pid = ::Process.spawn(*arguments) + # + # ::Process.waitpid(pid) + # ensure + # ::Process.kill(:TERM, pid) + # end def to_s "\#<#{self.class}: #{@thread.name}>"