/
CwlSignal.swift
3302 lines (2952 loc) · 167 KB
/
CwlSignal.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
//
// CwlSignal.swift
// CwlSignal
//
// Created by Matt Gallagher on 2016/06/05.
// Copyright © 2016 Matt Gallagher ( https://www.cocoawithlove.com ). All rights reserved.
//
// Permission to use, copy, modify, and/or distribute this software for any purpose with or without
// fee is hereby granted, provided that the above copyright notice and this permission notice
// appear in all copies.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS
// SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE
// AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
// NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
// OF THIS SOFTWARE.
//
import CwlUtils
import Foundation
/// This protocol allows transformations that apply to `Signal` types to be applied to a type that exposes a signal.
public protocol SignalInterface {
associatedtype OutputValue
var signal: Signal<OutputValue> { get }
}
/// This protocol allows transformations that apply to `Signal` types to be applied to a type that exposes a signal.
public protocol SignalInputInterface {
associatedtype InputValue
var input: SignalInput<InputValue> { get }
}
#if DEBUG_LOGGING
// NOTE: This is thread unsafe. There should really be a lock around access and logging.
var globalCount: Int = 0
#endif
/// A composable, one-way, potentially asynchronous, FIFO communication channel that delivers a sequence of `Result<OutputValue, SignalEnd>`.
///
/// In conjunction with various transformation functions, this class forms the core of a reactive programming system. Try the playgrounds for a better walkthrough of concepts and usage.
///
/// # Terminology
///
/// The word "signal" may be used in a number of ways, so keep in mind:
/// - `Signal`: refers to this class
/// - signal graph: one or more `Signal` instances, connected together, from `SignalInput` to `SignalOutput`
/// - signal: the sequence of `Result` instances, from activation to close, that pass through a signal graph
public class Signal<OutputValue>: SignalInterface {
public typealias Result = Swift.Result<OutputValue, SignalEnd>
public enum Next {
case none
case single(Result)
case array(Array<Result>)
}
// # GOALS
//
// The primary design goals for this implementation are:
// 1. All possible actions on `Signal` itself are threadsafe (no possible action results in undefined or corrupt memory behavior for internal data)
// 2. Deadlocks on internally created mutexes will never occur.
// 3. Values will never be delivered out-of-order.
// 4. After a disconnection and reconnection, only values from the latest connection will be delivered.
// 5. Loopback (sending to an antecedent input from a subsequent signal handler) and attempts at re-entrancy to any closure in the graph are permitted. Attempted re-entrancy delivery is simply queued to be delivered after any in-flight behavior completes.
//
// That's quite a list of goals but it's largely covered by two ideas:
// 1. No user code is ever invoked inside a `Signal` internal mutex
// 2. Delivery to a `Signal` includes the "predecessor" and the "activationCount". If either fail to match the internal state of the `Signal`, then the delivery is out-of-date and can be discarded.
//
// The first of these points is ensured through the use of `itemProcessing`, `holdCount` and `DeferredWork`. The `itemProcessing` and `holdCount` block a queue while out-of-mutex work is performed. The `DeferredWork` defers work to be performed later, once the stack has unwound and no mutexes are held.
// This ensures that no problematic work is performed inside a mutex but it means that we often have "in-flight" work occurring outside a mutex that might no longer be valid. So we need to combine this work identifiers that allow us to reject out-of-date work. That's where the second point becomes important.
// The "activationCount" for an `Signal` changes any time a manual input control is generated (`SignalInput`/`SignalMergedInput`), any time a first predecessor is added or any time there are predecessors connected and the `delivery` state changes to or from `.disabled`. Combined with the fact that it is not possible to disconnect and re-add the same predecessor to a multi-input Signal (SignalMergedInput or SignalCombiner) this guarantees any messages from out-of-date but still in-flight deliveries are ignored.
//
// # LIMITS TO THREADSAFETY
//
// While all actions on `Signal` are threadsafe, there are some points to keep in mind:
// 1. Threadsafe means that the internal members of the `Signal` class will remain threadsafe and your own closures will always be serially and non-reentrantly invoked on the provided `Exec` context. However, this doesn't mean that work you perform in processing closures is always threadsafe; shared references or mutable captures in your closures will still require mutual exclusion.
// 2. Delivery of signal values is guaranteed to be in-order and within appropriate mutexes but is not guaranteed to be executed on the sending thread. If subsequent results are sent to a `Signal` from a second thread while the `Signal` is processing a previous result from a first thread the subsequent result will be *queued* and handled on the *first* thread once it completes processing the earlier values.
// 3. Handlers, captured values and state values will be released *outside* all contexts or mutexes. If you capture an object with `deinit` behavior in a processing closure, you must apply any synchronization context yourself.
// MARK: - Signal static construction functions
/// Create a manual input/output pair where values sent to the `SignalInput` are passed through the `Signal` output.
///
/// - returns: a (`SignalInput`, `Signal`) tuple being the input and output for this stage in the signal pipeline.
public static func create() -> (input: SignalInput<OutputValue>, signal: Signal<OutputValue>) {
let s = Signal<OutputValue>()
s.activationCount = 1
return (SignalInput(signal: s, activationCount: s.activationCount), s)
}
/// A version of created that creates a `SignalMultiInput` instead of a `SignalInput`.
///
/// - Returns: the (input, signal)
public static func createMultiInput() -> (input: SignalMultiInput<OutputValue>, signal: Signal<OutputValue>) {
let s = Signal<OutputValue>()
var dw = DeferredWork()
s.sync { s.updateActivationInternal(andInvalidateAllPrevious: true, dw: &dw) }
dw.runWork()
return (SignalMultiInput(signal: s), s)
}
/// A version of created that creates a `SignalMergedInput` instead of a `SignalInput`.
///
/// - Returns: the (input, signal)
public static func createMergedInput(onLastInputClosed: SignalEnd? = nil, onDeinit: SignalEnd = .cancelled) -> (input: SignalMergedInput<OutputValue>, signal: Signal<OutputValue>) {
let s = Signal<OutputValue>()
var dw = DeferredWork()
s.sync { s.updateActivationInternal(andInvalidateAllPrevious: true, dw: &dw) }
dw.runWork()
return (SignalMergedInput(signal: s, onLastInputClosed: onLastInputClosed, onDeinit: onDeinit), s)
}
/// Similar to `create`, in that it creates a "head" for the graph but rather than immediately providing a `SignalInput`, this function calls the `activationChange` function when the signal graph is activated and provides the newly created `SignalInput` at that time. When the graph deactivates, `nil` is sent to the `activationChange` function. If a subsequent reactivation occurs, the new `SignalInput` for the re-activation is provided.
///
/// - Parameters:
/// - context: the `activationChange` will be invoked in this context
/// - activationChange: receives inputs on activation and nil on each deactivation
/// - Returns: the constructed `Signal`
public static func generate(context: Exec = .direct, _ activationChange: @escaping (_ input: SignalInput<OutputValue>?) -> Void) -> Signal<OutputValue> {
let s = Signal<OutputValue>()
let nis = Signal<Any?>()
s.newInputSignal = (nis, nis.subscribe(context: context) { r in
if case .success(let v) = r {
activationChange(v as? SignalInput<OutputValue>)
}
})
return s
}
/// Constructs a `SignalMulti` with an array of "activation" values and a closing error.
///
/// - Parameters:
/// - values: an array of values
/// - end: the closing condition for the `Signal`
/// - Returns: a `SignalMulti`
public static func preclosed<S: Sequence>(sequence: S, end: SignalEnd = .complete) -> SignalMulti<OutputValue> where S.Iterator.Element == OutputValue {
return SignalMulti<OutputValue>(processor: Signal<OutputValue>().attach { (s, dw) in
SignalMultiProcessor(source: s, values: (Array(sequence), end), userUpdated: false, activeWithoutOutputs: .always, dw: &dw, context: .direct, updater: { a, p, r in ([], nil) })
})
}
/// Constructs a `SignalMulti` with a single activation value and a closing error.
///
/// - Parameters:
/// - value: a single value
/// - end: the closing condition for the `Signal`
/// - Returns: a `SignalMulti`
public static func preclosed(_ values: OutputValue..., end: SignalEnd = .complete) -> SignalMulti<OutputValue> {
return preclosed(sequence: values, end: end)
}
/// Constructs a `SignalMulti` that is already closed with an error.
///
/// - Parameter end: the closing condition for the `Signal`
/// - Returns: a `SignalMulti`
public static func preclosed(end: SignalEnd = .complete) -> SignalMulti<OutputValue> {
return SignalMulti<OutputValue>(processor: Signal<OutputValue>().attach { (s, dw) in
SignalMultiProcessor(source: s, values: ([], end), userUpdated: false, activeWithoutOutputs: .always, dw: &dw, context: .direct, updater: { a, p, r in ([], nil) })
})
}
// MARK: - Signal public transformation functions
public var signal: Signal<OutputValue> { return self }
/// Appends a `SignalOutput` listener to the value emitted from this `Signal`. The output will "activate" this `Signal` and all direct antecedents in the graph (which may start lazy operations deferred until activation).
///
/// - Parameters:
/// - context: context: the `Exec` context used to invoke the `handler`
/// - handler: the function invoked for each received `Result`
/// - Returns: the created `SignalOutput` (if released, the subscription will be cancelled).
public final func subscribe(context: Exec = .direct, _ handler: @escaping (Result) -> Void) -> SignalOutput<OutputValue> {
return attach { (s, dw) in
SignalOutput<OutputValue>(source: s, dw: &dw, context: context, handler: handler)
}
}
/// A version of `subscribe` that retains the `SignalOutput` internally, keeping the signal graph alive. The `SignalOutput` is cancelled and released if the signal closes or if the handler returns `false` after any signal.
///
/// NOTE: this subscriber deliberately creates a reference counted loop. If the signal is never closed and the handler never returns false, it will result in a memory leak. This function should be used only when `self` is guaranteed to close or the handler `false` condition is guaranteed.
///
/// - Parameters:
/// - context: the execution context where the `processor` will be invoked
/// - handler: will be invoked with each value received and if returns `false`, the output will be cancelled and released
public final func subscribeWhile(context: Exec = .direct, _ handler: @escaping (Result) -> Bool) {
_ = attach { (s, dw) in
var handlerRetainedOutput: SignalOutput<OutputValue>? = nil
let output = SignalOutput<OutputValue>(source: s, dw: &dw, context: context, handler: { r in
withExtendedLifetime(handlerRetainedOutput) {}
if !handler(r) || r.isFailure {
handlerRetainedOutput?.cancel()
handlerRetainedOutput = nil
}
})
handlerRetainedOutput = output
return output
}
}
/// Appends a disconnected `SignalJunction` to this `Signal` so outputs can be repeatedly joined and disconnected from this graph in the future.
///
/// - Returns: the `SignalJunction<OutputValue>`
public final func junction() -> SignalJunction<OutputValue> {
return attach { (s, dw) -> SignalJunction<OutputValue> in
return SignalJunction<OutputValue>(source: s, dw: &dw)
}
}
/// Appends an immediately activated handler that captures any activation values from this `Signal`. The captured values can be accessed from the `SignalCapture<OutputValue>` using the `activation()` function. The `SignalCapture<OutputValue>` can then be joined to further `Signal`s using the `bind(to:)` function on the `SignalCapture<OutputValue>`.
///
/// - Returns: the handler than can be used to obtain activation values and bind to subsequent nodes.
public final func capture() -> SignalCapture<OutputValue> {
return attach { (s, dw) -> SignalCapture<OutputValue> in
SignalCapture<OutputValue>(source: s, dw: &dw)
}
}
/// Appends a handler function that transforms the value emitted from this `Signal` into a new `Signal`.
///
/// - Parameters:
/// - context: the `Exec` context used to invoke the `handler`
/// - processor: the function invoked for each received `Result`
/// - Returns: the created `Signal`
public final func transform<U>(context: Exec = .direct, _ processor: @escaping (Result) -> Signal<U>.Next) -> Signal<U> {
return Signal<U>(processor: attach { (s, dw) in
SignalTransformer<OutputValue, U>(source: s, dw: &dw, context: context, processor)
}).returnToGlobalIfNeeded(context: context)
}
/// Appends a handler function that transforms the value emitted from this `Signal` into a new `Signal`.
///
/// - Parameters:
/// - context: the `Exec` context used to invoke the `handler`
/// - processor: the function invoked for each received `Result`
/// - Returns: the created `Signal`
public final func transformActivation<U>(context: Exec = .direct, activation: @escaping (Result) -> Signal<U>.Next, _ processor: @escaping (Result) -> Signal<U>.Next) -> Signal<U> {
return Signal<U>(processor: attach { (s, dw) in
SignalActivationTransformer<OutputValue, U>(source: s, dw: &dw, context: context, activationProcessor: activation, processor)
}).returnToGlobalIfNeeded(context: context)
}
/// Appends a handler function that transforms the value emitted from this `Signal` into a new `Signal`.
///
/// - Parameters:
/// - initialState: the initial value for a state value associated with the handler. This value is retained and if the signal graph is deactivated, the state value is reset to this value.
/// - context: the `Exec` context used to invoke the `handler`
/// - processor: the function invoked for each received `Result`
/// - Returns: the transformed output `Signal`
public final func transform<S, U>(initialState: S, context: Exec = .direct, _ processor: @escaping (inout S, Result) -> Signal<U>.Next) -> Signal<U> {
return Signal<U>(processor: attach { (s, dw) in
SignalTransformerWithState<OutputValue, U, S>(source: s, initialState: initialState, dw: &dw, context: context, processor)
}).returnToGlobalIfNeeded(context: context)
}
/// Appends a handler function that receives inputs from this and another `Signal<U>`. The `handler` function applies any transformation it wishes an emits a (potentially) third `Signal` type.
///
/// - Parameters:
/// - second: the other `Signal` that is, along with `self` used as input to the `handler`
/// - context: the `Exec` context used to invoke the `handler`
/// - processor: processes inputs from either `self` or `second` as `EitherResult2<OutputValue, U>` (an enum which may contain either `.result1` or `.result2` corresponding to `self` or `second`) and sends results to an `SignalNext<V>`.
/// - Returns: an `Signal<V>` which is the result stream from the `SignalNext<V>` passed to the `handler`.
public final func combine<U: SignalInterface, V>(_ second: U, context: Exec = .direct, _ processor: @escaping (EitherResult2<OutputValue, U.OutputValue>) -> Signal<V>.Next) -> Signal<V> {
return Signal<EitherResult2<OutputValue, U.OutputValue>>(processor: self.attach { (s1, dw) -> SignalCombiner<OutputValue, EitherResult2<OutputValue, U.OutputValue>> in
SignalCombiner(source: s1, dw: &dw, context: .direct, processor: EitherResult2<OutputValue, U.OutputValue>.result1)
}).addPreceeding(processor: second.signal.attach { (s2, dw) -> SignalCombiner<U.OutputValue, EitherResult2<OutputValue, U.OutputValue>> in
SignalCombiner(source: s2, dw: &dw, context: .direct, processor: EitherResult2<OutputValue, U.OutputValue>.result2)
}).transform(context: context, Signal.successProcessor(processor)).returnToGlobalIfNeeded(context: context)
}
/// Appends a handler function that receives inputs from this and two other `Signal`s. The `handler` function applies any transformation it wishes an emits a (potentially) fourth `Signal` type.
///
/// - Parameters:
/// - second: the second `Signal`, after `self` used as input to the `handler`
/// - third: the third `Signal`, after `self` and `second`, used as input to the `handler`
/// - context: the `Exec` context used to invoke the `handler`
/// - processor: processes inputs from either `self`, `second` or `third` as `EitherResult3<OutputValue, U, V>` (an enum which may contain either `.result1`, `.result2` or `.result3` corresponding to `self`, `second` or `third`) and sends results to an `SignalNext<W>`.
/// - Returns: an `Signal<W>` which is the result stream from the `SignalNext<W>` passed to the `handler`.
public final func combine<U: SignalInterface, V: SignalInterface, W>(_ second: U, _ third: V, context: Exec = .direct, _ processor: @escaping (EitherResult3<OutputValue, U.OutputValue, V.OutputValue>) -> Signal<W>.Next) -> Signal<W> {
return Signal<EitherResult3<OutputValue, U.OutputValue, V.OutputValue>>(processor: self.attach { (s1, dw) -> SignalCombiner<OutputValue, EitherResult3<OutputValue, U.OutputValue, V.OutputValue>> in
SignalCombiner(source: s1, dw: &dw, context: .direct, processor: EitherResult3<OutputValue, U.OutputValue, V.OutputValue>.result1)
}).addPreceeding(processor: second.signal.attach { (s2, dw) -> SignalCombiner<U.OutputValue, EitherResult3<OutputValue, U.OutputValue, V.OutputValue>> in
SignalCombiner(source: s2, dw: &dw, context: .direct, processor: EitherResult3<OutputValue, U.OutputValue, V.OutputValue>.result2)
}).addPreceeding(processor: third.signal.attach { (s3, dw) -> SignalCombiner<V.OutputValue, EitherResult3<OutputValue, U.OutputValue, V.OutputValue>> in
SignalCombiner(source: s3, dw: &dw, context: .direct, processor: EitherResult3<OutputValue, U.OutputValue, V.OutputValue>.result3)
}).transform(context: context, Signal.successProcessor(processor)).returnToGlobalIfNeeded(context: context)
}
/// Appends a handler function that receives inputs from this and three other `Signal`s. The `handler` function applies any transformation it wishes an emits a (potentially) fifth `Signal` type.
///
/// - Parameters:
/// - second: the second `Signal`, after `self` used as input to the `handler`
/// - third: the third `Signal`, after `self` and `second`, used as input to the `handler`
/// - fourth: the fourth `Signal`, after `self`, `second` and `third`, used as input to the `handler`
/// - context: the `Exec` context used to invoke the `handler`
/// - processor: processes inputs from either `self`, `second`, `third` or `fourth` as `EitherResult4<OutputValue, U, V, W>` (an enum which may contain either `.result1`, `.result2`, `.result3` or `.result4` corresponding to `self`, `second`, `third` or `fourth`) and sends results to an `SignalNext<X>`.
/// - Returns: an `Signal<X>` which is the result stream from the `SignalNext<X>` passed to the `handler`.
public final func combine<U: SignalInterface, V: SignalInterface, W: SignalInterface, X>(_ second: U, _ third: V, _ fourth: W, context: Exec = .direct, _ processor: @escaping (EitherResult4<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue>) -> Signal<X>.Next) -> Signal<X> {
return Signal<EitherResult4<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue>>(processor: self.attach { (s1, dw) -> SignalCombiner<OutputValue, EitherResult4<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue>> in
SignalCombiner(source: s1, dw: &dw, context: .direct, processor: EitherResult4<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue>.result1)
}).addPreceeding(processor: second.signal.attach { (s2, dw) -> SignalCombiner<U.OutputValue, EitherResult4<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue>> in
SignalCombiner(source: s2, dw: &dw, context: .direct, processor: EitherResult4<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue>.result2)
}).addPreceeding(processor: third.signal.attach { (s3, dw) -> SignalCombiner<V.OutputValue, EitherResult4<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue>> in
SignalCombiner(source: s3, dw: &dw, context: .direct, processor: EitherResult4<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue>.result3)
}).addPreceeding(processor: fourth.signal.attach { (s4, dw) -> SignalCombiner<W.OutputValue, EitherResult4<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue>> in
SignalCombiner(source: s4, dw: &dw, context: .direct, processor: EitherResult4<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue>.result4)
}).transform(context: context, Signal.successProcessor(processor)).returnToGlobalIfNeeded(context: context)
}
/// Appends a handler function that receives inputs from this and four other `Signal`s. The `handler` function applies any transformation it wishes an emits a (potentially) sixth `Signal` type.
///
/// - Parameters:
/// - second: the second `Signal`, after `self` used as input to the `handler`
/// - third: the third `Signal`, after `self` and `second`, used as input to the `handler`
/// - fourth: the fourth `Signal`, after `self`, `second` and `third`, used as input to the `handler`
/// - fifth: the fifth `Signal`, after `self`, `second`, `third` and `fourth`, used as input to the `handler`
/// - context: the `Exec` context used to invoke the `handler`
/// - processor: processes inputs from either `self`, `second`, `third`, `fourth` or `fifth` as `EitherResult5<OutputValue, U, V, W, X>` (an enum which may contain either `.result1`, `.result2`, `.result3`, `.result4` or `.result5` corresponding to `self`, `second`, `third`, `fourth` or `fifth`) and sends results to an `SignalNext<Y>`.
/// - Returns: an `Signal<Y>` which is the result stream from the `SignalNext<Y>` passed to the `handler`.
public final func combine<U: SignalInterface, V: SignalInterface, W: SignalInterface, X: SignalInterface, Y>(_ second: U, _ third: V, _ fourth: W, _ fifth: X, context: Exec = .direct, _ processor: @escaping (EitherResult5<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue, X.OutputValue>) -> Signal<Y>.Next) -> Signal<Y> {
return Signal<EitherResult5<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue, X.OutputValue>>(processor: self.attach { (s1, dw) -> SignalCombiner<OutputValue, EitherResult5<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue, X.OutputValue>> in
SignalCombiner(source: s1, dw: &dw, context: .direct, processor: EitherResult5<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue, X.OutputValue>.result1)
}).addPreceeding(processor: second.signal.attach { (s2, dw) -> SignalCombiner<U.OutputValue, EitherResult5<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue, X.OutputValue>> in
SignalCombiner(source: s2, dw: &dw, context: .direct, processor: EitherResult5<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue, X.OutputValue>.result2)
}).addPreceeding(processor: third.signal.attach { (s3, dw) -> SignalCombiner<V.OutputValue, EitherResult5<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue, X.OutputValue>> in
SignalCombiner(source: s3, dw: &dw, context: .direct, processor: EitherResult5<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue, X.OutputValue>.result3)
}).addPreceeding(processor: fourth.signal.attach { (s4, dw) -> SignalCombiner<W.OutputValue, EitherResult5<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue, X.OutputValue>> in
SignalCombiner(source: s4, dw: &dw, context: .direct, processor: EitherResult5<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue, X.OutputValue>.result4)
}).addPreceeding(processor: fifth.signal.attach { (s5, dw) -> SignalCombiner<X.OutputValue, EitherResult5<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue, X.OutputValue>> in
SignalCombiner(source: s5, dw: &dw, context: .direct, processor: EitherResult5<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue, X.OutputValue>.result5)
}).transform(context: context, Signal.successProcessor(processor)).returnToGlobalIfNeeded(context: context)
}
/// Similar to `combine(second:context:handler:)` with an additional "state" value.
///
/// - Parameters:
/// - initialState: the initial value of a "state" value passed into the closure on each invocation. The "state" will be reset to this value if the `Signal` deactivates.
/// - second: the other `Signal` that is, along with `self` used as input to the `handler`
/// - context: the `Exec` context used to invoke the `handler`
/// - processor: processes inputs from either `self` or `second` as `EitherResult2<OutputValue, U>` (an enum which may contain either `.result1` or `.result2` corresponding to `self` or `second`) and sends results to an `SignalNext<V>`.
/// - Returns: an `Signal<V>` which is the result stream from the `SignalNext<V>` passed to the `handler`.
public final func combine<S, U: SignalInterface, V>(_ second: U, initialState: S, context: Exec = .direct, _ processor: @escaping (inout S, EitherResult2<OutputValue, U.OutputValue>) -> Signal<V>.Next) -> Signal<V> {
return Signal<EitherResult2<OutputValue, U.OutputValue>>(processor: self.attach { (s1, dw) -> SignalCombiner<OutputValue, EitherResult2<OutputValue, U.OutputValue>> in
SignalCombiner(source: s1, dw: &dw, context: .direct, processor: EitherResult2<OutputValue, U.OutputValue>.result1)
}).addPreceeding(processor: second.signal.attach { (s2, dw) -> SignalCombiner<U.OutputValue, EitherResult2<OutputValue, U.OutputValue>> in
SignalCombiner(source: s2, dw: &dw, context: .direct, processor: EitherResult2<OutputValue, U.OutputValue>.result2)
}).transform(initialState: initialState, context: context, Signal.successProcessorWithState(processor)).returnToGlobalIfNeeded(context: context)
}
/// Similar to `combine(second:third:context:handler:)` with an additional "state" value.
///
/// - Parameters:
/// - initialState: the initial value of a "state" value passed into the closure on each invocation. The "state" will be reset to this value if the `Signal` deactivates.
/// - second: the second `Signal`, after `self` used as input to the `handler`
/// - third: the third `Signal`, after `self` and `second`, used as input to the `handler`
/// - context: the `Exec` context used to invoke the `handler`
/// - processor: processes inputs from either `self`, `second` or `third` as `EitherResult3<OutputValue, U, V>` (an enum which may contain either `.result1`, `.result2` or `.result3` corresponding to `self`, `second` or `third`) and sends results to an `SignalNext<W>`.
/// - Returns: an `Signal<W>` which is the result stream from the `SignalNext<W>` passed to the `handler`.
public final func combine<S, U: SignalInterface, V: SignalInterface, W>(_ second: U, _ third: V, initialState: S, context: Exec = .direct, _ processor: @escaping (inout S, EitherResult3<OutputValue, U.OutputValue, V.OutputValue>) -> Signal<W>.Next) -> Signal<W> {
return Signal<EitherResult3<OutputValue, U.OutputValue, V.OutputValue>>(processor: self.attach { (s1, dw) -> SignalCombiner<OutputValue, EitherResult3<OutputValue, U.OutputValue, V.OutputValue>> in
SignalCombiner(source: s1, dw: &dw, context: .direct, processor: EitherResult3<OutputValue, U.OutputValue, V.OutputValue>.result1)
}).addPreceeding(processor: second.signal.attach { (s2, dw) -> SignalCombiner<U.OutputValue, EitherResult3<OutputValue, U.OutputValue, V.OutputValue>> in
SignalCombiner(source: s2, dw: &dw, context: .direct, processor: EitherResult3<OutputValue, U.OutputValue, V.OutputValue>.result2)
}).addPreceeding(processor: third.signal.attach { (s3, dw) -> SignalCombiner<V.OutputValue, EitherResult3<OutputValue, U.OutputValue, V.OutputValue>> in
SignalCombiner(source: s3, dw: &dw, context: .direct, processor: EitherResult3<OutputValue, U.OutputValue, V.OutputValue>.result3)
}).transform(initialState: initialState, context: context, Signal.successProcessorWithState(processor)).returnToGlobalIfNeeded(context: context)
}
/// Similar to `combine(second:third:fourth:context:handler:)` with an additional "state" value.
///
/// - Parameters:
/// - initialState: the initial value of a "state" value passed into the closure on each invocation. The "state" will be reset to this value if the `Signal` deactivates.
/// - second: the second `Signal`, after `self` used as input to the `handler`
/// - third: the third `Signal`, after `self` and `second`, used as input to the `handler`
/// - fourth: the fourth `Signal`, after `self`, `second` and `third`, used as input to the `handler`
/// - context: the `Exec` context used to invoke the `handler`
/// - processor: processes inputs from either `self`, `second`, `third` or `fourth` as `EitherResult4<OutputValue, U, V, W>` (an enum which may contain either `.result1`, `.result2`, `.result3` or `.result4` corresponding to `self`, `second`, `third` or `fourth`) and sends results to an `SignalNext<X>`.
/// - Returns: an `Signal<X>` which is the result stream from the `SignalNext<X>` passed to the `handler`.
public final func combine<S, U: SignalInterface, V: SignalInterface, W: SignalInterface, X>(_ second: U, _ third: V, _ fourth: W, initialState: S, context: Exec = .direct, _ processor: @escaping (inout S, EitherResult4<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue>) -> Signal<X>.Next) -> Signal<X> {
return Signal<EitherResult4<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue>>(processor: self.attach { (s1, dw) -> SignalCombiner<OutputValue, EitherResult4<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue>> in
SignalCombiner(source: s1, dw: &dw, context: .direct, processor: EitherResult4<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue>.result1)
}).addPreceeding(processor: second.signal.attach { (s2, dw) -> SignalCombiner<U.OutputValue, EitherResult4<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue>> in
SignalCombiner(source: s2, dw: &dw, context: .direct, processor: EitherResult4<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue>.result2)
}).addPreceeding(processor: third.signal.attach { (s3, dw) -> SignalCombiner<V.OutputValue, EitherResult4<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue>> in
SignalCombiner(source: s3, dw: &dw, context: .direct, processor: EitherResult4<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue>.result3)
}).addPreceeding(processor: fourth.signal.attach { (s4, dw) -> SignalCombiner<W.OutputValue, EitherResult4<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue>> in
SignalCombiner(source: s4, dw: &dw, context: .direct, processor: EitherResult4<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue>.result4)
}).transform(initialState: initialState, context: context, Signal.successProcessorWithState(processor)).returnToGlobalIfNeeded(context: context)
}
/// Similar to `combine(second:third:fourth:fifthcontext:handler:)` with an additional "state" value.
///
/// - Parameters:
/// - initialState: the initial value of a "state" value passed into the closure on each invocation. The "state" will be reset to this value if the `Signal` deactivates.
/// - second: the second `Signal`, after `self` used as input to the `handler`
/// - third: the third `Signal`, after `self` and `second`, used as input to the `handler`
/// - fourth: the fourth `Signal`, after `self`, `second` and `third`, used as input to the `handler`
/// - fifth: the fifth `Signal`, after `self`, `second`, `third` and `fourth`, used as input to the `handler`
/// - context: the `Exec` context used to invoke the `handler`
/// - processor: processes inputs from either `self`, `second`, `third`, `fourth` or `fifth` as `EitherResult5<OutputValue, U, V, W, X>` (an enum which may contain either `.result1`, `.result2`, `.result3`, `.result4` or `.result5` corresponding to `self`, `second`, `third`, `fourth` or `fifth`) and sends results to an `SignalNext<Y>`.
/// - Returns: an `Signal<Y>` which is the result stream from the `SignalNext<Y>` passed to the `handler`.
public final func combine<S, U: SignalInterface, V: SignalInterface, W: SignalInterface, X: SignalInterface, Y>(_ second: U, _ third: V, _ fourth: W, _ fifth: X, initialState: S, context: Exec = .direct, _ processor: @escaping (inout S, EitherResult5<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue, X.OutputValue>) -> Signal<Y>.Next) -> Signal<Y> {
return Signal<EitherResult5<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue, X.OutputValue>>(processor: self.attach { (s1, dw) -> SignalCombiner<OutputValue, EitherResult5<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue, X.OutputValue>> in
SignalCombiner(source: s1, dw: &dw, context: .direct, processor: EitherResult5<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue, X.OutputValue>.result1)
}).addPreceeding(processor: second.signal.attach { (s2, dw) -> SignalCombiner<U.OutputValue, EitherResult5<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue, X.OutputValue>> in
SignalCombiner(source: s2, dw: &dw, context: .direct, processor: EitherResult5<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue, X.OutputValue>.result2)
}).addPreceeding(processor: third.signal.attach { (s3, dw) -> SignalCombiner<V.OutputValue, EitherResult5<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue, X.OutputValue>> in
SignalCombiner(source: s3, dw: &dw, context: .direct, processor: EitherResult5<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue, X.OutputValue>.result3)
}).addPreceeding(processor: fourth.signal.attach { (s4, dw) -> SignalCombiner<W.OutputValue, EitherResult5<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue, X.OutputValue>> in
SignalCombiner(source: s4, dw: &dw, context: .direct, processor: EitherResult5<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue, X.OutputValue>.result4)
}).addPreceeding(processor: fifth.signal.attach { (s5, dw) -> SignalCombiner<X.OutputValue, EitherResult5<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue, X.OutputValue>> in
SignalCombiner(source: s5, dw: &dw, context: .direct, processor: EitherResult5<OutputValue, U.OutputValue, V.OutputValue, W.OutputValue, X.OutputValue>.result5)
}).transform(initialState: initialState, context: context, Signal.successProcessorWithState(processor)).returnToGlobalIfNeeded(context: context)
}
/// Appends a new `SignalMulti` to this `Signal`. The new `SignalMulti` immediately activates its antecedents and is "continuous" (multiple listeners can be attached to the `SignalMulti` and each new listener immediately receives the most recently sent value on "activation").
///
/// NOTE: this is the canonical "shared value" signal
///
/// - parameter initialValues: the immediate value sent to any listeners that connect *before* the first value is sent through this `Signal`
/// - returns: a continuous `SignalMulti`
public final func continuous(initialValue: OutputValue) -> SignalMulti<OutputValue> {
return SignalMulti<OutputValue>(processor: attach { (s, dw) in
SignalMultiProcessor(source: s, values: ([initialValue], nil), userUpdated: false, activeWithoutOutputs: .always, dw: &dw, context: .direct, updater: { a, p, r -> (Array<OutputValue>, SignalEnd?) in
let previous: (Array<OutputValue>, SignalEnd?) = (a, p)
switch r {
case .success(let v): a = [v]
case .failure(let e): a = []; p = e
}
return previous
})
})
}
/// Appends a new `SignalMulti` to this `Signal`. The new `SignalMulti` immediately activates its antecedents and is "continuous" (multiple listeners can be attached to the `SignalMulti` and each new listener immediately receives the most recently sent value on "activation"). Any listeners that connect before the first signal is received will receive no value on "activation".
///
/// NOTE: this is the canonical "shared results" signal
///
/// - returns: a continuous `SignalMulti`
public final func continuous() -> SignalMulti<OutputValue> {
return SignalMulti<OutputValue>(processor: attach { (s, dw) in
SignalMultiProcessor(source: s, values: ([], nil), userUpdated: false, activeWithoutOutputs: .always, dw: &dw, context: .direct, updater: { a, p, r -> (Array<OutputValue>, SignalEnd?) in
let previous: (Array<OutputValue>, SignalEnd?) = (a, p)
switch r {
case .success(let v): a = [v]; p = nil
case .failure(let e): a = []; p = e
}
return previous
})
})
}
/// Appends a new `SignalMulti` to this `Signal`. The new `SignalMulti` does not immediately activate (it waits until an output activates it normally). The first activator receives no cached values but does start the signal. If a value is received, subsequent activators will receive the most recent value. Depending on the `discardOnDeactivate` behavior, the cached value may be discarded (resetting the entire signal to its deactivated state) or the cached value might be retained for delivery to any future listeners.
///
/// NOTE: this signal is intended for lazily loaded, shared resources.
///
/// - returns: a continuous `SignalMulti`
public final func continuousWhileActive(discardOnDeactivate: Bool = true) -> SignalMulti<OutputValue> {
return SignalMulti<OutputValue>(processor: attach { (s, dw) in
SignalMultiProcessor(source: s, values: ([], nil), userUpdated: false, activeWithoutOutputs: discardOnDeactivate ? .never : .ifNonEmpty, dw: &dw, context: .direct, updater: { a, p, r -> (Array<OutputValue>, SignalEnd?) in
let previous: (Array<OutputValue>, SignalEnd?) = (a, p)
switch r {
case .success(let v): a = [v]; p = nil
case .failure(let e): a = []; p = e
}
return previous
})
})
}
/// Appends a new `SignalMulti` to this `Signal`. The new `SignalMulti` immediately activates its antecedents and offers full "playback" (multiple listeners can be attached to the `SignalMulti` and each new listener receives the entire history of values previously sent through this `Signal` upon "activation").
///
/// - returns: a playback `SignalMulti`
public final func playback() -> SignalMulti<OutputValue> {
return SignalMulti<OutputValue>(processor: attach { (s, dw) in
SignalMultiProcessor(source: s, values: ([], nil), userUpdated: false, activeWithoutOutputs: .always, dw: &dw, context: .direct, updater: { a, p, r -> (Array<OutputValue>, SignalEnd?) in
switch r {
case .success(let v): a.append(v)
case .failure(let e): p = e
}
return ([], nil)
})
})
}
/// Appends a new `Signal` to this `Signal`. The new `Signal` immediately activates its antecedents and caches any values it receives until this the new `Signal` itself is activated – at which point it sends all prior values upon "activation" and subsequently reverts to passthough.
///
/// NOTE: this is intended for greedily started signals that might start emitting before the listeners connect.
///
/// - Parameter precached: start the cache with some initial values to which subsequent values will be added (default: nil)
/// - Returns: a "cache until active" `Signal`.
public final func cacheUntilActive(precached: [OutputValue]? = nil) -> Signal<OutputValue> {
return Signal<OutputValue>(processor: attach { (s, dw) in
SignalCacheUntilActive(source: s, precached: precached, dw: &dw)
})
}
/// Appends a new `SignalMulti` to this `Signal`. While multiple listeners are permitted, there is no caching, activation signal or other changes inherent in this new `Signal` – newly connected listeners will receive only those values sent after they connect.
///
/// NOTE: this is intended for shared signals where new values are important but previous values are not
///
/// - returns: a "multicast" `SignalMulti`.
public final func multicast() -> SignalMulti<OutputValue> {
return SignalMulti<OutputValue>(processor: attach { (s, dw) in
SignalMultiProcessor(source: s, values: ([], nil), userUpdated: false, activeWithoutOutputs: .never, dw: &dw, context: .direct, updater: nil)
})
}
/// Appends a new `SignalMulti` to this `Signal`. The new `SignalMulti` immediately activates its antecedents. Every time a value is received, it is passed to an "updater" which creates an array of activation values and an error that will be used for any new listeners.
/// Consider this as an operator that allows the creation of a custom "bring-up-to-speed" value for new listeners.
///
/// - Parameters:
/// - initialValues: activation values used when *before* any incoming value is received (if you wan't to specify closed as well, use `preclosed` instead)
/// - context: the execution context where the `updater` will run
/// - updater: run for each incoming `Result` to update the buffered activation values
/// - Returns: a `SignalMulti` with custom activation
public final func customActivation(initialValues: Array<OutputValue> = [], context: Exec = .direct, _ updater: @escaping (_ cachedValues: inout Array<OutputValue>, _ cachedError: inout SignalEnd?, _ incoming: Result) -> Void) -> SignalMulti<OutputValue> {
return SignalMulti<OutputValue>(processor: attach { (s, dw) in
SignalMultiProcessor(source: s, values: (initialValues, nil), userUpdated: true, activeWithoutOutputs: .always, dw: &dw, context: context) { (bufferedValues: inout Array<OutputValue>, bufferedError: inout SignalEnd?, incoming: Result) -> (Array<OutputValue>, SignalEnd?) in
let oldActivationValues = bufferedValues
let oldError = bufferedError
updater(&bufferedValues, &bufferedError, incoming)
return (oldActivationValues, oldError)
}
})
}
/// This operator applies a reducing function to the stream of incoming values, reducing down to a single, internal `State` value.
///
/// A value of the same `State` type is emitted on each iteration, although it is not required to be the same value. Having the return value be potentially different to the internal state isn't standard "reduction semantics" but it enables differential notifications, rather than whole state notifications.
///
/// This operator combines aspects of `transform` and `customActivation` into a single operation, transforming the incoming message into state values by combining with a cached state value (that also serves as the activation value).
///
/// - Parameters:
/// - initialState: initial activation value for the stream and internal state for the reducer
/// - context: execution context where `reducer` will run
/// - reducer: the function that combines the state with incoming values and emits differential updates
/// - Returns: a `SignalMulti<State>`
public final func reduce<State>(initialState: State, context: Exec = .direct, _ reducer: @escaping (_ state: State, _ message: OutputValue) throws -> State) -> SignalMulti<State> {
return SignalMulti<State>(processor: attach { (s, dw) in
return SignalReducer<OutputValue, State>(source: s, state: initialState, end: nil, dw: &dw, context: context) { (state: State, message: Signal<OutputValue>.Result) -> Signal<State>.Result in
switch message {
case .success(let m): return Swift.Result { try reducer(state, m) }.mapError(SignalEnd.other)
case .failure(let e): return .failure(e)
}
}
})
}
/// This operator applies a reducing function to the stream of incoming values, reducing down to a single, internal `State` value.
///
/// A value of the same `State` type is emitted on each iteration, although it is not required to be the same value. Having the return value be potentially different to the internal state isn't standard "reduction semantics" but it enables differential notifications, rather than whole state notifications.
///
/// This operator combines aspects of `transform` and `customActivation` into a single operation, transforming the incoming message into state values by combining with a cached state value (that also serves as the activation value).
///
/// - Parameters:
/// - initialState: initial activation value for the stream and internal state for the reducer
/// - context: execution context where `reducer` will run
/// - reducer: the function that combines the state with incoming values and emits differential updates
/// - Returns: a `SignalMulti<State>`
public final func reduce<State>(context: Exec = .direct, initializer: @escaping (_ message: OutputValue) throws -> State?, _ reducer: @escaping (_ state: State, _ message: OutputValue) throws -> State) -> SignalMulti<State> {
return SignalMulti<State>(processor: attach { (s, dw) in
let ini: SignalReducer<OutputValue, State>.Initializer = { message in
switch message {
case .success(let m): return Swift.Result { try initializer(m) }.mapError(SignalEnd.other)
case .failure(let e): return .failure(e)
}
}
return SignalReducer<OutputValue, State>(source: s, initializer: ini, end: nil, dw: &dw, context: context) { (state: State, message: Signal<OutputValue>.Result) -> Signal<State>.Result in
switch message {
case .success(let m): return Swift.Result { try reducer(state, m) }.mapError(SignalEnd.other)
case .failure(let e): return .failure(e)
}
}
})
}
// MARK: - Signal private properties
// A struct that stores data associated with the current handler. Under the `Signal` mutex, if the `itemProcessing` flag is acquired, the fields of this struct are filled in using `Signal` and `SignalHandler` data and the contents of the struct can be used by the current thread *outside* the
private struct ItemContext<OutputValue> {
let context: Exec
let direct: Bool
let synchronous: Bool
let handler: (Result) -> Void
let activationCount: Int
init(context: Exec, synchronous: Bool, handler: @escaping (Result) -> Void, activationCount: Int) {
self.activationCount = activationCount
self.context = context
if case .direct = context {
self.direct = true
} else {
self.direct = false
}
self.synchronous = synchronous
self.handler = handler
}
}
#if DEBUG_LOGGING
var count: Int = {
globalCount += 1
return globalCount
}()
#endif
// Protection for all mutable members on this class and any attached `signalHandler`.
// NOTE 1: This mutex may be shared between synchronous serially connected `Signal`s (for memory and performance efficiency).
// NOTE 2: It is noted that a `DispatchQueue` mutex would be preferrable since it respects libdispatch's QoS, however, it is not possible (as of Swift 4) to use `DispatchQueue` as a mutex without incurring a heap allocated closure capture so `PThreadMutex` is used instead to avoid a factor of 10 performance loss.
private final var mutex = os_unfair_lock()
fileprivate final func unbalancedLock() {
os_unfair_lock_lock(&mutex)
}
fileprivate final func unbalancedTryLock() -> Bool {
return os_unfair_lock_trylock(&mutex)
}
fileprivate final func unbalancedUnlock() {
os_unfair_lock_unlock(&mutex)
}
fileprivate func sync<R>(execute work: () throws -> R) rethrows -> R {
unbalancedLock()
defer { unbalancedUnlock() }
return try work()
}
// The graph can be disconnected and reconnected and various actions may occur outside locks, it's helpful to determine which actions are no longer relevant. The `Signal` controls this through `delivery` and `activationCount`. The `delivery` controls the basic lifecycle of a simple connected graph through 4 phases: `.disabled` (pre-connection) -> `.sychronous` (connecting) -> `.normal` (connected) -> `.disabled` (disconnected).
fileprivate final var delivery = SignalDelivery.disabled { didSet { handlerContextNeedsRefresh = true } }
// The graph can be disconnected and reconnected and various actions may occur outside locks, it's helpful to determine which actions are no longer relevant because they are associated with a phase of a previous connection.
// When connected to a preceeding `SignalPredecessor`, `activationCount` is incremented on each connection and disconnection to ensure that actions associated with a previous phase of a previous connection are rejected.
// When connected to a preceeding `SignalInput`, `activationCount` is incremented solely when a new `SignalInput` is attached or the current input is invalidated (joined using an `SignalJunction`).
fileprivate final var activationCount: Int = 0 { didSet { handlerContextNeedsRefresh = true } }
// If there is a preceeding `Signal` in the graph, its `SignalProcessor` is stored in this variable. Note that `SignalPredecessor` is always an instance of `SignalProcessor`.
/// If Swift gains an `OrderedSet` type, it should be used here in place of this `Set` and the `sortedPreceeding` accessor, below.
fileprivate final var preceeding: Set<OrderedSignalPredecessor>
// The destination of this `Signal`. This value is `nil` on construction.
fileprivate final weak var signalHandler: SignalHandler<OutputValue>? = nil { didSet { handlerContextNeedsRefresh = true } }
fileprivate final var handlerContextNeedsRefresh = true
// Queue of values pending dispatch (NOTE: the current `item` is not stored in the queue)
// Normally the queue is FIFO but when an `Signal` has multiple inputs, the "activation" from each input will be considered before any post-activation inputs.
private final var queue = Deque<Result>()
// A `holdCount` may indefinitely block the queue for one of two reasons:
// 1. a `SignalNext` is retained outside its handler function for asynchronous processing of an item
// 2. a `SignalCapture` handler has captured the activation but a `Signal` to receive the remainder is not currently connected
// Accordingly, the `holdCount` should only have a value in the range [0, 2]
private final var holdCount: UInt8 = 0
// When a `Result` is popped from the queue and the handler is being invoked, the `itemProcessing` is set to `true`. The effect is equivalent to `holdCount`.
private final var itemProcessing: Bool = false
// Notifications for the inverse of `delivery == .disabled`, accessed exclusively through the `generate` constructor. Can be used for lazy construction/commencement, resetting to initial state on graph disconnect and reconnect or cleanup after graph deletion.
// A signal is used here instead of a simple function callback since re-entrancy-safe queueing and context delivery are needed.
// WARNING: this is actually a (Signal<SignalInput<OutputValue>?>, SignalEndpont<SignalInput<OutputValue>?>)? but we use `Any` to avoid huge optimization overheads.
private final var newInputSignal: (Signal<Any?>, SignalOutput<Any?>)? = nil
// A monotonically increasing counter that is incremented every time the set of connected, preceeding handlers changes. This value is used to reject predecessors that are not up-to-date with the latest graph structure (i.e. have been asynchronously removed or invalidated).
private final var preceedingCount: Int = 0
// This is a cache of values that can be read outside the lock by the current owner of the `itemProcessing` flag.
private final var handlerContext = ItemContext<OutputValue>(context: .direct, synchronous: false, handler: { _ in }, activationCount: 0)
// MARK: - Signal private functions
// Invokes `removeAllPreceedingInternal` if and only if the `forDisconnector` matches the current `preceeding.first`
//
// - Parameter forDisconnector: the disconnector requesting this change
// - Returns: if the predecessor matched, then a new `SignalInput<OutputValue>` for this `Signal`, otherwise `nil`.
fileprivate final func newInput(forDisconnector: SignalProcessor<OutputValue, OutputValue>) -> SignalInput<OutputValue>? {
var dw = DeferredWork()
let result = sync { () -> SignalInput<OutputValue>? in
if preceeding.count == 1, let p = preceeding.first?.base, p === forDisconnector {
removeAllPreceedingInternal(dw: &dw)
return SignalInput(signal: self, activationCount: activationCount)
} else {
return nil
}
}
dw.runWork()
return result
}
/// If this `Signal` can attach a new handler, this function runs the provided closure (which is expected to construct and set the new handler) and returns the handler. If this `Signal` can't attach a new handler, returns the result of running the closure inside the mutex of a separate preclosed `Signal`.
///
/// This method serves three purposes:
/// 1) It enforces the idea that the `signalHandler` should be constructed under this `Signal`'s mutex, providing the `DeferredWork` required by the `signalHandler` constructor interface.
/// 2) It enforces the rule that multiple listen attempts should be immediately closed with a `.duplicate` error
/// 3) It allows abstraction over the actual `Signal` used for attachment (self for single listener and a newly created `Signal` for multi listener).
///
/// - Parameter constructor: the handler constructor function
/// - Returns: the result from the constructor (typically an SignalHandler)
fileprivate func attach<R>(constructor: (Signal<OutputValue>, inout DeferredWork) -> R) -> R where R: SignalHandler<OutputValue> {
var dw = DeferredWork()
let result: R? = sync {
self.signalHandler == nil ? constructor(self, &dw) : nil
}
dw.runWork()
if let r = result {
return r
} else {
preconditionFailure("Multiple outputs added to single listener Signal.")
}
}
/// Avoids complications with non-reentrant
///
/// - Parameter context: the context upon which `asyncRelativeContext` will be called
/// - Returns: possibly `self`, possibly `self` a transform that shifts to the `asyncRelativeContext`.
fileprivate func returnToGlobalIfNeeded(context: Exec) -> Signal<OutputValue> {
if context.type.isImmediateAlways || context.type.isReentrant {
return self
} else {
return self.transform(context: context.relativeAsync(), { .single($0) })
}
}
/// Constructor for a `Signal` that is the output for a `SignalProcessor`.
///
/// - Parameter processor: input source for this `Signal`
fileprivate init<U>(processor: SignalProcessor<U, OutputValue>) {
preceedingCount += 1
preceeding = [processor.wrappedWithOrder(preceedingCount)]
#if DEBUG_LOGGING
print("\(type(of: self)): \(self.count) created")
#endif
if !(self is SignalMulti<OutputValue>) {
var dw = DeferredWork()
sync {
// Since this function must be used only in cases where the processor is *also* new, this can't be `duplicate` or `loop`
try! processor.outputAddedSuccessorInternal(self, param: nil, activationCount: nil, dw: &dw)
}
dw.runWork()
}
}
// Connects this `Signal` to a preceeding SignalPredecessor. Other connection functions must go through this.
//
// - Parameters:
// - newPreceeding: the preceeding SignalPredecessor to add
// - param: this function may invoke `outputAddedSuccessorInternal` internally. If it does this `param` will be passed as the `param` for that function.
// - dw: required
// - Throws: any error from `outputAddedSuccessorInternal` invoked on `newPreceeding`
fileprivate final func addPreceedingInternal(_ newPreceeding: SignalPredecessor, param: Any?, dw: inout DeferredWork) throws {
preceedingCount += 1
let wrapped = newPreceeding.wrappedWithOrder(preceedingCount)
preceeding.insert(wrapped)
do {
try newPreceeding.outputAddedSuccessorInternal(self, param: param, activationCount: (delivery.isDisabled || preceeding.count == 1) ? Optional<Int>.none : Optional<Int>(activationCount), dw: &dw)
if !delivery.isDisabled, preceeding.count == 1 {
updateActivationInternal(andInvalidateAllPrevious: true, dw: &dw)
if !delivery.isSynchronous {
let ac = activationCount
dw.append {
var dw = DeferredWork()
self.sync {
if ac == self.activationCount {
newPreceeding.outputCompletedActivationSuccessorInternal(self, dw: &dw)
}
}
dw.runWork()
}
}
}
} catch {
preceeding.remove(wrapped)
throw error
}
}
// Removes a (potentially) non-unique predecessor. Used only from `SignalMergeSet` and `SignalMergeProcessor`. This is one of two, independent, functions for removing preceeding. The other being `removeAllPreceedingInternal`.
//
// - Parameters:
// - oldPreceeding: the predecessor to remove
// - dw: required
fileprivate final func removePreceedingWithoutInterruptionInternal(_ oldPreceeding: SignalPredecessor, dw: inout DeferredWork) -> Bool {
if preceeding.remove(oldPreceeding.wrappedWithOrder(0)) != nil {
oldPreceeding.outputRemovedSuccessorInternal(self, dw: &dw)
return true
}
return false
}
// Removes all predecessors and invalidate all previous inputs. This is one of two, independent, functions for removing preceeding. The other being `removePreceedingWithoutInterruptionInternal`.
//
// - Parameters:
// - oldPreceeding: the predecessor to remove
// - dw: required
fileprivate final func removeAllPreceedingInternal(dw: inout DeferredWork) {
if preceeding.count > 0 {
dw.append { [preceeding] in withExtendedLifetime(preceeding) {} }
// Careful to use *sorted* preceeding to propagate graph changes deterministically
sortedPreceedingInternal.forEach { $0.base.outputRemovedSuccessorInternal(self, dw: &dw) }
preceeding = []
}
updateActivationInternal(andInvalidateAllPrevious: true, dw: &dw)
}
// The primary `send` function (although the `push` functions do also send).
// Sends `result`, assuming `fromInput` matches the current `self.input` and `self.delivery` is enabled
//
// - Parameters:
// - result: the value or error to pass to any attached handler
// - predecessor: the `SignalInput` or `SignalNext` delivering the handler
// - activationCount: the activation count from the predecessor to match against internal value
// - activated: whether the predecessor is already in `normal` delivery mode
// - Returns: `nil` on success. Non-`nil` values include `SignalSendError.disconnected` if the `predecessor` or `activationCount` fail to match, `SignalSendError.inactive` if the current `delivery` state is `.disabled`.
@discardableResult @usableFromInline
final func send(result: Result, predecessor: Unmanaged<AnyObject>?, activationCount: Int, activated: Bool) -> SignalSendError? {
unbalancedLock()
guard isCurrent(predecessor, activationCount) else {
unbalancedUnlock()
// Retain the result past the end of the lock
withExtendedLifetime(result) {}
return SignalSendError.disconnected
}
switch delivery {
case .normal:
if holdCount == 0 && itemProcessing == false {
assert(queue.isEmpty)
break
} else {
queue.append(result)
unbalancedUnlock()
return nil
}
case .synchronous(let count):
if activated {
queue.append(result)
unbalancedUnlock()
return nil
} else if count == 0, holdCount == 0, itemProcessing == false {
break
} else {
queue.insert(result, at: count)
delivery = .synchronous(count + 1)
unbalancedUnlock()
return nil
}
case .disabled:
unbalancedUnlock()
// Retain the result past the end of the lock
withExtendedLifetime(result) {}
return SignalSendError.inactive
}
assert(holdCount == 0 && itemProcessing == false)
if handlerContextNeedsRefresh {
var dw = DeferredWork()
let hasHandler = refreshItemContextInternal(&dw)
if hasHandler {
itemProcessing = true
}
unbalancedUnlock()
// We need to be extremely careful that any previous handlers, replaced in the `refreshItemContextInternal` function are released *here* if we're going to re-enter the lock and that we've *already* acquired the `itemProcessing` Bool. There's a little bit of dancing around in this `if handlerContextNeedsRefresh` block to ensure these two things are true.
dw.runWork()
if !hasHandler {
return SignalSendError.inactive
}
} else {
itemProcessing = true
unbalancedUnlock()
}
#if DEBUG_LOGGING
print("\(type(of: self)): \(self.count) emitted \(result))")
#endif
if handlerContext.direct, case .success = result {
handlerContext.handler(result)
unbalancedLock()
if handlerContextNeedsRefresh || !queue.isEmpty {
unbalancedUnlock()
specializedSyncPop()
} else {
itemProcessing = false
unbalancedUnlock()
}
} else {
dispatch(result)
}
return nil
}
// A secondary send function used to push values and possibly and end-of-stream error onto the `newInputSignal`. The push is not handled immediately but is deferred until the `DeferredWork` runs. Since values are *always* queued, this is less efficient than `send` but it avoids re-entrancy into self if the `newInputSignal` immediately tries to send values back to us.
//
// - Parameters:
// - values: pushed onto this `Signal`'s queue
// - end: pushed onto this `Signal`'s queue
// - activationCount: activationCount of the sender (must match the internal value)
// - dw: used to dispatch the signal safely outside the parent's mutex
fileprivate final func push(values: Array<OutputValue>, end: SignalEnd?, activationCount: Int, activated: Bool, dw: inout DeferredWork) {
sync {
guard self.activationCount == activationCount else { return }
pushInternal(values: values, end: end, activated: activated, dw: &dw)
}
}
// A secondary send function used to push activation values and activation errors. Since values are *always* queued, this is less efficient than `send` but it can safely be invoked inside mutexes.
//
// - Parameters:
// - values: pushed onto this `Signal`'s queue
// - end: pushed onto this `Signal`'s queue
// - dw: used to dispatch the signal safely outside the parent's mutex
fileprivate final func pushInternal(values: Array<OutputValue>, end: SignalEnd?, activated: Bool, dw: inout DeferredWork) {
assert(unbalancedTryLock() == false)
guard values.count > 0 || end != nil else {
dw.append {
withExtendedLifetime(values) {}
withExtendedLifetime(end) {}
}
return
}
if !activated, case .synchronous(let count) = delivery {
assert(count == 0)
delivery = .synchronous(values.count + (end != nil ? 1 : 0))
}
for v in values {
queue.append(.success(v))
}
if let e = end {
queue.append(.failure(e))
}
resumeIfPossibleInternal(dw: &dw)
}
// Used in SignalCapture.handleSynchronousToNormalInternal to handle a situation where a deactivation and reactivation occurs *while* `itemProcessing` so the next capture is in the queue instead of being captured. This function extracts the queued value for capture before transition to normal.
//
// - Returns: the queued items under the synchronous count.
fileprivate final func pullQueuedSynchronousInternal() -> (values: Array<OutputValue>, end: SignalEnd?) {
if case .synchronous(let count) = delivery, count > 0 {
var values = Array<OutputValue>()
var end: SignalEnd? = nil
for _ in 0..<count {
switch queue.removeFirst() {
case .success(let v): values.append(v)
case .failure(let e): end = e
}
}
delivery = .synchronous(0)
return (values, end)
}
return ([], nil)
}
// Increment the `holdCount`
fileprivate final func blockInternal() {
assert(unbalancedTryLock() == false)
assert(holdCount <= 1)
holdCount += 1
}
// Decrement the `holdCount`, if the `activationCountAtBlock` provided matches `self.activationCount`
//
// NOTE: the caller must resume processing if holdCount reaches zero and there are queued items.
///
/// - Parameter activationCountAtBlock: must match the internal value or the block request will be ignored
fileprivate final func unblockInternal(activationCountAtBlock: Int) {
guard self.activationCount == activationCountAtBlock else { return }
assert(unbalancedTryLock() == false)
assert(holdCount >= 1 && holdCount <= 2)
holdCount -= 1
}
// If the holdCount is zero and there are queued items, increments the hold count immediately and starts processing in the deferred work.
///
/// - Parameter dw: required
fileprivate final func resumeIfPossibleInternal(dw: inout DeferredWork) {
if holdCount == 0, itemProcessing == false, !queue.isEmpty {
if !refreshItemContextInternal(&dw) {
// The weakly held handler has asynchronously released.
return
}
itemProcessing = true
dw.append {
if let r = self.pop() {