Skip to content
Permalink
ractor
Go to file
 
 
Cannot retrieve contributors at this time
810 lines (666 sloc) 21.9 KB

まとめ

  • 1つのRactorは並行実行単位として実行される
    • 1つのRactorは1つ以上のスレッドをもつ
    • スレッドは各 Ractor に属するグローバルロックで共有される
  • Ractor へメッセージを送受信(message passing)しながら同期して実行をすすめる
    • push 型(sender knows receiver)(actor model)の Ractor#send + Ractor.recv
    • pull 型(receiver knows sender)の Ractor.yield + Ractor#take
  • メッセージは Ruby のオブジェクト
    • オブジェクトは共有可能・不可能オブジェクトに二分され、共有不可能オブジェクトはたかだか一つの Ractor からしか参照されない
    • 共有可能オブジェクトをメッセージとして転送すると、単に参照が送られる
    • 共有不可能オブジェクトをメッセージとして転送すると、原則コピーされる
    • 共有不可能オブジェクトは、(ほぼ)コピーしない move ができるものがあるが、転送元からは参照できなくなる
  • 共有可能オブジェクトへのアクセスは必ず排他制御される
    • C レベルでの SEGV は起こらない
    • ただし、トランザクションが足りないことがあるため、レースは発生しうる。例えば、トランザクションをまたいだ変更など。

Ractor の生成と終了

  • Ractor.new do ... end で Ractor を生成する
  • 渡したブロックが新しい Ractor 上で実行される
    • ブロックは外側の環境と隔離される
    • Ractor.new に渡した引数は incoming message としてブロックパラメータに到達する
    • ブロックの返値が、outgoing message となる

Ractor の生成

  • Ractor.new メソッドで Ractor 作成
  • 渡したブロックが生成された Ractor で並行実行される
Ractor.new do
  # このブロックが並行に実行される
end
  • name: で名前を与えられる
  r = Ractor.new name: 'test-name' do
  end
  r.name #=> 'test-name'

Ractor に渡したブロックは、生成側の環境からは隔離される

  • Ractor 間でオブジェクトが共有されないように、「ブロックの外側のローカル変数(など)」と「self」は隔離される
    • 与えられたブロックは、Proc#isolate によって外側の環境にアクセスできない
    • エラーは Proc#isolate が実行された瞬間に起こる。つまり Ractor.new したときに起こる
  begin
    a = true
    r = Ractor.new do
      a #=> ArgumentError
    end
    r.take # Ractor の実行を待つ。後述
  rescue ArgumentError
  end
  • 与えられたブロックの self は、その Ractor オブジェクト自身になる(外側の self とは別になる)
  r = Ractor.new do
    self.object_id
  end
  r.take == self.object_id #=> false
  • Ractor.new に渡された(キーワード引数以外の)引数は、ブロックの引数になる。ただし、参照を渡すのでは無く、その Ractor へのincoming messageとなる(詳細は後述)
  r = Ractor.new 'ok' do |msg|
    msg #=> 'ok'
  end
  r.take #=> 'ok'
  # 上のコードとほぼ同じ意味
  r = Ractor.new do
    msg = Ractor.recv
    msg
  end
  r.send 'ok'
  r.take #=> 'ok'
  • ブロックの返値は、その Ractor からの outgoing message となる(詳細は後述)
  r = Ractor.new do
    'ok'
  end
  r.take #=> `ok`
  # 上のコードとほぼ同じ意味
  r = Ractor.new do
    Ractor.yield 'ok'
  end
  r.take #=> 'ok'
  • ブロックのエラー値は、outgoing message を受信した Ractor 上でエラーが伝搬する
  r = Ractor.new do
    raise 'ok' # exception will be transferred receiver
  end
  begin
    r.take
  rescue Ractor::RemoteError => e
    e.cause.class   #=> RuntimeError
    e.cause.message #=> 'ok'
    e.ractor        #=> r
  end

