Skip to content

Commit

Permalink
Merge branch 'master' into threadpool-cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
saeta committed May 23, 2020
2 parents 2708ea6 + 4c41ec8 commit 43f726b
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 106 deletions.
@@ -0,0 +1,76 @@
// Copyright 2020 Penguin Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// TODO: once some version of atomics lands in Swift, refactor this to make it much nicer!

/// A helper that packs the spinning state of the `NonBlockingThreadPool` into 64 bits.
internal struct NonBlockingSpinningState {
var underlying: UInt64

init(_ underlying: UInt64) { self.underlying = underlying }

/// The number of spinning worker threads.
var spinningCount: UInt64 {
get {
underlying & Self.spinningCountMask
}
set {
assert(newValue < Self.spinningCountMask, "new value: \(newValue)")
underlying = (underlying & ~Self.spinningCountMask) | newValue
}
}

/// Number of non-notifying submissions into the pool.
var noNotifyCount: UInt64 {
(underlying & Self.noNotifyCountMask) >> Self.noNotifyCountShift
}

/// True iff a task has been submitted to the pool without notifying the thread pool's `condition`.
var hasNoNotifyTask: Bool {
(underlying & Self.noNotifyCountMask) != 0
}

/// Returns a new state with the the non-notifying count incremented by one.
func incrementingNoNotifyCount() -> Self {
Self(underlying + Self.noNotifyCountIncrement)
}

/// Decrements the non-notifying count by one.
mutating func decrementNoNotifyCount() {
underlying -= Self.noNotifyCountIncrement
}

/// Returns a new state with the spinning count incremented by one.
func incrementingSpinningCount() -> Self {
Self(underlying + 1)
}

/// Returns a new state with the spinning count decremented by one.
func decrementingSpinningCount() -> Self {
Self(underlying - 1)
}

static let spinningCountBits: UInt64 = 32
static let spinningCountMask: UInt64 = (1 << spinningCountBits) - 1
static let noNotifyCountBits: UInt64 = 32
static let noNotifyCountShift: UInt64 = 32
static let noNotifyCountMask: UInt64 = ((1 << noNotifyCountBits) - 1) << noNotifyCountShift
static let noNotifyCountIncrement: UInt64 = (1 << noNotifyCountShift)
}

extension NonBlockingSpinningState: CustomStringConvertible {
public var description: String {
"NonblockingSpinningState(spinningCount: \(spinningCount), noNotifyCount: \(noNotifyCount))"
}
}
Expand Up @@ -99,7 +99,7 @@ public class NonBlockingThreadPool<Environment: ConcurrencyPlatform>: ComputeThr
let totalThreadCount = threadCount + externalFastPathThreadCount
self.totalThreadCount = totalThreadCount
self.externalFastPathThreadCount = externalFastPathThreadCount
self.coprimes = makeCoprimes(upTo: totalThreadCount)
self.coprimes = positiveCoprimes(totalThreadCount)
self.queues = (0..<totalThreadCount).map { _ in Queue.make() }
self.cancelledStorage = AtomicUInt64()
self.blockedCountStorage = AtomicUInt64()
Expand Down Expand Up @@ -157,9 +157,6 @@ public class NonBlockingThreadPool<Environment: ConcurrencyPlatform>: ComputeThr
}
}

// TODO: Add API to allow expressing parallelFor without requiring closure allocations & test
// to see if that improves performance or not.

