diff --git a/Sources/Common/Background Queue/QueueRunRequest.swift b/Sources/Common/Background Queue/QueueRunRequest.swift index 7383434e4..996d1b804 100644 --- a/Sources/Common/Background Queue/QueueRunRequest.swift +++ b/Sources/Common/Background Queue/QueueRunRequest.swift @@ -38,106 +38,112 @@ public class CioQueueRunRequest: QueueRunRequest { } private func startNewRequestRun() { - runTasks(lastRanTask: nil) + runTasks() } // Disable swiftlint because function at this time isn't too complex to need to make it smaller. // Many of the lines of this function are logging related. // swiftlint:disable:next function_body_length - private func runTasks( - lastRanTask: QueueTaskMetadata?, - lastFailedTask: QueueTaskMetadata? = nil - ) { - // get the inventory before running each task. If a task was added to the queue while the last task was being - // executed, we can assert that new task will execute during this run. - let queueInventory = storage.getInventory() - - guard let nextTaskToRunInventoryItem = queryRunner.getNextTask( - queueInventory, - lastRanTask: lastRanTask, - lastFailedTask: lastFailedTask - ) - else { - // we hit the end of the current inventory. Done! - return doneRunning() - } + private func runTasks() { + var lastRanTask: QueueTaskMetadata? + var lastFailedTask: QueueTaskMetadata? + var continueRunnning = true // call when you're done with task - func goToNextTask(didTaskFail: Bool) { - let lastFailedTask = didTaskFail ? nextTaskToRunInventoryItem : nil - - runTasks(lastRanTask: nextTaskToRunInventoryItem, lastFailedTask: lastFailedTask) + func goToNextTask(didTaskFail: Bool, taskJustExecuted: QueueTaskMetadata) { + lastFailedTask = didTaskFail ? taskJustExecuted : nil + lastRanTask = taskJustExecuted } - let nextTaskStorageId = nextTaskToRunInventoryItem.taskPersistedId - guard let nextTaskToRun = storage.get(storageId: nextTaskStorageId) else { - // log error. this scenario shouldn't happen where task can't be found. - logger.error("Tried to get queue task with storage id: \(nextTaskStorageId), but storage couldn't find it.") + func doneRunning() { + continueRunnning = false + logger.debug("queue out of tasks to run.") - // The task failed to execute like a HTTP failure. Update `lastFailedTask`. - return goToNextTask(didTaskFail: true) - } + queryRunner.reset() - logger.debug("queue tasks left to run: \(queueInventory.count)") - logger.debug(""" - queue next task to run: \(shortTaskId(nextTaskStorageId)), - \(nextTaskToRun.type), \(nextTaskToRun.data.string ?? ""), \(nextTaskToRun.runResults) - """) - - // we are not using [weak self] because if the task is currently running, - // we dont want the result handler to get garbage collected which could - // make the task run again when it shouldn't. - // - // if we wanted to use [weak self] then we should allow running a task to cancel - // while executing which would then allow this to use [weak self]. - runner.runTask(nextTaskToRun) { result in - switch result { - case .success: - self.logger.debug("queue task \(self.shortTaskId(nextTaskStorageId)) ran successfully") - - self.logger.debug("queue deleting task \(self.shortTaskId(nextTaskStorageId))") - _ = self.storage.delete(storageId: nextTaskToRunInventoryItem.taskPersistedId) - - return goToNextTask(didTaskFail: false) - case .failure(let error): - self.logger - .debug("queue task \(self.shortTaskId(nextTaskStorageId)) run failed \(error.localizedDescription)") - - let previousRunResults = nextTaskToRun.runResults - - // When a HTTP request isn't made, dont update the run history to give us inaccurate data. - if case .requestsPaused = error { - self.logger.debug(""" - queue task \(self.shortTaskId(nextTaskStorageId)) didn't run because all HTTP requests paused. - """) - - self.logger.info("queue is quitting early because all HTTP requests are paused.") - - return self.doneRunning() - } else { - let newRunResults = previousRunResults.totalRunsSet(previousRunResults.totalRuns + 1) - - self.logger.debug(""" - queue task \(self.shortTaskId(nextTaskStorageId)) updating run history - from: \(nextTaskToRun.runResults) to: \(newRunResults) - """) - - _ = self.storage.update( - storageId: nextTaskToRunInventoryItem.taskPersistedId, - runResults: newRunResults - ) - } + return requestManager.requestComplete() + } - return goToNextTask(didTaskFail: true) + while continueRunnning { + // get the inventory before running each task. If a task was added to the queue while the last task was being + // executed, we can assert that new task will execute during this run. + let queueInventory = storage.getInventory() + + guard let nextTaskToRunInventoryItem = queryRunner.getNextTask( + queueInventory, + lastRanTask: lastRanTask, + lastFailedTask: lastFailedTask + ) + else { + // we hit the end of the current inventory. Done! + doneRunning() + break } - } - } - private func doneRunning() { - logger.debug("queue out of tasks to run.") + let nextTaskStorageId = nextTaskToRunInventoryItem.taskPersistedId + guard let nextTaskToRun = storage.get(storageId: nextTaskStorageId) else { + // log error. this scenario shouldn't happen where task can't be found. + logger.error("Tried to get queue task with storage id: \(nextTaskStorageId), but storage couldn't find it.") - queryRunner.reset() + // The task failed to execute like a HTTP failure. Update `lastFailedTask`. + goToNextTask(didTaskFail: true, taskJustExecuted: nextTaskToRunInventoryItem) + break + } - return requestManager.requestComplete() + logger.debug("queue tasks left to run: \(queueInventory.count)") + logger.debug(""" + queue next task to run: \(shortTaskId(nextTaskStorageId)), + \(nextTaskToRun.type), \(nextTaskToRun.data.string ?? ""), \(nextTaskToRun.runResults) + """) + + // we are not using [weak self] because if the task is currently running, + // we dont want the result handler to get garbage collected which could + // make the task run again when it shouldn't. + // + // if we wanted to use [weak self] then we should allow running a task to cancel + // while executing which would then allow this to use [weak self]. + runner.runTask(nextTaskToRun) { result in + switch result { + case .success: + self.logger.debug("queue task \(self.shortTaskId(nextTaskStorageId)) ran successfully") + + self.logger.debug("queue deleting task \(self.shortTaskId(nextTaskStorageId))") + _ = self.storage.delete(storageId: nextTaskToRunInventoryItem.taskPersistedId) + + goToNextTask(didTaskFail: false, taskJustExecuted: nextTaskToRunInventoryItem) + case .failure(let error): + self.logger + .debug("queue task \(self.shortTaskId(nextTaskStorageId)) run failed \(error.localizedDescription)") + + let previousRunResults = nextTaskToRun.runResults + + // When a HTTP request isn't made, dont update the run history to give us inaccurate data. + if case .requestsPaused = error { + self.logger.debug(""" + queue task \(self.shortTaskId(nextTaskStorageId)) didn't run because all HTTP requests paused. + """) + + self.logger.info("queue is quitting early because all HTTP requests are paused.") + + doneRunning() + break + } else { + let newRunResults = previousRunResults.totalRunsSet(previousRunResults.totalRuns + 1) + + self.logger.debug(""" + queue task \(self.shortTaskId(nextTaskStorageId)) updating run history + from: \(nextTaskToRun.runResults) to: \(newRunResults) + """) + + _ = self.storage.update( + storageId: nextTaskToRunInventoryItem.taskPersistedId, + runResults: newRunResults + ) + } + + goToNextTask(didTaskFail: true, taskJustExecuted: nextTaskToRunInventoryItem) + } + } + } } } diff --git a/Tests/Shared/IntegrationTest.swift b/Tests/Shared/IntegrationTest.swift index 2f886cfda..6db0057ae 100644 --- a/Tests/Shared/IntegrationTest.swift +++ b/Tests/Shared/IntegrationTest.swift @@ -15,8 +15,9 @@ open class IntegrationTest: UnitTest { // Date util stub is available in UnitTest public private(set) var sampleDataFilesUtil: SampleDataFilesUtil! - override open func setUp() { - super.setUp() + // setup should match that of UnitTest so that when a subclass calls setUp, it should call this function instead UnitTest's version. + override open func setUp(enableLogs: Bool = false, modifySdkConfig: ((inout SdkConfig) -> Void)? = nil) { + super.setUp(enableLogs: enableLogs, modifySdkConfig: modifySdkConfig) sampleDataFilesUtil = SampleDataFilesUtil(fileStore: diGraph.fileStorage) diff --git a/Tests/Shared/Stub/HttpRequestRunnerStub.swift b/Tests/Shared/Stub/HttpRequestRunnerStub.swift index c2bc2f809..1e1e17d01 100644 --- a/Tests/Shared/Stub/HttpRequestRunnerStub.swift +++ b/Tests/Shared/Stub/HttpRequestRunnerStub.swift @@ -33,6 +33,10 @@ public class HttpRequestRunnerStub { responseToAlwaysReturn = getHttpResponse(code: 200, data: "".data) } + public func alwaysReturnResponse(code: Int, data: Data) { + responseToAlwaysReturn = getHttpResponse(code: code, data: data) + } + private func getHttpResponse(code: Int, data: Data) -> HttpResponse { HttpResponse( data: data, diff --git a/Tests/Shared/UnitTest.swift b/Tests/Shared/UnitTest.swift index daff30a33..2e1125b79 100644 --- a/Tests/Shared/UnitTest.swift +++ b/Tests/Shared/UnitTest.swift @@ -66,7 +66,7 @@ open class UnitTest: XCTestCase { setUp(enableLogs: false) } - public func setUp(enableLogs: Bool = false, modifySdkConfig: ((inout SdkConfig) -> Void)? = nil) { + open func setUp(enableLogs: Bool = false, modifySdkConfig: ((inout SdkConfig) -> Void)? = nil) { var newSdkConfig = SdkConfig.Factory.create(region: Region.US) if enableLogs { newSdkConfig.logLevel = CioLogLevel.debug @@ -156,6 +156,7 @@ open class UnitTest: XCTestCase { public extension UnitTest { func waitForQueueToFinishRunningTasks( _ queue: Queue, + timeout: TimeInterval = 0.5, file: StaticString = #file, line: UInt = #line ) { @@ -164,6 +165,6 @@ public extension UnitTest { queueExpectation.fulfill() } - waitForExpectations(for: [queueExpectation], file: file, line: line) + waitForExpectations(for: [queueExpectation], timeout: timeout, file: file, line: line) } } diff --git a/Tests/Shared/XCTestCaseExtensions.swift b/Tests/Shared/XCTestCaseExtensions.swift index 818915e12..ae078a4b1 100644 --- a/Tests/Shared/XCTestCaseExtensions.swift +++ b/Tests/Shared/XCTestCaseExtensions.swift @@ -8,11 +8,12 @@ public extension XCTestCase { func waitForExpectations( for expectations: [XCTestExpectation], + timeout: TimeInterval = 0.5, enforceOrder: Bool = false, file _: StaticString = #file, line _: UInt = #line ) { - wait(for: expectations, timeout: 0.5, enforceOrder: enforceOrder) + wait(for: expectations, timeout: timeout, enforceOrder: enforceOrder) } func getEnvironmentVariable(_ key: String) -> String? { diff --git a/Tests/Tracking/CustomerIOIntegrationTests.swift b/Tests/Tracking/CustomerIOIntegrationTests.swift index 015bfc6df..afbb5ad00 100644 --- a/Tests/Tracking/CustomerIOIntegrationTests.swift +++ b/Tests/Tracking/CustomerIOIntegrationTests.swift @@ -116,4 +116,27 @@ class CustomerIOIntegrationTests: IntegrationTest { XCTAssertGreaterThan(httpRequestRunnerStub.requestCallsCount, 0) XCTAssertEqual(diGraph.queueStorage.getInventory().count, 0) } + + // MARK: Misc tests + + // Test BQ can process lots of tasks inside of it. + // Issues reported in the past where BQ caused a stackoverflow it too many tasks inside of it: https://github.com/customerio/issues/issues/8917 + func test_backgroundQueueCanHandleLotsOfTasksInQueue() { + let numberOfTasksToAddToQueue = 1000 + setUp { config in + config.backgroundQueueMinNumberOfTasks = numberOfTasksToAddToQueue + } + httpRequestRunnerStub.alwaysReturnResponse(code: 403, data: "".data) + + CustomerIO.shared.identify(identifier: .random) // to allow us to add other tasks to the BQ + + for _ in 0 ... numberOfTasksToAddToQueue { + CustomerIO.shared.track(name: .random) + } + + // 30 second timeout because this test takes a while to execute. + waitForQueueToFinishRunningTasks(queue, timeout: 30.0) + + XCTAssertGreaterThan(httpRequestRunnerStub.requestCallsCount, numberOfTasksToAddToQueue) + } }