-
Notifications
You must be signed in to change notification settings - Fork 196
/
dispatcher.rb
139 lines (113 loc) · 4.21 KB
/
dispatcher.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# require 'ruby-prof'
require 'volt/utils/logging/task_logger'
require 'drb'
require 'concurrent'
require 'timeout'
module Volt
# The task dispatcher is responsible for taking incoming messages
# from the socket channel and dispatching them to the proper handler.
class Dispatcher
# When we pass the dispatcher over DRb, don't send a copy, just proxy.
include DRb::DRbUndumped
attr_reader :volt_app
def initialize(volt_app)
@volt_app = volt_app
if Volt.env.test?
# When testing, we want to run immediately so it blocks and doesn't
# start the next thread.
@worker_pool = Concurrent::ImmediateExecutor.new
else
@worker_pool = Concurrent::ThreadPoolExecutor.new(
min_threads: Volt.config.min_worker_threads,
max_threads: Volt.config.max_worker_threads
)
end
@worker_timeout = Volt.config.worker_timeout || 60
end
# Dispatch takes an incoming Task from the client and runs it on the
# server, returning the result to the client.
# Tasks returning a promise will wait to return.
def dispatch(channel, message)
# Dispatch the task in the worker pool. Pas in the meta data
@worker_pool.post do
begin
dispatch_in_thread(channel, message)
rescue => e
err = "Worker Thread Exception for #{message}\n"
err += e.inspect
err += e.backtrace.join("\n") if e.respond_to?(:backtrace)
Volt.logger.error(err)
end
end
end
# Check if it is safe to use this method
def safe_method?(klass, method_name)
# Make sure the class being called is a Task.
return false unless klass.ancestors.include?(Task)
# Make sure the method is defined on the klass we're using and not up the hiearchy.
# ^ This check prevents methods like #send, #eval, #instance_eval, #class_eval, etc...
klass.ancestors.each do |ancestor_klass|
if ancestor_klass.instance_methods(false).include?(method_name)
return true
elsif ancestor_klass == Task
# We made it to Task and didn't find the method, that means it
# was defined above Task, so we reject the call.
return false
end
end
false
end
def close_channel(channel)
QueryTasks.new(@volt_app, channel).close!
end
private
# Do the actual dispatching, should be running inside of a worker thread at
# this point.
def dispatch_in_thread(channel, message)
callback_id, class_name, method_name, meta_data, *args = message
method_name = method_name.to_sym
# Get the class
klass = Object.send(:const_get, class_name)
promise = Promise.new
start_time = Time.now.to_f
# Check that we are calling on a Task class and a method provide at
# Task or above in the ancestor chain. (so no :send or anything)
if safe_method?(klass, method_name)
promise.resolve(nil)
# Init and send the method
promise = promise.then do
result = nil
Timeout.timeout(klass.__timeout || @worker_timeout) do
Thread.current['meta'] = meta_data
begin
result = klass.new(@volt_app, channel, self).send(method_name, *args)
ensure
Thread.current['meta'] = nil
end
end
result
end
else
# Unsafe method
promise.reject(RuntimeError.new("unsafe method: #{method_name}"))
end
# Called after task runs or fails
finish = proc do |error|
if error.is_a?(Timeout::Error)
# re-raise with a message
error = Timeout::Error.new("Task Timed Out after #{@worker_timeout} seconds: #{message}")
end
run_time = ((Time.now.to_f - start_time) * 1000).round(3)
Volt.logger.log_dispatch(class_name, method_name, run_time, args, error)
end
# Run the promise and pass the return value/error back to the client
promise.then do |result|
channel.send_message('response', callback_id, result, nil)
finish.call
end.fail do |error|
finish.call(error)
channel.send_message('response', callback_id, nil, error)
end
end
end
end