Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test Publishers.CollectByTime with TestScheduler #25

Closed
iwheelbuy opened this issue Jul 7, 2020 · 17 comments · Fixed by #26
Closed

Test Publishers.CollectByTime with TestScheduler #25

iwheelbuy opened this issue Jul 7, 2020 · 17 comments · Fixed by #26

Comments

@iwheelbuy
Copy link

iwheelbuy commented Jul 7, 2020

I've started to test various Combine Publishers and got stuck with Publishers.CollectByTime. I'm using TestScheduler and VirtualTimeInterval to test it. I might got it all wrong, but I expect the publisher to buffer and periodically publish arrays of signals. The first array is published correctly and it respects the VirtualTimeInterval. But then ALL other signals are published in the second array and VirtualTime of each signal is ignored.

With VirtualTimeInterval = 2 I publish

(1, .input("a")),
(2, .input("b")),
(4, .input("c")),
(5, .input("d")),
(7, .input("e")),
(8, .input("f"))

I expect Publishers.CollectByTime to group signals this way: ["ab", "cd", "ef"]. But I get ["ab", "cdef"].
Can you please tell me what is wrong with my code or maybe with my understanding the CollectByTime publisher?

import Combine
import Entwine
import EntwineTest
import XCTest

final class CollectByTime: XCTestCase {

   func test_common_behavior() {
      let configuration = TestScheduler.Configuration.default
      let scheduler = TestScheduler()
      let upstream: TestablePublisher<String, Never> = scheduler.createRelativeTestablePublisher([
         (1, .input("a")),
         (2, .input("b")),
         (4, .input("c")),
         (5, .input("d")),
         (7, .input("e")),
         (8, .input("f")),
         (9, .completion(.finished))
      ])
      let window: VirtualTimeInterval = 2
      let strategy = Publishers.TimeGroupingStrategy<TestScheduler>
         .byTime(scheduler, window)
      let publisher = Publishers.CollectByTime(
         upstream: upstream,
         strategy: strategy,
         options: nil
      )
      let subscriber = scheduler.start(configuration: configuration, create: { publisher })
      let values = subscriber
         .recordedOutput
         .compactMap({ _, signal -> String? in
            switch signal {
            case .input(let array):
               return array.joined()
            default:
               return nil
            }
         })
      XCTAssertEqual(values, ["ab", "cd", "ef"])
   }
}
@tcldr
Copy link
Owner

tcldr commented Jul 8, 2020

Hi – thanks for the report. Yes, looks like there was an issue with the TestScheduler that meant it failed to schedule any repeating actions – which is required by Combine's CollectByTime publisher. I've posted a fix on a branch called test-scheduler-intervals attached to PR #26. Let me know if that resolves your issue.

@iwheelbuy
Copy link
Author

It works, thanks a lot!

@iwheelbuy
Copy link
Author

Or maybe it doesn't...
I decided to test Debounce

let upstream: TestablePublisher<String, TestError> = scheduler.createRelativeTestablePublisher([
   (1, .input("a")),
   (2, .input("b")),
   (7, .input("c")),
   (99, .completion(.finished))
])
let publisher = Publishers.Debounce(
   upstream: upstream,
   dueTime: 5,
   scheduler: scheduler,
   options: nil
)
let subscriber = scheduler.start(configuration: configuration, create: { publisher })

And got ["c", "c", "c", "c", "c", "c", "c", "c", "c", "c", "c", "c", "c", "c", "c", "c", "c", "c"] events.
Please don't waste your time if it's not an easy feature to implement.

@iwheelbuy iwheelbuy reopened this Jul 9, 2020
@tcldr
Copy link
Owner

tcldr commented Jul 9, 2020

Not at all. Good to get the issues ironed out. Try the latest commit to PR #26

@iwheelbuy
Copy link
Author

There is still something wrong with it...

interval = 12
count = 5
values = [1, 5, 7, 11, 13, 27, 35, 43, 51, 53, 55, 61, 67, 69, 73, 77, 79, 83, 87, 89, 91, 93, 95]

With test scheduler I got:

[[1, 5, 7, 11, 13], [27, 35, 43, 51, 53], [55, 61], [67, 69, 73, 77, 79]]

As you can see [27, 35, 43, 51, 53] is not correct because 53 - 27 > 12. Also some values are lost.

I've written the function which produces the expected answer and the result should be:

[[1, 5, 7, 11], [13], [27, 35], [43], [51, 53, 55], [61, 67, 69], [73, 77, 79, 83], [87, 89, 91, 93, 95]]

I've tested the "expected answer" function and CollectByTime publisher with ordinary DispatchQueue.global(qos: .userInteractive) and got the result which matches with "expected answer" function result.

P.S. I think there is something wrong with the approach to repeating actions. I'm going to continue without test scheduler for CollectByTime publisher for now. Maybe will come back later.

P.S. Debounce publisher works correct in master

P.S. I have created a test-scheduler-intervals branch in my test repository in case you want to try those tests yourself.

@tcldr
Copy link
Owner

tcldr commented Jul 13, 2020

Hi @iwheelbuy, thanks for writing this up – I really appreciate it! However, when I run your example numbers, it matches the control.

As you can see [27, 35, 43, 51, 53] is not correct because 53 - 27 > 12. Also some values are lost.

I'm not sure our understanding CollectByTime matches up.

My understanding of CollectByTime is that: when using the 'time' strategy, it will collect all elements from the point the publisher started to the point the interval has elapsed. So if using an interval of 5, it will group all elements between 0..<5, 5..<10, 10..<15 and so on. (With 'zero' being the time at which the publisher was subscribed to.)

When also specifying count as part of the strategy, the behaviour is the same, except that if the count is reached within a time interval (0..<5 or 5..<10), the publisher will immediately output the elements it has already collected for that time interval, but otherwise continue as normal.

As an example, if we schedule [0,1,2,3,4,5,6,7,8,9] to be published with an interval of 5, and a count of 5, we will receive [[0,1,2,3,4],[5,6,7,8,9]] and if we ask for an interval of 5 and count of 2 we will receive [[0,1],[2,3],[4],[5,6],[7,8],[9]].

As the interval is determined by the subscribe time, if we offset everything by 3 ([3,4,5,6,7,8,9,10,11,12]), with an interval of 5 and a count of 5, we will receive [[3,4],[5,6,7,8,9],[10,11,12]] and if we ask for an interval of 5 and count of 2 we will receive [[3,4],[5,6],[7,8],[9],[10,11],[12]].

Using your own example, we can test without Entwine using a control by:

    func testControlCollectByTimeSequence() {
        let interval = 12
        let count = 5
        let expected = [
            [1, 5, 7, 11],
            [13],
            [27, 35],
            [43],
            [51, 53, 55],
            [61, 67, 69],
            [73, 77, 79, 83],
            [87, 89, 91, 93, 95],
        ]
        let sut = DispatchQueue(label: "serial-queue")
        let subject = PassthroughSubject<Int, Never>()
        let strategy = Publishers.TimeGroupingStrategy<DispatchQueue>
            .byTimeOrCount(sut, .init(.milliseconds(interval)), count)
        let publisher = Publishers.CollectByTime(upstream: subject, strategy: strategy, options: nil)
        var cancellables = Set<AnyCancellable>()
        var results = [[Int]]()
        
        publisher.sink(receiveValue: { results.append($0) }).store(in: &cancellables)
        
        let testIntervals = expected.flatMap { $0 }
        
        let now = DispatchTime.now()
        let group = DispatchGroup()
        group.enter()
        sut.schedule(after: .init(now + .milliseconds(1000))) { group.leave() }
        for interval in testIntervals {
            sut.schedule(after: .init(now + .milliseconds(interval))) { subject.send(interval) }
        }
        _ = group.wait(timeout: now + .milliseconds(2000))
        
        XCTAssertEqual(expected, results)
    }

And with Entwine, we have:

    func testCollectByTimeSequence() {
        let interval = 12
        let count = 5
        let expected = [
            [1, 5, 7, 11],
            [13],
            [27, 35],
            [43],
            [51, 53, 55],
            [61, 67, 69],
            [73, 77, 79, 83],
            [87, 89, 91, 93, 95],
        ]
        
        let sut = TestScheduler(initialClock: 0, maxClock: 2000)
        let strategy = Publishers.TimeGroupingStrategy<TestScheduler>
            .byTimeOrCount(sut, .init(interval), count)
        let subject = PassthroughSubject<Int, Never>()
        let publisher = Publishers.CollectByTime(upstream: subject, strategy: strategy, options: nil)
        var cancellables = Set<AnyCancellable>()
        var results = [[Int]]()
        
        publisher.sink(receiveValue: { results.append($0) }).store(in: &cancellables)
        
        let testIntervals = expected.flatMap { $0 }
        
        for interval in testIntervals {
            sut.schedule(after: .init(interval)) { subject.send(interval) }
        }
        sut.resume()
        
        XCTAssertEqual(expected, results)
    }

