Skip to content

Commit

Permalink
Fixed an inconsistency where "alwaysActive" handlers would continue t…
Browse files Browse the repository at this point in the history
…o receive errors after the first error.
  • Loading branch information
mattgallagher committed Aug 26, 2017
2 parents 40cf515 + 8d3bbc4 commit 6b080bc
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 47 deletions.
79 changes: 33 additions & 46 deletions Sources/CwlSignal/CwlSignal.swift
Expand Up @@ -411,7 +411,7 @@ public class Signal<Value> {
/// - returns: a continuous `SignalMulti`
public final func continuous(initialValue: Value) -> SignalMulti<Value> {
return SignalMulti<Value>(processor: attach { (s, dw) in
SignalMultiProcessor(signal: s, values: ([initialValue], nil), userUpdated: false, alwaysActive: true, dw: &dw, context: .direct, updater: { a, p, r -> (Array<Value>, Error?) in
SignalMultiProcessor(signal: s, values: ([initialValue], nil), userUpdated: false, activeWithoutOutputs: true, dw: &dw, context: .direct, updater: { a, p, r -> (Array<Value>, Error?) in
let previous: (Array<Value>, Error?) = (a, p)
switch r {
case .success(let v): a = [v]
Expand All @@ -427,7 +427,7 @@ public class Signal<Value> {
/// - returns: a continuous `SignalMulti`
public final func continuous() -> SignalMulti<Value> {
return SignalMulti<Value>(processor: attach { (s, dw) in
SignalMultiProcessor(signal: s, values: ([], nil), userUpdated: false, alwaysActive: true, dw: &dw, context: .direct, updater: { a, p, r -> (Array<Value>, Error?) in
SignalMultiProcessor(signal: s, values: ([], nil), userUpdated: false, activeWithoutOutputs: true, dw: &dw, context: .direct, updater: { a, p, r -> (Array<Value>, Error?) in
let previous: (Array<Value>, Error?) = (a, p)
switch r {
case .success(let v): a = [v]; p = nil
Expand All @@ -443,7 +443,7 @@ public class Signal<Value> {
/// - returns: a continuous `SignalMulti`
public final func continuousWhileActive() -> SignalMulti<Value> {
return SignalMulti<Value>(processor: attach { (s, dw) in
SignalMultiProcessor(signal: s, values: ([], nil), userUpdated: false, alwaysActive: false, dw: &dw, context: .direct, updater: { a, p, r -> (Array<Value>, Error?) in
SignalMultiProcessor(signal: s, values: ([], nil), userUpdated: false, activeWithoutOutputs: false, dw: &dw, context: .direct, updater: { a, p, r -> (Array<Value>, Error?) in
let previous: (Array<Value>, Error?) = (a, p)
switch r {
case .success(let v): a = [v]; p = nil
Expand All @@ -459,7 +459,7 @@ public class Signal<Value> {
/// - returns: a playback `SignalMulti`
public final func playback() -> SignalMulti<Value> {
return SignalMulti<Value>(processor: attach { (s, dw) in
SignalMultiProcessor(signal: s, values: ([], nil), userUpdated: false, alwaysActive: true, dw: &dw, context: .direct, updater: { a, p, r -> (Array<Value>, Error?) in
SignalMultiProcessor(signal: s, values: ([], nil), userUpdated: false, activeWithoutOutputs: true, dw: &dw, context: .direct, updater: { a, p, r -> (Array<Value>, Error?) in
switch r {
case .success(let v): a.append(v)
case .failure(let e): p = e
Expand All @@ -483,7 +483,7 @@ public class Signal<Value> {
/// - returns: a "multicast" `SignalMulti`.
public final func multicast() -> SignalMulti<Value> {
return SignalMulti<Value>(processor: attach { (s, dw) in
SignalMultiProcessor(signal: s, values: ([], nil), userUpdated: false, alwaysActive: false, dw: &dw, context: .direct, updater: nil)
SignalMultiProcessor(signal: s, values: ([], nil), userUpdated: false, activeWithoutOutputs: false, dw: &dw, context: .direct, updater: nil)
})
}

Expand All @@ -497,7 +497,7 @@ public class Signal<Value> {
/// - Returns: a `SignalMulti` with custom activation
public final func customActivation(initialValues: Array<Value> = [], context: Exec = .direct, updater: @escaping (_ cachedValues: inout Array<Value>, _ cachedError: inout Error?, _ incoming: Result<Value>) -> Void) -> SignalMulti<Value> {
return SignalMulti<Value>(processor: attach { (s, dw) in
SignalMultiProcessor(signal: s, values: (initialValues, nil), userUpdated: true, alwaysActive: true, dw: &dw, context: context) { (bufferedValues: inout Array<Value>, bufferedError: inout Error?, incoming: Result<Value>) -> (Array<Value>, Error?) in
SignalMultiProcessor(signal: s, values: (initialValues, nil), userUpdated: true, activeWithoutOutputs: true, dw: &dw, context: context) { (bufferedValues: inout Array<Value>, bufferedError: inout Error?, incoming: Result<Value>) -> (Array<Value>, Error?) in
let oldActivationValues = bufferedValues
let oldError = bufferedError
updater(&bufferedValues, &bufferedError, incoming)
Expand All @@ -514,7 +514,7 @@ public class Signal<Value> {
/// - Returns: a `SignalMulti`
public static func preclosed<S: Sequence>(values: S, error: Error = SignalError.closed) -> SignalMulti<Value> where S.Iterator.Element == Value {
return SignalMulti<Value>(processor: Signal<Value>().attach { (s, dw) in
SignalMultiProcessor(signal: s, values: (Array(values), error), userUpdated: false, alwaysActive: true, dw: &dw, context: .direct, updater: { a, p, r in ([], nil) })
SignalMultiProcessor(signal: s, values: (Array(values), error), userUpdated: false, activeWithoutOutputs: true, dw: &dw, context: .direct, updater: { a, p, r in ([], nil) })
})
}

Expand All @@ -526,7 +526,7 @@ public class Signal<Value> {
/// - Returns: a `SignalMulti`
public static func preclosed(_ value: Value, error: Error = SignalError.closed) -> SignalMulti<Value> {
return SignalMulti<Value>(processor: Signal<Value>().attach { (s, dw) in
SignalMultiProcessor(signal: s, values: ([value], error), userUpdated: false, alwaysActive: true, dw: &dw, context: .direct, updater: { a, p, r in ([], nil) })
SignalMultiProcessor(signal: s, values: ([value], error), userUpdated: false, activeWithoutOutputs: true, dw: &dw, context: .direct, updater: { a, p, r in ([], nil) })
})
}

Expand All @@ -536,7 +536,7 @@ public class Signal<Value> {
/// - Returns: a `SignalMulti`
public static func preclosed(error: Error = SignalError.closed) -> SignalMulti<Value> {
return SignalMulti<Value>(processor: Signal<Value>().attach { (s, dw) in
SignalMultiProcessor(signal: s, values: ([], error), userUpdated: false, alwaysActive: true, dw: &dw, context: .direct, updater: { a, p, r in ([], nil) })
SignalMultiProcessor(signal: s, values: ([], error), userUpdated: false, activeWithoutOutputs: true, dw: &dw, context: .direct, updater: { a, p, r in ([], nil) })
})
}

Expand Down Expand Up @@ -972,7 +972,7 @@ public class Signal<Value> {
var dw = DeferredWork()
mutex.sync {
if itemContext.activationCount == activationCount, !delivery.isDisabled {
signalHandler?.deactivateInternal(dw: &dw)
signalHandler?.deactivateInternal(dueToLackOfOutputs: false, dw: &dw)
}
}
dw.runWork()
Expand Down Expand Up @@ -1290,7 +1290,7 @@ fileprivate class SignalHandler<Value> {
self.handler = initialHandlerInternal()

// Propagate immediately
if alwaysActiveInternal {
if activeWithoutOutputsInternal {
if activateInternal(dw: &dw) {
let count = self.signal.activationCount
dw.append { self.endActivation(activationCount: count) }
Expand All @@ -1316,7 +1316,7 @@ fileprivate class SignalHandler<Value> {
}

// True if this node activates predecessors even when it has no active successors
fileprivate var alwaysActiveInternal: Bool {
fileprivate var activeWithoutOutputsInternal: Bool {
assert(signal.mutex.unbalancedTryLock() == false)
return false
}
Expand Down Expand Up @@ -1383,25 +1383,23 @@ fileprivate class SignalHandler<Value> {
fileprivate func handleSynchronousToNormalInternal(dw: inout DeferredWork) {
}

// Override point invoked from `deactivateInternal` used in `SignalEndpoint`
// - Parameter dw: required
fileprivate func handleDeactivationInternal(dw: inout DeferredWork) {
}

// Changes delivery to disabled *and* resets the handler to the initial handler.
// - Parameter dw: required
fileprivate final func deactivateInternal(dw: inout DeferredWork) {
fileprivate final func deactivateInternal(dueToLackOfOutputs: Bool, dw: inout DeferredWork) {
assert(signal.mutex.unbalancedTryLock() == false)
handleDeactivationInternal(dw: &dw)
if !alwaysActiveInternal {
if !activeWithoutOutputsInternal || !dueToLackOfOutputs {
signal.changeDeliveryInternal(newDelivery: .disabled, dw: &dw)
dw.append { [handler] in
withExtendedLifetime(handler) {}

// Endpoints may release themselves on deactivation so we need to keep ourselves alive until outside the lock
withExtendedLifetime(self) {}
}
handler = initialHandlerInternal()
if !activeWithoutOutputsInternal {
handler = initialHandlerInternal()
} else {
handler = { r in }
}
}
}
}
Expand Down Expand Up @@ -1531,7 +1529,7 @@ fileprivate class SignalProcessor<Value, U>: SignalHandler<Value>, SignalPredece
if activationCount != nil {
sendActivationToOutputInternal(index: index, dw: &dw)
result = activateInternal(dw: &dw)
} else if activationCount == nil && !signal.delivery.isDisabled && !alwaysActiveInternal {
} else if activationCount == nil && !signal.delivery.isDisabled && !activeWithoutOutputsInternal {
var anyStillActive = false
for o in outputs {
if o.activationCount != nil {
Expand All @@ -1540,7 +1538,7 @@ fileprivate class SignalProcessor<Value, U>: SignalHandler<Value>, SignalPredece
}
}
if !anyStillActive {
deactivateInternal(dw: &dw)
deactivateInternal(dueToLackOfOutputs: true, dw: &dw)
}
}

Expand Down Expand Up @@ -1735,32 +1733,32 @@ fileprivate class SignalMultiProcessor<Value>: SignalProcessor<Value, Value> {
var activationValues: Array<Value>
var preclosed: Error?
let userUpdated: Bool
let alwaysActive: Bool
let activeWithoutOutputs: Bool

// Rather than using different subclasses for each of the "multi" `Signal`s, this one subclass is used for all. However, that requires a few different parameters to enable different behaviors.
//
// - Parameters:
// - signal: the predecessor signal
// - values: the initial activation values and error
// - userUpdated: whether the `updater` is user-supplied and needs value-copying to ensure thread-safety
// - alwaysActive: whether the handler should immediately activate
// - activeWithoutOutputs: whether the handler should immediately activate
// - dw: required
// - context: where the `updater` will be run
// - updater: when a new signal is received, updates the cached activation values and error
init(signal: Signal<Value>, values: (Array<Value>, Error?), userUpdated: Bool, alwaysActive: Bool, dw: inout DeferredWork, context: Exec, updater: Updater?) {
init(signal: Signal<Value>, values: (Array<Value>, Error?), userUpdated: Bool, activeWithoutOutputs: Bool, dw: inout DeferredWork, context: Exec, updater: Updater?) {
precondition((values.1 == nil && values.0.isEmpty) || updater != nil, "Non empty activation values requires always active.")
self.updater = updater
self.activationValues = values.0
self.preclosed = values.1
self.userUpdated = userUpdated
self.alwaysActive = alwaysActive
self.activeWithoutOutputs = activeWithoutOutputs
super.init(signal: signal, dw: &dw, context: context)
}

// Multicast and continuousWhileActive are not preactivated but all others are not.
fileprivate override var alwaysActiveInternal: Bool {
fileprivate override var activeWithoutOutputsInternal: Bool {
assert(signal.mutex.unbalancedTryLock() == false)
return alwaysActive
return activeWithoutOutputs
}

// Multiprocessor can handle multiple outputs
Expand Down Expand Up @@ -1873,7 +1871,7 @@ fileprivate class SignalCacheUntilActive<Value>: SignalProcessor<Value, Value> {
}

// Is always active
fileprivate override var alwaysActiveInternal: Bool {
fileprivate override var activeWithoutOutputsInternal: Bool {
assert(signal.mutex.unbalancedTryLock() == false)
return true
}
Expand Down Expand Up @@ -2307,7 +2305,7 @@ public final class SignalCapture<Value>: SignalProcessor<Value, Value>, Cancella
}

// Once an output is connected, `SignalCapture` becomes a no-special-behaviors passthrough handler.
fileprivate override var alwaysActiveInternal: Bool {
fileprivate override var activeWithoutOutputsInternal: Bool {
assert(signal.mutex.unbalancedTryLock() == false)
return outputs.count > 0 ? false : true
}
Expand Down Expand Up @@ -2748,7 +2746,6 @@ public class SignalMergedInput<Value>: SignalMultiInput<Value> {
/// This class is instantiated by calling `subscribe` on any `Signal`.
public final class SignalEndpoint<Value>: SignalHandler<Value>, Cancellable {
private let userHandler: (Result<Value>) -> Void
private var closed = false

/// Constructor called from `subscribe`
///
Expand All @@ -2769,31 +2766,21 @@ public final class SignalEndpoint<Value>: SignalHandler<Value>, Cancellable {
return { [userHandler] r in userHandler(r) }
}

// Endpoints are "always active" until they deactivate, after which they never reactivate.
// - Parameter dw: required
fileprivate override func handleDeactivationInternal(dw: inout DeferredWork) {
closed = true
}

/// A `SignalEndpoint` is active until closed (receives a `failure` signal)
fileprivate override var alwaysActiveInternal: Bool {
fileprivate override var activeWithoutOutputsInternal: Bool {
assert(signal.mutex.unbalancedTryLock() == false)
if closed {
return false
} else {
return true
}
return true
}

/// A simple test for whether this endpoint has received an error, yet. Not generally needed (responding to state changes is best done through the handler function itself).
public var isClosed: Bool {
return sync { closed }
return sync { signal.delivery.isDisabled }
}

/// Implementatation of `Cancellable` forces deactivation
public func cancel() {
var dw = DeferredWork()
sync { if !closed { deactivateInternal(dw: &dw) } }
sync { if !signal.delivery.isDisabled { deactivateInternal(dueToLackOfOutputs: false, dw: &dw) } }
dw.runWork()
}

Expand Down
21 changes: 20 additions & 1 deletion Tests/CwlSignalTests/CwlSignalReactiveTests.swift
Expand Up @@ -1547,7 +1547,26 @@ class SignalReactiveTests: XCTestCase {

withExtendedLifetime(ep3) {}
}


func testPlaygroundMerge() {
let smileysArray = ["😀", "🙃", "😉", "🤣"]
let spookeysArray = ["👻", "🎃", "👹", "😈"]
let animalsArray = ["🐶", "🐱", "🐭", "🐨"]
let smileys = Signal<String>.from(values: smileysArray, error: nil).playback()
let spookeys = Signal<String>.from(values: spookeysArray, error: SignalError.closed).playback()
let animals = Signal<String>.from(values: animalsArray, error: SignalError.cancelled).playback()

var result = [String]()
let ep = Signal<String>.merge(smileys, spookeys, animals).subscribeValues {
result.append($0)
}
var expected = smileysArray
expected.append(contentsOf: spookeysArray)
expected.append(contentsOf: animalsArray)
XCTAssert(result == expected)
withExtendedLifetime(ep) {}
}

func testMerge() {
let merge2 = Signal<Int>.merge([Signal<Int>.from(values: 0..<10), Signal<Int>.from(values: 10..<20)])
var results2 = [Result<Int>]()
Expand Down

0 comments on commit 6b080bc

Please sign in to comment.