Skip to content

Commit

Permalink
ThreadPool cleanup (3/n): Switch to vectorized API & remove unused/co…
Browse files Browse the repository at this point in the history
…nfusing extensions and implementations.

This change takes a first^H^H^H^H^H^Hthird whack at a bunch of tech debt:
 1. Removes the Naive thread pool implementation from `PJoin`.
 2. Removes the unnecessary `TypedComputeThreadPool` protocol refinement.
 3. Removes the badly implemented extensions that implemented `parallelFor`
    in terms of `join`.
 4. Removes use of `rethrows`, as the rethrows language feature is not
    expressive enough to allow the performance optimizations for the non-throwing
    case.
  • Loading branch information
saeta committed May 22, 2020
1 parent 81d80b8 commit 2708ea6
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 534 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,40 @@ public class NonBlockingThreadPool<Environment: ConcurrencyPlatform>: ComputeThr
if let e = err { throw e }
}

public func parallelFor(n: Int, _ fn: VectorizedParallelForFunction) {
let grainSize = n / parallelism // TODO: Make adaptive!

func executeParallelFor(_ start: Int, _ end: Int) {
if start + grainSize >= end {
fn(start, end, n)
} else {
// Divide into 2 & recurse.
let rangeSize = end - start
let midPoint = start + (rangeSize / 2)
self.join({ executeParallelFor(start, midPoint) }, { executeParallelFor(midPoint, end)})
}
}

executeParallelFor(0, n)
}

public func parallelFor(n: Int, _ fn: ThrowingVectorizedParallelForFunction) throws {
let grainSize = n / parallelism // TODO: Make adaptive!

func executeParallelFor(_ start: Int, _ end: Int) throws {
if start + grainSize >= end {
try fn(start, end, n)
} else {
// Divide into 2 & recurse.
let rangeSize = end - start
let midPoint = start + (rangeSize / 2)
try self.join({ try executeParallelFor(start, midPoint) }, { try executeParallelFor(midPoint, end) })
}
}

try executeParallelFor(0, n)
}