Ractor 間のコミュニケーション

  • Ractor 間のコミュニケーションは、メッセージパッシングと、共有可能コンテナオブジェクトによって行う
    • (1) メッセージパッシング
      • (1-1) push 型の send/recv(send する側が宛先を知っている) aka actor model
      • (1-2) pull 型の yield/take(take する側が送信元を知っている) aka ランデブー
    • (2) 共有可能なコンテナオブジェクトを用いる(未実装)
  • 待ち合わせ
    • 待ち合わせは、基本的に (1) メッセージパッシングで行う
    • (2) は、データの送受信は行うことができるが、待ち合わせには用いない(... 多分)
  • (1-1) send/recv(push 型通信?)
    • Ractor#sendRactor#<< が alias)は、対象 Ractor の incoming port へメッセージを送信する。incoming port は無限サイズの incoming queue に接続されているので、Ractor#send はブロックしない。
    • Ractor.recv で、自 Ractor の incoming queue からメッセージを一つ取り出す。incoming queue が空ならブロックする
  • (1-2) yield/take(pull 型通信?)
    • Ractor.yield(obj) でメッセージを Ractor#take している Ractor へ送信する
    • どちらも、相手が発生するまでブロックする
  • Ractor.select() で、take, recv, yield のどれかが成功するまで待つことができる
  • port は close することができる
    • Ractor#close_incoming および Ractor#close_outgoing がある
    • incoming port を close すると、それ以降 send することができない。また、空の incoming queue を待っていた場合、例外になる
    • outgoing port を close すると、take もしくは yield ができなくなる。もし、待っているものがいた場合、例外になる
    • Ractor が終了すると、その Ractor の incoming/outgoing port はそれぞれ close される
  • Ractror 間送受信において、メッセージとしてオブジェクトを送受信する方法は、次の3種類
    • (1) 参照:共有可能オブジェクトは、参照のみ送る(速い)
    • (2) コピー:すべてコピー(ディープコピー)して送る
    • (3) 移動:送信元で、以降一切用いないことを前提に軽量なコピーを送る
    • 移動したい場合、send もしくは yieldmove: true オプションを付けて指定する

Ractor 間の送受信

  • 各 Ractor は、それぞれ incoming-portoutgoing-port を持つ
  • incoming port には無限サイズのキューである incoming queue が接続されている
                  Ractor r
                 +-------------------------------------------+
                 | incoming                         outgoing |
                 | port                                 port |
   r.send(obj) ->*->[incoming queue]     Ractor.yield(obj) ->*-> r.take
                 |                |                          |
                 |                v                          |
                 |           Ractor.recv                     |
                 +-------------------------------------------+


接続することができる(r2.send obj on r1、Ractor.recv on r2)
  +----+     +----+
  * r1 |-----* r2 *
  +----+     +----+


接続することができる(Ractor.yield(obj) on r1, r1.take on r2)
  +----+     +----+
  * r1 *------ r2 *
  +----+     +----+

