New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
再帰ロックのコード例にデータ競合が含まれる? #61
Comments
ありがとうございます! |
確認しました。 C11からは_Atomicが追加されて、メモリモデルが詳細に仕様化されて、これらを使うと良いのですが、本書はC99でも動作するコードを掲載しています。 いくつか未検討なパターンもあるので、引き続き検証してみます。 |
確認ありがとうございます!
あっなるほど、であれば C11 のメモリモデルを持ち出すのは良くないですね…失礼しました。
if 文の else 節の 一方で、 |
ありがとうございます! C11の動作モデルについて本書執筆時は、C11かC99かで迷ったのですが、C11で正確に記載するには、メモリバリアの説明が必要となります。しかし、メモリバリアはアドバンスなトピックであるため、中盤以降に持ってきたかったので、GCC拡張を利用しています。 C11の動作モデルで規定するundefinedですが、たしかに、x86では正しく動作するがAArch64では動作しないという挙動になるundefinedなコードは書けるとは思います。一方で、弱いメモリモデルにあわせたアルゴリズムなら、規格上はundefinedでも実用上は動作するのではと思います。実際C99はアトミック処理に関しては、どの動作もundefinedでコンパイラ実装とCPUの実装を把握して書くしかありませんでした。 ご指摘通り、C11で導入されたatomicを利用すればこれは綺麗に書けるのですが、本書ではその部分がRustに置き換わってしまっています。
|
spinlock_acquire(&lock->lock); |
- 初期状態:
lock->id == 0
- ロック獲得:
lock->lock = true
lock->id = 0x11
lock->cnt++
ロック獲得
以下の行を実行と仮定。
lock->cnt--; |
lock->cnt--
lock->id = 0
lock->lock = false
0x10
スレッド
上記のどのステップでlock->id
を読み込んでも、0x10
にはならない。ありあえるのは、0x00
か0x11
のみ。
自分でロック獲得後でないと、lock->lock == true
かつlock->id == 0x11
にはならない。
モデル検査
本当にこのアルゴリズムで大丈夫かという疑問は残るので、モデル検査ツールのTLA+で検証してみました。
検証項目は、「デッドロックしない」と「クリティカルセクション実行のスレッドが高々1」という不変条件を検証しました。
結論から言うと、これらに対する反例は見つからず、現時点ではアルゴリズムの誤りは発見できませんでした。
以下が検証のコードとなります。
定数はMAX_REC = 3
、PIDS = {1, 2, 3}
としています。
また、以下の箇所がout-of-order実行されると想定し、メモリ読み込みをコードの逆順としています。
if (lock->lock && lock->id == id) { |
------------------------------ MODULE reclock ------------------------------
EXTENDS Integers, TLC, Sequences, FiniteSets
CONSTANTS MAX_REC, PIDS
(*--algorithm lock
variables
locking = {}, \* ロック中のプロセス集合。検証用の変数
\* アルゴリズム用の変数
lock = FALSE,
id = 0,
cnt = 0;
\* ロック獲得
procedure acquire()
variables
regA, regB, regC, regD, regE;
begin
Acquire:
\* out-of-order実行を模擬
regA := cnt;
A0: regB := id;
A1: regC := lock;
A2: regD := regB = self;
A3: regE := regC /\ regD;
if regE then \* if lock /\ id = self
A4: regA := regA + 1;
A5: cnt := regA; \* cnt := cnt + 1
else
TAS:
await lock = FALSE;
lock := TRUE;
locking := locking \union {self};
A6: id := self;
A7: cnt := cnt + 1;
end if;
A8: return
end procedure;
\* ロック開放
procedure release()
begin
Release:
cnt := cnt - 1;
R0:
if cnt = 0 then
R1: id := 0;
R2: lock := FALSE;
locking := locking \ {self};
end if;
R4: return;
end procedure;
\* 再帰ロック
procedure recursive_lock()
begin
RecLock:
either
Rec:
if cnt < MAX_REC then \* 最大MAX_REC回まで再帰ロック
RL0:
call lock_unlock() \* 再帰ロック
end if;
or
NotRec:
skip;
end either;
RL1: return;
end procedure;
procedure lock_unlock()
begin
Lock:
call acquire(); \* ロック獲得
L0: call recursive_lock(); \* 再帰ロック
L1: call release(); \* ロック開放
L2: return;
end procedure;
fair process pid \in PIDS
begin
T1:
call lock_unlock()
end process;
end algorithm;*)
\* BEGIN TRANSLATION (chksum(pcal) = "b99e79b4" /\ chksum(tla) = "1fb7a28e")
CONSTANT defaultInitValue
VARIABLES locking, lock, id, cnt, pc, stack, regA, regB, regC, regD, regE
vars == << locking, lock, id, cnt, pc, stack, regA, regB, regC, regD, regE >>
ProcSet == (PIDS)
Init == (* Global variables *)
/\ locking = {}
/\ lock = FALSE
/\ id = 0
/\ cnt = 0
(* Procedure acquire *)
/\ regA = [ self \in ProcSet |-> defaultInitValue]
/\ regB = [ self \in ProcSet |-> defaultInitValue]
/\ regC = [ self \in ProcSet |-> defaultInitValue]
/\ regD = [ self \in ProcSet |-> defaultInitValue]
/\ regE = [ self \in ProcSet |-> defaultInitValue]
/\ stack = [self \in ProcSet |-> << >>]
/\ pc = [self \in ProcSet |-> "T1"]
Acquire(self) == /\ pc[self] = "Acquire"
/\ regA' = [regA EXCEPT ![self] = cnt]
/\ pc' = [pc EXCEPT ![self] = "A0"]
/\ UNCHANGED << locking, lock, id, cnt, stack, regB, regC,
regD, regE >>
A0(self) == /\ pc[self] = "A0"
/\ regB' = [regB EXCEPT ![self] = id]
/\ pc' = [pc EXCEPT ![self] = "A1"]
/\ UNCHANGED << locking, lock, id, cnt, stack, regA, regC, regD,
regE >>
A1(self) == /\ pc[self] = "A1"
/\ regC' = [regC EXCEPT ![self] = lock]
/\ pc' = [pc EXCEPT ![self] = "A2"]
/\ UNCHANGED << locking, lock, id, cnt, stack, regA, regB, regD,
regE >>
A2(self) == /\ pc[self] = "A2"
/\ regD' = [regD EXCEPT ![self] = regB[self] = self]
/\ pc' = [pc EXCEPT ![self] = "A3"]
/\ UNCHANGED << locking, lock, id, cnt, stack, regA, regB, regC,
regE >>
A3(self) == /\ pc[self] = "A3"
/\ regE' = [regE EXCEPT ![self] = regC[self] /\ regD[self]]
/\ IF regE'[self]
THEN /\ pc' = [pc EXCEPT ![self] = "A4"]
ELSE /\ pc' = [pc EXCEPT ![self] = "TAS"]
/\ UNCHANGED << locking, lock, id, cnt, stack, regA, regB, regC,
regD >>
A4(self) == /\ pc[self] = "A4"
/\ regA' = [regA EXCEPT ![self] = regA[self] + 1]
/\ pc' = [pc EXCEPT ![self] = "A5"]
/\ UNCHANGED << locking, lock, id, cnt, stack, regB, regC, regD,
regE >>
A5(self) == /\ pc[self] = "A5"
/\ cnt' = regA[self]
/\ pc' = [pc EXCEPT ![self] = "A8"]
/\ UNCHANGED << locking, lock, id, stack, regA, regB, regC, regD,
regE >>
TAS(self) == /\ pc[self] = "TAS"
/\ lock = FALSE
/\ lock' = TRUE
/\ locking' = (locking \union {self})
/\ pc' = [pc EXCEPT ![self] = "A6"]
/\ UNCHANGED << id, cnt, stack, regA, regB, regC, regD, regE >>
A6(self) == /\ pc[self] = "A6"
/\ id' = self
/\ pc' = [pc EXCEPT ![self] = "A7"]
/\ UNCHANGED << locking, lock, cnt, stack, regA, regB, regC, regD,
regE >>
A7(self) == /\ pc[self] = "A7"
/\ cnt' = cnt + 1
/\ pc' = [pc EXCEPT ![self] = "A8"]
/\ UNCHANGED << locking, lock, id, stack, regA, regB, regC, regD,
regE >>
A8(self) == /\ pc[self] = "A8"
/\ pc' = [pc EXCEPT ![self] = Head(stack[self]).pc]
/\ regA' = [regA EXCEPT ![self] = Head(stack[self]).regA]
/\ regB' = [regB EXCEPT ![self] = Head(stack[self]).regB]
/\ regC' = [regC EXCEPT ![self] = Head(stack[self]).regC]
/\ regD' = [regD EXCEPT ![self] = Head(stack[self]).regD]
/\ regE' = [regE EXCEPT ![self] = Head(stack[self]).regE]
/\ stack' = [stack EXCEPT ![self] = Tail(stack[self])]
/\ UNCHANGED << locking, lock, id, cnt >>
acquire(self) == Acquire(self) \/ A0(self) \/ A1(self) \/ A2(self)
\/ A3(self) \/ A4(self) \/ A5(self) \/ TAS(self)
\/ A6(self) \/ A7(self) \/ A8(self)
Release(self) == /\ pc[self] = "Release"
/\ cnt' = cnt - 1
/\ pc' = [pc EXCEPT ![self] = "R0"]
/\ UNCHANGED << locking, lock, id, stack, regA, regB, regC,
regD, regE >>
R0(self) == /\ pc[self] = "R0"
/\ IF cnt = 0
THEN /\ pc' = [pc EXCEPT ![self] = "R1"]
ELSE /\ pc' = [pc EXCEPT ![self] = "R4"]
/\ UNCHANGED << locking, lock, id, cnt, stack, regA, regB, regC,
regD, regE >>
R1(self) == /\ pc[self] = "R1"
/\ id' = 0
/\ pc' = [pc EXCEPT ![self] = "R2"]
/\ UNCHANGED << locking, lock, cnt, stack, regA, regB, regC, regD,
regE >>
R2(self) == /\ pc[self] = "R2"
/\ lock' = FALSE
/\ locking' = locking \ {self}
/\ pc' = [pc EXCEPT ![self] = "R4"]
/\ UNCHANGED << id, cnt, stack, regA, regB, regC, regD, regE >>
R4(self) == /\ pc[self] = "R4"
/\ pc' = [pc EXCEPT ![self] = Head(stack[self]).pc]
/\ stack' = [stack EXCEPT ![self] = Tail(stack[self])]
/\ UNCHANGED << locking, lock, id, cnt, regA, regB, regC, regD,
regE >>
release(self) == Release(self) \/ R0(self) \/ R1(self) \/ R2(self)
\/ R4(self)
RecLock(self) == /\ pc[self] = "RecLock"
/\ \/ /\ pc' = [pc EXCEPT ![self] = "Rec"]
\/ /\ pc' = [pc EXCEPT ![self] = "NotRec"]
/\ UNCHANGED << locking, lock, id, cnt, stack, regA, regB,
regC, regD, regE >>
Rec(self) == /\ pc[self] = "Rec"
/\ IF cnt < MAX_REC
THEN /\ pc' = [pc EXCEPT ![self] = "RL0"]
ELSE /\ pc' = [pc EXCEPT ![self] = "RL1"]
/\ UNCHANGED << locking, lock, id, cnt, stack, regA, regB, regC,
regD, regE >>
RL0(self) == /\ pc[self] = "RL0"
/\ stack' = [stack EXCEPT ![self] = << [ procedure |-> "lock_unlock",
pc |-> "RL1" ] >>
\o stack[self]]
/\ pc' = [pc EXCEPT ![self] = "Lock"]
/\ UNCHANGED << locking, lock, id, cnt, regA, regB, regC, regD,
regE >>
NotRec(self) == /\ pc[self] = "NotRec"
/\ TRUE
/\ pc' = [pc EXCEPT ![self] = "RL1"]
/\ UNCHANGED << locking, lock, id, cnt, stack, regA, regB,
regC, regD, regE >>
RL1(self) == /\ pc[self] = "RL1"
/\ pc' = [pc EXCEPT ![self] = Head(stack[self]).pc]
/\ stack' = [stack EXCEPT ![self] = Tail(stack[self])]
/\ UNCHANGED << locking, lock, id, cnt, regA, regB, regC, regD,
regE >>
recursive_lock(self) == RecLock(self) \/ Rec(self) \/ RL0(self)
\/ NotRec(self) \/ RL1(self)
Lock(self) == /\ pc[self] = "Lock"
/\ stack' = [stack EXCEPT ![self] = << [ procedure |-> "acquire",
pc |-> "L0",
regA |-> regA[self],
regB |-> regB[self],
regC |-> regC[self],
regD |-> regD[self],
regE |-> regE[self] ] >>
\o stack[self]]
/\ regA' = [regA EXCEPT ![self] = defaultInitValue]
/\ regB' = [regB EXCEPT ![self] = defaultInitValue]
/\ regC' = [regC EXCEPT ![self] = defaultInitValue]
/\ regD' = [regD EXCEPT ![self] = defaultInitValue]
/\ regE' = [regE EXCEPT ![self] = defaultInitValue]
/\ pc' = [pc EXCEPT ![self] = "Acquire"]
/\ UNCHANGED << locking, lock, id, cnt >>
L0(self) == /\ pc[self] = "L0"
/\ stack' = [stack EXCEPT ![self] = << [ procedure |-> "recursive_lock",
pc |-> "L1" ] >>
\o stack[self]]
/\ pc' = [pc EXCEPT ![self] = "RecLock"]
/\ UNCHANGED << locking, lock, id, cnt, regA, regB, regC, regD,
regE >>
L1(self) == /\ pc[self] = "L1"
/\ stack' = [stack EXCEPT ![self] = << [ procedure |-> "release",
pc |-> "L2" ] >>
\o stack[self]]
/\ pc' = [pc EXCEPT ![self] = "Release"]
/\ UNCHANGED << locking, lock, id, cnt, regA, regB, regC, regD,
regE >>
L2(self) == /\ pc[self] = "L2"
/\ pc' = [pc EXCEPT ![self] = Head(stack[self]).pc]
/\ stack' = [stack EXCEPT ![self] = Tail(stack[self])]
/\ UNCHANGED << locking, lock, id, cnt, regA, regB, regC, regD,
regE >>
lock_unlock(self) == Lock(self) \/ L0(self) \/ L1(self) \/ L2(self)
T1(self) == /\ pc[self] = "T1"
/\ stack' = [stack EXCEPT ![self] = << [ procedure |-> "lock_unlock",
pc |-> "Done" ] >>
\o stack[self]]
/\ pc' = [pc EXCEPT ![self] = "Lock"]
/\ UNCHANGED << locking, lock, id, cnt, regA, regB, regC, regD,
regE >>
pid(self) == T1(self)
(* Allow infinite stuttering to prevent deadlock on termination. *)
Terminating == /\ \A self \in ProcSet: pc[self] = "Done"
/\ UNCHANGED vars
Next == (\E self \in ProcSet: \/ acquire(self) \/ release(self)
\/ recursive_lock(self) \/ lock_unlock(self))
\/ (\E self \in PIDS: pid(self))
\/ Terminating
Spec == /\ Init /\ [][Next]_vars
/\ \A self \in PIDS : /\ WF_vars(pid(self))
/\ WF_vars(lock_unlock(self))
/\ WF_vars(acquire(self))
/\ WF_vars(release(self))
/\ WF_vars(recursive_lock(self))
Termination == <>(\A self \in ProcSet: pc[self] = "Done")
\* END TRANSLATION
TypeInvariant ==
/\ MAX_REC \in Int
/\ PIDS \subseteq Int
=============================================================================
\* Modification History
\* Last modified Mon Nov 22 09:29:41 JST 2021 by ytakano
\* Created Sun Nov 21 22:09:25 JST 2021 by ytakano
実行結果
C11
なるほど、そういう意図があったのですね。冒頭でメモリバリアの説明をするような怖い本だと自分は手に取れてなかった気がするので、そこのハードルを下げる構成になっていてすごくありがたいです!
|
確かに、上位4ビットと下位4ビットで操作が分割されるようなCPUアーキテクチャの場合は、おこりえそうです。
https://yohhoy.hatenadiary.jp/entry/20121016/p1 AArch64だと、たとえば、 アトミック処理を含む現代的なCPUだと問題なさそうな気もしますが、 |
はい。自分も現実的な環境では発生しないように思います。
自分もデータがアラインメントされている限りは読み込みは atomic な振る舞いになると思っているのですが、その保証がまだ見つけられていません。 命令レベルの問題のほかに気にしていたのはコンパイラの最適化です。
という理由で問題はないと思います。 |
ありがとうございます! 以下のようにコードを修正したいと思います。また、増刷があれば書籍にも反映したいと思います。 ytakano@c1c6bb3#diff-fb9cce8092ac6c6fa7da708205bc019ce70ac2ad0bec59426a0d9d8aff9ff437 並行プログラミングは難しいトピックで、間違いもまだまだあるとは思いますが、これに懲りずご指摘頂ければと思います。 |
コードの修正を確認しました。問題ないと思います。 自分の指摘に長々付き合って頂いてありがとうございます。 知り合いの間で本読みをしていますが、並行プログラミングの世界の概要が一冊で把握できる貴重な本だと感じています。 |
はい。 こちらこそ、大変詳しく読んで頂きありがとうございます。 |
4章4節の再帰ロックのコードサンプルにデータ競合がありそうです。
reentlock_acquire
関数https://github.com/oreilly-japan/conc_ytakano/blob/main/chap4/4.4/ch4_4_reent_c/reent.c#L17
conc_ytakano/chap4/4.4/ch4_4_reent_c/reent.c
Line 19 in 85a014d
conc_ytakano/chap4/4.4/ch4_4_reent_c/reent.c
Line 25 in 85a014d
それぞれの行は
lock->lock
へのロードとストアをそれぞれ行っており、spinlock_acquire
はアトミック操作で、もう片方は通常のロード操作です。C11 の規格では、
https://port70.net/~nsz/c/c11/n1570.html#5.1.2.4p25
とあるのでこの lock->lock メンバへのアクセスはデータ競合を含んでいそうです。
また、この関数中の
lock->id
へのアクセスも同様です。conc_ytakano/chap4/4.4/ch4_4_reent_c/reent.c
Line 28 in 85a014d
conc_ytakano/chap4/4.4/ch4_4_reent_c/reent.c
Line 19 in 85a014d
lock->lock
への操作とlock->id
への操作はどちらも atomic なほうが良いのではないかと思いました。The text was updated successfully, but these errors were encountered: