Skip to content

Commit

Permalink
ThreadPool cleanup (3/n): Switch to vectorized API & remove unused/co… (
Browse files Browse the repository at this point in the history
#32)

* ThreadPool cleanup (3/n): Switch to vectorized API & remove unused/confusing extensions and implementations.

This change takes a first^H^H^H^H^H^Hthird whack at a bunch of tech debt:
 1. Removes the Naive thread pool implementation from `PJoin`.
 2. Removes the unnecessary `TypedComputeThreadPool` protocol refinement.
 3. Removes the badly implemented extensions that implemented `parallelFor`
    in terms of `join`. (Note: `NonBlockingThreadPool`'s `parallelFor` is re-implemented
    in terms of `join` again, but better.)
 4. Removes use of `rethrows`, as the rethrows language feature is not
    expressive enough to allow the performance optimizations for the non-throwing
    case. Instead, methods are explicitly overloaded with throwing and non-throwing
    variants.
 5. Adds a vectorized API (which improves performance).

Performance measurements:

After:

name                                                                   time         std                   iterations  
--------------------------------------------------------------------------------------------------------------------
NonBlockingThreadPool: join, one level                                 700.0 ns     ± 70289.84998218225   127457      
NonBlockingThreadPool: join, two levels                                2107.0 ns    ± 131041.5070696377   31115       
NonBlockingThreadPool: join, three levels                              4960.0 ns    ± 178122.9562964306   15849       
NonBlockingThreadPool: join, four levels, three on thread pool thread  5893.0 ns    ± 224021.47900401088  13763       
NonBlockingThreadPool: parallel for, one level                         22420.0 ns   ± 203689.69689780468  7581        
NonBlockingThreadPool: parallel for, two levels                        500985.5 ns  ± 642136.0139757036   1390        
Before:

name                                                                   time          std                   iterations  
---------------------------------------------------------------------------------------------------------------------
NonBlockingThreadPool: join, one level                                 728.0 ns      ± 78662.43173968921   115554      
NonBlockingThreadPool: join, two levels                                2149.0 ns     ± 144611.11773139169  30425       
NonBlockingThreadPool: join, three levels                              5049.0 ns     ± 188450.6773907647   15157       
NonBlockingThreadPool: join, four levels, three on thread pool thread  5951.0 ns     ± 229270.51587738466  10255       
NonBlockingThreadPool: parallel for, one level                         4919427.5 ns  ± 887590.5386061076   302         
NonBlockingThreadPool: parallel for, two levels                        4327151.0 ns  ± 855302.611386676    313         

Co-authored-by: Dave Abrahams <dabrahams@google.com>
  • Loading branch information
saeta and Dave Abrahams committed May 24, 2020
1 parent 4c41ec8 commit 64d2335
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 553 deletions.
2 changes: 1 addition & 1 deletion Sources/PenguinGraphs/AdjacencyList.swift
Expand Up @@ -376,7 +376,7 @@ extension AdjacencyList: ParallelGraph {

// TODO: Separate them out to be on different cache lines to avoid false sharing!
// A per-thread array of global states, where each thread index gets its own.
var globalStates: [GlobalState?] = Array(repeating: nil, count: threadPool.parallelism + 1)
var globalStates: [GlobalState?] = Array(repeating: nil, count: threadPool.maxParallelism + 1)
try globalStates.withUnsafeMutableBufferPointer { globalStates in

try storage.withUnsafeMutableBufferPointer { vertices in
Expand Down
2 changes: 1 addition & 1 deletion Sources/PenguinGraphs/VertexParallelProtocols.swift
Expand Up @@ -296,7 +296,7 @@ public class PerThreadMailboxes<
///
/// This initializer helps the type inference algorithm along.
public convenience init<SequentialGraph: VertexListGraph & ParallelGraph>(for graph: __shared SequentialGraph, sending messageType: Message.Type) where SequentialGraph.ParallelProjection == Graph {
self.init(vertexCount: graph.vertexCount, threadCount: ComputeThreadPools.parallelism)
self.init(vertexCount: graph.vertexCount, threadCount: ComputeThreadPools.maxParallelism)
}
}

Expand Down
Expand Up @@ -289,7 +289,49 @@ public class NonBlockingThreadPool<Environment: ConcurrencyPlatform>: ComputeThr
if let e = err { throw e }
}

/// Shuts down the thread pool.
/// Executes `fn`, optionally in parallel, spanning the range `0..<n`.
public func parallelFor(n: Int, _ fn: VectorizedParallelForBody) {
let grainSize = n / maxParallelism // TODO: Make adaptive!

func executeParallelFor(_ start: Int, _ end: Int) {
if start + grainSize >= end {
fn(start, end, n)
} else {
// Divide into 2 & recurse.
let rangeSize = end - start
let midPoint = start + (rangeSize / 2)
self.join({ executeParallelFor(start, midPoint) }, { executeParallelFor(midPoint, end)})
}
}

executeParallelFor(0, n)
}

/// Executes `fn`, optionally in parallel, spanning the range `0..<n`.
public func parallelFor(n: Int, _ fn: ThrowingVectorizedParallelForBody) throws {
let grainSize = n / maxParallelism // TODO: Make adaptive!

func executeParallelFor(_ start: Int, _ end: Int) throws {
if start + grainSize >= end {
try fn(start, end, n)
} else {
// Divide into 2 & recurse.
let rangeSize = end - start
let midPoint = start + (rangeSize / 2)
try self.join({ try executeParallelFor(start, midPoint) }, { try executeParallelFor(midPoint, end) })
}
}

try executeParallelFor(0, n)
}

/// Requests that all threads in the threadpool exit and cleans up their associated resources.
///
/// This function returns only once all threads have exited and their resources have been
/// deallocated.
///
/// Note: if a work item was submitted to the threadpool that never completes (i.e. has an
/// infinite loop), this function will never return.
public func shutDown() {
cancelled = true
condition.notify(all: true)
Expand All @@ -300,7 +342,7 @@ public class NonBlockingThreadPool<Environment: ConcurrencyPlatform>: ComputeThr
threads.removeAll() // Remove threads that have been shut down.
}

public var parallelism: Int { totalThreadCount }
public var maxParallelism: Int { totalThreadCount }

public var currentThreadIndex: Int? {
perThreadKey.localValue?.threadId
Expand Down
Expand Up @@ -39,7 +39,7 @@ extension Array where Element: Numeric {
return buffer_psum(
pool, // TODO: Take defaulted-arg & thread local to allow for composition!
buff,
computeRecursiveDepth(procCount: pool.parallelism) + 2) // Sub-divide into quarters-per-processor in case of uneven scheduling.
computeRecursiveDepth(procCount: pool.maxParallelism) + 2) // Sub-divide into quarters-per-processor in case of uneven scheduling.
}
}
}
Expand Down
192 changes: 88 additions & 104 deletions Sources/PenguinParallel/ThreadPool.swift
Expand Up @@ -60,28 +60,68 @@ public protocol ComputeThreadPool {
/// This is the throwing overload
func join(_ a: () throws -> Void, _ b: () throws -> Void) throws

/// A function to be invoked in parallel a specified number of times by `parallelFor`.
///
/// - Parameter currentInvocationIndex: the index of the invocation executing
/// in the current thread.
/// - Parameter requestedInvocationCount: the number of parallel invocations requested.
typealias ParallelForBody
= (_ currentInvocationIndex: Int, _ requestedInvocationCount: Int) -> Void

/// A function that can be executed in parallel.
///
/// The first argument is the index of the copy, and the second argument is the total number of
/// copies being executed.
typealias ParallelForFunc = (Int, Int) throws -> Void
/// - Parameter currentInvocationIndex: the index of the invocation executing
/// in the current thread.
/// - Parameter requestedInvocationCount: the number of parallel invocations requested.
typealias ThrowingParallelForBody
= (_ currentInvocationIndex: Int, _ requestedInvocationCount: Int) throws -> Void

/// A vectorized function that can be executed in parallel.
///
/// The first argument is the start index for the vectorized operation, and the second argument
/// corresponds to the end of the range. The third argument contains the total size of the range.
typealias VectorizedParallelForBody = (Int, Int, Int) -> Void

/// A vectorized function that can be executed in parallel.
///
/// The first argument is the start index for the vectorized operation, and the second argument
/// corresponds to the end of the range. The third argument contains the total size of the range.
typealias ThrowingVectorizedParallelForBody = (Int, Int, Int) throws -> Void

/// Returns after executing `fn` `n` times.
///
/// - Parameter n: The total times to execute `fn`.
func parallelFor(n: Int, _ fn: ParallelForFunc) rethrows
func parallelFor(n: Int, _ fn: ParallelForBody)

/// Returns after executing `fn` an unspecified number of times, guaranteeing that `fn` has been
/// called with parameters that perfectly cover of the range `0..<n`.
///
/// - Parameter n: The range of numbers `0..<n` to cover.
func parallelFor(n: Int, _ fn: VectorizedParallelForBody)

/// Returns after executing `fn` `n` times.
///
/// - Parameter n: The total times to execute `fn`.
func parallelFor(n: Int, _ fn: ThrowingParallelForBody) throws

/// Returns after executing `fn` an unspecified number of times, guaranteeing that `fn` has been
/// called with parameters that perfectly cover of the range `0..<n`.
///
/// - Parameter n: The range of numbers `0..<n` to cover.
func parallelFor(n: Int, _ fn: ThrowingVectorizedParallelForBody) throws


// TODO: Add this & a default implementation!
// /// Returns after executing `fn` `n` times.
// ///
// /// - Parameter n: The total times to execute `fn`.
// /// - Parameter blocksPerThread: The minimum block size to subdivide. If unspecified, a good
// /// value will be chosen based on the amount of available parallelism.
// func parallelFor(blockingUpTo n: Int, blocksPerThread: Int, _ fn: ParallelForFunc)
// func parallelFor(blockingUpTo n: Int, _ fn: ParallelForFunc)
// func parallelFor(blockingUpTo n: Int, blocksPerThread: Int, _ fn: ParallelForBody)
// func parallelFor(blockingUpTo n: Int, _ fn: ParallelForBody)

/// The maximum amount of parallelism possible within this thread pool.
var parallelism: Int { get }
/// The maximum number of concurrent threads of execution supported by this thread pool.
var maxParallelism: Int { get }

/// Returns the index of the current thread in the pool, if running on a thread-pool thread,
/// nil otherwise.
Expand All @@ -91,125 +131,69 @@ public protocol ComputeThreadPool {
}

extension ComputeThreadPool {
/// A default implementation of the non-throwing variation in terms of the throwing one.
public func join(_ a: () -> Void, _ b: () -> Void) {
withoutActuallyEscaping(a) { a in
let throwing: () throws -> Void = a
try! join(throwing, b)
}
}
}

/// Holds a parallel for function; this is used to avoid extra refcount overheads on the function
/// itself.
fileprivate struct ParallelForFunctionHolder {
var fn: ComputeThreadPool.ParallelForFunc
}

/// Uses `ComputeThreadPool.join` to execute `fn` in parallel.
fileprivate func runParallelFor<C: ComputeThreadPool>(
pool: C,
start: Int,
end: Int,
total: Int,
fn: UnsafePointer<ParallelForFunctionHolder>
) throws {
if start + 1 == end {
try fn.pointee.fn(start, total)
} else {
assert(end > start)
let distance = end - start
let midpoint = start + (distance / 2)
try pool.join(
{ try runParallelFor(pool: pool, start: start, end: midpoint, total: total, fn: fn) },
{ try runParallelFor(pool: pool, start: midpoint, end: end, total: total, fn: fn) })
}
}

extension ComputeThreadPool {
public func parallelFor(n: Int, _ fn: ParallelForFunc) rethrows {
try withoutActuallyEscaping(fn) { fn in
var holder = ParallelForFunctionHolder(fn: fn)
try withUnsafePointer(to: &holder) { holder in
try runParallelFor(pool: self, start: 0, end: n, total: n, fn: holder)
/// Implements `parallelFor(n:_:)` (scalar) in terms of `parallelFor(n:_:)` (vectorized).
public func parallelFor(n: Int, _ fn: ParallelForBody) {
parallelFor(n: n) { start, end, total in
for i in start..<end {
fn(i, total)
}
}
}
}

/// Typed compute threadpools support additional sophisticated operations.
public protocol TypedComputeThreadPool: ComputeThreadPool {
/// Submit a task to be executed on the threadpool.
///
/// `pRun` will execute task in parallel on the threadpool and it will complete at a future time.
/// `pRun` returns immediately.
func dispatch(_ task: (Self) -> Void)

/// Run two tasks (optionally) in parallel.
///
/// Fork-join parallelism allows for efficient work-stealing parallelism. The two non-escaping
/// functions will have finished executing before `pJoin` returns. The first function will execute on
/// the local thread immediately, and the second function will execute on another thread if resources
/// are available, or on the local thread if there are not available other resources.
func join(_ a: (Self) -> Void, _ b: (Self) -> Void)

/// Run two throwing tasks (optionally) in parallel; if one task throws, it is unspecified
/// whether the second task is even started.
///
/// This is the throwing overloaded variation.
func join(_ a: (Self) throws -> Void, _ b: (Self) throws -> Void) throws
}

extension TypedComputeThreadPool {
/// Implement the non-throwing variation in terms of the throwing one.
public func join(_ a: (Self) -> Void, _ b: (Self) -> Void) {
withoutActuallyEscaping(a) { a in
let throwing: (Self) throws -> Void = a
// Implement the non-throwing in terms of the throwing implementation.
try! join(throwing, b)
/// Implements `parallelFor(n:_:)` (scalar) in terms of `parallelFor(n:_:)` (vectorized).
public func parallelFor(n: Int, _ fn: ThrowingParallelForBody) throws {
try parallelFor(n: n) { start, end, total in
for i in start..<end {
try fn(i, total)
}
}
}
}

extension TypedComputeThreadPool {
public func dispatch(_ fn: @escaping () -> Void) {
dispatch { _ in fn() }
}

public func join(_ a: () -> Void, _ b: () -> Void) {
join({ _ in a() }, { _ in b() })
}

public func join(_ a: () throws -> Void, _ b: () throws -> Void) throws {
try join({ _ in try a() }, { _ in try b() })
}
}

/// A `ComputeThreadPool` that executes everything immediately on the current thread.
///
/// This threadpool implementation is useful for testing correctness, as well as avoiding context
/// switches when a computation is designed to be parallelized at a coarser level.
public struct InlineComputeThreadPool: TypedComputeThreadPool {
public struct InlineComputeThreadPool: ComputeThreadPool {
/// Initializes `self`.
public init() {}

/// The amount of parallelism available in this thread pool.
public var parallelism: Int { 1 }
/// The maximum number of concurrent threads of execution supported by this thread pool.
public var maxParallelism: Int { 1 }

/// The index of the current thread.
public var currentThreadIndex: Int? { 0 }

/// Dispatch `fn` to be run at some point in the future (immediately).
///
/// Note: this implementation just executes `fn` immediately.
public func dispatch(_ fn: (Self) -> Void) {
fn(self)
public func dispatch(_ fn: () -> Void) {
fn()
}

/// Executes `a` and `b` optionally in parallel, and returns when both are complete.
///
/// Note: this implementation simply executes them serially.
public func join(_ a: () -> Void, _ b: () -> Void) {
a()
b()
}

/// Executes `a` and `b` optionally in parallel, and returns when both are complete.
///
/// Note: this implementation simply executes them serially.
public func join(_ a: () throws -> Void, _ b: () throws -> Void) throws {
try a()
try b()
}

public func parallelFor(n: Int, _ fn: VectorizedParallelForBody) {
fn(0, n, n)
}

/// Executes `a` and `b` and returns when both are complete.
public func join(_ a: (Self) throws -> Void, _ b: (Self) throws -> Void) throws {
try a(self)
try b(self)
public func parallelFor(n: Int, _ fn: ThrowingVectorizedParallelForBody) throws {
try fn(0, n, n)
}
}

Expand Down Expand Up @@ -267,8 +251,8 @@ extension ComputeThreadPools {
}

/// The amount of parallelism provided by the current thread-local compute pool.
public static var parallelism: Int {
local.parallelism
public static var maxParallelism: Int {
local.maxParallelism
}

/// Sets `pool` to `local` for the duration of `body`.
Expand Down

0 comments on commit 64d2335

Please sign in to comment.