/
mq.ml
106 lines (82 loc) · 3.84 KB
/
mq.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
(* Copyright (c) 2009 Mauricio Fernández <mfp@acm.org> *)
(** Message queue (MQ): message and error type definitions, module types. *)
(** Type of received messages. *)
type received_msg = {
msg_id : string;
msg_destination : string;
msg_headers : (string * string) list;
msg_body : string
}
(** {3 Errors } *)
(** Suggested error recovery strategy. *)
type restartable = Retry | Reconnect | Abort
(* Type of connection error. *)
type connection_error = Access_refused | Connection_refused | Closed
type message_queue_error =
Connection_error of connection_error
| Protocol_error of (string * (string * string) list * string)
(* Exception raised by MQ client operations. *)
exception Message_queue_error of restartable * string * message_queue_error
(** {3 Module types} *)
(** Base MQ module. *)
module type BASE =
sig
type 'a thread
type connection
type transaction
val transaction_begin : connection -> transaction thread
val transaction_commit : connection -> transaction -> unit thread
val transaction_commit_all : connection -> unit thread
val transaction_abort_all : connection -> unit thread
val transaction_abort : connection -> transaction -> unit thread
val receive_msg : connection -> received_msg thread
(** Acknowledge the reception of a message. *)
val ack_msg : connection -> ?transaction:transaction -> received_msg -> unit thread
(** Acknowledge the reception of a message using its message-id. *)
val ack : connection -> ?transaction:transaction -> string -> unit thread
val disconnect : connection -> unit thread
end
(** Generic, low-level MQ module, accepting custom headers for the operations
* not included in {!BASE}. *)
module type GENERIC =
sig
include BASE
val connect : ?login:string -> ?passcode:string -> ?eof_nl:bool ->
?headers:(string * string) list -> Unix.sockaddr -> connection thread
val send : connection -> ?transaction:transaction -> ?persistent:bool ->
destination:string -> ?headers:(string * string) list -> string -> unit thread
val send_no_ack : connection -> ?transaction:transaction -> ?persistent:bool ->
destination:string -> ?headers:(string * string) list -> string -> unit thread
val subscribe : connection -> ?headers:(string * string) list -> string -> unit thread
val unsubscribe : connection -> ?headers:(string * string) list -> string -> unit thread
end
(** Higher-level message queue, with queue and topic abstractions. *)
module type HIGH_LEVEL =
sig
include BASE
val connect :
?prefetch:int -> login:string -> passcode:string -> Unix.sockaddr ->
connection thread
(** Send and wait for ACK. *)
val send : connection -> ?transaction:transaction ->
?ack_timeout:float -> destination:string -> string -> unit thread
(** Send without waiting for confirmation. *)
val send_no_ack : connection -> ?transaction:transaction ->
?ack_timeout:float -> destination:string -> string -> unit thread
(** Send to a topic and wait for ACK *)
val topic_send : connection -> ?transaction:transaction ->
destination:string -> string -> unit thread
(** Send to a topic without waiting for confirmation. *)
val topic_send_no_ack : connection -> ?transaction:transaction ->
destination:string -> string -> unit thread
(** [create queue conn name] creates a persistent queue named [name].
* Messages sent to will persist. *)
val create_queue : connection -> string -> unit thread
val subscribe_queue : connection -> ?auto_delete:bool -> string -> unit thread
val unsubscribe_queue : connection -> string -> unit thread
val subscribe_topic : connection -> string -> unit thread
val unsubscribe_topic : connection -> string -> unit thread
val queue_size : connection -> string -> Int64.t option thread
val queue_subscribers : connection -> string -> int option thread
val topic_subscribers : connection -> string -> int option thread
end