Skip to content

Commit

Permalink
Use simple async tasks rather than fixed number of worker tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
safesparrow committed Dec 12, 2022
1 parent 9deabfa commit c9c4adc
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 28 deletions.
69 changes: 43 additions & 26 deletions tests/ParallelTypeCheckingTests/Code/GraphProcessing.fs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
/// Parallel processing of graph of work items with dependencies
module ParallelTypeCheckingTests.GraphProcessing

open System.Collections.Concurrent
open System.Collections.Generic
open System.Threading
open ParallelTypeCheckingTests.Parallel

/// Used for processing
type NodeInfo<'Item> =
Expand All @@ -13,17 +15,32 @@ type NodeInfo<'Item> =
Dependants: 'Item[]
}

// TODO Do not expose this type to other files
type Node<'Item, 'Result> =
{
Info: NodeInfo<'Item>
mutable ProcessedDepsCount: int
mutable Result: 'Result option
}

/// Basic concurrent set implemented using ConcurrentDictionary
type private ConcurrentSet<'a>() =
let dict = ConcurrentDictionary<'a, unit>()

member this.Add(item: 'a): bool =
dict.TryAdd(item, ())

/// <summary>
/// A generic method to generate results for a graph of work items in parallel.
/// Processes leaves first, and after each node has been processed, schedules any now unblocked dependants.
/// Returns a list of results, per item.
/// </summary>
/// <param name="graph">Graph of work items</param>
/// <param name="doWork">A function to generate results for a single item</param>
let processGraphSimple<'Item, 'Result when 'Item: equality and 'Item: comparison>
(graph: Graph<'Item>)
// TODO Avoid exposing mutable nodes to the caller
(doWork: IReadOnlyDictionary<'Item, Node<'Item, 'Result>> -> Node<'Item, 'Result> -> 'Result)
(parallelism: int)
: 'Result[] // Results in order defined in 'graph'
=
let transitiveDeps = graph |> Graph.transitiveOpt
Expand Down Expand Up @@ -57,43 +74,43 @@ let processGraphSimple<'Item, 'Result when 'Item: equality and 'Item: comparison
graph.Keys
|> Seq.map (fun item -> item, makeNode item)
|> readOnlyDict
let lookup item = nodes[item]
let lookupMany items = items |> Array.map lookup
let lookupMany items = items |> Array.map (fun item -> nodes[item])
let leaves =
nodes.Values
|> Seq.filter (fun n -> n.Info.Deps.Length = 0)
|> Seq.toArray

printfn $"Node count: {nodes.Count}"
use cts = new CancellationTokenSource()

let work
let mutable processedCount = 0
let waitHandle = new AutoResetEvent(false)
let rec post node =
Async.Start(async {work node}, cts.Token)
and work
(node: Node<'Item, 'Result>)
: Node<'Item, 'Result>[] =
: unit =
let singleRes = doWork nodes node
node.Result <- Some singleRes
// Need to double-check that only one dependency schedules this dependant
let unblocked =
let unblockedDependants =
node.Info.Dependants
|> lookupMany
|> Array.filter (fun x ->
let pdc =
// TODO Not ideal, better ways most likely exist
lock x (fun () ->
x.ProcessedDepsCount <- x.ProcessedDepsCount + 1
x.ProcessedDepsCount)
pdc = x.Info.Deps.Length)
unblocked

use cts = new CancellationTokenSource()

Parallel.processInParallel
leaves
work
parallelism
(fun processedCount -> processedCount = nodes.Count)
cts.Token
(fun x -> x.Info.Item.ToString())

// For every dependant, increment its number of processed dependencies,
// and filter ones which now have all dependencies processed.
|> Array.filter (fun dependant ->
// This counter can be incremented by multiple workers on different threads.
let pdc = Interlocked.Increment(&dependant.ProcessedDepsCount)
// Note: We cannot read 'dependant.ProcessedDepsCount' again to avoid returning the same item multiple times.
pdc = dependant.Info.Deps.Length)
unblockedDependants |> Array.iter post
if Interlocked.Increment(&processedCount) = nodes.Count then
waitHandle.Set() |> ignore

leaves |> Array.iter post
// TODO Handle async exceptions
// q.Error += ...
waitHandle.WaitOne() |> ignore

nodes.Values
|> Seq.map (fun node ->
node.Result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ let processGraph<'Item, 'State, 'Result, 'FinalFileResult when 'Item: equality a
(doWork: 'Item -> 'State -> 'Result)
(folder: bool -> 'State -> 'Result -> 'FinalFileResult * 'State)
(emptyState: 'State)
(parallelism: int)
(_parallelism: int)
: 'FinalFileResult[] * 'State =

let work
Expand All @@ -79,7 +79,6 @@ let processGraph<'Item, 'State, 'Result, 'FinalFileResult when 'Item: equality a
processGraphSimple
graph
work
parallelism

let finals, state: 'FinalFileResult[] * 'State =
results
Expand Down

0 comments on commit c9c4adc

Please sign in to comment.