Were you expecting something different?

@iwheelbuy
Copy link
Author

Our understanding matches up! It was me who made a mistake. Values 1, 5, 7, 11, 13... represent not just values, but also a time when to publish the value. I have accidentally replaced time of value with number of index in array...

I'll check again (^

@iwheelbuy
Copy link
Author

As I've mentioned 1, 5, 7, 11, 13... represents a value and a virtual time when to publish. It means I publish Int(13) on VirtualTime(13). So I've corrected the time each value is published. The new result I got is:

[[1, 5, 7, 11], [13, 27, 35, 43, 51], [53, 55, 61, 67, 69]]

The first emitted array [1, 5, 7, 11] is okay because it matches the 0 ..< 12 time interval and array size of 5 not reached.
The second array we expect to be in 12 ..< 24 time interval. And there is only one item from values which matches the range, it is 13. So I expect that the second emitted array should be [13]. But it is not...

@tcldr
Copy link
Owner

tcldr commented Jul 13, 2020

Thanks for checking! If you run this test:

func testCollectByTimeSequence() {
        let interval = 12
        let count = 5
        let expected = [
            [1, 5, 7, 11],
            [13],
            [27, 35],
            [43],
            [51, 53, 55],
            [61, 67, 69],
            [73, 77, 79, 83],
            [87, 89, 91, 93, 95],
        ]
        
        let sut = TestScheduler(initialClock: 0, maxClock: 2000)
        let strategy = Publishers.TimeGroupingStrategy<TestScheduler>
            .byTimeOrCount(sut, .init(interval), count)
        let subject = PassthroughSubject<Int, Never>()
        let publisher = Publishers.CollectByTime(upstream: subject, strategy: strategy, options: nil)
        var cancellables = Set<AnyCancellable>()
        var results = [[Int]]()
        
        publisher.sink(receiveValue: { results.append($0) }).store(in: &cancellables)
        
        let testIntervals = expected.flatMap { $0 }
        
        for interval in testIntervals {
            sut.schedule(after: .init(interval)) { subject.send(interval) }
        }
        sut.resume()
        
        XCTAssertEqual(expected, results)
    }

What do you get?

@iwheelbuy
Copy link
Author

iwheelbuy commented Jul 13, 2020

package(url: "git@github.com:iwheelbuy/Entwine.git", .branch("test-scheduler-intervals"))

XCTAssertEqual failed: ("[[1, 5, 7, 11], [13], [27, 35], [43], [51, 53, 55], [61, 67, 69], [73, 77, 79, 83], [87, 89, 91, 93, 95]]") is not equal to ("[[1, 5, 7, 11], [13, 27, 35, 43, 51], [53, 55, 61, 67, 69], [73, 77, 79, 83, 87]]")

@tcldr
Copy link
Owner

tcldr commented Jul 13, 2020

Curious – that passes for me. Are you testing against the latest test-scheduler-intervals commit?

@iwheelbuy
Copy link
Author

Checked also on .package(url: "git@github.com:tcldr/Entwine.git", from: "0.9.0"), same result

XCTAssertEqual failed: ("[[1, 5, 7, 11], [13], [27, 35], [43], [51, 53, 55], [61, 67, 69], [73, 77, 79, 83], [87, 89, 91, 93, 95]]") is not equal to ("[[1, 5, 7, 11], [13, 27, 35, 43, 51], [53, 55, 61, 67, 69], [73, 77, 79, 83, 87]]")

@tcldr
Copy link
Owner

tcldr commented Jul 13, 2020

Yeah, it needs to be the branch of the PR for this issue:
package(url: "git@github.com: tcldr/Entwine.git", .branch("test-scheduler-intervals"))

@iwheelbuy
Copy link
Author

iwheelbuy commented Jul 13, 2020

After switching between PR branch, 0.9.0 tag and back - It started to pass successfully.....
I wonder if it was a SPM bug/cache

I'll do some more research, thank you for your reply.

@tcldr
Copy link
Owner

tcldr commented Jul 13, 2020

No problem, let me know if it works out and I'll merge this into master. Thanks!

@iwheelbuy
Copy link
Author

Everything works!

@tcldr
Copy link
Owner

tcldr commented Jul 14, 2020

Great news! Glad it worked out.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants