Skip to content

Commit

Permalink
Better support for readiness protocol.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Jan 26, 2020
1 parent 0445cfe commit 906f99b
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 54 deletions.
1 change: 1 addition & 0 deletions lib/async/container/controller.rb
Expand Up @@ -46,6 +46,7 @@ class Controller
def initialize(startup_duration: nil)
@container = nil

# If there is no `NOTIFY_SOCKET` this will be nil:
@notify = Notify::Client.open

@signals = {}
Expand Down
12 changes: 9 additions & 3 deletions lib/async/container/forked/group.rb
Expand Up @@ -25,7 +25,7 @@ module Async
module Container
module Forked
class Group < Async::Container::Group
def initialize
def initialize(**options)
@pgid = nil

super
Expand All @@ -34,22 +34,27 @@ def initialize
def spawn(*arguments, **options)
self.yield

arguments = self.prepare_for_spawn(arguments)
# Arguments are modified in place.
Notify.before_spawn(@notify, arguments)

pid = ::Process.spawn(*arguments, **options)

@context&.add(pid)

return Process.new(self, pid)
end

def fork(&block)
self.yield

pid = ::Process.fork do
self.after_fork
Notify.after_fork(@notify)

yield
end

@context&.add(pid)

return Process.new(self, pid)
end

Expand Down Expand Up @@ -110,6 +115,7 @@ def wait_one(blocking = true)

return if !blocking && pid == nil

@context&.delete(pid)
fiber = @running.delete(pid)

if @running.empty?
Expand Down
6 changes: 3 additions & 3 deletions lib/async/container/forked/instance.rb
Expand Up @@ -32,9 +32,9 @@ def name= name
::Process.setproctitle(@name)
end

def exec(*arguments)
::Process.exec(*arguments)
end
# def exec(*arguments)
# ::Process.exec(*arguments)
# end

def to_s
"\#<#{self.class}: #{@name}>"
Expand Down
43 changes: 19 additions & 24 deletions lib/async/container/group.rb
Expand Up @@ -23,14 +23,21 @@
module Async
module Container
class Group
def initialize
def initialize(notify: false)
@running = {}

# This queue allows us to wait for processes to complete, without spawning new processes as a result.
@queue = nil

@notify = Notify::Server.open
@context = @notify.bind
if notify == true
@notify = Notify::Server.open
elsif notify
@notify = notify
else
@notify = nil
end

@context = @notify&.context
end

def any?
Expand All @@ -41,16 +48,12 @@ def empty?
@running.empty?
end

def pids
@running.keys
end

# This method sleeps for the specified duration, then
def sleep(duration)
self.resume
self.suspend

self.wait_for_children(self.pids, duration || 1)
self.wait_for_children(duration)

# This waits for any process to exit.
while self.wait_one(false)
Expand Down Expand Up @@ -89,24 +92,16 @@ def close

protected

def after_fork
ENV.update(@notify.export)
end

def prepare_for_spawn(arguments)
if arguments.first.is_a?(Hash)
arguments[0] = arguments[0].merge(@notify.export)
else
arguments.unshift(@notify.export)
def wait_for_children(duration)
if @notify
self.wait_until_ready(duration)
elsif duration
Kernel::sleep(duration)
end

return arguments
end

def wait_for_children(pids, duration)
@context.clear

puts "Waiting on #{self.pids}"
def wait_until_ready(duration)
puts "Waiting on #{@context.pids}"

Sync do |task|
waiting_task = nil
Expand All @@ -117,7 +112,7 @@ def wait_for_children(pids, duration)

yield message

break if @context.ready?(self.pids)
break if @context.ready?
end

waiting_task&.stop
Expand Down
58 changes: 51 additions & 7 deletions lib/async/container/notify.rb
@@ -1,3 +1,5 @@
# frozen_string_literal: true
#
# Copyright, 2020, by Samuel G. D. Williams. <http://www.codeotaku.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
Expand Down Expand Up @@ -28,6 +30,7 @@
module Async
module Container
module Notify
NOTIFY_SOCKET = 'NOTIFY_SOCKET'
MAXIMUM_MESSAGE_SIZE = 4096

def self.load(message)
Expand All @@ -38,8 +41,35 @@ def self.load(message)
]
end

# Sets or clears NOTIFY_SOCKET environment variable depending on whether the server exists (and in theory bound).
def self.after_fork(server, environment = ENV)
if server
# Set the environment variable:
environment[NOTIFY_SOCKET] = server.path
else
# Unset the environment variable (this doesn't actually set it to nil):
environment[NOTIFY_SOCKET] = nil
end

return environment
end

# Inserts or duplicates the environment given an argument array.
# Sets or clears it in a way that is suitable for {::Process.spawn}.
def self.before_spawn(server, arguments)
if arguments.first.is_a?(Hash)
environment = arguments.first = arguments.first.dup
else
arguments.unshift(environment = Hash.new)
end

after_fork(server, arguments.first)

return arguments
end

class Client
def self.open(path = ENV['NOTIFY_SOCKET'])
def self.open(path = ENV[NOTIFY_SOCKET])
if path
self.new(
IO::Endpoint.unix(path, Socket::SOCK_DGRAM)
Expand Down Expand Up @@ -107,10 +137,6 @@ def initialize(path)

attr :path

def export
{'NOTIFY_SOCKET' => @path}
end

def bind
Context.new(@path)
end
Expand All @@ -133,6 +159,24 @@ def clear
@status.clear
end

def pids
@state.keys
end

def add(pid)
@state[pid] = :preparing
end

def fail(pid, reason = nil)
@state[pid] = :failed
@status[pid] = reason
end

def remove(pid)
@state.delete(pid)
@status.delete(pid)
end

attr :state
attr :status

Expand All @@ -142,7 +186,7 @@ def update(pid, message)
end

if message['RELOADING'] == '1'
@state[pid] = :reloading
@state[pid] = :preparing
end

if message['READY'] == '1'
Expand All @@ -155,7 +199,7 @@ def update(pid, message)
end

def ready?(pids)
pids.all?{|pid| @state[pid] == :ready}
pids.all?{|pid| @state[pid] != :preparing}
end

def close
Expand Down
16 changes: 6 additions & 10 deletions lib/async/container/threaded/group.rb
Expand Up @@ -28,32 +28,28 @@ module Async
module Container
module Threaded
class Group < Async::Container::Group
def initialize
def initialize(**options)
@finished = Thread::Queue.new
@pids = Set.new

super
end

def self.pids
@pids
end

def finished(*arguments)
@finished.push(arguments)
end

def spawn(*arguments, **options)
arguments = prepare_for_spawn(arguments)
# Arguments are modified in place:
Notify.before_spawn(@notify, arguments)

pid = ::Process.spawn(*arguments)
@context&.add(pid)

self.fork do
begin
pid = ::Process.spawn(*arguments)
@pids.add(pid)

::Process.waitpid(pid)
ensure
@pids.delete(pid)
::Process.kill(:TERM, pid)
end
end
Expand Down
14 changes: 7 additions & 7 deletions lib/async/container/threaded/instance.rb
Expand Up @@ -30,13 +30,13 @@ def name= value
@thread.name = value
end

def exec(*arguments)
pid = ::Process.spawn(*arguments)
::Process.waitpid(pid)
ensure
::Process.kill(:TERM, pid)
end
# def exec(*arguments)
# pid = ::Process.spawn(*arguments)
#
# ::Process.waitpid(pid)
# ensure
# ::Process.kill(:TERM, pid)
# end

def to_s
"\#<#{self.class}: #{@thread.name}>"
Expand Down

0 comments on commit 906f99b

Please sign in to comment.