Skip to content

Commit

Permalink
Merge e6e680e into c88e3df
Browse files Browse the repository at this point in the history
  • Loading branch information
c-cube committed Nov 30, 2016
2 parents c88e3df + e6e680e commit 4095025
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 24 deletions.
2 changes: 1 addition & 1 deletion examples/example1.ml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ let lwt_main =
>>= fun connection -> Lwt_io.printl "Connected"
>>= fun () -> C.send_join ~connection ~channel
>>= fun () -> C.send_privmsg ~connection ~target:channel ~message
>>= fun () -> C.listen ~connection ~callback
>>= fun () -> C.listen ~connection ~callback ()
>>= fun () -> C.send_quit ~connection

let _ = Lwt_main.run lwt_main
Expand Down
29 changes: 15 additions & 14 deletions examples/example2.ml
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,21 @@ let callback connection result =
Lwt_io.printl e

let lwt_main =
Lwt_io.printl "Connecting..."
>>= fun () ->
C.connect_by_name ~server:!host ~port:!port ~nick:!nick ()
>>= function
| None -> Lwt_io.printl "could not find host"
| Some connection ->
Lwt_io.printl "Connected"
>>= fun () ->
let t = C.listen ~connection ~callback in
Lwt_io.printl "send join msg"
>>= fun () -> C.send_join ~connection ~channel:!channel
>>= fun () -> C.send_privmsg ~connection ~target:!channel ~message
>>= fun () -> t (* wait for completion of t *)
>>= fun () -> C.send_quit ~connection
C.set_log Lwt_io.printl;
C.reconnect_loop
~after:30
~connect:(fun () ->
Lwt_io.printl "Connecting..." >>= fun () ->
C.connect_by_name ~server:!host ~port:!port ~nick:!nick ()
)
~f:(fun connection ->
Lwt_io.printl "Connected" >>= fun () ->
Lwt_io.printl "send join msg" >>= fun () ->
C.send_join ~connection ~channel:!channel >>= fun () ->
C.send_privmsg ~connection ~target:!channel ~message
)
~callback
()

let options = Arg.align
[ "-host", Arg.Set_string host, " set remove server host name"
Expand Down
103 changes: 98 additions & 5 deletions lib/irc_client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@ module Make(Io: Irc_transport.IO) = struct
mutable terminated: bool;
}

(* logging *)

let log_ : (string -> unit Io.t) ref = ref (fun _ -> Io.return ())
let set_log f = log_ := f

let log s = !log_ (Printf.sprintf "[%.2f] %s" (Sys.time()) s)
let logf s = Printf.ksprintf log s

open Io

let rec really_write ~connection ~data ~offset ~length =
Expand Down Expand Up @@ -37,6 +45,9 @@ module Make(Io: Irc_transport.IO) = struct
let send_pass ~connection ~password =
send ~connection (M.pass password)

let send_ping ~connection ~message =
send ~connection (M.ping message)

let send_pong ~connection ~message =
send ~connection (M.pong message)

Expand Down Expand Up @@ -97,7 +108,6 @@ module Make(Io: Irc_transport.IO) = struct
send_pong ~connection ~message >>= fun () -> wait_for_welcome ~connection
| _ -> wait_for_welcome ~connection


let connect
?(username="irc-client") ?(mode=0) ?(realname="irc-client")
?password ~addr ~port ~nick () =
Expand All @@ -123,19 +133,102 @@ module Make(Io: Irc_transport.IO) = struct
connect ~addr ~port ~username ~mode ~realname ~nick ?password ()
>>= fun connection -> Io.return (Some connection))

let listen ~connection ~callback =
let rec listen' () =
(** Information on keeping the connection alive *)
type keepalive = {
mode: [`Active | `Passive];
timeout: int;
}

