/
amqp_node.rb
164 lines (136 loc) · 5.4 KB
/
amqp_node.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# RJR AMQP Endpoint
#
# Copyright (C) 2012 Mohammed Morsi <mo@morsi.org>
# Licensed under the AGPLv3+ http://www.gnu.org/licenses/agpl.txt
# establish client connection w/ specified args and invoke block w/
# newly created client, returning it after block terminates
require 'amqp'
require 'thread'
require 'rjr/node'
require 'rjr/message'
module RJR
# AMQP client node callback interface,
# send data back to client via AMQP.
class AMQPNodeCallback
def initialize(args = {})
@exchange = args[:exchange]
@exchange_lock = args[:exchange_lock]
@destination = args[:destination]
@message_headers = args[:headers]
@disconnected = false
@exchange_lock.synchronize{
# FIXME should disconnect all callbacks on_return
@exchange.on_return do |basic_return, metadata, payload|
puts "#{payload} was returned! reply_code = #{basic_return.reply_code}, reply_text = #{basic_return.reply_text}"
@disconnected = true
end
}
end
def invoke(callback_method, *data)
msg = RequestMessage.new :method => callback_method, :args => data, :headers => @message_headers
raise RJR::Errors::ConnectionError.new("client unreachable") if @disconnected
@exchange_lock.synchronize{
@exchange.publish(msg.to_s, :routing_key => @destination, :mandatory => true)
}
end
end
# AMQP node definition, listen for and invoke json-rpc requests over AMQP
class AMQPNode < RJR::Node
RJR_NODE_TYPE = :amqp
private
def handle_message(metadata, msg)
if RequestMessage.is_request_message?(msg)
reply_to = metadata.reply_to
# TODO should delete handler threads as they complete & should handle timeout
@thread_pool << ThreadPoolJob.new { handle_request(reply_to, msg) }
elsif ResponseMessage.is_response_message?(msg)
# TODO test message, make sure it is a response message
msg = ResponseMessage.new(:message => msg, :headers => @message_headers)
lock = @message_locks[msg.msg_id]
if lock
headers = @message_headers.merge(msg.headers)
res = Dispatcher.handle_response(msg.result)
lock << res
lock[0].synchronize { lock[1].signal }
end
end
end
def handle_request(reply_to, message)
msg = RequestMessage.new(:message => message, :headers => @message_headers)
headers = @message_headers.merge(msg.headers) # append request message headers
result = Dispatcher.dispatch_request(msg.jr_method,
:method_args => msg.jr_args,
:headers => headers,
:rjr_node_id => @node_id,
:rjr_node_type => RJR_NODE_TYPE,
:rjr_callback =>
AMQPNodeCallback.new(:exchange => @exchange,
:exchange_lock => @exchange_lock,
:destination => reply_to,
:headers => headers))
response = ResponseMessage.new(:id => msg.msg_id, :result => result, :headers => headers)
@exchange_lock.synchronize{
@exchange.publish(response.to_s, :routing_key => reply_to)
}
end
public
# initialize the node w/ the specified params
def initialize(args = {})
super(args)
@broker = args[:broker]
# tuple of message ids to locks/condition variables for the responses
# of those messages with optional result response
@message_locks = {}
end
# Initialize the amqp subsystem
def init_node
@conn = AMQP.connect(:host => @broker)
@conn.on_tcp_connection_failure { puts "OTCF #{@node_id}" }
### connect to qpid broker
@channel = AMQP::Channel.new(@conn)
# qpid constructs that will be created for node
@queue_name = "#{@node_id.to_s}-queue"
@queue = @channel.queue(@queue_name, :auto_delete => true)
@exchange = @channel.default_exchange
@exchange_lock = Mutex.new
end
# Instruct Node to start listening for and dispatching rpc requests
def listen
em_run do
init_node
# start receiving messages
@queue.subscribe do |metadata, msg|
handle_message(metadata, msg)
end
end
end
# Instructs node to send rpc request, and wait for / return response
def invoke_request(routing_key, rpc_method, *args)
req_mutex = Mutex.new
req_cv = ConditionVariable.new
message = RequestMessage.new :method => rpc_method,
:args => args,
:headers => @message_headers
em_run do
init_node
@message_locks[message.msg_id] = [req_mutex, req_cv]
# begin listening for result
@queue.subscribe do |metadata, msg|
handle_message(metadata, msg)
end
@exchange_lock.synchronize{
@exchange.publish(message.to_s, :routing_key => routing_key, :reply_to => @queue_name)
}
end
## wait for result
# TODO - make this optional, eg a non-blocking operation mode
# (allowing event handler registration to be run on success / fail / etc)
req_mutex.synchronize { req_cv.wait(req_mutex) }
result = @message_locks[message.msg_id][2]
@message_locks.delete(message.msg_id)
self.stop
self.join unless self.em_running?
return result
end
end
end