Permalink
Browse files

Add Lwt_bounded_queue implementation, needed by mirage interface

  • Loading branch information...
David Scott
David Scott committed Sep 20, 2012
1 parent 5aaa7c1 commit d1f6654e75f07cc97ca14a3da8798ab54aaf6ce4
Showing with 123 additions and 18 deletions.
  1. +1 −1 Makefile
  2. +1 −1 _oasis
  3. +63 −0 mirage/lwt_bounded_stream.ml
  4. +33 −0 mirage/lwt_bounded_stream.mli
  5. +18 −9 mirage/pcap_mirage.ml
  6. +7 −7 myocamlbuild.ml
View
@@ -5,7 +5,7 @@ J=4
UNIX ?= $(shell if ocamlfind query lwt.unix >/dev/null 2>&1; then echo --enable-unix; fi)
MIRAGE ?= $(shell if ocamlfind query mirage-net >/dev/null 2>&1; then echo --enable-mirage; fi)
-TESTS ?= $(shell if ocamlfind query oUnit >/dev/null 2>&1; then echo --enable-tests; fi)
+TESTS ?= $(shell if ocamlfind query oUnit >/dev/null && ocamlfind query lwt.unix >/dev/null 2>&1; then echo --enable-tests; fi)
setup.ml: _oasis
oasis setup
View
2 _oasis
@@ -28,7 +28,7 @@ Library mirage
Path: mirage
Findlibname: mirage
Findlibparent: pcap
- Modules: Pcap_mirage
+ Modules: Pcap_mirage, Lwt_bounded_stream
BuildDepends: mirage-net, pcap, lwt, lwt.syntax
Executable print
@@ -0,0 +1,63 @@
+(*
+ * Copyright (c) 2012 Citrix Systems
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ *)
+
+open Lwt
+
+type 'a t = {
+ stream: 'a Lwt_stream.t;
+ max_elements: int ref;
+ nr_elements: int ref;
+ nr_dropped: int ref;
+}
+
+let create max_elements =
+ let stream, stream_push = Lwt_stream.create () in
+ let t = {
+ stream = stream;
+ max_elements = ref max_elements;
+ nr_elements = ref 0;
+ nr_dropped = ref 0;
+ } in
+ let push = function
+ | None -> stream_push None
+ | Some x ->
+ if !(t.nr_elements) > !(t.max_elements)
+ then begin
+ incr t.nr_dropped;
+ end else begin
+ stream_push (Some x);
+ incr t.nr_elements
+ end in
+ t, push
+
+let get_available t =
+ let all = Lwt_stream.get_available t.stream in
+ t.nr_elements := !(t.nr_elements) - (List.length all);
+ all
+
+let nget n t =
+ lwt all = Lwt_stream.nget n t.stream in
+ t.nr_elements := !(t.nr_elements) - (List.length all);
+ return all
+
+let set_max_elements max_elements t =
+ t.max_elements := max_elements;
+ (* drop elements if we have too many *)
+ let excess_elements = max 0 (!(t.nr_elements) - max_elements) in
+ if excess_elements > 0 then begin
+ let (_: 'a list) = Lwt_stream.get_available_up_to excess_elements t.stream in
+ t.nr_elements := max_elements
+ end
@@ -0,0 +1,33 @@
+(*
+ * Copyright (c) 2012 Citrix Systems
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ *)
+
+type 'a t
+(** Similar to Lwt_stream.bounded_push except threads never block in push() *)
+
+val create: int -> 'a t * ('a option -> unit)
+(** [create max_elements] creates a stream which can contain at most
+ [max_elements] *)
+
+val get_available: 'a t -> 'a list
+(** [get_available t] returns all available elements from [t] without blocking *)
+
+val nget: int -> 'a t -> 'a list Lwt.t
+(** [nget n t] returns [n] elements from [t] *)
+
+val set_max_elements: int -> 'a t -> unit
+(** [set_max_elements n t] resets the maximum number of elements stored within
+ [t] to [n]. If more than [n] elements are stored then the surplus elements
+ will be immediately dropped. *)
View
@@ -16,7 +16,6 @@
open Lwt
open OS
-(*open Net.Ethif *)
open Pcap
open Pcap.LE (* write in little-endian format *)
@@ -28,7 +27,9 @@ let capture_limit = 64
type fd = Cstruct.buf list option Lwt_mvar.t
-let open_blkif blkif : fd =
+let write fd bufs = Lwt_mvar.put fd (Some bufs)
+
+let open_device blkif : fd =
let m : fd = Lwt_mvar.create_empty () in
let page_offset = ref 0L in
let buf_offset = ref 0 in
@@ -67,7 +68,19 @@ let open_blkif blkif : fd =
done in
m
-let capture input fd =
+let start_capture (input: Net.Ethif.t) fd =
+ let stream, push = Lwt_bounded_stream.create capture_limit in
+
+ Net.Ethif.set_promiscuous input (function
+ | Net.Ethif.Input buf ->
+ push (Some (OS.Clock.time (), [ buf ]));
+ (* since this was an input frame, we want to process it as normal *)
+ Net.Ethif.default_process input buf
+ | Net.Ethif.Output bufs ->
+ push (Some (OS.Clock.time (), bufs));
+ return ()
+ );
+
let buf = OS.Io_page.get () in
set_pcap_header_magic_number buf magic_number;
set_pcap_header_version_major buf major_version;
@@ -76,12 +89,8 @@ let capture input fd =
set_pcap_header_sigfigs buf 0l;
set_pcap_header_snaplen buf 4096l;
set_pcap_header_network buf (Network.(to_int32 Ethernet));
- lwt () = Lwt_mvar.put fd (Some [Cstruct.sub buf 0 sizeof_pcap_header] ) in
-
- set_capture_limit capture_limit input;
- OS.Console.log (Printf.sprintf "pcap: set capture limit to %d" capture_limit);
+ lwt () = write fd [Cstruct.sub buf 0 sizeof_pcap_header] in
- let stream = get_captured_packets input in
try_lwt
while_lwt true do
lwt packets = Lwt_bounded_stream.nget 1 stream in
@@ -93,7 +102,7 @@ let capture input fd =
set_pcap_packet_ts_usec buf (Int32.rem (Int32.of_float ( time *. 1000000.)) 1000000l);
set_pcap_packet_incl_len buf (Int32.of_int len);
set_pcap_packet_orig_len buf (Int32.of_int len);
- Lwt_mvar.put fd (Some (Cstruct.sub buf 0 sizeof_pcap_packet :: frags))
+ write fd (Cstruct.sub buf 0 sizeof_pcap_packet :: frags)
) packets
done
with Lwt_stream.Closed ->
View
@@ -1,7 +1,7 @@
(* OASIS_START *)
-(* DO NOT EDIT (digest: 5b1cec25a6d4341f2d4dc4b90846fd71) *)
+(* DO NOT EDIT (digest: 3943cf9142ac2a7be0828ddd196035b2) *)
module OASISGettext = struct
-# 21 "/Users/avsm/.opam/3.12.1+mirage-unix-direct/build/oasis.0.3.0/src/oasis/OASISGettext.ml"
+# 21 "/home/djs/.opam/3.12.1/build/oasis.0.3.0/src/oasis/OASISGettext.ml"
let ns_ str =
str
@@ -24,7 +24,7 @@ module OASISGettext = struct
end
module OASISExpr = struct
-# 21 "/Users/avsm/.opam/3.12.1+mirage-unix-direct/build/oasis.0.3.0/src/oasis/OASISExpr.ml"
+# 21 "/home/djs/.opam/3.12.1/build/oasis.0.3.0/src/oasis/OASISExpr.ml"
@@ -116,7 +116,7 @@ end
# 117 "myocamlbuild.ml"
module BaseEnvLight = struct
-# 21 "/Users/avsm/.opam/3.12.1+mirage-unix-direct/build/oasis.0.3.0/src/base/BaseEnvLight.ml"
+# 21 "/home/djs/.opam/3.12.1/build/oasis.0.3.0/src/base/BaseEnvLight.ml"
module MapString = Map.Make(String)
@@ -214,7 +214,7 @@ end
# 215 "myocamlbuild.ml"
module MyOCamlbuildFindlib = struct
-# 21 "/Users/avsm/.opam/3.12.1+mirage-unix-direct/build/oasis.0.3.0/src/plugins/ocamlbuild/MyOCamlbuildFindlib.ml"
+# 21 "/home/djs/.opam/3.12.1/build/oasis.0.3.0/src/plugins/ocamlbuild/MyOCamlbuildFindlib.ml"
(** OCamlbuild extension, copied from
* http://brion.inria.fr/gallium/index.php/Using_ocamlfind_with_ocamlbuild
@@ -323,7 +323,7 @@ module MyOCamlbuildFindlib = struct
end
module MyOCamlbuildBase = struct
-# 21 "/Users/avsm/.opam/3.12.1+mirage-unix-direct/build/oasis.0.3.0/src/plugins/ocamlbuild/MyOCamlbuildBase.ml"
+# 21 "/home/djs/.opam/3.12.1/build/oasis.0.3.0/src/plugins/ocamlbuild/MyOCamlbuildBase.ml"
(** Base functions for writing myocamlbuild.ml
@author Sylvain Le Gall
@@ -339,7 +339,7 @@ module MyOCamlbuildBase = struct
type name = string
type tag = string
-# 56 "/Users/avsm/.opam/3.12.1+mirage-unix-direct/build/oasis.0.3.0/src/plugins/ocamlbuild/MyOCamlbuildBase.ml"
+# 56 "/home/djs/.opam/3.12.1/build/oasis.0.3.0/src/plugins/ocamlbuild/MyOCamlbuildBase.ml"
type t =
{

0 comments on commit d1f6654

Please sign in to comment.