let default_keepalive: keepalive = {
mode=`Active;
timeout=60;
}

type listen_keepalive_state = {
mutable last_seen: float;
mutable last_active_ping: float;
mutable finish: bool;
}

let listen ?(keepalive=default_keepalive) ~connection ~callback () =
(* main loop *)
let rec listen_rec state =
next_line_ ~connection
>>= function
| None -> return ()
| Some line ->
begin match M.parse line with
| Result.Ok {M.command = M.PING message; _} ->
(* update "last_seen" field *)
let now = Sys.time() in
state.last_seen <- max now state.last_seen;
(* Handle pings without calling the callback. *)
log "reply pong to server" >>= fun () ->
send_pong ~connection ~message
| result -> callback connection result
end
>>= listen'
>>= fun () ->
if state.finish
then Io.return ()
else listen_rec state

(* main loop for timeout *)
and timeout_thread state =
let now = Sys.time() in
let time_til_timeout =
state.last_seen +. float keepalive.timeout -. now
and time_til_ping =
state.last_active_ping +. float keepalive.timeout -. now
in
if time_til_timeout < 0. then (
(* done *)
log "client timeout" >>= fun () ->
state.finish <- true;
(* try to wake up the listening thread, so it can die *)
really_write ~connection ~data:"\n" ~offset:0 ~length:1
) else (
(* send "ping" if active mode and it's been long enough *)
if keepalive.mode = `Active && time_til_ping < 0. then (
state.last_active_ping <- now;
log "send ping to server..." >>= fun () ->
send_ping ~connection ~message:"ping"
) else (
Io.return ()
)
>>= fun () ->
(* sleep until the due date, then check again *)
Io.sleep (int_of_float time_til_timeout + 1) >>= fun () ->
timeout_thread state
)
in
let state = {
last_seen = Sys.time();
last_active_ping = Sys.time();
finish = false;
} in
(* connect, serve, etc. *)
begin match Io.pick with
| Some pick ->
pick
[ listen_rec state;
timeout_thread state;
]
| _ ->
listen_rec state
end

let reconnect_loop ?keepalive ~after ~connect ~f ~callback () =
let rec aux () =
connect () >>= function
| None -> log "could not connect" >>= aux
| Some connection ->
let t = listen ?keepalive ~connection ~callback () in
f connection >>= fun () ->
t >>= fun () ->
log "connection terminated." >>= fun () ->
Io.sleep after >>= fun () ->
log "try to reconnect..." >>= fun () ->
aux ()
in
listen' ()
aux ()
end
44 changes: 40 additions & 4 deletions lib/irc_client.mli
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,50 @@ module Make : functor (Io: Irc_transport.IO) ->
{!connect}. Returns [None] if no IP could be found for the given
name. *)

val listen : connection:connection_t ->
(** Information on keeping the connection alive *)
type keepalive = {
mode: [`Active | `Passive];
timeout: int;
}

val default_keepalive : keepalive
(** Default value for keepalive: active mode with auto-reconnect *)

val listen :
?keepalive:keepalive ->
connection:connection_t ->
callback:(
connection_t ->
Irc_message.parse_result ->
unit Io.t
) ->
unit Io.t) ->
unit ->
unit Io.t
(** [listen connection callback] listens for incoming messages on
[connection]. All server pings are handled internally; all other
messages are passed, along with [connection], to [callback]. *)
messages are passed, along with [connection], to [callback].
@param keepalive the behavior on disconnection (if the transport
supports {!Irc_transport.IO.pick} and {!Irc_transport.IO.sleep}) *)

val reconnect_loop :
?keepalive:keepalive ->
after:int ->
connect:(unit -> connection_t option Io.t) ->
f:(connection_t -> unit Io.t) ->
callback:(
connection_t ->
Irc_message.parse_result ->
unit Io.t) ->
unit ->
unit Io.t
(** A combination of {!connect} and {!listen} that, every time
the connection is terminated, tries to start a new one
after [after] seconds.
@param after time before trying to reconnect
@param connect how to reconnect
(a closure over {!connect} or {!connect_by_name})
@param callback the callback for {!listen}
@param f the function to call after connection *)

val set_log : (string -> unit Io.t) -> unit
(** Set logging function *)
end
3 changes: 3 additions & 0 deletions lib/irc_transport.ml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,7 @@ module type IO = sig
val gethostbyname : string -> inet_addr list t

val iter : ('a -> unit t) -> 'a list -> unit t

val sleep : int -> unit t
val pick : ('a t list -> 'a t) option
end
8 changes: 8 additions & 0 deletions lib/irc_transport.mli
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,12 @@ module type IO = sig
list if none is found) *)

val iter : ('a -> unit t) -> 'a list -> unit t

val sleep : int -> unit t
(* [sleep t] sleeps for [t] seconds, then returns. *)

val pick : ('a t list -> 'a t) option
(** OPTIONAL
[pick l] returns the first thread of [l] that terminates (and might
cancel the others) *)
end
3 changes: 3 additions & 0 deletions lwt/irc_client_lwt.ml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ module Io = struct
)

let iter = Lwt_list.iter_s
let sleep d = Lwt_unix.sleep (float d)

let pick = Some Lwt.pick
end

include Irc_client.Make(Io)
3 changes: 3 additions & 0 deletions unix/irc_client_unix.ml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ module Io = struct
[]

let iter = List.iter
let sleep = Unix.sleep

let pick = None
end

include Irc_client.Make(Io)

0 comments on commit 4095025

Please sign in to comment.