Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extended supervisor with internal shell access. #178

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/beer/falcon.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#!/usr/bin/env falcon-host
# frozen_string_literal: true

load :rack, :self_signed_tls, :supervisor

rack 'beer.localhost', :self_signed_tls
load :rack, :self_signed_tls, :supervisor, :supervised

supervisor

rack 'beer.localhost', :self_signed_tls, :supervised
1 change: 1 addition & 0 deletions falcon.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Gem::Specification.new do |spec|
spec.add_dependency "async-http", "~> 0.56.0"
spec.add_dependency "async-http-cache", "~> 0.4.0"
spec.add_dependency "async-io", "~> 1.22"
spec.add_dependency "async-bus"
spec.add_dependency "build-environment", "~> 1.13"
spec.add_dependency "bundler"
spec.add_dependency "localhost", "~> 1.1"
Expand Down
1 change: 1 addition & 0 deletions gems.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

# gem "async-container", path: "../async-container"
# gem "async-websocket", path: "../async-websocket"
gem "async-bus", path: "../async-bus"
# gem "async-http", path: "../async-http"
# gem "protocol-http1", path: "../protocol-http1"
# gem "utopia-project", path: "../utopia-project"
Expand Down
25 changes: 19 additions & 6 deletions lib/falcon/command/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
require 'async'
require 'json'

require 'irb'
require 'delegate'

require 'async/io/stream'
require 'async/io/unix_endpoint'

Expand Down Expand Up @@ -64,12 +67,24 @@ def call(stream)
end
end

class Shell < Samovar::Command
self.description = "Connect a shell to the currently running supervisor."

# Send the metrics message to the supervisor and print the results.
def call(client)
client.connect do |connection|
binding.irb
end
end
end

# The nested command to execute.
# @name nested
# @attribute [Command]
nested :command, {
'restart' => Restart,
'metrics' => Metrics,
'shell' => Shell,
}, default: 'metrics'

# The endpoint the supervisor is bound to.
Expand All @@ -79,12 +94,10 @@ def endpoint

# Connect to the supervisor and execute the requested command.
def call
Async do
endpoint.connect do |socket|
stream = Async::IO::Stream.new(socket)

@command.call(stream)
end
Sync do
client = Async::Bus::Client.new(endpoint)

@command.call(client)
end
end
end
Expand Down
36 changes: 36 additions & 0 deletions lib/falcon/environments/supervised.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# frozen_string_literal: true

# Copyright, 2021, by Samuel G. D. Williams. <http://www.codeotaku.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

require_relative '../service/supervisor'

# A application process monitor environment.
#
# @scope Falcon Environments
# @name supervisor
environment(:supervised) do
# The endpoint the supervisor will bind to.
# @attribute [Async::IO::Endpoint]
supervisor_endpoint do
Async::IO::Endpoint.unix("supervisor.ipc")
end
end

7 changes: 6 additions & 1 deletion lib/falcon/service/application.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def middleware
# Number of instances to start.
# @returns [Integer | nil]
def count
@environment.evaluator.count
@environment.evaluator.count
end

# Preload any resources specified by the environment.
Expand Down Expand Up @@ -92,6 +92,11 @@ def setup(container)

server = Server.new(self.middleware, @bound_endpoint, protocol: protocol, scheme: scheme)

self.supervise do |connection|
connection.bind(:service, self)
connection.bind(:server, server)
end

server.run

instance.ready!
Expand Down
21 changes: 21 additions & 0 deletions lib/falcon/service/generic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

require 'async/bus/client'

module Falcon
module Service
# Captures the stateful behaviour of a specific service.
Expand Down Expand Up @@ -61,6 +63,25 @@ def logger
return Console.logger # .with(name: name)
end

def supervise(reconnect_timeout: 1.0, &block)
if endpoint = @evaluator.supervisor_endpoint
Console.logger.debug(self, "supervise: #{endpoint}")
client = Async::Bus::Client.new(endpoint)

Async do |task|
while true
begin
client.connect(true, &block)
rescue => error
Console.logger.error(self, error)
end

task.sleep(reconnect_timeout)
end
end
end
end

# Start the service.
def start
end
Expand Down
61 changes: 29 additions & 32 deletions lib/falcon/service/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@

require 'async/io/endpoint'
require 'async/io/shared_endpoint'
require 'async/bus/server'

require 'delegate'

module Falcon
module Service
Expand All @@ -44,31 +47,27 @@ def endpoint
@evaluator.endpoint
end

# Restart the process group that the supervisor belongs to.
def do_restart(message)
# Tell the parent of this process group to spin up a new process group/container.
# Wait for that to start accepting new connections.
# Stop accepting connections.
# Wait for existing connnections to drain.
# Terminate this process group.
signal = message[:signal] || :INT
class Interface
def initialize
@services = {}
end

Process.kill(signal, Process.ppid)
end

# Capture process metrics relating to the process group that the supervisor belongs to.
def do_metrics(message)
Process::Metrics::General.capture(pid: Process.ppid, ppid: Process.ppid)
end

# Handle an incoming request.
# @parameter message [Hash] The decoded message.
def handle(message)
case message[:please]
when 'restart'
self.do_restart(message)
when 'metrics'
self.do_metrics(message)
attr :services
# Restart the process group that the supervisor belongs to.
def restart(signal = :INT)
# Tell the parent of this process group to spin up a new process group/container.
# Wait for that to start accepting new connections.
# Stop accepting connections.
# Wait for existing connnections to drain.
# Terminate this process group.
Process.kill(signal, Process.ppid)
end

# Capture process metrics relating to the process group that the supervisor belongs to.
def metrics
Process::Metrics::General.capture(pid: Process.ppid, ppid: Process.ppid)
end
end

Expand All @@ -88,16 +87,14 @@ def start
def setup(container)
container.run(name: self.name, restart: true, count: 1) do |instance|
Async do
@bound_endpoint.accept do |peer|
stream = Async::IO::Stream.new(peer)

while message = stream.gets("\0")
response = handle(JSON.parse(message, symbolize_names: true))
stream.puts(response.to_json, separator: "\0")
end
end
server = Async::Bus::Server.new(@bound_endpoint)
interface = Interface.new

instance.ready!

server.accept do |connection|
connection.bind(:supervisor, interface)
end
end
end

Expand Down