同時に待つことができる(Ractor.select(r1, r2))
  +----+
  * r1 *------+
  +----+      |
              +----- Ractor.select(r1, r2)
  +----+      |
  * r2 *------|
  +----+

  r = Ractor.new do
    msg = Ractor.recv # r の incoming queue からの受信
  end
  r.send 'ok' # r の incoming port -> incoming queue へ送信
  r.take      # r の outgoing port から受信
  # 実引数は incoming queue への送信
  r = Ractor.new 'ok' do |msg|
    # 仮引数は incoming queue からの受信

    msg # ブロックの返値は outgoing port への送信
  end

  # g の outgoing port からの受信
  r.take #=> `ok`
  • 複数の Ractor が一つの Ractor に対して待ち合わせが可能(Ractor.select
  pipe = Ractor.new do
    loop do
      Ractor.yield Ractor.recv
    end
  end

  RN = 10
  rs = RN.times.map{|i|
    Ractor.new pipe, i do |pipe, i|
      msg = pipe.take
      msg # ping-pong
    end
  }
  RN.times{|i|
    pipe << i
  }
  RN.times.map{
    r, n = Ractor.select(*rs)
    rs.delete r
    n
  }.sort #=> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
  • 複数の Ractor が一つの Ractor に送信可能
  pipe = Ractor.new do
    loop do
      Ractor.yield Ractor.recv
    end
  end

  RN = 10
  rs = RN.times.map{|i|
    Ractor.new pipe, i do |pipe, i|
      pipe << i
    end
  }
  RN.times.map{
    pipe.take
  }.sort #=> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Ractor.select で複数の Ractor から recv する

  • Ractor.select(*ractors) を用いて複数の Ractor からの take を待つことができる
  • 返値は、「どの Ractor からメッセージが届いたのか」、「送信されたオブジェクト」の2つ
  r1 = Ractor.new{'r1'}
  r, obj = Ractor.select(r1)
  r == r1 and obj == 'r1' #=> true
  # select 2
  r1 = Ractor.new{'r1'}
  r2 = Ractor.new{'r2'}
  rs = [r1, r2]
  as = []
  r, obj = Ractor.select(*rs)
  rs.delete(r)
  as << obj
  r, obj = Ractor.select(*rs)
  rs.delete(r)
  as << obj
  as.sort == ['r1', 'r2'] #=> true
  • select(2) と同じ C10K problem があるので、その辺なんとかしたい(良い感じの API)
  • go-lang の select syntax は、同時に受信可能なチャンネルがある場合、ランダム(ラウンドロビン?)に選択するするらしく、こちらもそのようにしたほうが良いと思われる(現在は、引数の順序通りに見ていく)

Ractor の port を close

  • Ractor#close_incoming/outgoing で incoming/outgoing port を close することができる(Queue#close と同じ)
  • close_incoming された Ractor に Ractor#sendすると例外。incoming queue が空のとき(ブロックしようとするとき) Ractor.recv すると例外
  • close_outgoing された Ractor で Ractor.yield する、もしくは Ractor#take すると例外
  • Ractor が終了すると、incoming/outgoing port が自動的に close される
  r = Ractor.new do
    'finish'
  end
  r.take
  begin
    o = r.take
  rescue Ractor::ClosedError
    'ok'
  else
    "ng: #{o}"
  end
  r = Ractor.new do
  end

  r.take # wait terminate

  begin
    r.send(1)
  rescue Ractor::ClosedError
    'ok'
  else
    'ng'
  end
  • 複数の Ractor が一つの Ractor の yield を待っているとき、Ractor#close_outgoing すると yield 待ちがすべてキャンセルされる(ClosedError)

コピーによるオブジェクトの転送

  • Ractor#send(obj) および Ractor.yield(obj) は、obj が共有不可能オブジェクトであれば、(deep) コピーする
  obj = 'str'.dup
  r = Ractor.new obj do |msg|
    msg.object_id
  end
  
  obj.object_id == r.recv #=> false
  • 現状は Marshal#dump し、recv 側で load する(dRuby と同じ)。なので、対応しないオブジェクトは送ることができない。
  • 専用のコピーコードを書き下す必要がある
  obj = Thread.new{}
  begin
    r = Ractor.new obj do |msg|
      msg
    end
  rescue TypeError => e
    e.message #=> no _dump_data is defined for class Thread
  else
    'ng'
  end

move によるオブジェクトの転送

  • Ractor#send(obj, move: true) および Ractor.yield(obj, move: true) は、objが共有不可能オブジェクトであれば、move する
  • move されたオブジェクトは、送信元 Ractor で参照しようとすると(例えば、メソッド呼び出し)、エラーになる
  # move with Ractor#send
  r = Ractor.new do
    obj = Ractor.recv
    obj << ' world'
  end

  str = 'hello'
  r.send str, move: true
  modified = r.take #=> 'hello world'

  begin
    # move した文字列を触ろうとするのでエラー
    str << ' exception' # raise Ractor::MovedError
  rescue Ractor::MovedError
    modified #=> 'hello world'
  else
    raise 'unreachable'
  end
  # move with Ractor.yield
  r = Ractor.new do
    obj = 'hello'
    Ractor.yield obj, move: true
    obj << 'world'
  end

  str = r.take
  begin
    r.take 
  rescue Ractor::RemoteError
    p str #=> "hello"
  end
  • 現状では、T_FILET_STRINGT_ARRAY にのみ対応する
    • T_FILE はソケットなどを念頭に(サーバアプリ)
    • T_STRING は、コピーではないのでバッファの確保が不要になって速い(でかい場合)
    • T_ARRAY もバッファの確保が不要になる。ただし、全要素をなめる必要があるので、速くはない(多分、あまり使われない)
  • アクセス禁止の実装は、禁断のクラスの差し替えによる

共有可能オブジェクト

  • 次のオブジェクトが Ractor 間で(現状)共有可能
    • SPECIAL_CONST_P()
    • native に frozen な Numeric と Symbol
      • T_FLOATT_COMPLEXT_RATIONAL, T_BIGNUM
      • T_SYMBOL
    • frozen な T_STRINGT_REGEXP
      • ただし、ivar がない(FL_EXIVAR がない)
    • クラス、モジュール:T_CLASST_MODULET_ICLASS
    • Ractor などの共有を前提としたデータ構造
  • 将来的には、immutable なコンテナなども対象に
    • deep frozen な Array, Hash など → FL_IMMUTABLE 作る?
  • 共有可能な !special const オブジェクトは FL_SHAREABLE がつく
    • frozen な String など、あとで調査したときに付ける
  r = Ractor.new do
    while v = Ractor.recv
      Ractor.yield v
    end
  end

  class C
  end

  sharable_objects = [1, :sym, 'xyzzy'.to_sym, 'frozen'.freeze, 1+2r, 3+4i, /regexp/, C]

  sr = sharable_objects.map{|o|
    r << o
    o2 = r.take
    [o, o.object_id == o2.object_id]
  }
  #=> [[1, true], [:sym, true], [:xyzzy, true], [\"frozen\", true], [(3/1), true], [(3+4i), true], [/regexp/, true], [C, true]]

  ur = unsharable_objects = ['mutable str'.dup, [:array], {hash: true}].map{|o|
    r << o
    o2 = r.take
    [o, o.object_id == o2.object_id]
  }
  #+> "[[\"mutable str\", false], [[:array], false], [{:hash=>true}, false]]]"

共有不可オブジェクトを共有させないために

  • グローバル変数は main Ractor でのみ利用可能
  $gv = 1
  r = Ractor.new do
    $gv
  end

  begin
    r.take
  rescue Ractor::RemoteError => e
    e.cause.message #=> 'can not access global variables from non-main Ractors'
  end
  • outer-local variable は参照不可(Proc#isolate
  begin
    a = true
    r = Ractor.new do
      a
    end
  rescue => e
    e.class #=> ArgumentError
  end
  • 共有可能オブジェクトのインスタンス変数は、main Ractor(最初に生成されたオブジェクト)からのみアクセス可
  class C
    @iv = 'str'
  end

  r = Ractor.new do
    class C
      p @iv
    end
  end


  begin
    r.take
  rescue => e
    e.class #=> RuntimeError
  end
  shared = Ractor.new{}
  shared.instance_variable_set(:@iv, 'str')

  r = Ractor.new shared do |shared|
    p shared.instance_variable_get(:@iv)
  end

  begin
    r.take
  rescue Ractor::RemoteError => e
    e.cause.message
  end
  • クラス変数も main Ractor からのみアクセス可
  • 利用しているライブラリは対応が必要
  class C
    @@cv = 'str'
  end

  r = Ractor.new do
    class C
      p @@cv
    end
  end


  begin
    r.take
  rescue => e
    e.class #=> RuntimeError
  end
  • 共有不可オブジェクトが格納されている定数の参照は、main ractor からのみ可
  class C
    CONST = 'str'
  end
  r = Ractor.new do
    C::CONST
  end
  begin
    r.take
  rescue => e
    e.class #=> NameError
  end
  • 共有不可オブジェクトを定数にセットするのは、main Ractor からのみ可
  class C
  end
  r = Ractor.new do
    C::CONST = 'str'
  end
  begin
    r.take
  rescue => e
    e.class
  end

検討

  • channel で通信しないのは、エラー伝搬を確実に行うため
    • Close した(死亡した)Ractor に send
    • Close した(死亡した)Ractor から take
    • Close した(おそらく外部から close された)incoming port から recv
    • Close した(このケースはあるんだろうか...?)outgoing port へ yield
    • これで、多分 take で結果を受け取るのであれば、間違いに気づくことができる
  • エラー伝搬が起こらないケース
    • 誰も待っていないのに yield ... これはなんとかなるんだろうか? 無視すればいい?
    • 誰も送ってくれないのに recv(send する側が死亡した場合)→ 結果は take で待つという文化になるか?
  • take は、Erlang における link を Ruby でどうするといいかなと検討した結果(能動的な監視)、その発展

その他実装

  • まだ並列化していない(実は全部従来の GVL 使っている)
  • デバッグモード
    • 生成時に Ractor ID(uint32_t、連番)を振り、VM push 時に現 Ractor ID と異なれば rb_bug()

Examples

ring in actor model

RN = 10000
CR = Ractor.current

last_r = r = Ractor.new do
  p Ractor.recv
  CR << :fin
end

RN.times{
  r = Ractor.new r do |next_r|
    next_r << Ractor.recv
  end
}

p :setup_ok
r << 1
p Ractor.recv

fork-join

def fib n
  if n < 2
    1
  else
    fib(n-2) + fib(n-1)
  end
end

RN = 10
rs = (1..RN).map do |i|
  Ractor.new i do |i|
    [i, fib(i)]
  end
end

until rs.empty?
  r, v = Ractor.select(*rs)
  rs.delete r
  p answer: v
end

worker pool

require 'prime'

pipe = Ractor.new do
  loop do
    Ractor.yield Ractor.recv
  end
end

N = 1000
RN = 10
workers = (1..RN).map do
  Ractor.new pipe do |pipe|
    while n = pipe.take
      Ractor.yield [n, n.prime?]
    end
  end
end

(1..N).each{|i|
  pipe << i
}

pp (1..N).map{
  r, (n, b) = Ractor.select(*workers)
  [n, b]
}.sort_by{|(n, b)| n}

Pipeline

# pipeline with yield/take
r1 = Ractor.new do
  'r1'
end

r2 = Ractor.new r1 do |r1|
  r1.take + 'r2'
end

r3 = Ractor.new r2 do |r2|
  r2.take + 'r3'
end

p r3.take #=> 'r1r2r3'
# pipeline with send/recv

r3 = Ractor.new Ractor.current do |cr|
  cr.send Ractor.recv + 'r3'
end

r2 = Ractor.new r3 do |r3|
  r3.send Ractor.recv + 'r2'
end

r1 = Ractor.new r2 do |r2|
  r2.send Ractor.recv + 'r1'
end

r1 << 'r0'
p Ractor.recv #=> "r0r1r2r3"

Supervise

# ring example again

r = Ractor.current
rs = (1..10).map{|i|
  r = Ractor.new r, i do |r, i|
    r.send Ractor.recv + "r#{i}"
  end
}

r.send "r0"
p Ractor.recv #=> "r0r10r9r8r7r6r5r4r3r2r1"
# ring example with an error

r = Ractor.current
rs = (1..10).map{|i|
  r = Ractor.new r, i do |r, i|
    loop do
      msg = Ractor.recv
      raise if /e/ =~ msg
      r.send msg + "r#{i}"
    end
  end
}

r.send "r0"
p Ractor.recv #=> "r0r10r9r8r7r6r5r4r3r2r1"
r.send "r0"
p Ractor.select(*rs, Ractor.current)
[:recv, "r0r10r9r8r7r6r5r4r3r2r1"]
r.send "e0"
p Ractor.select(*rs, Ractor.current)
#=>
#<Thread:0x000056262de28bd8 run> terminated with exception (report_on_exception is true):
Traceback (most recent call last):
        2: from /home/ko1/src/ruby/trunk/test.rb:7:in `block (2 levels) in <main>'
        1: from /home/ko1/src/ruby/trunk/test.rb:7:in `loop'
/home/ko1/src/ruby/trunk/test.rb:9:in `block (3 levels) in <main>': unhandled exception
Traceback (most recent call last):
        2: from /home/ko1/src/ruby/trunk/test.rb:7:in `block (2 levels) in <main>'
        1: from /home/ko1/src/ruby/trunk/test.rb:7:in `loop'
/home/ko1/src/ruby/trunk/test.rb:9:in `block (3 levels) in <main>': unhandled exception
        1: from /home/ko1/src/ruby/trunk/test.rb:21:in `<main>'
<internal:ractor>:69:in `select': thrown by remote Ractor. (Ractor::RemoteError)
# resend non-error message

r = Ractor.current
rs = (1..10).map{|i|
  r = Ractor.new r, i do |r, i|
    loop do
      msg = Ractor.recv
      raise if /e/ =~ msg
      r.send msg + "r#{i}"
    end
  end
}

r.send "r0"
p Ractor.recv #=> "r0r10r9r8r7r6r5r4r3r2r1"
r.send "r0"
p Ractor.select(*rs, Ractor.current)
[:recv, "r0r10r9r8r7r6r5r4r3r2r1"]
msg = 'e0'
begin
  r.send msg
  p Ractor.select(*rs, Ractor.current)
rescue Ractor::RemoteError
  msg = 'r0'
  retry
end

#=> <internal:ractor>:100:in `send': The incoming-port is already closed (Ractor::ClosedError)
# because r == r[-1] is terminated.
# ring example with supervisor and re-start

def make_ractor r, i
  Ractor.new r, i do |r, i|
    loop do
      msg = Ractor.recv
      raise if /e/ =~ msg
      r.send msg + "r#{i}"
    end
  end
end

r = Ractor.current
rs = (1..10).map{|i|
  r = make_ractor(r, i)
}

msg = 'e0' # error causing message
begin
  r.send msg
  p Ractor.select(*rs, Ractor.current)
rescue Ractor::RemoteError
  r = rs[-1] = make_ractor(rs[-2], rs.size-1)
  msg = 'x0'
  retry
end

#=> [:recv, "x0r9r9r8r7r6r5r4r3r2r1"]
You can’t perform that action at this time.