Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ class MyInbound < Librevox::Listener::Inbound
end
```

**Note on CUSTOM events:** FreeSWITCH custom events have a subclass name (e.g. `CUSTOM conference::maintenance`). You must include both the event name and subclass — `events ['CUSTOM conference::maintenance']`. Using just `events ['CUSTOM']` will not match any custom events.

## Outbound Listener

Subclass `Librevox::Listener::Outbound` to create an outbound listener. FreeSWITCH connects to it when a call hits a socket application in the dialplan.
Expand Down
2 changes: 2 additions & 0 deletions lib/librevox.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
require 'librevox/version'

module Librevox
class ResponseError < StandardError; end

autoload :Client, 'librevox/client'
autoload :CommandSocket, 'librevox/command_socket'
autoload :Commands, 'librevox/commands'
Expand Down
6 changes: 4 additions & 2 deletions lib/librevox/listener/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ def api
def send_message(msg)
@command_mutex.acquire do
@connection.send_message(msg)
@reply_queue.dequeue
reply = @reply_queue.dequeue
raise Librevox::ResponseError, reply.headers[:reply_text] if reply.error?
reply
end
end

Expand Down Expand Up @@ -85,7 +87,7 @@ def run_session
end

def disconnect
@connection&.close
@connection&.close_write
end

private
Expand Down
4 changes: 3 additions & 1 deletion lib/librevox/listener/outbound.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def application(app, args = nil, **params)
def session_initiated
end

def initialize(connection)
def initialize(connection, options = {})
super(connection)
@session = nil
@app_complete_queue = Async::Queue.new
Expand All @@ -50,6 +50,8 @@ def run_session
send_message "myevents"
send_message "linger"
session_initiated
rescue Librevox::ResponseError => e
Librevox.logger.error "Session error: #{e.message}"
end

def handle_response
Expand Down
6 changes: 6 additions & 0 deletions lib/librevox/protocol/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ def send_message(msg)
@stream.flush
end

def close_write
@stream.close_write
rescue IOError, Errno::ENOTCONN
# Already closed or not connected
end

def close
return if @stream.closed?

Expand Down
8 changes: 6 additions & 2 deletions lib/librevox/protocol/response.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ def reply?
api_response? || command_reply?
end

def error?
reply? && headers[:reply_text]&.start_with?("-ERR")
end

private

def parse_headers(headers)
Expand All @@ -42,7 +46,7 @@ def parse_content(content)
return content unless content.include?(":")

headers, body = content.split("\n\n", 2)
parse_kv(headers, decode: event?).merge(body: body || "")
parse_kv(headers, decode: true).merge(body: body || "")
end

def parse_kv(string, decode: false)
Expand All @@ -51,7 +55,7 @@ def parse_kv(string, decode: false)
name, value = line.split(':', 2)
next unless value
value = URI::RFC2396_PARSER.unescape(value) if decode
hash[name.downcase.tr('-', '_').to_sym] = value.strip
hash[name.downcase.gsub(/[^a-z0-9_]/, '_').to_sym] = value.strip
end
hash
end
Expand Down
5 changes: 3 additions & 2 deletions lib/librevox/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@

module Librevox
class Server
def initialize(handler, endpoint)
def initialize(handler, endpoint, **options)
@handler = handler
@endpoint = endpoint
@options = options
end

attr :endpoint
Expand All @@ -15,7 +16,7 @@ def accept(socket, _address)
stream = IO::Stream(socket)
connection = Protocol::Connection.new(stream)

listener = @handler.new(connection)
listener = @handler.new(connection, @options)

session_task = Async { listener.run_session }
connection.read_loop { |msg| listener.receive_message(msg) }
Expand Down
91 changes: 91 additions & 0 deletions test/functional/librevox/listener/outbound_error_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# frozen_string_literal: true

require_relative '../../../test_helper'

require 'librevox/listener/outbound'

class OutboundListenerWithUnhandledErrorApp < Librevox::Listener::Outbound
def session_initiated
sample_app "fail"
end
end

class OutboundListenerWithErrorApp < Librevox::Listener::Outbound
attr_reader :error

def session_initiated
sample_app "fail"
rescue Librevox::ResponseError => e
@error = e
end
end

class TestOutboundApplicationError < Minitest::Test
prepend Librevox::Test::AsyncTest
include OutboundSetupHelpers
include Librevox::Test::Matchers

def setup
@listener = OutboundListenerWithErrorApp.new(MockConnection.new)
@session_task = Async { @listener.run_session }

command_reply "Establish-Session" => "OK",
"Unique-ID" => "1234"
event_and_linger_replies
3.times { @listener.outgoing_data.shift }
end

def teardown
@session_task&.stop
super
end

def test_application_raises_on_error_reply
assert_send_application @listener, "fail"

# sendmsg ack with error — raises instead of blocking on app_complete_queue
command_reply "Reply-Text" => "-ERR invalid command"

assert_instance_of Librevox::ResponseError, @listener.error
assert_equal "-ERR invalid command", @listener.error.message
end
end

class TestOutboundUnhandledApplicationError < Minitest::Test
prepend Librevox::Test::AsyncTest
include OutboundSetupHelpers
include Librevox::Test::Matchers

def setup
@listener = OutboundListenerWithUnhandledErrorApp.new(MockConnection.new)
@session_task = Async { @listener.run_session }

command_reply "Establish-Session" => "OK",
"Unique-ID" => "1234"
event_and_linger_replies
3.times { @listener.outgoing_data.shift }
end

def teardown
@session_task&.stop
super
end

def test_unhandled_error_ends_session_cleanly
assert_send_application @listener, "fail"

log = StringIO.new
original_logger = Librevox.logger
Librevox.logger = Logger.new(log)

# sendmsg ack with error — run_session rescues and logs, no crash
command_reply "Reply-Text" => "-ERR invalid command"

# session task completes without raising
@session_task.wait

assert_match(/-ERR invalid command/, log.string)
ensure
Librevox.logger = original_logger
end
end
103 changes: 103 additions & 0 deletions test/integration/outbound_disconnect_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# frozen_string_literal: true

require_relative '../test_helper'

require 'librevox/listener/outbound'
require 'librevox/server'
require 'async'
require 'io/endpoint'
require 'io/stream'

class DisconnectFromSessionListener < Librevox::Listener::Outbound
def session_initiated
disconnect
end
end

class DisconnectFromEventListener < Librevox::Listener::Outbound
event(:channel_hangup) { disconnect }
end

module DisconnectTestHelpers
def start_server(listener_class)
tcp_server = TCPServer.new("127.0.0.1", 0)
port = tcp_server.local_address.ip_port
tcp_server.close

thread = Thread.new do
Sync do
endpoint = IO::Endpoint.tcp("127.0.0.1", port)
server = Librevox::Server.new(listener_class, endpoint)
server.run
end
end

sleep 0.1
[port, thread]
end

def fake_fs_connect(port)
socket = TCPSocket.new("127.0.0.1", port)

# connect
msg = socket.gets("\n\n")
assert_equal "connect", msg&.strip
socket.write("Content-Type: command/reply\nCaller-Caller-Id-Number: 8675309\nUnique-ID: 1234\n\n")

# myevents
msg = socket.gets("\n\n")
assert_equal "myevents", msg&.strip
socket.write("Content-Type: command/reply\nReply-Text: +OK Events Enabled\n\n")

# linger
msg = socket.gets("\n\n")
assert_equal "linger", msg&.strip
socket.write("Content-Type: command/reply\nReply-Text: +OK will linger\n\n")

socket
end

def assert_connection_closed(socket)
ready = IO.select([socket], nil, nil, 3)
if ready
data = socket.read_nonblock(1024, exception: false)
assert(data.nil? || data == "",
"Expected EOF but got: #{data.inspect}")
else
flunk "Timed out waiting for connection to close — disconnect did not work"
end
end
end

class TestDisconnectFromSession < Minitest::Test
include DisconnectTestHelpers

def test_disconnect_from_session_initiated
port, server_thread = start_server(DisconnectFromSessionListener)
socket = fake_fs_connect(port)
assert_connection_closed(socket)
ensure
socket&.close
server_thread&.kill
server_thread&.join(1)
end
end

class TestDisconnectFromEvent < Minitest::Test
include DisconnectTestHelpers

def test_disconnect_from_event_hook
port, server_thread = start_server(DisconnectFromEventListener)
socket = fake_fs_connect(port)

# Send a CHANNEL_HANGUP event to trigger the hook that calls disconnect
body = "Event-Name: CHANNEL_HANGUP"
socket.write("Content-Type: text/event-plain\nContent-Length: #{body.size}\n\n#{body}")

assert_connection_closed(socket)
ensure
socket&.close
server_thread&.kill
server_thread&.join(1)
end
end
1 change: 1 addition & 0 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# frozen_string_literal: true

Warning[:experimental] = false
require 'minitest/autorun'
require 'async'
require 'librevox'
Expand Down
29 changes: 27 additions & 2 deletions test/unit/librevox/response_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,38 @@ def test_url_decode_preserves_literal_plus
assert_equal "hello+world", response.content[:some_header]
end

def test_does_not_url_decode_non_event_content
def test_url_decode_non_event_content
response = Librevox::Protocol::Response.new("Content-Type: command/reply", "Reply-Text: %2BOK")
assert_equal "%2BOK", response.content[:reply_text]
assert_equal "+OK", response.content[:reply_text]
end

def test_does_not_url_decode_headers
response = Librevox::Protocol::Response.new("Content-Type: text%2Fevent-plain", "")
assert_equal "text%2Fevent-plain", response.headers[:content_type]
end

def test_error_on_command_reply_with_err
response = Librevox::Protocol::Response.new("Content-Type: command/reply\nReply-Text: -ERR invalid command", "")
assert response.error?
end

def test_error_on_api_response_with_err
response = Librevox::Protocol::Response.new("Content-Type: api/response\nReply-Text: -ERR no such command", "")
assert response.error?
end

def test_not_error_on_ok_reply
response = Librevox::Protocol::Response.new("Content-Type: command/reply\nReply-Text: +OK", "")
refute response.error?
end

def test_not_error_on_event
response = Librevox::Protocol::Response.new("Content-Type: text/event-plain", "Event-Name: Hangup")
refute response.error?
end

def test_not_error_on_reply_without_reply_text
response = Librevox::Protocol::Response.new("Content-Type: command/reply", "Foo: Bar")
refute response.error?
end
end
Loading