From e9b1fe80cc2b126bb76ece4fef2d09d8d2cfb022 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Henrik=20Hauge=20Bj=C3=B8rnskov?= <19725+henrikbjorn@users.noreply.github.com> Date: Tue, 17 Mar 2026 11:54:56 +0100 Subject: [PATCH 1/3] Fix disconnect not closing TCP connection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit disconnect called close on the stream from a child fiber, which got interrupted by session_task.stop in the Server ensure block before the FD close completed — no TCP FIN was ever sent. Use close_write (shutdown SHUT_WR) instead, which is an atomic kernel syscall that sends the FIN immediately. The read_loop then sees EOF and exits naturally, letting the Server ensure close the stream from the owning fiber. This matches the pattern used by protocol-http1. Also passes options through Server to listener constructors and adds integration tests using a fake FreeSWITCH over real TCP sockets. --- lib/librevox/listener/base.rb | 2 +- lib/librevox/listener/outbound.rb | 2 +- lib/librevox/protocol/connection.rb | 6 ++ lib/librevox/server.rb | 5 +- test/integration/outbound_disconnect_test.rb | 103 +++++++++++++++++++ 5 files changed, 114 insertions(+), 4 deletions(-) create mode 100644 test/integration/outbound_disconnect_test.rb diff --git a/lib/librevox/listener/base.rb b/lib/librevox/listener/base.rb index 887b3ad..d2016d8 100644 --- a/lib/librevox/listener/base.rb +++ b/lib/librevox/listener/base.rb @@ -85,7 +85,7 @@ def run_session end def disconnect - @connection&.close + @connection&.close_write end private diff --git a/lib/librevox/listener/outbound.rb b/lib/librevox/listener/outbound.rb index 8f585fb..0f62287 100644 --- a/lib/librevox/listener/outbound.rb +++ b/lib/librevox/listener/outbound.rb @@ -39,7 +39,7 @@ def application(app, args = nil, **params) def session_initiated end - def initialize(connection) + def initialize(connection, options = {}) super(connection) @session = nil @app_complete_queue = Async::Queue.new diff --git a/lib/librevox/protocol/connection.rb b/lib/librevox/protocol/connection.rb index df32931..bf72275 100644 --- a/lib/librevox/protocol/connection.rb +++ b/lib/librevox/protocol/connection.rb @@ -35,6 +35,12 @@ def send_message(msg) @stream.flush end + def close_write + @stream.close_write + rescue IOError, Errno::ENOTCONN + # Already closed or not connected + end + def close return if @stream.closed? diff --git a/lib/librevox/server.rb b/lib/librevox/server.rb index 3d24161..5c469a4 100644 --- a/lib/librevox/server.rb +++ b/lib/librevox/server.rb @@ -4,9 +4,10 @@ module Librevox class Server - def initialize(handler, endpoint) + def initialize(handler, endpoint, **options) @handler = handler @endpoint = endpoint + @options = options end attr :endpoint @@ -15,7 +16,7 @@ def accept(socket, _address) stream = IO::Stream(socket) connection = Protocol::Connection.new(stream) - listener = @handler.new(connection) + listener = @handler.new(connection, @options) session_task = Async { listener.run_session } connection.read_loop { |msg| listener.receive_message(msg) } diff --git a/test/integration/outbound_disconnect_test.rb b/test/integration/outbound_disconnect_test.rb new file mode 100644 index 0000000..f8806f3 --- /dev/null +++ b/test/integration/outbound_disconnect_test.rb @@ -0,0 +1,103 @@ +# frozen_string_literal: true + +require_relative '../test_helper' + +require 'librevox/listener/outbound' +require 'librevox/server' +require 'async' +require 'io/endpoint' +require 'io/stream' + +class DisconnectFromSessionListener < Librevox::Listener::Outbound + def session_initiated + disconnect + end +end + +class DisconnectFromEventListener < Librevox::Listener::Outbound + event(:channel_hangup) { disconnect } +end + +module DisconnectTestHelpers + def start_server(listener_class) + tcp_server = TCPServer.new("127.0.0.1", 0) + port = tcp_server.local_address.ip_port + tcp_server.close + + thread = Thread.new do + Sync do + endpoint = IO::Endpoint.tcp("127.0.0.1", port) + server = Librevox::Server.new(listener_class, endpoint) + server.run + end + end + + sleep 0.1 + [port, thread] + end + + def fake_fs_connect(port) + socket = TCPSocket.new("127.0.0.1", port) + + # connect + msg = socket.gets("\n\n") + assert_equal "connect", msg&.strip + socket.write("Content-Type: command/reply\nCaller-Caller-Id-Number: 8675309\nUnique-ID: 1234\n\n") + + # myevents + msg = socket.gets("\n\n") + assert_equal "myevents", msg&.strip + socket.write("Content-Type: command/reply\nReply-Text: +OK Events Enabled\n\n") + + # linger + msg = socket.gets("\n\n") + assert_equal "linger", msg&.strip + socket.write("Content-Type: command/reply\nReply-Text: +OK will linger\n\n") + + socket + end + + def assert_connection_closed(socket) + ready = IO.select([socket], nil, nil, 3) + if ready + data = socket.read_nonblock(1024, exception: false) + assert(data.nil? || data == "", + "Expected EOF but got: #{data.inspect}") + else + flunk "Timed out waiting for connection to close — disconnect did not work" + end + end +end + +class TestDisconnectFromSession < Minitest::Test + include DisconnectTestHelpers + + def test_disconnect_from_session_initiated + port, server_thread = start_server(DisconnectFromSessionListener) + socket = fake_fs_connect(port) + assert_connection_closed(socket) + ensure + socket&.close + server_thread&.kill + server_thread&.join(1) + end +end + +class TestDisconnectFromEvent < Minitest::Test + include DisconnectTestHelpers + + def test_disconnect_from_event_hook + port, server_thread = start_server(DisconnectFromEventListener) + socket = fake_fs_connect(port) + + # Send a CHANNEL_HANGUP event to trigger the hook that calls disconnect + body = "Event-Name: CHANNEL_HANGUP" + socket.write("Content-Type: text/event-plain\nContent-Length: #{body.size}\n\n#{body}") + + assert_connection_closed(socket) + ensure + socket&.close + server_thread&.kill + server_thread&.join(1) + end +end From 71dc5ebcb3a7c2683dab661257bbfb24943ae2c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Henrik=20Hauge=20Bj=C3=B8rnskov?= <19725+henrikbjorn@users.noreply.github.com> Date: Tue, 17 Mar 2026 14:24:16 +0100 Subject: [PATCH 2/3] Handle error replies and harden ESL response parsing - Add Response#error? predicate for -ERR replies - Raise ResponseError in send_message so callers get error handling - Rescue unhandled errors in run_session to log and end cleanly - URL-decode all content values, not just events (connect replies too) - Sanitize header names to valid symbol chars (gsub non-alphanumeric) - Suppress Ruby experimental IO::Buffer warning in tests --- lib/librevox.rb | 2 + lib/librevox/listener/base.rb | 4 +- lib/librevox/listener/outbound.rb | 2 + lib/librevox/protocol/response.rb | 8 +- .../librevox/listener/outbound_error_test.rb | 91 +++++++++++++++++++ test/test_helper.rb | 1 + test/unit/librevox/response_test.rb | 29 +++++- 7 files changed, 132 insertions(+), 5 deletions(-) create mode 100644 test/functional/librevox/listener/outbound_error_test.rb diff --git a/lib/librevox.rb b/lib/librevox.rb index 9cdf102..dade947 100644 --- a/lib/librevox.rb +++ b/lib/librevox.rb @@ -4,6 +4,8 @@ require 'librevox/version' module Librevox + class ResponseError < StandardError; end + autoload :Client, 'librevox/client' autoload :CommandSocket, 'librevox/command_socket' autoload :Commands, 'librevox/commands' diff --git a/lib/librevox/listener/base.rb b/lib/librevox/listener/base.rb index d2016d8..cf45ac1 100644 --- a/lib/librevox/listener/base.rb +++ b/lib/librevox/listener/base.rb @@ -51,7 +51,9 @@ def api def send_message(msg) @command_mutex.acquire do @connection.send_message(msg) - @reply_queue.dequeue + reply = @reply_queue.dequeue + raise Librevox::ResponseError, reply.headers[:reply_text] if reply.error? + reply end end diff --git a/lib/librevox/listener/outbound.rb b/lib/librevox/listener/outbound.rb index 0f62287..b6f8cd0 100644 --- a/lib/librevox/listener/outbound.rb +++ b/lib/librevox/listener/outbound.rb @@ -50,6 +50,8 @@ def run_session send_message "myevents" send_message "linger" session_initiated + rescue Librevox::ResponseError => e + Librevox.logger.error "Session error: #{e.message}" end def handle_response diff --git a/lib/librevox/protocol/response.rb b/lib/librevox/protocol/response.rb index b6944ce..85ec3ee 100644 --- a/lib/librevox/protocol/response.rb +++ b/lib/librevox/protocol/response.rb @@ -32,6 +32,10 @@ def reply? api_response? || command_reply? end + def error? + reply? && headers[:reply_text]&.start_with?("-ERR") + end + private def parse_headers(headers) @@ -42,7 +46,7 @@ def parse_content(content) return content unless content.include?(":") headers, body = content.split("\n\n", 2) - parse_kv(headers, decode: event?).merge(body: body || "") + parse_kv(headers, decode: true).merge(body: body || "") end def parse_kv(string, decode: false) @@ -51,7 +55,7 @@ def parse_kv(string, decode: false) name, value = line.split(':', 2) next unless value value = URI::RFC2396_PARSER.unescape(value) if decode - hash[name.downcase.tr('-', '_').to_sym] = value.strip + hash[name.downcase.gsub(/[^a-z0-9_]/, '_').to_sym] = value.strip end hash end diff --git a/test/functional/librevox/listener/outbound_error_test.rb b/test/functional/librevox/listener/outbound_error_test.rb new file mode 100644 index 0000000..fe0ece6 --- /dev/null +++ b/test/functional/librevox/listener/outbound_error_test.rb @@ -0,0 +1,91 @@ +# frozen_string_literal: true + +require_relative '../../../test_helper' + +require 'librevox/listener/outbound' + +class OutboundListenerWithUnhandledErrorApp < Librevox::Listener::Outbound + def session_initiated + sample_app "fail" + end +end + +class OutboundListenerWithErrorApp < Librevox::Listener::Outbound + attr_reader :error + + def session_initiated + sample_app "fail" + rescue Librevox::ResponseError => e + @error = e + end +end + +class TestOutboundApplicationError < Minitest::Test + prepend Librevox::Test::AsyncTest + include OutboundSetupHelpers + include Librevox::Test::Matchers + + def setup + @listener = OutboundListenerWithErrorApp.new(MockConnection.new) + @session_task = Async { @listener.run_session } + + command_reply "Establish-Session" => "OK", + "Unique-ID" => "1234" + event_and_linger_replies + 3.times { @listener.outgoing_data.shift } + end + + def teardown + @session_task&.stop + super + end + + def test_application_raises_on_error_reply + assert_send_application @listener, "fail" + + # sendmsg ack with error — raises instead of blocking on app_complete_queue + command_reply "Reply-Text" => "-ERR invalid command" + + assert_instance_of Librevox::ResponseError, @listener.error + assert_equal "-ERR invalid command", @listener.error.message + end +end + +class TestOutboundUnhandledApplicationError < Minitest::Test + prepend Librevox::Test::AsyncTest + include OutboundSetupHelpers + include Librevox::Test::Matchers + + def setup + @listener = OutboundListenerWithUnhandledErrorApp.new(MockConnection.new) + @session_task = Async { @listener.run_session } + + command_reply "Establish-Session" => "OK", + "Unique-ID" => "1234" + event_and_linger_replies + 3.times { @listener.outgoing_data.shift } + end + + def teardown + @session_task&.stop + super + end + + def test_unhandled_error_ends_session_cleanly + assert_send_application @listener, "fail" + + log = StringIO.new + original_logger = Librevox.logger + Librevox.logger = Logger.new(log) + + # sendmsg ack with error — run_session rescues and logs, no crash + command_reply "Reply-Text" => "-ERR invalid command" + + # session task completes without raising + @session_task.wait + + assert_match(/-ERR invalid command/, log.string) + ensure + Librevox.logger = original_logger + end +end diff --git a/test/test_helper.rb b/test/test_helper.rb index d2e71ee..b146af4 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -1,5 +1,6 @@ # frozen_string_literal: true +Warning[:experimental] = false require 'minitest/autorun' require 'async' require 'librevox' diff --git a/test/unit/librevox/response_test.rb b/test/unit/librevox/response_test.rb index d18a3ec..94b4a23 100644 --- a/test/unit/librevox/response_test.rb +++ b/test/unit/librevox/response_test.rb @@ -73,13 +73,38 @@ def test_url_decode_preserves_literal_plus assert_equal "hello+world", response.content[:some_header] end - def test_does_not_url_decode_non_event_content + def test_url_decode_non_event_content response = Librevox::Protocol::Response.new("Content-Type: command/reply", "Reply-Text: %2BOK") - assert_equal "%2BOK", response.content[:reply_text] + assert_equal "+OK", response.content[:reply_text] end def test_does_not_url_decode_headers response = Librevox::Protocol::Response.new("Content-Type: text%2Fevent-plain", "") assert_equal "text%2Fevent-plain", response.headers[:content_type] end + + def test_error_on_command_reply_with_err + response = Librevox::Protocol::Response.new("Content-Type: command/reply\nReply-Text: -ERR invalid command", "") + assert response.error? + end + + def test_error_on_api_response_with_err + response = Librevox::Protocol::Response.new("Content-Type: api/response\nReply-Text: -ERR no such command", "") + assert response.error? + end + + def test_not_error_on_ok_reply + response = Librevox::Protocol::Response.new("Content-Type: command/reply\nReply-Text: +OK", "") + refute response.error? + end + + def test_not_error_on_event + response = Librevox::Protocol::Response.new("Content-Type: text/event-plain", "Event-Name: Hangup") + refute response.error? + end + + def test_not_error_on_reply_without_reply_text + response = Librevox::Protocol::Response.new("Content-Type: command/reply", "Foo: Bar") + refute response.error? + end end From f4dc90fb04e8331b347be4f144ef1de41bda1a05 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Henrik=20Hauge=20Bj=C3=B8rnskov?= <19725+henrikbjorn@users.noreply.github.com> Date: Wed, 18 Mar 2026 09:55:55 +0100 Subject: [PATCH 3/3] Add note about CUSTOM event subclass requirement --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index ddc9320..c377f6b 100644 --- a/README.md +++ b/README.md @@ -86,6 +86,8 @@ class MyInbound < Librevox::Listener::Inbound end ``` +**Note on CUSTOM events:** FreeSWITCH custom events have a subclass name (e.g. `CUSTOM conference::maintenance`). You must include both the event name and subclass — `events ['CUSTOM conference::maintenance']`. Using just `events ['CUSTOM']` will not match any custom events. + ## Outbound Listener Subclass `Librevox::Listener::Outbound` to create an outbound listener. FreeSWITCH connects to it when a call hits a socket application in the dialplan.