Skip to content

Commit

Permalink
New package: websocketaf-lwt
Browse files Browse the repository at this point in the history
  • Loading branch information
andreas committed Jan 5, 2019
1 parent 65185b0 commit db0c615
Show file tree
Hide file tree
Showing 4 changed files with 320 additions and 0 deletions.
5 changes: 5 additions & 0 deletions lwt/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
(library
(name websocketaf_lwt)
(public_name websocketaf-lwt)
(libraries faraday-lwt-unix websocketaf lwt.unix digestif.ocaml base64)
(flags (:standard -safe-string)))
274 changes: 274 additions & 0 deletions lwt/websocketaf_lwt.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
open Lwt.Infix

let sha1 s =
s
|> Digestif.SHA1.digest_string
|> Digestif.SHA1.to_raw_string
|> B64.encode ~pad:true

module Buffer : sig
type t

val create : int -> t

val get : t -> f:(Lwt_bytes.t -> off:int -> len:int -> int) -> int
val put : t -> f:(Lwt_bytes.t -> off:int -> len:int -> int Lwt.t) -> int Lwt.t
end = struct
type t =
{ buffer : Lwt_bytes.t
; mutable off : int
; mutable len : int }

let create size =
let buffer = Lwt_bytes.create size in
{ buffer; off = 0; len = 0 }

let compress t =
if t.len = 0
then begin
t.off <- 0;
t.len <- 0;
end else if t.off > 0
then begin
Lwt_bytes.blit t.buffer t.off t.buffer 0 t.len;
t.off <- 0;
end

let get t ~f =
let n = f t.buffer ~off:t.off ~len:t.len in
t.off <- t.off + n;
t.len <- t.len - n;
if t.len = 0
then t.off <- 0;
n

let put t ~f =
compress t;
f t.buffer ~off:(t.off + t.len) ~len:(Lwt_bytes.length t.buffer - t.len)
>>= fun n ->
t.len <- t.len + n;
Lwt.return n
end


let read fd buffer =
Lwt.catch
(fun () ->
Buffer.put buffer ~f:(fun bigstring ~off ~len ->
Lwt_bytes.read fd bigstring off len))
(function
| Unix.Unix_error (Unix.EBADF, _, _) as exn ->
Lwt.fail exn
| exn ->
Lwt.async (fun () ->
Lwt_unix.close fd);
Lwt.fail exn)

>>= fun bytes_read ->
if bytes_read = 0 then
Lwt.return `Eof
else
Lwt.return (`Ok bytes_read)

let shutdown socket command =
try Lwt_unix.shutdown socket command
with Unix.Unix_error (Unix.ENOTCONN, _, _) -> ()



module Server = struct
let create_connection_handler ?config:_ ~websocket_handler ~error_handler:_ =
fun client_addr socket ->
let module Server_connection = Websocketaf.Server_connection in
let connection =
Server_connection.create
~sha1
~websocket_handler:(websocket_handler client_addr)
in


let read_buffer = Buffer.create 0x1000 in
let read_loop_exited, notify_read_loop_exited = Lwt.wait () in

let rec read_loop () =
let rec read_loop_step () =
match Server_connection.next_read_operation connection with
| `Read ->
read socket read_buffer >>= begin function
| `Eof ->
Buffer.get read_buffer ~f:(fun bigstring ~off ~len ->
Server_connection.read_eof connection bigstring ~off ~len)
|> ignore;
read_loop_step ()
| `Ok _ ->
Buffer.get read_buffer ~f:(fun bigstring ~off ~len ->
Server_connection.read connection bigstring ~off ~len)
|> ignore;
read_loop_step ()
end

| `Yield ->
Server_connection.yield_reader connection read_loop;
Lwt.return_unit

| `Close ->
Lwt.wakeup_later notify_read_loop_exited ();
if not (Lwt_unix.state socket = Lwt_unix.Closed) then begin
shutdown socket Unix.SHUTDOWN_RECEIVE
end;
Lwt.return_unit
in

Lwt.async (fun () ->
Lwt.catch
read_loop_step
(fun exn ->
(* XXX(andreas): missing error reporting *)
(* Server_connection.report_exn connection exn;*)
Printexc.print_backtrace stdout;
ignore(raise exn);
Lwt.return_unit))
in


let writev = Faraday_lwt_unix.writev_of_fd socket in
let write_loop_exited, notify_write_loop_exited = Lwt.wait () in

let rec write_loop () =
let rec write_loop_step () =
match Server_connection.next_write_operation connection with
| `Write io_vectors ->
writev io_vectors >>= fun result ->
Server_connection.report_write_result connection result;
write_loop_step ()

| `Yield ->
Server_connection.yield_writer connection write_loop;
Lwt.return_unit

| `Close _ ->
Lwt.wakeup_later notify_write_loop_exited ();
if not (Lwt_unix.state socket = Lwt_unix.Closed) then begin
shutdown socket Unix.SHUTDOWN_SEND
end;
Lwt.return_unit
in

