forked from MLstate/opalang
/
badop_server.ml
136 lines (121 loc) · 5.45 KB
/
badop_server.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
(*
Copyright © 2011 MLstate
This file is part of OPA.
OPA is free software: you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License, version 3, as published by
the Free Software Foundation.
OPA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for
more details.
You should have received a copy of the GNU Affero General Public License
along with OPA. If not, see <http://www.gnu.org/licenses/>.
*)
(*
@author Louis Gesbert
**)
module D = Badop_lib
open Cps.Ops
module N = Hlnet
module F = functor (Backend: Badop.S) ->
struct
include Badop_protocol.F
(struct
type spoken = D.Dialog.response
type understood = D.Dialog.query
type revision = Backend.revision
end)
type t = {
backend: Backend.database;
endpoint: N.endpoint;
scheduler: Scheduler.t;
}
let rec transaction_callback (transmap: Backend.transaction IntMap.t ref) channel
: (D.Dialog.query transaction_op
-> (D.Dialog.response transaction_op -> unit) -> unit)
=
fun request k ->
match request with
| Read (path, (D.Query (tr_version, op) as query)) ->
Backend.read (IntMap.find tr_version !transmap) path op
@> fun resp -> Read (path, D.Dialog_aux.respond query resp) |> k
| Write (path, tr_next_version, op) ->
Badop.Aux.map_write_op (* From Protocol.transaction to Backend.transaction *)
~revision:(fun r k -> r |> k)
~transaction:(fun () k -> assert false |> k)
op
@> fun op ->
Backend.write (IntMap.find (pred tr_next_version) !transmap) path op
@> fun backend_response ->
let tr = Badop.Aux.result_transaction backend_response in
transmap := IntMap.add tr_next_version tr !transmap (* no continuation needed *)
| WriteList (tr_next_version, D.Query l_q) ->
let l_paths,l_op = List.split l_q in
Badop.Aux.map_write_list_op (* From Protocol.transaction to Backend.transaction *)
~revision:(fun r k -> r |> k)
~transaction:(fun _ k -> assert false |> k)
l_op
@> fun l_op ->
Backend.write_list (IntMap.find (pred tr_next_version) !transmap) (List.combine l_paths l_op)
@> fun tr ->
transmap := IntMap.add tr_next_version tr !transmap (* no continuation needed *)
| Prepare (D.Query tr_next_version as query) ->
Backend.Tr.prepare (IntMap.find (pred tr_next_version) !transmap)
@> fun (tr,success) ->
transmap := IntMap.add tr_next_version tr !transmap;
Prepare (D.Dialog_aux.respond query success) |> k
| Commit (D.Query tr_version as query) ->
Backend.Tr.commit (IntMap.find tr_version !transmap)
@> fun resp -> Commit (D.Dialog_aux.respond query resp) |> k
| Abort (D.Query tr_version as query) ->
Backend.Tr.abort (IntMap.find tr_version !transmap)
@> fun resp -> Abort (D.Dialog_aux.respond query resp) |> k
| Fork (D.Query tr_version as query) ->
let channel = N.dup channel transaction_channel_spec in
let transmap = ref (IntMap.filter_keys ((<=) tr_version) !transmap) in
N.setup_respond channel (transaction_callback transmap channel);
Fork (D.Dialog_aux.respond query channel) |> k
| Read (_, D.Response _) | WriteList (_, D.Response _) | Prepare (D.Response _)
| Commit (D.Response _) | Abort (D.Response _) | Fork (D.Response _) ->
assert false
let main_callback db (channel: database) :
D.Dialog.query database_query -> D.Dialog.response database_query Cps.t
=
fun request k ->
let init_tr backend_tr k =
let channel = N.dup channel transaction_channel_spec in
N.setup_respond channel (transaction_callback (ref (IntMap.singleton 0 backend_tr)) channel);
channel |> k
in
match request with
| Transaction (D.Query () as query) ->
Backend.Tr.start db
(fun _exc -> N.panic channel)
@> fun backend_tr -> init_tr backend_tr
@> fun tr -> Transaction (D.Dialog_aux.respond query tr) |> k
| Transaction_at (D.Query rev as query) ->
Backend.Tr.start_at_revision db rev
(fun _exc -> N.panic channel)
@> fun backend_tr -> init_tr backend_tr
@> fun tr -> Transaction_at (D.Dialog_aux.respond query tr) |> k
| Status (D.Query () as query) ->
Backend.status db
@> fun st ->
Status (D.Dialog_aux.respond query (Badop.Layer ("Server",st))) |> k
| Transaction (D.Response _) | Transaction_at (D.Response _) | Status (D.Response _) ->
assert false
let listener (db: Backend.database) :
database -> unit
=
fun channel -> N.setup_respond channel (main_callback db channel)
let start scheduler bindto options k =
N.listen scheduler bindto;
Backend.open_database options
@> fun db ->
N.accept scheduler bindto database_channel_spec (listener db);
{ backend = db; endpoint = bindto; scheduler = scheduler; } |> k
let stop { backend = db; endpoint = endpoint; scheduler = scheduler; } k =
N.refuse scheduler endpoint; (* todo: ensure to close all channels bound to local endpoint *)
Backend.close_database db
@> k
end