/
runner.rb
87 lines (74 loc) · 1.82 KB
/
runner.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
require 'acfs/service/middleware'
module Acfs
# @api private
#
class Runner
include Service::Middleware
attr_reader :adapter
def initialize(adapter)
@adapter = adapter
@running = false
end
# Process an operation. Synchronous operations will be run
# and parallel operations will be queued.
#
def process(op)
::ActiveSupport::Notifications.instrument 'acfs.runner.process', operation: op do
op.synchronous? ? run(op) : enqueue(op)
end
end
# Run operation right now skipping queue.
#
def run(op)
::ActiveSupport::Notifications.instrument 'acfs.runner.run', operation: op do
op_request(op) { |req| adapter.run req }
end
end
# List of current queued operations.
#
def queue
@queue ||= []
end
# Enqueue operation to be run later.
#
def enqueue(op)
::ActiveSupport::Notifications.instrument 'acfs.runner.enqueue', operation: op do
if running?
op_request(op) { |req| adapter.queue req }
else
queue << op
end
end
end
# Return true if queued operations are currently processed.
#
def running?
@running
end
# Start processing queued operations.
#
def start
::ActiveSupport::Notifications.instrument 'acfs.runner.start' do
enqueue_operations
@running = true
adapter.start
@running = false
end
end
def clear
queue.clear
adapter.abort
@running = false
end
private
def enqueue_operations
while (op = queue.shift)
op_request(op) { |req| adapter.queue req }
end
end
def op_request(op)
return if Acfs::Stub.enabled? and Acfs::Stub.stubbed(op)
yield prepare op.service.prepare op.request
end
end
end