Lwt.async (fun () ->
Lwt.catch
write_loop_step
(fun exn ->
(* XXX(andreas): missing error reporting *)
(*Server_connection.report_exn connection exn;*)
Printexc.print_backtrace stdout;
ignore(raise exn);
Lwt.return_unit))
in


read_loop ();
write_loop ();
Lwt.join [read_loop_exited; write_loop_exited] >>= fun () ->

if Lwt_unix.state socket <> Lwt_unix.Closed then
Lwt.catch
(fun () -> Lwt_unix.close socket)
(fun _exn -> Lwt.return_unit)
else
Lwt.return_unit
end



module Client = struct
let connect socket ~nonce ~host ~port ~resource ~error_handler ~websocket_handler =
let module Client_connection = Websocketaf.Client_connection in
let connection =
Client_connection.create ~nonce ~host ~port ~resource ~sha1 ~error_handler ~websocket_handler in

let read_buffer = Buffer.create 0x1000 in
let read_loop_exited, notify_read_loop_exited = Lwt.wait () in

let read_loop () =
let rec read_loop_step () =
match Client_connection.next_read_operation connection with
| `Read ->
read socket read_buffer >>= begin function
| `Ok _ ->
Buffer.get read_buffer ~f:(fun bigstring ~off ~len ->
Client_connection.read connection bigstring ~off ~len
)
|> ignore;
read_loop_step ()
| `Eof ->
Buffer.get read_buffer ~f:(fun bigstring ~off ~len ->
Client_connection.read_eof connection bigstring ~off ~len)
|> ignore;
read_loop_step ()
end

| `Close ->
Lwt.wakeup_later notify_read_loop_exited ();
if not (Lwt_unix.state socket = Lwt_unix.Closed) then begin
shutdown socket Unix.SHUTDOWN_RECEIVE
end;
Lwt.return_unit
in

Lwt.async (fun () ->
Lwt.catch
read_loop_step
(fun exn ->
(*Client_connection.report_exn connection exn;*)
Printexc.print_backtrace stdout;
ignore(raise exn);
Lwt.return_unit))
in


let writev = Faraday_lwt_unix.writev_of_fd socket in
let write_loop_exited, notify_write_loop_exited = Lwt.wait () in

let rec write_loop () =
let rec write_loop_step () =
flush stdout;
match Client_connection.next_write_operation connection with
| `Write io_vectors ->
writev io_vectors >>= fun result ->
Client_connection.report_write_result connection result;
write_loop_step ()

| `Yield ->
Client_connection.yield_writer connection write_loop;
Lwt.return_unit

| `Close _ ->
Lwt.wakeup_later notify_write_loop_exited ();
if not (Lwt_unix.state socket = Lwt_unix.Closed) then begin
shutdown socket Unix.SHUTDOWN_SEND
end;
Lwt.return_unit
in

Lwt.async (fun () ->
Lwt.catch
write_loop_step
(fun exn ->
(*Client_connection.report_exn connection exn;*)
ignore(raise exn);
Lwt.return_unit))
in


read_loop ();
write_loop ();

Lwt.join [read_loop_exited; write_loop_exited] >>= fun () ->

if Lwt_unix.state socket <> Lwt_unix.Closed then
Lwt.catch
(fun () -> Lwt_unix.close socket)
(fun _exn -> Lwt.return_unit)
else
Lwt.return_unit;
end
19 changes: 19 additions & 0 deletions lwt/websocketaf_lwt.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
module Client : sig
val connect
: Lwt_unix.file_descr
-> nonce : string
-> host : string
-> port : int
-> resource : string
-> error_handler : (Websocketaf.Client_connection.error -> unit)
-> websocket_handler : (Websocketaf.Wsd.t -> Websocketaf.Client_connection.input_handlers)
-> unit Lwt.t
end

module Server : sig
val create_connection_handler
: ?config : Httpaf.Server_connection.Config.t
-> websocket_handler : (Unix.sockaddr -> Websocketaf.Wsd.t -> Websocketaf.Server_connection.input_handlers)
-> error_handler : (Unix.sockaddr -> Httpaf.Server_connection.error_handler)
-> (Unix.sockaddr -> Lwt_unix.file_descr -> unit Lwt.t)
end
22 changes: 22 additions & 0 deletions websocketaf-lwt.opam
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
opam-version: "2.0"
name: "websocketaf-lwt"
maintainer: "Spiros Eliopoulos <spiros@inhabitedtype.com>"
authors: [ "Andreas Garnæs <andreas.garnaes@gmail.com>" ]
license: "BSD-3-clause"
homepage: "https://github.com/inhabitedtype/websocketaf"
bug-reports: "https://github.com/inhabitedtype/websocketaf/issues"
dev-repo: "git+https://github.com/inhabitedtype/websocketaf.git"
build: [
["dune" "subst" "-p" name] {pinned}
["dune" "build" "-p" name "-j" jobs]
]
depends: [
"ocaml" {>= "4.03.0"}
"faraday-lwt-unix"
"websocketaf"
"dune" {build}
"lwt"
"digestif"
"base64"
]
synopsis: "Lwt support for websocket/af"

0 comments on commit db0c615

Please sign in to comment.