Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Initial version of F# async extensions and samples

  • Loading branch information...
commit 05dbe9ab4f19e7f0506a908d53cd7942b31fe375 1 parent da2b1f5
@tpetricek authored
View
4 .gitignore
@@ -0,0 +1,4 @@
+*.dll
+*.pdb
+FSharp.ASyncExtensions.xml
+bin/release
View
114 README.markdown
@@ -0,0 +1,114 @@
+F# Async Extensions
+===================
+
+This library implements various extensions for asynchronous programming
+using F# asynchronous workflows and F# agents (the `MailboxProcessor` type
+in the standard F# library). It defines _asynchronous sequences_ that represent
+asynchronous operations returning multiple values (such as reading data from
+a stream in chunks), several reusable F# agents and numerous extensions.
+
+ * Samples that demonstrate how to use most of the extensions can
+ be found in the [samples directory][7]
+
+Asynchronous sequences
+----------------------
+
+Asynchronous sequences can be used to work with asynchronous computations that return
+multiple results. A value of type `AsyncSeq<'T>` can be started (just like an asynchronous
+workflow) and it eventually returns. The result is either a special value representing
+the end of the sequence or a value of type `'T` (head) together with the rest of the
+asynchronous sequence (tail) of type `AsyncSeq<'T>`.
+
+Unlike `IObservable<'T>`, asynchronous sequences are not _push-based_. The code that
+generates the next value of the asynchronous sequence starts only after previous elements
+have been processed. This makes it possible to easily write computations that return
+results as long as some component is using them.
+
+However, `IObservable<'T>` values can
+be converted to asynchronous sequences. The `AsyncSeq.ofObservable` combinator creates an
+asynchronous sequence that discards values produced by the observable while the
+asynchronous sequence was blocked. The `AsyncSeq.ofObservableBuffered` combinator stores
+all produced values in an unbounded buffer and returns the values from the buffer as soon
+as the user of asynchronous sequence requestst the next element.
+
+The library defines an F# computation expression for workfing with asynchronous sequences.
+For example, sequence that emits numbers in 1 second intervals can be defined as follows:
+
+ let rec numbers n = asyncSeq {
+ yield n
+ do! Async.Sleep(1000)
+ yield! numbers (n + 1) }
+
+Asynchronous workflows and asynchronous sequences can use the `for` construct to iterate
+over all elements of an asynchronous sequence. For example:
+
+ let rec evenNumbers = asyncSeq {
+ for n in numbers 0 do
+ if n%2=0 then yield n }
+
+The library also provides numerous combinators (similar to functions from the `Seq` module).
+The result of operations that aggregate values of an asynchronous sequence is an asynchronous
+workflow that returns a single value:
+
+ let rec sumTenEvenSquares =
+ numbers 0
+ |> AsyncSeq.filter (fun n -> n%2 = 0)
+ |> AsyncSeq.map (fun n -> n*n)
+ |> AsyncSeq.fold (+) 0
+
+ let n =
+ sumTenEvenSquares
+ |> Async.RunSynchronously
+
+For some examples that use (earlier versions) of asynchronous sequences, see also the following
+two F# snippets: [first][5] and [second][6].
+
+Reusable agents
+---------------
+
+The library implements several reusable agents for building concurrent applications:
+
+ * **Agent** is a simple type aliast for `MailboxProcessor` that is more convenient to use
+
+ * **AutoCancelAgent** wraps the standard F# agent and adds support for stopping of the
+ agent's body using the `IDisposable` interface (the type automatically creates a
+ cancellation token, uses it to start the underlying agent and cancels it when the agent
+ is disposed). For example, [see this F# snippet][1].
+
+ * **BatchProcessingAgent** can be used to implement batch processing. It creates groups of
+ messages (added using the `Enqueue` method) and emits them using the `BatchProduced`
+ event. A group is produced when it reaches the maximal size or after the timeout elapses.
+
+ * **BlockingQueueAgent** implements an asynchronous queue with blocking put and blocking
+ get operations. It can be used to implement the _producer-consumer_ concurrent pattern.
+ The constructor of the agent takes the maximal size of the buffer.
+
+
+Observable extensions
+---------------------
+
+The library implements extensions for using `IObservable<'T>` type from F# asynchronous
+workflows. An overloaded extension method `Async.AwaitObservable` can be used to wait
+for an occurrence of an event (or other observable action):
+
+ let counter n = async {
+ printfn "Counting: %d" n
+ let! _ = Async.AwaitEvent(form.MouseDown)
+ return! counter (n + 1) }
+
+Overloaded version of the method allows waiting for the first of multiple events. The
+method asynchronously returns `Choice<'T1, 'T2>` value that can be used to determine
+which of the events has occurred.
+
+For examples using this method see Chapter 16 of [Real World Functional Programming][2]
+(some examples are available in a [free excerpt from the chapter][3]). The
+`Async.AwaitObservable` method should be used instead of `Async.AwaitEvent` to avoid
+memory leaks (see also related [StackOverflow discussion][4])
+
+ [1]: http://fssnip.net/64
+ [2]: http://manning.com/petricek
+ [3]: http://dotnetslackers.com/articles/net/Programming-user-interfaces-using-f-sharp-workflows.aspx
+ [4]: http://stackoverflow.com/questions/3701861/wait-for-any-event-of-multiple-events-simultaneously-in-f
+ [5]: http://fssnip.net/1k
+ [6]: http://fssnip.net/1Y
+ [7]: https://github.com/tpetricek/FSharp.AsyncExtensions
View
BIN  bin/HtmlAgilityPack.dll
Binary file not shown
View
54 samples/AsyncSeqObservable.fsx
@@ -0,0 +1,54 @@
+// ----------------------------------------------------------------------------
+// F# async extensions (AsyncSeqObservable.fsx)
+// (c) Tomas Petricek, 2011, Available under Apache 2.0 license.
+// ----------------------------------------------------------------------------
+
+// This example demonstrates how to convert IObservable<'T> to AsyncSeq<'T>
+
+#r "..\\bin\\FSharp.AsyncExtensions.dll"
+open FSharp.Control
+open System.Windows.Forms
+open System.Threading
+
+// Create simple winforms user interface with a button and multiline text box
+let frm = new Form(Visible=true, TopMost=true, Width=440)
+let btn = new Button(Left=10, Top=10, Width=150, Text="Async Operation")
+let out = new TextBox(Left=10, Top=40, Width=400, Height=200, Multiline=true)
+frm.Controls.Add(btn)
+frm.Controls.Add(out)
+
+// Prints message to the displayed text box
+let wprint fmt =
+ Printf.kprintf (fun s -> out.Text <- out.Text + s) fmt
+
+
+// The sample demonstrates two ways of converting IObservable<_> values to
+// asynchronous sequences. When using 'AsyncSeq.ofObservable', values that are
+// emitted when the asynchronous sequence is blocked are discarded. When you
+// click on the 'Async Operation' button, the following workflow starts
+// processing and drops all clicks until the body of the for loop completes
+let discarding =
+ async {
+ for click in btn.Click |> AsyncSeq.ofObservable do
+ wprint "Sleeping (and discarding clicks)...\r\n"
+ do! Async.Sleep(1000)
+ wprint "Done (listening again)\r\n" }
+
+let ctsd = new CancellationTokenSource()
+Async.Start(discarding, ctsd.Token)
+ctsd.Cancel()
+
+
+// When using 'AsyncSeq.ofObservableBuffered', the values emitted by the
+// observable while the asynchronous sequence is blocked are stored in a
+// buffer (and will be returned as next elements).
+let buffering =
+ async {
+ for click in btn.Click |> AsyncSeq.ofObservableBuffered do
+ wprint "Sleeping (and buffering clicks)...\r\n"
+ do! Async.Sleep(1000)
+ wprint "Done (ready for next value)\r\n" }
+
+let ctsb = new CancellationTokenSource()
+Async.Start(buffering, ctsb.Token)
+ctsb.Cancel()
View
39 samples/AutoCancel.fsx
@@ -0,0 +1,39 @@
+// ----------------------------------------------------------------------------
+// F# async extensions (AutoCancel.fsx)
+// (c) Tomas Petricek, 2011, Available under Apache 2.0 license.
+// ----------------------------------------------------------------------------
+
+// This example demonstrates how to use 'AutoCancelAgent'
+// The agent automatically stops its body when disposed.
+
+#r "..\\bin\\FSharp.AsyncExtensions.dll"
+open FSharp.Control
+
+let op = async {
+ // Create a local agent that is disposed when the
+ // workflow completes (using the 'use' construct)
+ use agent = AutoCancelAgent.Start(fun agent -> async {
+ try
+ while true do
+ // Wait for a message - note that we use timeout
+ // to allow cancellation (when the operation completes)
+ let! msg = agent.Receive(1000)
+ match msg with
+ | (n, reply:AsyncReplyChannel<unit>) ->
+ // Print number and reply to the sender
+ printfn "%d" n
+ reply.Reply(())
+ | _ -> ()
+ finally
+ // Called when the agent is disposed
+ printfn "agent completed" })
+
+ // Do some processing using the agent...
+ for i in 0 .. 10 do
+ do! Async.Sleep(100)
+ do! agent.PostAndAsyncReply(fun r -> i, r)
+
+ do! Async.Sleep(100)
+ printfn "workflow completed" }
+
+Async.Start(op)
View
31 samples/BatchProcessing.fsx
@@ -0,0 +1,31 @@
+// ----------------------------------------------------------------------------
+// F# async extensions (BatchProcessing.fsx)
+// (c) Tomas Petricek, 2011, Available under Apache 2.0 license.
+// ----------------------------------------------------------------------------
+
+// This example demonstrates how to use 'BatchProcessingAgent'
+// The agent groups received messages in groups with a maximal
+// size and emits them with a maximal timeout.
+
+#r "..\\bin\\FSharp.AsyncExtensions.dll"
+open FSharp.Control
+
+open System.Drawing
+open System.Windows.Forms
+
+// Create simple winforms user interface with label
+let frm = new Form()
+let lbl = new Label(Font = new Font("Calibri", 20.0f), Dock = DockStyle.Fill)
+lbl.TextAlign <- ContentAlignment.MiddleCenter
+frm.Controls.Add(lbl)
+frm.Show()
+
+// Handle key press events but update the GUI after 5 keys
+// have been pressed or after 5 seconds (whichever happens first)
+let ag = new BatchProcessingAgent<_>(5, 5000)
+frm.KeyPress.Add(fun e -> ag.Enqueue(e.KeyChar))
+ag.BatchProduced
+ |> Event.map (fun chars -> new System.String(chars))
+ |> Event.scan (+) ""
+ |> Event.add (fun str -> lbl.Text <- str)
+
View
35 samples/BlockingQueue.fsx
@@ -0,0 +1,35 @@
+// ----------------------------------------------------------------------------
+// F# async extensions (BlockingQueue.fsx)
+// (c) Tomas Petricek, 2011, Available under Apache 2.0 license.
+// ----------------------------------------------------------------------------
+
+// This example demonstrates how to use 'BlockingAgent'
+// The agent implements producer/consumer concurrent pattern.
+
+#r "..\\bin\\FSharp.AsyncExtensions.dll"
+open FSharp.Control
+
+let buffer = new BlockingQueueAgent<int>(3)
+
+// The sample uses two workflows that add/take elements
+// from the buffer with the following timeouts. When the producer
+// timout is larger, consumer will be blocked. Otherwise, producer
+// will be blocked.
+let producerTimeout = 500
+let consumerTimeout = 1000
+
+async {
+ for i in 0 .. 10 do
+ // Sleep for some time and then add value
+ do! Async.Sleep(producerTimeout)
+ do! buffer.AsyncAdd(i)
+ printfn "Added %d" i }
+|> Async.Start
+
+async {
+ while true do
+ // Sleep for some time and then get value
+ do! Async.Sleep(consumerTimeout)
+ let! v = buffer.AsyncGet()
+ printfn "Got %d" v }
+|> Async.Start
View
33 samples/Caching.fsx
@@ -0,0 +1,33 @@
+// ----------------------------------------------------------------------------
+// F# async extensions (AutoCancel.fsx)
+// (c) Tomas Petricek, 2011, Available under Apache 2.0 license.
+// ----------------------------------------------------------------------------
+
+// This example demonstrates how to use 'Async.Cache' and 'AsyncSeq.cache'
+
+#r "..\\bin\\FSharp.AsyncExtensions.dll"
+open FSharp.Control
+
+// The Async.Cache combinator makes it possible to create asynchronous
+// workflow that caches the result and performs computation only once
+// when called multiple times.
+let op =
+ async { // Will be printed just once
+ printfn "Evaluating..."
+ return 42 }
+ |> Async.Cache
+
+Async.RunSynchronously(op)
+Async.RunSynchronously(op)
+
+
+// The AsyncSeq.cache combinator has similar effect - the asynchronous
+// sequence can be used multiple times, but it will evaluate just once
+let asq =
+ asyncSeq { for i in 0 .. 10 do
+ printfn "Generating %d..." i
+ yield i }
+ |> AsyncSeq.cache
+
+AsyncSeq.iter (printfn "Consuming %d") asq
+|> Async.RunSynchronously
View
200 samples/ChatServer.fsx
@@ -0,0 +1,200 @@
+#r "System.Xml.Linq.dll"
+//#load "Server.fs"
+
+//open System.IO
+//open System.Net
+//open System.Text
+//open System.Threading
+open System.Xml.Linq
+//open FSharp.Control
+
+type Agent<'T> = MailboxProcessor<'T>
+
+// ----------------------------------------------------------------------------
+
+module First =
+ let agent = Agent.Start(fun agent -> async {
+ while true do
+ let! msg = agent.Receive()
+ printfn "Hello %s!" msg })
+
+ agent.Post("Tomas")
+
+type ChatMessage =
+ | GetContent of AsyncReplyChannel<string>
+ | SendMessage of string
+
+module Second =
+ let agent = Agent<_>.Start(fun agent ->
+ let rec loop messages = async {
+
+ // Pick next message from the mailbox
+ let! msg = agent.Receive()
+ match msg with
+ | SendMessage msg ->
+ // Add message to the list & continue
+ let msg = XElement(XName.Get("li"), msg)
+ return! loop (msg :: messages)
+
+ | GetContent reply ->
+ // Generate HTML with messages
+ let html = XElement(XName.Get("ul"), messages)
+ // Send it back as the reply
+ reply.Reply(html.ToString())
+ return! loop messages }
+ loop [] )
+
+ agent.Post(SendMessage "Welcome to F# chat implemented using agents!")
+ agent.Post(SendMessage "This is my second message to this chat room...")
+
+ agent.PostAndReply(GetContent)
+
+// --------------------------------------------------------------------------------------
+
+type internal ChatMessage =
+ | GetContent of AsyncReplyChannel<string>
+ | SendMessage of string
+
+
+type ChatRoom() =
+ let agent = Agent.Start(fun agent ->
+ let rec loop messages = async {
+ // Pick next message from the mailbox
+ let! msg = agent.Receive()
+ match msg with
+ | SendMessage msg ->
+ // Add message to the list & continue
+ let msg = XElement(XName.Get("li"), msg)
+ return! loop (msg :: messages)
+
+ | GetContent reply ->
+ // Generate HTML with messages
+ let html = XElement(XName.Get("ul"), messages)
+ // Send it back as the reply
+ reply.Reply(html.ToString())
+ return! loop messages }
+ loop [] )
+ member x.SendMessage(msg) = agent.Post(SendMessage msg)
+ member x.AsyncGetContent(?timeout) = agent.PostAndAsyncReply(GetContent, ?timeout=timeout)
+ member x.GetContent() = agent.PostAndReply(GetContent)
+
+ member x.GetContentAsync() =
+ Async.StartAsTask(agent.PostAndAsyncReply(GetContent))
+
+ member x.GetContentAsync(cancellationToken) =
+ Async.StartAsTask
+ ( agent.PostAndAsyncReply(GetContent),
+ cancellationToken = cancellationToken )
+
+let room = new ChatRoom()
+
+room.SendMessage("Welcome to F# chat implemented using agents!")
+room.SendMessage("This is my second message to this chat room...")
+
+async {
+ while true do
+ do! Async.Sleep(10000)
+ let! html = room.AsyncGetContent()
+ printfn "%A" html }
+|> Async.Start
+
+// --------------------------------------------------------------------------------------
+
+open System.Net
+open System.Threading
+
+[<AutoOpen>]
+module HttpExtensions =
+
+ type System.Net.HttpListener with
+ member x.AsyncGetContext() =
+ Async.FromBeginEnd(x.BeginGetContext, x.EndGetContext)
+
+type HttpAgent private (url, f) as this =
+ let tokenSource = new CancellationTokenSource()
+ let agent = Agent.Start((fun _ -> f this), tokenSource.Token)
+ let server = async {
+ use listener = new HttpListener()
+ listener.Prefixes.Add(url)
+ listener.Start()
+ while true do
+ let! context = listener.AsyncGetContext()
+ agent.Post(context) }
+ do Async.Start(server, cancellationToken = tokenSource.Token)
+
+ member x.Receive(?timeout) = agent.Receive(?timeout = timeout)
+ member x.Stop() = tokenSource.Cancel()
+ static member Start(url, f) =
+ new HttpAgent(url, f)
+
+
+open System.IO
+open System.Text
+
+[<AutoOpen>]
+module HttpExtensions2 =
+
+ type System.Net.HttpListenerRequest with
+ member request.InputString =
+ use sr = new StreamReader(request.InputStream)
+ sr.ReadToEnd()
+
+ type System.Net.HttpListenerResponse with
+ member response.Reply(s:string) =
+ let buffer = Encoding.UTF8.GetBytes(s)
+ response.ContentLength64 <- int64 buffer.Length
+ let output = response.OutputStream
+ output.Write(buffer,0,buffer.Length)
+ output.Close()
+ member response.Reply(typ, buffer:byte[]) =
+ response.ContentLength64 <- int64 buffer.Length
+ let output = response.OutputStream
+ response.ContentType <- typ
+ output.Write(buffer,0,buffer.Length)
+ output.Close()
+
+// --------------------------------------------------------------------------------------
+
+open System.Threading
+
+let server = HttpAgent.Start("http://localhost:8082/", fun server -> async {
+ while true do
+ let! ctx = server.Receive()
+ ctx.Response.Reply("Hello!") })
+
+server.Stop()
+
+let root = @"C:\Tomas\Writing\MSDN\code\2 Server Side\Demo.ChatServer\"
+
+let contentTypes =
+ dict [ ".gif", "binary/image"
+ ".css", "text/css"
+ ".html", "text/html"
+ ".xap", "application/x-silverlight-app" ]
+
+let server = HttpAgent.Start("http://localhost:8082/", fun mbox ->
+ let handleRequest (ctx:HttpListenerContext) = async {
+ match ctx.Request.Url.LocalPath with
+ | "/post" ->
+ // Send message to the chat room
+ room.SendMessage(ctx.Request.InputString)
+ ctx.Response.Reply("OK")
+ | "/chat" ->
+ // Get messages from the chat room (asynchronously!)
+ let! text = room.AsyncGetContent()
+ ctx.Response.Reply(text)
+ | s ->
+ // Handle an ordinary file request
+ let file =
+ root + (if s = "/" then "chat.html" else s.ToLower())
+ if File.Exists(file) then
+ let typ = contentTypes.[Path.GetExtension(file)]
+ ctx.Response.Reply(typ, File.ReadAllBytes(file))
+ else
+ ctx.Response.Reply(sprintf "File not found: %s" file) }
+ async {
+ while true do
+ let! ctx = mbox.Receive()
+ ctx |> handleRequest |> Async.Start })
+
+server.Stop()
View
164 samples/Crawler.fsx
@@ -0,0 +1,164 @@
+// ----------------------------------------------------------------------------
+// F# async extensions (Crawler.fsx)
+// (c) Tomas Petricek, 2011, Available under Apache 2.0 license.
+// ----------------------------------------------------------------------------
+
+// This example demonstrates how to use asynchronous sequences and
+// blocking agents to implement a web crawler. The sample also uses
+// various AsyncSeq combinators to process the resulting async sequence.
+//
+// The first version performs single-threaded random walk (returned
+// as an asynchronous sequence) and the second version is concurrent.
+
+#r "..\\bin\\FSharp.AsyncExtensions.dll"
+#r "..\\bin\\HtmlAgilityPack.dll"
+
+open System
+open System.Net
+open System.Text.RegularExpressions
+open HtmlAgilityPack
+
+open FSharp.Control
+
+// ----------------------------------------------------------------------------
+// Helper functions for downloading documents, extracting links etc.
+
+/// Asynchronously download the document and parse the HTML
+let downloadDocument url = async {
+ try let wc = new WebClient()
+ let! html = wc.AsyncDownloadString(Uri(url))
+ let doc = new HtmlDocument()
+ doc.LoadHtml(html)
+ return Some doc
+ with _ -> return None }
+
+/// Extract all links from the document that start with "http://"
+let extractLinks (doc:HtmlDocument) =
+ try
+ [ for a in doc.DocumentNode.SelectNodes("//a") do
+ if a.Attributes.Contains("href") then
+ let href = a.Attributes.["href"].Value
+ if href.StartsWith("http://") then
+ let endl = href.IndexOf('?')
+ yield if endl > 0 then href.Substring(0, endl) else href ]
+ with _ -> []
+
+/// Extract the <title> of the web page
+let getTitle (doc:HtmlDocument) =
+ let title = doc.DocumentNode.SelectSingleNode("//title")
+ if title <> null then title.InnerText.Trim() else "Untitled"
+
+// ----------------------------------------------------------------------------
+// Basic crawling - crawl web pages and follow just one link from every page
+
+/// Crawl the internet starting from the specified page
+/// From each page follow the first not-yet-visited page
+let rec randomCrawl url =
+ let visited = new System.Collections.Generic.HashSet<_>()
+
+ // Visits page and then recursively visits all referenced pages
+ let rec loop url = asyncSeq {
+ if visited.Add(url) then
+ let! doc = downloadDocument url
+ match doc with
+ | Some doc ->
+ // Yield url and title as the next element
+ yield url, getTitle doc
+ // For every link, yield all referenced pages too
+ for link in extractLinks doc do
+ yield! loop link
+ | _ -> () }
+ loop url
+
+// Use AsyncSeq combinators to print the titles of the first 10
+// web sites that are from other domains than bing.com
+randomCrawl "http://news.bing.com"
+|> AsyncSeq.filter (fun (url, title) -> url.Contains("bing.com") |> not)
+|> AsyncSeq.map snd
+|> AsyncSeq.take 10
+|> AsyncSeq.iter (printfn "%s")
+|> Async.Start
+
+
+// ----------------------------------------------------------------------------
+// Better crawler - crawls the web concurrently using the specified number of
+// workers, stores results and pending URLS to blocking buffers and returns
+// all results as an asynchronous sequence. After caller stops taking elements
+// from the asynchronous sequence, the blocking buffers will eventually fill
+// up and crawling will stop.
+
+let concurrentWorkers = 20
+
+let rec concurrentCrawl url = asyncSeq {
+ // Number of pending requests is usually very high
+ // (when the queue fills up, the workers will stop, so set this to 10k)
+ let requests = BlockingQueueAgent<_>(10000)
+ let results = BlockingQueueAgent<_>(40)
+ let visited = ConcurrentSetAgent<_>()
+
+ /// Worker repeatedly takes URL from the queue and processes it
+ let worker() = async {
+ while true do
+ let! url = requests.AsyncGet()
+ let! doc = downloadDocument url
+ match doc with
+ | Some doc ->
+ // Yield url and title as the next element
+ do! results.AsyncAdd( (url, getTitle doc) )
+ // For every link, yield all referenced pages too
+ for link in extractLinks doc do
+ let! added = visited.AsyncAdd(link)
+ if added then
+ do! requests.AsyncAdd(link)
+ | _ -> () }
+
+ // Return an asynchronous sequence that sends intial request
+ // to the crawler, starts workers and then repeatedly takes
+ // results from the results queue.
+ do! requests.AsyncAdd(url)
+ for i in 0 .. concurrentWorkers do
+ worker () |> Async.Start
+ while true do
+ let! res = results.AsyncGet()
+ yield res }
+
+// ----------------------------------------------------------------------------
+// Visualize the results of crawling - show the most common words in titles
+
+// Create user interface with text box for displaying words
+open System.Windows.Forms
+let frm = new Form(TopMost=true, Visible=true, Width=400, Height=600)
+let txt = new TextBox( Multiline = true, Dock = DockStyle.Fill,
+ Font = new System.Drawing.Font("Cambria", 12.0f),
+ ScrollBars = ScrollBars.Vertical )
+frm.Controls.Add(txt)
+
+// Creates an asynchronous sequence that produces values of type
+// Map<string, int> representing words together with their count
+// (new version returned after every processing step)
+let tables =
+ concurrentCrawl "http://news.bing.com"
+ // Split title into lowercase words
+ |> AsyncSeq.map (fun (_, title) ->
+ title.Split( [|' '; '.'; '-'; '|'; ','; ';' |],
+ StringSplitOptions.RemoveEmptyEntries )
+ |> Array.map (fun s -> s.ToLower()) )
+ // Create sequence that aggregates words and returns immediate results
+ |> AsyncSeq.scan (fun table words ->
+ words |> Seq.fold (fun table word ->
+ match Map.tryFind word table with
+ | Some v -> Map.add word (v + 1) table
+ | _ -> Map.add word 1 table) table) Map.empty
+
+// Asynchronous workflow that iterates over the sequence
+// and displays the results in the textbox
+async {
+ let counter = ref 0
+ for table in tables |> AsyncSeq.take 200 do
+ frm.Text <- sprintf "Processed %d" (counter := !counter + 1; !counter)
+ txt.Text <-
+ table
+ |> Seq.sortBy (fun (KeyValue(k, v)) -> -v)
+ |> Seq.map (fun (KeyValue(k, v)) -> sprintf "%s (%d)" k v)
+ |> String.concat "\r\n" }
+|> Async.StartImmediate
View
143 samples/WebProxy.fsx
@@ -0,0 +1,143 @@
+// ----------------------------------------------------------------------------
+// F# async extensions (AutoCancel.fsx)
+// (c) Tomas Petricek, 2011, Available under Apache 2.0 license.
+// ----------------------------------------------------------------------------
+
+// This example demonstrates how to implement simple HTTP proxy
+
+#r "..\\bin\\FSharp.AsyncExtensions.dll"
+open FSharp.Control
+open FSharp.IO
+open FSharp.Net
+
+open System
+open System.Net
+open System.Threading
+
+let root = "http://msdn.microsoft.com"
+let proxy = "http://localhost:8082/"
+
+// ----------------------------------------------------------------------------
+// Simple web proxy implemented using 'HttpListener'. This version downloads
+// the entire web page as a string and then writes it to the response stream.
+
+let cts1 = new CancellationTokenSource()
+HttpListener.Start(proxy, (fun (req, resp) -> async {
+ // Download the web page
+ let url = root + req.Url.PathAndQuery
+ let wc = new WebClient()
+ let! html = wc.AsyncDownloadString(Uri(url))
+
+ // Replace URLs and send to the response stream
+ let html = html.Replace(root, proxy)
+ do! resp.AsyncReply(html) }), cancellationToken = cts1.Token)
+
+// Now go to: http://localhost:8082/en-us/fsharp
+cts1.Cancel()
+
+// ----------------------------------------------------------------------------
+// Better version of a proxy - this time, we read data from the input stream
+// in chunks and write them to the response stream as they arive.
+
+let cts2 = new CancellationTokenSource()
+HttpListener.Start(proxy, (fun (req, resp) -> async {
+ // Initialize the download
+ let url = root + req.Url.PathAndQuery
+ let targetReq = HttpWebRequest.Create(url)
+ use! targetResp = targetReq.AsyncGetResponse()
+ use stream = targetResp.GetResponseStream()
+
+ // Copy data until we read the entire input
+ let count = ref 1
+ let buffer = Array.zeroCreate 4096
+ while !count > 0 do
+ let! read = stream.AsyncRead(buffer, 0, buffer.Length)
+ do! resp.OutputStream.AsyncWrite(buffer, 0, read)
+ count := read
+ resp.Close() }), cancellationToken = cts2.Token)
+
+cts2.Cancel()
+
+// ----------------------------------------------------------------------------
+// Proxy that copies data in chunks can be easily implemented using
+// asynchronous sequences. We read all data as asynchronous sequence and
+// write them to the output (Even simpler version could use 'AsyncWriteSeq'
+// to write all input buffers to the output stream).
+
+let cts3 = new CancellationTokenSource()
+HttpListener.Start(proxy, (fun (req, resp) -> async {
+ // Initialize the download
+ let url = root + req.Url.PathAndQuery
+ let targetReq = HttpWebRequest.Create(url)
+ use! targetResp = targetReq.AsyncGetResponse()
+ use stream = targetResp.GetResponseStream()
+
+ // Iterate over chunks read as an asynchronous sequence
+ // and write them to the output stream
+ for buffer in stream.AsyncReadSeq(4096) do
+ do! resp.OutputStream.AsyncWrite(buffer, 0, buffer.Length)
+ resp.Close() }), cancellationToken = cts3.Token)
+
+cts3.Cancel()
+
+// ----------------------------------------------------------------------------
+// A more sophisticated version of proxy that caches web
+// pages using a simple agent.
+
+type CacheMessage =
+ | TryGet of string * AsyncReplyChannel<option<byte[]>>
+ | Add of string * byte[]
+
+// Creates an agent that handles 'CacheMessage' and implements the cache
+let cache = Agent.Start(fun agent -> async {
+ let pages = new System.Collections.Generic.Dictionary<_, _>()
+ while true do
+ let! msg = agent.Receive()
+ match msg with
+ | TryGet(url, repl) ->
+ // Try to get a value from the dictionary
+ match pages.TryGetValue(url) with
+ | true, data -> repl.Reply(Some(data))
+ | _ -> repl.Reply(None)
+ | Add(url, data) ->
+ // Add byte array to the cache
+ pages.[url] <- data })
+
+
+let cts4 = new CancellationTokenSource()
+HttpListener.Start(proxy, (fun (req, resp) -> async {
+ // Generate URL and check data from the cache
+ let url = root + req.Url.PathAndQuery
+ let! cached = cache.PostAndAsyncReply(fun repl -> TryGet(url, repl))
+ match cached with
+ | Some data ->
+ // Reply using data from the cache
+ do! resp.OutputStream.AsyncWrite(data)
+ resp.Close()
+ | None ->
+ // Initialize the download
+ let targetReq = HttpWebRequest.Create(url)
+ use! targetResp = targetReq.AsyncGetResponse()
+ use stream = targetResp.GetResponseStream()
+
+ // Create a cached asynchronous sequence
+ // (that reads the stream only once)
+ let cachedData = stream.AsyncReadSeq(4096) |> AsyncSeq.cache
+
+ // Start workflow that reads all data in memory (for caching)
+ let! allBytes =
+ cachedData
+ |> AsyncSeq.fold (fun st data -> data::st) []
+ |> Async.StartChild
+ // Write all data from the async sequence to the output
+ for buffer in cachedData do
+ do! resp.OutputStream.AsyncWrite(buffer, 0, buffer.Length)
+ resp.Close()
+
+ // Get all data accumulated in background and save
+ // them to the cache (for later use)
+ let! allData = allBytes
+ let data = allData |> List.rev |> Array.concat
+ cache.Post(Add(url, data)) }), cts4.Token)
+
+cts4.Cancel()
View
8 src/Agents/Agent.fs
@@ -0,0 +1,8 @@
+// ----------------------------------------------------------------------------
+// F# async extensions (Agent.fs)
+// (c) Tomas Petricek, 2011, Available under Apache 2.0 license.
+// ----------------------------------------------------------------------------
+namespace FSharp.Control
+
+/// Type alias for F# mailbox processor type
+type Agent<'T> = MailboxProcessor<'T>
View
59 src/Agents/AutoCancelAgent.fs
@@ -0,0 +1,59 @@
+// ----------------------------------------------------------------------------
+// F# async extensions (AutoCancelAgent.fs)
+// (c) Tomas Petricek, 2011, Available under Apache 2.0 license.
+// ----------------------------------------------------------------------------
+namespace FSharp.Control
+
+open System
+open System.Threading
+
+// ----------------------------------------------------------------------------
+// See also: http://fssnip.net/64
+
+/// Wrapper for the standard F# agent (MailboxProcessor) that
+/// supports stopping of the agent's body using the IDisposable
+/// interface (the type automatically creates a cancellation token)
+type AutoCancelAgent<'T> private (mbox:Agent<'T>, cts:CancellationTokenSource) =
+
+ /// Start a new disposable agent using the specified body function
+ /// (the method creates a new cancellation token for the agent)
+ static member Start(f) =
+ let cts = new CancellationTokenSource()
+ new AutoCancelAgent<'T>(Agent<'T>.Start(f, cancellationToken = cts.Token), cts)
+
+ /// Returns the number of unprocessed messages in the message queue of the agent.
+ member x.CurrentQueueLength = mbox.CurrentQueueLength
+ /// Occurs when the execution of the agent results in an exception.
+ [<CLIEvent>]
+ member x.Error = mbox.Error
+ /// Waits for a message. This will consume the first message in arrival order.
+ member x.Receive(?timeout) = mbox.Receive(?timeout = timeout)
+ /// Scans for a message by looking through messages in arrival order until <c>scanner</c>
+ /// returns a Some value. Other messages remain in the queue.
+ member x.Scan(scanner, ?timeout) = mbox.Scan(scanner, ?timeout = timeout)
+ /// Like PostAndReply, but returns None if no reply within the timeout period.
+ member x.TryPostAndReply(buildMessage, ?timeout) =
+ mbox.TryPostAndReply(buildMessage, ?timeout = timeout)
+ /// Waits for a message. This will consume the first message in arrival order.
+ member x.TryReceive(?timeout) =
+ mbox.TryReceive(?timeout = timeout)
+ /// Scans for a message by looking through messages in arrival order until <c>scanner</c>
+ /// returns a Some value. Other messages remain in the queue.
+ member x.TryScan(scanner, ?timeout) =
+ mbox.TryScan(scanner, ?timeout = timeout)
+ /// Posts a message to the message queue of the MailboxProcessor, asynchronously.
+ member x.Post(m) = mbox.Post(m)
+ /// Posts a message to an agent and await a reply on the channel, synchronously.
+ member x.PostAndReply(buildMessage, ?timeout) =
+ mbox.PostAndReply(buildMessage, ?timeout = timeout)
+ /// Like PostAndAsyncReply, but returns None if no reply within the timeout period.
+ member x.PostAndTryAsyncReply(buildMessage, ?timeout) =
+ mbox.PostAndTryAsyncReply(buildMessage, ?timeout = timeout)
+ /// Posts a message to an agent and await a reply on the channel, asynchronously.
+ member x.PostAndAsyncReply(buildMessage, ?timeout) =
+ mbox.PostAndAsyncReply(buildMessage, ?timeout=timeout)
+
+ interface IDisposable with
+ member x.Dispose() =
+ (mbox :> IDisposable).Dispose()
+ cts.Cancel()
View
45 src/Agents/BatchProcessingAgent.fs
@@ -0,0 +1,45 @@
+// ----------------------------------------------------------------------------
+// F# async extensions (BatchProcessingAgent.fs)
+// (c) Tomas Petricek, 2011, Available under Apache 2.0 license.
+// ----------------------------------------------------------------------------
+namespace FSharp.Control
+
+open System
+
+// ----------------------------------------------------------------------------
+
+/// Agent that can be used to implement batch processing. It creates groups
+/// of messages (added using the Enqueue method) and emits them using the
+/// BatchProduced event. A group is produced when it reaches the maximal
+/// size or after the timeout elapses.
+type BatchProcessingAgent<'T>(bulkSize, timeout) =
+
+ let bulkEvent = new Event<'T[]>()
+ let agent : Agent<'T> = Agent.Start(fun agent ->
+ let rec loop remainingTime messages = async {
+ let start = DateTime.Now
+ let! msg = agent.TryReceive(timeout = max 0 remainingTime)
+ let elapsed = int (DateTime.Now - start).TotalMilliseconds
+ match msg with
+ | Some(msg) when
+ List.length messages = bulkSize - 1 ->
+ bulkEvent.Trigger(msg :: messages |> List.rev |> Array.ofList)
+ return! loop timeout []
+ | Some(msg) ->
+ return! loop (remainingTime - elapsed) (msg::messages)
+ | None when List.length messages <> 0 ->
+ bulkEvent.Trigger(messages |> List.rev |> Array.ofList)
+ return! loop timeout []
+ | None ->
+ return! loop timeout [] }
+ loop timeout [] )
+
+ /// The event is triggered when a group of messages is collected. The
+ /// group is not empty, but may not be of the specified maximal size
+ /// (when the timeout elapses before enough messages is collected)
+ [<CLIEvent>]
+ member x.BatchProduced = bulkEvent.Publish
+
+ /// Sends new message to the agent
+ member x.Enqueue v = agent.Post(v)
+
View
78 src/Agents/BlockingQueueAgent.fs
@@ -0,0 +1,78 @@
+// ----------------------------------------------------------------------------
+// F# async extensions (BlockingQueueAgent.fs)
+// (c) Tomas Petricek, 2011, Available under Apache 2.0 license.
+// ----------------------------------------------------------------------------
+namespace FSharp.Control
+
+open System
+open System.Collections.Generic
+
+// ----------------------------------------------------------------------------
+
+type internal BlockingAgentMessage<'T> =
+ | Add of 'T * AsyncReplyChannel<unit>
+ | Get of AsyncReplyChannel<'T>
+
+/// Agent that implements an asynchronous queue with blocking put
+/// and blocking get operation (this implements the producer-consumer
+/// concurrent programming pattern). The constructor takes the maximal
+/// size of the buffer.
+type BlockingQueueAgent<'T>(maxLength) =
+ [<VolatileField>]
+ let mutable count = 0
+ let agent = Agent.Start(fun agent ->
+
+ let queue = new Queue<_>()
+
+ let rec emptyQueue() =
+ agent.Scan(fun msg ->
+ match msg with
+ | Add(value, reply) -> Some(enqueueAndContinue(value, reply))
+ | _ -> None )
+ and fullQueue() =
+ agent.Scan(fun msg ->
+ match msg with
+ | Get(reply) -> Some(dequeueAndContinue(reply))
+ | _ -> None )
+ and runningQueue() = async {
+ let! msg = agent.Receive()
+ match msg with
+ | Add(value, reply) -> return! enqueueAndContinue(value, reply)
+ | Get(reply) -> return! dequeueAndContinue(reply) }
+
+ and enqueueAndContinue (value, reply) = async {
+ reply.Reply()
+ queue.Enqueue(value)
+ count <- queue.Count
+ return! chooseState() }
+ and dequeueAndContinue (reply) = async {
+ reply.Reply(queue.Dequeue())
+ count <- queue.Count
+ return! chooseState() }
+ and chooseState() =
+ if queue.Count = 0 then emptyQueue()
+ elif queue.Count < maxLength then runningQueue()
+ else fullQueue()
+
+ // Start with an empty queue
+ emptyQueue() )
+
+ /// Asynchronously adds item to the queue. The operation ends when
+ /// there is a place for the item. If the queue is full, the operation
+ /// will block until some items are removed.
+ member x.AsyncAdd(v:'T, ?timeout) =
+ agent.PostAndAsyncReply((fun ch -> Add(v, ch)), ?timeout=timeout)
+
+ /// Asynchronously gets item from the queue. If there are no items
+ /// in the queue, the operation will block unitl items are added.
+ member x.AsyncGet(?timeout) =
+ agent.PostAndAsyncReply(Get, ?timeout=timeout)
+
+ /// Synchronously gets item from the queue. If there are no items
+ /// in the queue, the operation will block unitl items are added.
+ /// This method blocks until value is available!
+ member x.Get(?timeout) =
+ agent.PostAndReply(Get, ?timeout=timeout)
+
+ /// Gets the number of elements currently waiting in the queue.
+ member x.Count = count
View
23 src/Agents/ConcurrentSetAgent.fs
@@ -0,0 +1,23 @@
+// ----------------------------------------------------------------------------
+// F# async extensions (ConcurrentSetAgent.fs)
+// (c) Tomas Petricek, 2011, Available under Apache 2.0 license.
+// ----------------------------------------------------------------------------
+namespace FSharp.Control
+
+open System
+
+// ----------------------------------------------------------------------------
+
+/// Agent that implements a simple concurrent set. The agent exposes a
+/// member that adds value to the set and returns whether the value
+/// was already present.
+type ConcurrentSetAgent<'T>() =
+ let agent = Agent.Start(fun agent -> async {
+ let hashSet = new System.Collections.Generic.HashSet<_>(HashIdentity.Structural)
+ while true do
+ let! value, (repl:AsyncReplyChannel<_>) = agent.Receive()
+ repl.Reply(hashSet.Add(value)) })
+
+ /// Adds the specified element to the set and returns
+ /// 'false' when it was already present in the set
+ member x.AsyncAdd(v) = agent.PostAndAsyncReply(fun repl -> v, repl)
View
26 src/Async.fs
@@ -0,0 +1,26 @@
+// ----------------------------------------------------------------------------
+// F# async extensions (AsyncSeq.fs)
+// (c) Tomas Petricek, 2011, Available under Apache 2.0 license.
+// ----------------------------------------------------------------------------
+namespace FSharp.Control
+
+// ----------------------------------------------------------------------------
+
+[<AutoOpen>]
+module AsyncExtensions =
+
+ type Microsoft.FSharp.Control.Async with
+
+ /// Creates an asynchronous workflow that runs the asynchronous workflow
+ /// given as an argument at most once. When the returned workflow is
+ /// started for the second time, it reuses the result of the
+ /// previous execution.
+ static member Cache (input:Async<'T>) =
+ let agent = Agent<AsyncReplyChannel<_>>.Start(fun agent -> async {
+ let! repl = agent.Receive()
+ let! res = input
+ repl.Reply(res)
+ while true do
+ let! repl = agent.Receive()
+ repl.Reply(res) })
+ async { return! agent.PostAndAsyncReply(id) }
View
473 src/AsyncSeq.fs
@@ -0,0 +1,473 @@
+// ----------------------------------------------------------------------------
+// F# async extensions (AsyncSeq.fs)
+// (c) Tomas Petricek, 2011, Available under Apache 2.0 license.
+// ----------------------------------------------------------------------------
+namespace FSharp.Control
+
+open System
+open System.Threading
+open System.IO
+
+// ----------------------------------------------------------------------------
+
+/// An asynchronous sequence represents a delayed computation that can be
+/// started to produce either Cons value consisting of the next element of the
+/// sequence (head) together with the next asynchronous sequence (tail) or a
+/// special value representing the end of the sequence (Nil)
+type AsyncSeq<'T> = Async<AsyncSeqInner<'T>>
+
+/// The interanl type that represents a value returned as a result of
+/// evaluating a step of an asynchronous sequence
+and AsyncSeqInner<'T> =
+ | Nil
+ | Cons of 'T * AsyncSeq<'T>
+
+
+/// Module with helper functions for working with asynchronous sequences
+module AsyncSeq =
+
+ /// Creates an empty asynchronou sequence that immediately ends
+ [<GeneralizableValue>]
+ let empty<'T> : AsyncSeq<'T> =
+ async { return Nil }
+
+ /// Creates an asynchronous sequence that generates a single element and then ends
+ let singleton (v:'T) : AsyncSeq<'T> =
+ async { return Cons(v, empty) }
+
+ /// Yields all elements of the first asynchronous sequence and then
+ /// all elements of the second asynchronous sequence.
+ let rec append (seq1: AsyncSeq<'T>) (seq2: AsyncSeq<'T>) : AsyncSeq<'T> =
+ async { let! v1 = seq1
+ match v1 with
+ | Nil -> return! seq2
+ | Cons (h,t) -> return Cons(h,append t seq2) }
+
+
+ /// Computation builder that allows creating of asynchronous
+ /// sequences using the 'asyncSeq { ... }' syntax
+ type AsyncSeqBuilder() =
+ member x.Yield(v) = singleton v
+ member x.YieldFrom(s) = s
+ member x.Zero () = empty
+ member x.Bind (inp:Async<'T>, body : 'T -> AsyncSeq<'U>) : AsyncSeq<'U> =
+ async.Bind(inp, body)
+ member x.Combine (seq1:AsyncSeq<'T>,seq2:AsyncSeq<'T>) =
+ append seq1 seq2
+ member x.While (gd, seq:AsyncSeq<'T>) =
+ if gd() then x.Combine(seq,x.Delay(fun () -> x.While (gd, seq))) else x.Zero()
+ member x.Delay (f:unit -> AsyncSeq<'T>) =
+ async.Delay(f)
+
+
+ /// Builds an asynchronou sequence using the computation builder syntax
+ let asyncSeq = new AsyncSeqBuilder()
+
+ /// Tries to get the next element of an asynchronous sequence
+ /// and returns either the value or an exception
+ let internal tryNext (input:AsyncSeq<_>) = async {
+ try
+ let! v = input
+ return Choice1Of2 v
+ with e ->
+ return Choice2Of2 e }
+
+ /// Implements the 'TryWith' functionality for computation builder
+ let rec internal tryWith (input : AsyncSeq<'T>) handler = asyncSeq {
+ let! v = tryNext input
+ match v with
+ | Choice1Of2 Nil -> ()
+ | Choice1Of2 (Cons (h, t)) ->
+ yield h
+ yield! tryWith t handler
+ | Choice2Of2 rest ->
+ yield! handler rest }
+
+ /// Implements the 'TryFinally' functionality for computation builder
+ let rec internal tryFinally (input : AsyncSeq<'T>) compensation = asyncSeq {
+ let! v = tryNext input
+ match v with
+ | Choice1Of2 Nil ->
+ compensation()
+ | Choice1Of2 (Cons (h, t)) ->
+ yield h
+ yield! tryFinally t compensation
+ | Choice2Of2 e ->
+ compensation()
+ yield! raise e }
+
+ /// Creates an asynchronou sequence that iterates over the given input sequence.
+ /// For every input element, it calls the the specified function and iterates
+ /// over all elements generated by that asynchronous sequence.
+ /// This is the 'bind' operation of the computation expression (exposed using
+ /// the 'for' keyword in asyncSeq computation).
+ let rec collect f (input : AsyncSeq<'T>) : AsyncSeq<'TResult> = asyncSeq {
+ let! v = input
+ match v with
+ | Nil -> ()
+ | Cons(h, t) ->
+ yield! f h
+ yield! collect f t }
+
+
+ // Add additional methods to the 'asyncSeq' computation builder
+ type AsyncSeqBuilder with
+ member x.TryFinally (body: AsyncSeq<'T>, compensation) =
+ tryFinally body compensation
+ member x.TryWith (body: AsyncSeq<_>, handler: (exn -> AsyncSeq<_>)) =
+ tryWith body handler
+ member x.Using (resource:#IDisposable, binder) =
+ tryFinally (binder resource) (fun () ->
+ if box resource <> null then resource.Dispose())
+
+ /// For loop that iterates over a synchronous sequence (and generates
+ /// all elements generated by the asynchronous body)
+ member x.For(seq:seq<'T>, action:'T -> AsyncSeq<'TResult>) =
+ let enum = seq.GetEnumerator()
+ x.TryFinally(x.While((fun () -> enum.MoveNext()), x.Delay(fun () ->
+ action enum.Current)), (fun () ->
+ if enum <> null then enum.Dispose() ))
+
+ /// Asynchronous for loop - for all elements from the input sequence,
+ /// generate all elements produced by the body (asynchronously). See
+ /// also the AsyncSeq.collect function.
+ member x.For (seq:AsyncSeq<'T>, action:'T -> AsyncSeq<'TResult>) =
+ collect action seq
+
+
+ // Add asynchronous for loop to the 'async' computation builder
+ type Microsoft.FSharp.Control.AsyncBuilder with
+ member x.For (seq:AsyncSeq<'T>, action:'T -> Async<unit>) =
+ async.Bind(seq, function
+ | Nil -> async.Zero()
+ | Cons(h, t) -> async.Combine(action h, x.For(t, action)))
+
+ // --------------------------------------------------------------------------
+ // Additional combinators (implemented as async/asyncSeq computations)
+
+ /// Builds a new asynchronous sequence whose elements are generated by
+ /// applying the specified function to all elements of the input sequence.
+ ///
+ /// The specified function is asynchronous (and the input sequence will
+ /// be asked for the next element after the processing of an element completes).
+ let mapAsync f (input : AsyncSeq<'T>) : AsyncSeq<'TResult> = asyncSeq {
+ for itm in input do
+ let! v = f itm
+ yield v }
+
+ /// Asynchronously iterates over the input sequence and generates 'x' for
+ /// every input element for which the specified asynchronous function
+ /// returned 'Some(x)'
+ ///
+ /// The specified function is asynchronous (and the input sequence will
+ /// be asked for the next element after the processing of an element completes).
+ let chooseAsync f (input : AsyncSeq<'T>) : AsyncSeq<'R> = asyncSeq {
+ for itm in input do
+ let! v = f itm
+ match v with
+ | Some v -> yield v
+ | _ -> () }
+
+ /// Builds a new asynchronous sequence whose elements are those from the
+ /// input sequence for which the specified function returned true.
+ ///
+ /// The specified function is asynchronous (and the input sequence will
+ /// be asked for the next element after the processing of an element completes).
+ let filterAsync f (input : AsyncSeq<'T>) = asyncSeq {
+ for v in input do
+ let! b = f v
+ if b then yield v }
+
+ /// Asynchronously returns the last element that was generated by the
+ /// given asynchronous sequence (or the specified default value).
+ let rec lastOrDefault def (input : AsyncSeq<'T>) = async {
+ let! v = input
+ match v with
+ | Nil -> return def
+ | Cons(h, t) -> return! lastOrDefault h t }
+
+ /// Asynchronously returns the first element that was generated by the
+ /// given asynchronous sequence (or the specified default value).
+ let firstOrDefault def (input : AsyncSeq<'T>) = async {
+ let! v = input
+ match v with
+ | Nil -> return def
+ | Cons(h, _) -> return h }
+
+ /// Aggregates the elements of the input asynchronous sequence using the
+ /// specified 'aggregation' function. The result is an asynchronous
+ /// sequence of intermediate aggregation result.
+ ///
+ /// The aggregation function is asynchronous (and the input sequence will
+ /// be asked for the next element after the processing of an element completes).
+ let rec scanAsync f (state:'TState) (input : AsyncSeq<'T>) = asyncSeq {
+ let! v = input
+ match v with
+ | Nil -> ()
+ | Cons(h, t) ->
+ let! v = f state h
+ yield v
+ yield! t |> scanAsync f v }
+
+ /// Iterates over the input sequence and calls the specified function for
+ /// every value (to perform some side-effect asynchronously).
+ ///
+ /// The specified function is asynchronous (and the input sequence will
+ /// be asked for the next element after the processing of an element completes).
+ let rec iterAsync f (input : AsyncSeq<'T>) = async {
+ for itm in input do
+ do! f itm }
+
+ /// Returns an asynchronous sequence that returns pairs containing an element
+ /// from the input sequence and its predecessor. Empty sequence is returned for
+ /// singleton input sequence.
+ let rec pairwise (input : AsyncSeq<'T>) = asyncSeq {
+ let! v = input
+ match v with
+ | Nil -> ()
+ | Cons(h, t) ->
+ let prev = ref h
+ for v in t do
+ yield (!prev, v)
+ prev := v }
+
+ /// Aggregates the elements of the input asynchronous sequence using the
+ /// specified 'aggregation' function. The result is an asynchronous
+ /// workflow that returns the final result.
+ ///
+ /// The aggregation function is asynchronous (and the input sequence will
+ /// be asked for the next element after the processing of an element completes).
+ let rec foldAsync f (state:'TState) (input : AsyncSeq<'T>) =
+ input |> scanAsync f state |> lastOrDefault state
+
+ /// Same as AsyncSeq.foldAsync, but the specified function is synchronous
+ /// and returns the result of aggregation immediately.
+ let rec fold f (state:'TState) (input : AsyncSeq<'T>) =
+ foldAsync (fun st v -> f st v |> async.Return) state input
+
+ /// Same as AsyncSeq.scanAsync, but the specified function is synchronous
+ /// and returns the result of aggregation immediately.
+ let rec scan f (state:'TState) (input : AsyncSeq<'T>) =
+ scanAsync (fun st v -> f st v |> async.Return) state input
+
+ /// Same as AsyncSeq.mapAsync, but the specified function is synchronous
+ /// and returns the result of projection immediately.
+ let map f (input : AsyncSeq<'T>) =
+ mapAsync (f >> async.Return) input
+
+ /// Same as AsyncSeq.iterAsync, but the specified function is synchronous
+ /// and performs the side-effect immediately.
+ let iter f (input : AsyncSeq<'T>) =
+ iterAsync (f >> async.Return) input
+
+ /// Same as AsyncSeq.chooseAsync, but the specified function is synchronous
+ /// and processes the input element immediately.
+ let choose f (input : AsyncSeq<'T>) =
+ chooseAsync (f >> async.Return) input
+
+ /// Same as AsyncSeq.filterAsync, but the specified predicate is synchronous
+ /// and processes the input element immediately.
+ let filter f (input : AsyncSeq<'T>) =
+ filterAsync (f >> async.Return) input
+
+ // --------------------------------------------------------------------------
+ // Converting from/to synchronous sequences or IObservables
+
+ /// Creates an asynchronous sequence that lazily takes element from an
+ /// input synchronous sequence and returns them one-by-one.
+ let ofSeq (input : seq<'T>) = asyncSeq {
+ for el in input do
+ yield el }
+
+ /// A helper type for implementation of buffering when converting
+ /// observable to an asynchronous sequence
+ type internal BufferMessage<'T> =
+ | Get of AsyncReplyChannel<'T>
+ | Put of 'T
+
+ /// Converts observable to an asynchronous sequence using an agent with
+ /// a body specified as the argument. The returnd async sequence repeatedly
+ /// sends 'Get' message to the agent to get the next element. The observable
+ /// sends 'Put' message to the agent (as new inputs are generated).
+ let internal ofObservableUsingAgent (input : System.IObservable<_>) f =
+ asyncSeq {
+ use agent = AutoCancelAgent.Start(f)
+ use d = input |> Observable.asUpdates
+ |> Observable.subscribe (Put >> agent.Post)
+
+ let rec loop() = asyncSeq {
+ let! msg = agent.PostAndAsyncReply(Get)
+ match msg with
+ | ObservableUpdate.Error e -> raise e
+ | ObservableUpdate.Completed -> ()
+ | ObservableUpdate.Next v ->
+ yield v
+ yield! loop() }
+ yield! loop() }
+
+ /// Converts observable to an asynchronous sequence. Values that are produced
+ /// by the observable while the asynchronous sequence is blocked are stored to
+ /// an unbounded buffer and are returned as next elements of the async sequence.
+ let ofObservableBuffered (input : System.IObservable<_>) =
+ ofObservableUsingAgent input (fun mbox -> async {
+ let buffer = new System.Collections.Generic.Queue<_>()
+ let repls = new System.Collections.Generic.Queue<_>()
+ while true do
+ // Receive next message (when observable ends, caller will
+ // cancel the agent, so we need timeout to allow cancleation)
+ let! msg = mbox.TryReceive(200)
+ match msg with
+ | Some(Put(v)) -> buffer.Enqueue(v)
+ | Some(Get(repl)) -> repls.Enqueue(repl)
+ | _ -> ()
+ // Process matching calls from buffers
+ while buffer.Count > 0 && repls.Count > 0 do
+ repls.Dequeue().Reply(buffer.Dequeue()) })
+
+
+ /// Converts observable to an asynchronous sequence. Values that are produced
+ /// by the observable while the asynchronous sequence is blocked are discarded
+ /// (this function doesn't guarantee that asynchronou ssequence will return
+ /// all values produced by the observable)
+ let ofObservable (input : System.IObservable<_>) =
+ ofObservableUsingAgent input (fun mbox -> async {
+ while true do
+ // Allow timeout (when the observable ends, caller will
+ // cancel the agent, so we need timeout to allow cancellation)
+ let! msg = mbox.TryReceive(200)
+ match msg with
+ | Some(Put _) | None ->
+ () // Ignore put or no message
+ | Some(Get repl) ->
+ // Reader is blocked, so next will be Put
+ // (caller will not stop the agent at this point,
+ // so timeout is not necessary)
+ let! v = mbox.Receive()
+ match v with
+ | Put v -> repl.Reply(v)
+ | _ -> failwith "Unexpected Get" })
+
+ /// Converts asynchronous sequence to a synchronous blocking sequence.
+ /// The elements of the asynchronous sequence are consumed lazily.
+ let toBlockingSeq (input : AsyncSeq<'T>) =
+ // Write all elements to a blocking buffer and then add None to denote end
+ let buf = new BlockingQueueAgent<_>(1)
+ async {
+ do! iterAsync (Some >> buf.AsyncAdd) input
+ do! buf.AsyncAdd(None) } |> Async.Start
+
+ // Read elements from the blocking buffer & return a sequences
+ let rec loop () = seq {
+ match buf.Get() with
+ | None -> ()
+ | Some v ->
+ yield v
+ yield! loop() }
+ loop ()
+
+ /// Create a new asynchronous sequence that caches all elements of the
+ /// sequence specified as the input. When accessing the resulting sequence
+ /// multiple times, the input will still be evaluated only once
+ let rec cache (input : AsyncSeq<'T>) =
+ let agent = Agent<AsyncReplyChannel<_>>.Start(fun agent -> async {
+ let! repl = agent.Receive()
+ let! next = input
+ let res =
+ match next with
+ | Nil -> Nil
+ | Cons(h, t) -> Cons(h, cache t)
+ repl.Reply(res)
+ while true do
+ let! repl = agent.Receive()
+ repl.Reply(res) })
+ async { return! agent.PostAndAsyncReply(id) }
+
+ // --------------------------------------------------------------------------
+
+ /// Combines two asynchronous sequences into a sequence of pairs.
+ /// The values from sequences are retrieved in parallel.
+ let rec zip (input1 : AsyncSeq<'T1>) (input2 : AsyncSeq<'T2>) : AsyncSeq<_> = async {
+ let! ft = input1 |> Async.StartChild
+ let! s = input2
+ let! f = ft
+ match f, s with
+ | Cons(hf, tf), Cons(hs, ts) ->
+ return Cons( (hf, hs), zip tf ts)
+ | _ -> return Nil }
+
+ /// Returns elements from an asynchronous sequence while the specified
+ /// predicate holds. The predicate is evaluated asynchronously.
+ let rec takeWhileAsync p (input : AsyncSeq<'T>) : AsyncSeq<_> = async {
+ let! v = input
+ match v with
+ | Cons(h, t) ->
+ let! res = p h
+ if res then
+ return Cons(h, takeWhileAsync p t)
+ else return Nil
+ | Nil -> return Nil }
+
+ /// Skips elements from an asynchronous sequence while the specified
+ /// predicate holds and then returns the rest of the sequence. The
+ /// predicate is evaluated asynchronously.
+ let rec skipWhileAsync p (input : AsyncSeq<'T>) : AsyncSeq<_> = async {
+ let! v = input
+ match v with
+ | Cons(h, t) ->
+ let! res = p h
+ if res then return! skipWhileAsync p t
+ else return! t
+ | Nil -> return Nil }
+
+ /// Returns elements from an asynchronous sequence while the specified
+ /// predicate holds. The predicate is evaluated synchronously.
+ let rec takeWhile p (input : AsyncSeq<'T>) =
+ takeWhileAsync (p >> async.Return) input
+
+ /// Skips elements from an asynchronous sequence while the specified
+ /// predicate holds and then returns the rest of the sequence. The
+ /// predicate is evaluated asynchronously.
+ let rec skipWhile p (input : AsyncSeq<'T>) =
+ skipWhileAsync (p >> async.Return) input
+
+ /// Returns the first N elements of an asynchronous sequence
+ let rec take count (input : AsyncSeq<'T>) : AsyncSeq<_> = async {
+ if count > 0 then
+ let! v = input
+ match v with
+ | Cons(h, t) ->
+ return Cons(h, take (count - 1) t)
+ | Nil -> return Nil
+ else return Nil }
+
+ /// Skips the first N elements of an asynchronous sequence and
+ /// then returns the rest of the sequence unmodified.
+ let rec skip count (input : AsyncSeq<'T>) : AsyncSeq<_> = async {
+ if count > 0 then
+ let! v = input
+ match v with
+ | Cons(h, t) ->
+ return Cons(h, skip (count - 1) t)
+ | Nil -> return Nil
+ else return! input }
+
+
+module Seq =
+ /// Converts asynchronous sequence to a synchronous blocking sequence.
+ /// The elements of the asynchronous sequence are consumed lazily.
+ let ofAsyncSeq (input : AsyncSeq<'T>) =
+ AsyncSeq.toBlockingSeq input
+
+
+[<AutoOpen>]
+module AsyncSeqExtensions =
+ /// Builds an asynchronou sequence using the computation builder syntax
+ let asyncSeq = new AsyncSeq.AsyncSeqBuilder()
+
+ // Add asynchronous for loop to the 'async' computation builder
+ type Microsoft.FSharp.Control.AsyncBuilder with
+ member x.For (seq:AsyncSeq<'T>, action:'T -> Async<unit>) =
+ async.Bind(seq, function
+ | Nil -> async.Zero()
+ | Cons(h, t) -> async.Combine(action h, x.For(t, action)))
+
View
70 src/FSharp.AsyncExtensions.fsproj
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProductVersion>8.0.30703</ProductVersion>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{ede1812b-5a62-410a-9553-02499cf29317}</ProjectGuid>
+ <OutputType>Library</OutputType>
+ <RootNamespace>FSharp.AsyncExtensions</RootNamespace>
+ <AssemblyName>FSharp.AsyncExtensions</AssemblyName>
+ <TargetFrameworkVersion>v4.0</TargetFrameworkVersion>
+ <Name>FSharp.AsyncExtensions</Name>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <Tailcalls>false</Tailcalls>
+ <OutputPath>bin\Debug\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <WarningLevel>3</WarningLevel>
+ <DocumentationFile>bin\Debug\FSharp.AsyncExtensions.XML</DocumentationFile>
+ <OtherFlags>--sig:sig.fsi</OtherFlags>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <Tailcalls>true</Tailcalls>
+ <OutputPath>bin\Release\</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <WarningLevel>3</WarningLevel>
+ <DocumentationFile>bin\Release\FSharp.AsyncExtensions.XML</DocumentationFile>
+ </PropertyGroup>
+ <Import Project="$(MSBuildExtensionsPath32)\FSharp\1.0\Microsoft.FSharp.Targets" Condition="!Exists('$(MSBuildBinPath)\Microsoft.Build.Tasks.v4.0.dll')" />
+ <Import Project="$(MSBuildExtensionsPath32)\..\Microsoft F#\v4.0\Microsoft.FSharp.Targets" Condition=" Exists('$(MSBuildBinPath)\Microsoft.Build.Tasks.v4.0.dll')" />
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <PostBuildEvent>copy $(ProjectDir)$(OutDir)\FSharp.AsyncExtensions.* $(ProjectDir)..\bin</PostBuildEvent>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <PostBuildEvent>copy $(ProjectDir)$(OutDir)\FSharp.AsyncExtensions.* $(ProjectDir)..\bin\release
+nuget pack $(ProjectDir)..\bin\release\FSharp.AsyncExtensions.dll.nuspec
+</PostBuildEvent>
+ </PropertyGroup>
+ <ItemGroup>
+ <Compile Include="Agents\Agent.fs" />
+ <Compile Include="Agents\AutoCancelAgent.fs" />
+ <Compile Include="Agents\ConcurrentSetAgent.fs" />
+ <Compile Include="Agents\BatchProcessingAgent.fs" />
+ <Compile Include="Agents\BlockingQueueAgent.fs" />
+ <Compile Include="Observable.fs" />
+ <Compile Include="Async.fs" />
+ <Compile Include="AsyncSeq.fs" />
+ <Compile Include="IO.fs" />
+ </ItemGroup>
+ <ItemGroup>
+ <Reference Include="mscorlib" />
+ <Reference Include="FSharp.Core" />
+ <Reference Include="System" />
+ <Reference Include="System.Core" />
+ <Reference Include="System.Numerics" />
+ </ItemGroup>
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+</Project>
View
47 src/FSharp.AsyncExtensions.sln
@@ -0,0 +1,47 @@
+
+Microsoft Visual Studio Solution File, Format Version 11.00
+# Visual Studio 2010
+Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "FSharp.AsyncExtensions", "FSharp.AsyncExtensions.fsproj", "{EDE1812B-5A62-410A-9553-02499CF29317}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Misc", "Misc", "{8406B0C7-14A3-42F1-AC7A-EE26C0A9F15E}"
+ ProjectSection(SolutionItems) = preProject
+ ..\License.markdown = ..\License.markdown
+ ..\README.markdown = ..\README.markdown
+ EndProjectSection
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Samples", "Samples", "{DD79E523-7072-4A67-89A0-257ECEF766D1}"
+ ProjectSection(SolutionItems) = preProject
+ ..\samples\AsyncSeqObservable.fsx = ..\samples\AsyncSeqObservable.fsx
+ ..\samples\AutoCancel.fsx = ..\samples\AutoCancel.fsx
+ ..\samples\BatchProcessing.fsx = ..\samples\BatchProcessing.fsx
+ ..\samples\BlockingQueue.fsx = ..\samples\BlockingQueue.fsx
+ ..\samples\Caching.fsx = ..\samples\Caching.fsx
+ ..\samples\Crawler.fsx = ..\samples\Crawler.fsx
+ ..\samples\WebProxy.fsx = ..\samples\WebProxy.fsx
+ EndProjectSection
+EndProject
+Global
+ GlobalSection(SolutionConfigurationPlatforms) = preSolution
+ Debug|Any CPU = Debug|Any CPU
+ Debug|Mixed Platforms = Debug|Mixed Platforms
+ Debug|x86 = Debug|x86
+ Release|Any CPU = Release|Any CPU
+ Release|Mixed Platforms = Release|Mixed Platforms
+ Release|x86 = Release|x86
+ EndGlobalSection
+ GlobalSection(ProjectConfigurationPlatforms) = postSolution
+ {EDE1812B-5A62-410A-9553-02499CF29317}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {EDE1812B-5A62-410A-9553-02499CF29317}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {EDE1812B-5A62-410A-9553-02499CF29317}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
+ {EDE1812B-5A62-410A-9553-02499CF29317}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
+ {EDE1812B-5A62-410A-9553-02499CF29317}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {EDE1812B-5A62-410A-9553-02499CF29317}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {EDE1812B-5A62-410A-9553-02499CF29317}.Release|Any CPU.Build.0 = Release|Any CPU
+ {EDE1812B-5A62-410A-9553-02499CF29317}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
+ {EDE1812B-5A62-410A-9553-02499CF29317}.Release|Mixed Platforms.Build.0 = Release|Any CPU
+ {EDE1812B-5A62-410A-9553-02499CF29317}.Release|x86.ActiveCfg = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(SolutionProperties) = preSolution
+ HideSolutionNode = FALSE
+ EndGlobalSection
+EndGlobal
View
92 src/IO.fs
@@ -0,0 +1,92 @@
+// ----------------------------------------------------------------------------
+// F# async extensions (IO.fs)
+// (c) Tomas Petricek, 2011, Available under Apache 2.0 license.
+// ----------------------------------------------------------------------------
+
+namespace FSharp.IO
+open FSharp.Control
+// ----------------------------------------------------------------------------
+// Extensions that simplify working with Stream using async sequences
+
+[<AutoOpen>]
+module IOExtensions =
+ type System.IO.Stream with
+ /// Asynchronously reads the stream in chunks of a specified size
+ /// and returns the result as an asynchronous sequence.
+ member x.AsyncReadSeq(?bufferSize) =
+ let bufferSize = defaultArg bufferSize 1024
+ let buffer = Array.zeroCreate bufferSize
+ let rec loop () = asyncSeq {
+ let! count = x.AsyncRead(buffer, 0, bufferSize)
+ if count > 0 then
+ yield Array.sub buffer 0 count
+ yield! loop() }
+ loop ()
+
+ /// Asynchronously writes all data specified by the
+ /// given asynchronous sequence to the stream.
+ member x.AsyncWriteSeq(input : AsyncSeq<byte[]>) = async {
+ for data in input do
+ do! x.AsyncWrite(data) }
+
+// ----------------------------------------------------------------------------
+// Extensions that simplify working with HttpListener and related types
+
+namespace FSharp.Net
+open System.IO
+open System.Net
+open System.Text
+open System.Threading
+
+open FSharp.IO
+open FSharp.Control
+
+[<AutoOpen>]
+module HttpExtensions =
+
+ type System.Net.HttpListener with
+ /// Asynchronously waits for an incoming request and returns it.
+ member x.AsyncGetContext() =
+ Async.FromBeginEnd(x.BeginGetContext, x.EndGetContext)
+
+ /// Starts HttpListener on the specified URL. The 'handler' function is
+ /// called (in a new thread pool thread) each time an HTTP request is received.
+ static member Start(url, handler, ?cancellationToken) =
+ let server = async {
+ use listener = new HttpListener()
+ listener.Prefixes.Add(url)
+ listener.Start()
+ while true do
+ let! context = listener.AsyncGetContext()
+ Async.Start
+ ( handler (context.Request, context.Response),
+ ?cancellationToken = cancellationToken) }
+ Async.Start(server, ?cancellationToken = cancellationToken)
+
+ type System.Net.HttpListenerRequest with
+ /// Asynchronously reads the 'InputStream' of the request and converts it to a string
+ member request.AsyncInputString = async {
+ use tmp = new MemoryStream()
+ for data in request.InputStream.AsyncReadSeq(16 * 1024) do
+ tmp.Write(data, 0, data.Length)
+ tmp.Seek(0L, SeekOrigin.Begin) |> ignore
+ use sr = new StreamReader(tmp)
+ return sr.ReadToEnd() }
+
+
+ type System.Net.HttpListenerResponse with
+ /// Sends the specified string as a reply in UTF 8 encoding
+ member response.AsyncReply(s:string) = async {
+ let buffer = Encoding.UTF8.GetBytes(s)
+ response.ContentLength64 <- int64 buffer.Length
+ let output = response.OutputStream
+ do! output.AsyncWrite(buffer,0,buffer.Length)
+ output.Close() }
+
+ /// Sends the specified data as a reply with the specified content type
+ member response.AsyncReply(typ, buffer:byte[]) = async {
+ response.ContentLength64 <- int64 buffer.Length
+ let output = response.OutputStream
+ response.ContentType <- typ
+ do! output.AsyncWrite(buffer,0,buffer.Length)
+ output.Close() }
View
128 src/Observable.fs
@@ -0,0 +1,128 @@
+// ----------------------------------------------------------------------------
+// F# async extensions (Observable.fs)
+// (c) Tomas Petricek, 2011, Available under Apache 2.0 license.
+// ----------------------------------------------------------------------------
+#nowarn "40"
+namespace FSharp.Control
+
+open System
+
+// ----------------------------------------------------------------------------
+
+/// Union type that represents different messages that can be sent to the
+/// IObserver interface. The IObserver type is equivalent to a type that has
+/// just OnNext method that gets 'ObservableUpdate' as an argument.
+type ObservableUpdate<'T> =
+ | Next of 'T
+ | Error of exn
+ | Completed
+
+module Observable =
+
+ /// Creates an observable that calls the specified function (each time)
+ /// after an observer is attached to the observable. This is useful to
+ /// make sure that events triggered by the function are handled.
+ let guard f (e:IObservable<'Args>) =
+ { new IObservable<'Args> with
+ member x.Subscribe(observer) =
+ let rm = e.Subscribe(observer) in f(); rm }
+
+ /// Turns observable into an observable that only calls OnNext method of the
+ /// observer, but gives it a discriminated union that represents different
+ /// kinds of events (error, next, completed)
+ let asUpdates (input:IObservable<'T>) =
+ { new IObservable<_> with
+ member x.Subscribe(observer) =
+ input.Subscribe
+ ({ new IObserver<_> with
+ member x.OnNext(v) = observer.OnNext(Next v)
+ member x.OnCompleted() = observer.OnNext(Completed)
+ member x.OnError(e) = observer.OnNext(Error e) }) }
+
+// ----------------------------------------------------------------------------
+
+[<AutoOpen>]
+module ObservableExtensions =
+
+ /// Helper that can be used for writing CPS-style code that resumes
+ /// on the same thread where the operation was started.
+ let internal synchronize f =
+ let ctx = System.Threading.SynchronizationContext.Current
+ f (fun g ->
+ let nctx = System.Threading.SynchronizationContext.Current
+ if ctx <> null && ctx <> nctx then ctx.Post((fun _ -> g()), null)
+ else g() )
+
+ type Microsoft.FSharp.Control.Async with
+
+ /// Behaves like AwaitObservable, but calls the specified guarding function
+ /// after a subscriber is registered with the observable.
+ static member GuardedAwaitObservable (ev1:IObservable<'T1>) guardFunction =
+ synchronize (fun f ->
+ Async.FromContinuations((fun (cont,econt,ccont) ->
+ let rec finish cont value =
+ remover.Dispose()
+ f (fun () -> cont value)
+ and remover : IDisposable =
+ ev1.Subscribe
+ ({ new IObserver<_> with
+ member x.OnNext(v) = finish cont v
+ member x.OnError(e) = finish econt e
+ member x.OnCompleted() =
+ let msg = "Cancelling the workflow, because the Observable awaited using AwaitObservable has completed."
+ finish ccont (new System.OperationCanceledException(msg)) })
+ guardFunction() )))
+
+ /// Creates an asynchronous workflow that will be resumed when the
+ /// specified observables produces a value. The workflow will return
+ /// the value produced by the observable.
+ static member AwaitObservable(ev1:IObservable<'T1>) =
+ synchronize (fun f ->
+ Async.FromContinuations((fun (cont,econt,ccont) ->
+ let rec finish cont value =
+ remover.Dispose()
+ f (fun () -> cont value)
+ and remover : IDisposable =
+ ev1.Subscribe
+ ({ new IObserver<_> with
+ member x.OnNext(v) = finish cont v
+ member x.OnError(e) = finish econt e
+ member x.OnCompleted() =
+ let msg = "Cancelling the workflow, because the Observable awaited using AwaitObservable has completed."
+ finish ccont (new System.OperationCanceledException(msg)) })
+ () )))
+
+ /// Creates an asynchronous workflow that will be resumed when the
+ /// first of the specified two observables produces a value. The
+ /// workflow will return a Choice value that can be used to identify
+ /// the observable that produced the value.
+ static member AwaitObservable(ev1:IObservable<'T1>, ev2:IObservable<'T2>) =
+ List.reduce Observable.merge
+ [ ev1 |> Observable.map Choice1Of2
+ ev2 |> Observable.map Choice2Of2 ]
+ |> Async.AwaitObservable
+
+ /// Creates an asynchronous workflow that will be resumed when the
+ /// first of the specified three observables produces a value. The
+ /// workflow will return a Choice value that can be used to identify
+ /// the observable that produced the value.
+ static member AwaitObservable
+ ( ev1:IObservable<'T1>, ev2:IObservable<'T2>, ev3:IObservable<'T3> ) =
+ List.reduce Observable.merge
+ [ ev1 |> Observable.map Choice1Of3
+ ev2 |> Observable.map Choice2Of3
+ ev3 |> Observable.map Choice3Of3 ]
+ |> Async.AwaitObservable
+
+ /// Creates an asynchronous workflow that will be resumed when the
+ /// first of the specified four observables produces a value. The
+ /// workflow will return a Choice value that can be used to identify
+ /// the observable that produced the value.
+ static member AwaitObservable( ev1:IObservable<'T1>, ev2:IObservable<'T2>,
+ ev3:IObservable<'T3>, ev4:IObservable<'T4> ) =
+ List.reduce Observable.merge
+ [ ev1 |> Observable.map Choice1Of4
+ ev2 |> Observable.map Choice2Of4
+ ev3 |> Observable.map Choice3Of4
+ ev4 |> Observable.map Choice4Of4 ]
+ |> Async.AwaitObservable
Please sign in to comment.
Something went wrong with that request. Please try again.