public func join(_ a: Task, _ b: Task) {
// add `b` to the work queue (and execute it immediately if queue is full).
// if added to the queue, maybe wakeup worker if required.
Expand Down Expand Up @@ -369,7 +366,7 @@ extension NonBlockingThreadPool {
/// If there are threads spinning in the steal loop, there is no need to unpark a waiting thread,
/// as the task will get picked up by one of the spinners.
private func wakeupWorkerIfRequired() {
var state = NonblockingSpinningState(spinningState.valueRelaxed)
var state = NonBlockingSpinningState(spinningState.valueRelaxed)
while true {
// if the number of tasks submitted without notifying parked threads is equal to the number of
// spinning threads, we must wake up one of the parked threads
Expand All @@ -388,7 +385,7 @@ extension NonBlockingThreadPool {
fileprivate func shouldStartSpinning() -> Bool {
if activeThreadCount > Constants.minActiveThreadsToStartSpinning { return false } // ???

var state = NonblockingSpinningState(spinningState.valueRelaxed)
var state = NonBlockingSpinningState(spinningState.valueRelaxed)
while true {
if (state.spinningCount - state.noNotifyCount) >= Constants.maxSpinningThreads {
return false
Expand All @@ -404,7 +401,7 @@ extension NonBlockingThreadPool {
///
/// - Returns: `true` if there is a task to steal; false otherwise.
fileprivate func stopSpinning() -> Bool {
var state = NonblockingSpinningState(spinningState.valueRelaxed)
var state = NonBlockingSpinningState(spinningState.valueRelaxed)
while true {
var newState = state.decrementingSpinningCount()

Expand Down Expand Up @@ -438,9 +435,6 @@ extension NonBlockingThreadPool where Environment: DefaultInitializable {
public convenience init(name: String, threadCount: Int) {
self.init(name: name, threadCount: threadCount, environment: Environment())
}

// TODO: add a convenience initializer that automatically figures out the number of threads to
// use based on available processor threads.
}

fileprivate final class PerThreadState<Environment: ConcurrencyPlatform> {
Expand Down Expand Up @@ -545,102 +539,6 @@ fileprivate final class PerThreadState<Environment: ConcurrencyPlatform> {
}
}

fileprivate func makeCoprimes(upTo n: Int) -> [Int] {
var coprimes = [Int]()
for i in 1...n {
var a = i
var b = n
// If GCD(a, b) == 1, then a and b are coprimes.
while b != 0 {
let tmp = a
a = b
b = tmp % b
}
if a == 1 { coprimes.append(i) }
}
return coprimes
}

/// Reduce `lhs` into `[0, size)`.
///
/// This is a faster variation than computing `x % size`. For additional context, please see:
/// https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction
fileprivate func fastFit(_ lhs: Int, into size: Int) -> Int {
let l = UInt32(lhs)
let r = UInt32(size)
return Int(l.multipliedFullWidth(by: r).high)
}

/// Fast random number generator using [permuted congruential
/// generators](https://en.wikipedia.org/wiki/Permuted_congruential_generator)
fileprivate struct PCGRandomNumberGenerator {
var state: UInt64
static var stream: UInt64 { 0xda3e_39cb_94b9_5bdb }

mutating func next() -> UInt32 {
let current = state
// Update the internal state
state = current &* 6_364_136_223_846_793_005 &+ Self.stream
// Calculate output function (XSH-RS scheme), uses old state for max ILP.
let base = (current ^ (current >> 22))
let shift = Int(22 + (current >> 61))
return UInt32(truncatingIfNeeded: base >> shift)
}
}

fileprivate struct NonblockingSpinningState {
var underlying: UInt64

init(_ underlying: UInt64) { self.underlying = underlying }

var spinningCount: UInt64 {
get {
underlying & Self.spinningCountMask
}
set {
assert(newValue < Self.spinningCountMask, "new value: \(newValue)")
underlying = (underlying & ~Self.spinningCountMask) | newValue
}
}

var noNotifyCount: UInt64 {
(underlying & Self.noNotifyCountMask) >> Self.noNotifyCountShift
}

var hasNoNotifyTask: Bool {
(underlying & Self.noNotifyCountMask) != 0
}

func incrementingNoNotifyCount() -> Self {
Self(underlying + Self.noNotifyCountIncrement)
}

mutating func decrementNoNotifyCount() {
underlying -= Self.noNotifyCountIncrement
}

func incrementingSpinningCount() -> Self {
Self(underlying + 1)
}

func decrementingSpinningCount() -> Self {
Self(underlying - 1)
}

static let spinningCountBits: UInt64 = 32
static let spinningCountMask: UInt64 = (1 << spinningCountBits) - 1
static let noNotifyCountBits: UInt64 = 32
static let noNotifyCountShift: UInt64 = 32
static let noNotifyCountMask: UInt64 = ((1 << noNotifyCountBits) - 1) << noNotifyCountShift
static let noNotifyCountIncrement: UInt64 = (1 << noNotifyCountShift)
}

extension NonblockingSpinningState: CustomStringConvertible {
public var description: String {
"NonblockingSpinningState(spinningCount: \(spinningCount), noNotifyCount: \(noNotifyCount))"
}
}

fileprivate struct WorkItem {
let op: () -> Void
var stateStorage: AtomicUInt64
Expand Down
@@ -0,0 +1,59 @@
// Copyright 2020 Penguin Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/// Returns the positive integers that are coprime with `n`.
///
/// Two numbers are co-prime if their GCD is 1.
internal func positiveCoprimes(_ n: Int) -> [Int] {
var coprimes = [Int]()
for i in 1...n {
var a = i
var b = n
// If GCD(a, b) == 1, then a and b are coprimes.
while b != 0 {
let tmp = a
a = b
b = tmp % b
}
if a == 1 { coprimes.append(i) }
}
return coprimes
}

/// Returns a value deterministically selected from `0..<size`.
///
/// This is a faster variation than computing `x % size`. For additional context, please see:
/// https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction
internal func fastFit(_ lhs: Int, into size: Int) -> Int {
let l = UInt32(lhs)
let r = UInt32(size)
return Int(l.multipliedFullWidth(by: r).high)
}

/// Fast pseudorandom number generator using [permuted congruential
/// generators](https://www.pcg-random.org/).
internal struct PCGRandomNumberGenerator: RandomNumberGenerator {
var state: UInt64
static var stream: UInt64 { 0xda3e_39cb_94b9_5bdb }

mutating func next() -> UInt32 {
let current = state
// Update the internal state
state = current &* 6_364_136_223_846_793_005 &+ Self.stream
// Calculate output function (XSH-RS scheme), uses old state for max ILP.
let base = (current ^ (current >> 22))
let shift = Int(22 + (current >> 61))
return UInt32(truncatingIfNeeded: base >> shift)
}
}

0 comments on commit 43f726b

Please sign in to comment.