/// Shuts down the thread pool.
public func shutDown() {
cancelled = true
Expand Down
172 changes: 76 additions & 96 deletions Sources/PenguinParallel/ThreadPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -60,25 +60,61 @@ public protocol ComputeThreadPool {
/// This is the throwing overload
func join(_ a: () throws -> Void, _ b: () throws -> Void) throws

/// A function that can be executed in parallel.
///
/// The first argument is the index of the invocation, and the second argument is the total number
/// of invocations.
typealias ParallelForFunction = (Int, Int) -> Void

/// A function that can be executed in parallel.
///
/// The first argument is the index of the copy, and the second argument is the total number of
/// copies being executed.
typealias ParallelForFunc = (Int, Int) throws -> Void
typealias ThrowingParallelForFunction = (Int, Int) throws -> Void

/// A vectorized function that can be executed in parallel.
///
/// The first argument is the start index for the vectorized operation, and the second argument
/// corresponds to the end of the range. The third argument contains the total size of the range.
typealias VectorizedParallelForFunction = (Int, Int, Int) -> Void

/// A vectorized function that can be executed in parallel.
///
/// The first argument is the start index for the vectorized operation, and the second argument
/// corresponds to the end of the range. The third argument contains the total size of the range.
typealias ThrowingVectorizedParallelForFunction = (Int, Int, Int) throws -> Void

/// Returns after executing `fn` `n` times.
///
/// - Parameter n: The total times to execute `fn`.
func parallelFor(n: Int, _ fn: ParallelForFunc) rethrows
func parallelFor(n: Int, _ fn: ParallelForFunction)

/// Returns after executing `fn` an unspecified number of times, guaranteeing that `fn` has been
/// called with parameters that perfectly cover of the range `0..<n`.
///
/// - Parameter n: The range of numbers `0..<n` to cover.
func parallelFor(n: Int, _ fn: VectorizedParallelForFunction)

/// Returns after executing `fn` `n` times.
///
/// - Parameter n: The total times to execute `fn`.
func parallelFor(n: Int, _ fn: ThrowingParallelForFunction) throws

/// Returns after executing `fn` an unspecified number of times, guaranteeing that `fn` has been
/// called with parameters that perfectly cover of the range `0..<n`.
///
/// - Parameter n: The range of numbers `0..<n` to cover.
func parallelFor(n: Int, _ fn: ThrowingVectorizedParallelForFunction) throws


// TODO: Add this & a default implementation!
// /// Returns after executing `fn` `n` times.
// ///
// /// - Parameter n: The total times to execute `fn`.
// /// - Parameter blocksPerThread: The minimum block size to subdivide. If unspecified, a good
// /// value will be chosen based on the amount of available parallelism.
// func parallelFor(blockingUpTo n: Int, blocksPerThread: Int, _ fn: ParallelForFunc)
// func parallelFor(blockingUpTo n: Int, _ fn: ParallelForFunc)
// func parallelFor(blockingUpTo n: Int, blocksPerThread: Int, _ fn: ParallelForFunction)
// func parallelFor(blockingUpTo n: Int, _ fn: ParallelForFunction)

/// The maximum amount of parallelism possible within this thread pool.
var parallelism: Int { get }
Expand All @@ -91,105 +127,31 @@ public protocol ComputeThreadPool {
}

extension ComputeThreadPool {
/// A default implementation of the non-throwing variation in terms of the throwing one.
public func join(_ a: () -> Void, _ b: () -> Void) {
withoutActuallyEscaping(a) { a in
let throwing: () throws -> Void = a
try! join(throwing, b)
}
}
}

/// Holds a parallel for function; this is used to avoid extra refcount overheads on the function
/// itself.
fileprivate struct ParallelForFunctionHolder {
var fn: ComputeThreadPool.ParallelForFunc
}

/// Uses `ComputeThreadPool.join` to execute `fn` in parallel.
fileprivate func runParallelFor<C: ComputeThreadPool>(
pool: C,
start: Int,
end: Int,
total: Int,
fn: UnsafePointer<ParallelForFunctionHolder>
) throws {
if start + 1 == end {
try fn.pointee.fn(start, total)
} else {
assert(end > start)
let distance = end - start
let midpoint = start + (distance / 2)
try pool.join(
{ try runParallelFor(pool: pool, start: start, end: midpoint, total: total, fn: fn) },
{ try runParallelFor(pool: pool, start: midpoint, end: end, total: total, fn: fn) })
}
}

extension ComputeThreadPool {
public func parallelFor(n: Int, _ fn: ParallelForFunc) rethrows {
try withoutActuallyEscaping(fn) { fn in
var holder = ParallelForFunctionHolder(fn: fn)
try withUnsafePointer(to: &holder) { holder in
try runParallelFor(pool: self, start: 0, end: n, total: n, fn: holder)
/// Convert a non-vectorized operation to a vectorized operation.
public func parallelFor(n: Int, _ fn: ParallelForFunction) {
parallelFor(n: n) { start, end, total in
for i in start..<end {
fn(i, total)
}
}
}
}

/// Typed compute threadpools support additional sophisticated operations.
public protocol TypedComputeThreadPool: ComputeThreadPool {
/// Submit a task to be executed on the threadpool.
///
/// `pRun` will execute task in parallel on the threadpool and it will complete at a future time.
/// `pRun` returns immediately.
func dispatch(_ task: (Self) -> Void)

/// Run two tasks (optionally) in parallel.
///
/// Fork-join parallelism allows for efficient work-stealing parallelism. The two non-escaping
/// functions will have finished executing before `pJoin` returns. The first function will execute on
/// the local thread immediately, and the second function will execute on another thread if resources
/// are available, or on the local thread if there are not available other resources.
func join(_ a: (Self) -> Void, _ b: (Self) -> Void)

/// Run two throwing tasks (optionally) in parallel; if one task throws, it is unspecified
/// whether the second task is even started.
///
/// This is the throwing overloaded variation.
func join(_ a: (Self) throws -> Void, _ b: (Self) throws -> Void) throws
}

extension TypedComputeThreadPool {
/// Implement the non-throwing variation in terms of the throwing one.
public func join(_ a: (Self) -> Void, _ b: (Self) -> Void) {
withoutActuallyEscaping(a) { a in
let throwing: (Self) throws -> Void = a
// Implement the non-throwing in terms of the throwing implementation.
try! join(throwing, b)
/// Convert a non-vectorized operation to a vectorized operation.
public func parallelFor(n: Int, _ fn: ThrowingParallelForFunction) throws {
try parallelFor(n: n) { start, end, total in
for i in start..<end {
try fn(i, total)
}
}
}
}

extension TypedComputeThreadPool {
public func dispatch(_ fn: @escaping () -> Void) {
dispatch { _ in fn() }
}

public func join(_ a: () -> Void, _ b: () -> Void) {
join({ _ in a() }, { _ in b() })
}

public func join(_ a: () throws -> Void, _ b: () throws -> Void) throws {
try join({ _ in try a() }, { _ in try b() })
}
}

/// A `ComputeThreadPool` that executes everything immediately on the current thread.
///
/// This threadpool implementation is useful for testing correctness, as well as avoiding context
/// switches when a computation is designed to be parallelized at a coarser level.
public struct InlineComputeThreadPool: TypedComputeThreadPool {
public struct InlineComputeThreadPool: ComputeThreadPool {
/// Initializes `self`.
public init() {}

Expand All @@ -202,14 +164,32 @@ public struct InlineComputeThreadPool: TypedComputeThreadPool {
/// Dispatch `fn` to be run at some point in the future (immediately).
///
/// Note: this implementation just executes `fn` immediately.
public func dispatch(_ fn: (Self) -> Void) {
fn(self)
public func dispatch(_ fn: () -> Void) {
fn()
}

/// Executes `a` and `b` optionally in parallel, and returns when both are complete.
///
/// Note: this implementation simply executes them serially.
public func join(_ a: () -> Void, _ b: () -> Void) {
a()
b()
}

/// Executes `a` and `b` optionally in parallel, and returns when both are complete.
///
/// Note: this implementation simply executes them serially.
public func join(_ a: () throws -> Void, _ b: () throws -> Void) throws {
try a()
try b()
}

public func parallelFor(n: Int, _ fn: VectorizedParallelForFunction) {
fn(0, n, n)
}

/// Executes `a` and `b` and returns when both are complete.
public func join(_ a: (Self) throws -> Void, _ b: (Self) throws -> Void) throws {
try a(self)
try b(self)
public func parallelFor(n: Int, _ fn: ThrowingVectorizedParallelForFunction) throws {
try fn(0, n, n)
}
}

Expand Down

0 comments on commit 2708ea6

Please sign in to comment.