-
Notifications
You must be signed in to change notification settings - Fork 0
/
actor.rb
75 lines (63 loc) · 1.84 KB
/
actor.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# frozen_string_literal: true
require "concurrent/array"
require "concurrent/mvar"
require "concurrent/immutable_struct"
require_relative "promises"
module StandardProcedure
module Async
module Actor
def self.included base
base.class_eval do
def self.async name, &implementation
name = name.to_sym
implementation_name = :"_#{name}"
define_method name do |*args, **params|
_add_message_to_queue(implementation_name, *args, **params)
end
define_method implementation_name do |*args, **params|
instance_exec(*args, **params, &implementation)
rescue => ex
ex
end
end
end
end
private
def _messages
@_messages ||= Concurrent::Array.new
end
def _promises
@_promises ||= StandardProcedure::Async::Promises.new
end
def _add_message_to_queue name, *args, **params, &block
message = Message.new(self, name, args, params, block, Concurrent::MVar.new)
_messages << message
_perform_messages if _messages.count == 1
message
end
def _perform_messages
_promises.future do
while (message = _messages.shift)
message.call
end
end
end
# nodoc:
class Message < Concurrent::ImmutableStruct.new(:target, :name, :args, :params, :block, :result)
def value(timeout: 30)
result.take(timeout).tap do |value|
raise value if value.is_a? Exception
end
end
alias_method :get, :value
alias_method :await, :value
def then &block
block&.call value
end
def call
result.put target.send(name, *args, **params, &block)
end
end
end
end
end