Skip to content

Commit

Permalink
fix: revert 2.0.2 as it was found unstable (customerio#249)
Browse files Browse the repository at this point in the history
  • Loading branch information
levibostian committed Jan 11, 2023
1 parent 5c6e375 commit 51b5831
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 126 deletions.
176 changes: 85 additions & 91 deletions Sources/Common/Background Queue/QueueRunRequest.swift
Expand Up @@ -38,112 +38,106 @@ public class CioQueueRunRequest: QueueRunRequest {
}

private func startNewRequestRun() {
runTasks()
runTasks(lastRanTask: nil)
}

// Disable swiftlint because function at this time isn't too complex to need to make it smaller.
// Many of the lines of this function are logging related.
// swiftlint:disable:next function_body_length
private func runTasks() {
var lastRanTask: QueueTaskMetadata?
var lastFailedTask: QueueTaskMetadata?
var continueRunnning = true
private func runTasks(
lastRanTask: QueueTaskMetadata?,
lastFailedTask: QueueTaskMetadata? = nil
) {
// get the inventory before running each task. If a task was added to the queue while the last task was being
// executed, we can assert that new task will execute during this run.
let queueInventory = storage.getInventory()

guard let nextTaskToRunInventoryItem = queryRunner.getNextTask(
queueInventory,
lastRanTask: lastRanTask,
lastFailedTask: lastFailedTask
)
else {
// we hit the end of the current inventory. Done!
return doneRunning()
}

// call when you're done with task
func goToNextTask(didTaskFail: Bool, taskJustExecuted: QueueTaskMetadata) {
lastFailedTask = didTaskFail ? taskJustExecuted : nil
lastRanTask = taskJustExecuted
}
func goToNextTask(didTaskFail: Bool) {
let lastFailedTask = didTaskFail ? nextTaskToRunInventoryItem : nil

func doneRunning() {
continueRunnning = false
logger.debug("queue out of tasks to run.")
runTasks(lastRanTask: nextTaskToRunInventoryItem, lastFailedTask: lastFailedTask)
}

queryRunner.reset()
let nextTaskStorageId = nextTaskToRunInventoryItem.taskPersistedId
guard let nextTaskToRun = storage.get(storageId: nextTaskStorageId) else {
// log error. this scenario shouldn't happen where task can't be found.
logger.error("Tried to get queue task with storage id: \(nextTaskStorageId), but storage couldn't find it.")

return requestManager.requestComplete()
// The task failed to execute like a HTTP failure. Update `lastFailedTask`.
return goToNextTask(didTaskFail: true)
}

while continueRunnning {
// get the inventory before running each task. If a task was added to the queue while the last task was being
// executed, we can assert that new task will execute during this run.
let queueInventory = storage.getInventory()

guard let nextTaskToRunInventoryItem = queryRunner.getNextTask(
queueInventory,
lastRanTask: lastRanTask,
lastFailedTask: lastFailedTask
)
else {
// we hit the end of the current inventory. Done!
doneRunning()
break
logger.debug("queue tasks left to run: \(queueInventory.count)")
logger.debug("""
queue next task to run: \(shortTaskId(nextTaskStorageId)),
\(nextTaskToRun.type), \(nextTaskToRun.data.string ?? ""), \(nextTaskToRun.runResults)
""")

// we are not using [weak self] because if the task is currently running,
// we dont want the result handler to get garbage collected which could
// make the task run again when it shouldn't.
//
// if we wanted to use [weak self] then we should allow running a task to cancel
// while executing which would then allow this to use [weak self].
runner.runTask(nextTaskToRun) { result in
switch result {
case .success:
self.logger.debug("queue task \(self.shortTaskId(nextTaskStorageId)) ran successfully")

self.logger.debug("queue deleting task \(self.shortTaskId(nextTaskStorageId))")
_ = self.storage.delete(storageId: nextTaskToRunInventoryItem.taskPersistedId)

return goToNextTask(didTaskFail: false)
case .failure(let error):
self.logger
.debug("queue task \(self.shortTaskId(nextTaskStorageId)) run failed \(error.localizedDescription)")

let previousRunResults = nextTaskToRun.runResults

// When a HTTP request isn't made, dont update the run history to give us inaccurate data.
if case .requestsPaused = error {
self.logger.debug("""
queue task \(self.shortTaskId(nextTaskStorageId)) didn't run because all HTTP requests paused.
""")

self.logger.info("queue is quitting early because all HTTP requests are paused.")

return self.doneRunning()
} else {
let newRunResults = previousRunResults.totalRunsSet(previousRunResults.totalRuns + 1)

self.logger.debug("""
queue task \(self.shortTaskId(nextTaskStorageId)) updating run history
from: \(nextTaskToRun.runResults) to: \(newRunResults)
""")

_ = self.storage.update(
storageId: nextTaskToRunInventoryItem.taskPersistedId,
runResults: newRunResults
)
}

return goToNextTask(didTaskFail: true)
}
}
}

let nextTaskStorageId = nextTaskToRunInventoryItem.taskPersistedId
guard let nextTaskToRun = storage.get(storageId: nextTaskStorageId) else {
// log error. this scenario shouldn't happen where task can't be found.
logger.error("Tried to get queue task with storage id: \(nextTaskStorageId), but storage couldn't find it.")
private func doneRunning() {
logger.debug("queue out of tasks to run.")

// The task failed to execute like a HTTP failure. Update `lastFailedTask`.
goToNextTask(didTaskFail: true, taskJustExecuted: nextTaskToRunInventoryItem)
break
}
queryRunner.reset()

logger.debug("queue tasks left to run: \(queueInventory.count)")
logger.debug("""
queue next task to run: \(shortTaskId(nextTaskStorageId)),
\(nextTaskToRun.type), \(nextTaskToRun.data.string ?? ""), \(nextTaskToRun.runResults)
""")

// we are not using [weak self] because if the task is currently running,
// we dont want the result handler to get garbage collected which could
// make the task run again when it shouldn't.
//
// if we wanted to use [weak self] then we should allow running a task to cancel
// while executing which would then allow this to use [weak self].
runner.runTask(nextTaskToRun) { result in
switch result {
case .success:
self.logger.debug("queue task \(self.shortTaskId(nextTaskStorageId)) ran successfully")

self.logger.debug("queue deleting task \(self.shortTaskId(nextTaskStorageId))")
_ = self.storage.delete(storageId: nextTaskToRunInventoryItem.taskPersistedId)

goToNextTask(didTaskFail: false, taskJustExecuted: nextTaskToRunInventoryItem)
case .failure(let error):
self.logger
.debug("queue task \(self.shortTaskId(nextTaskStorageId)) run failed \(error.localizedDescription)")

let previousRunResults = nextTaskToRun.runResults

// When a HTTP request isn't made, dont update the run history to give us inaccurate data.
if case .requestsPaused = error {
self.logger.debug("""
queue task \(self.shortTaskId(nextTaskStorageId)) didn't run because all HTTP requests paused.
""")

self.logger.info("queue is quitting early because all HTTP requests are paused.")

doneRunning()
break
} else {
let newRunResults = previousRunResults.totalRunsSet(previousRunResults.totalRuns + 1)

self.logger.debug("""
queue task \(self.shortTaskId(nextTaskStorageId)) updating run history
from: \(nextTaskToRun.runResults) to: \(newRunResults)
""")

_ = self.storage.update(
storageId: nextTaskToRunInventoryItem.taskPersistedId,
runResults: newRunResults
)
}

goToNextTask(didTaskFail: true, taskJustExecuted: nextTaskToRunInventoryItem)
}
}
}
return requestManager.requestComplete()
}
}
5 changes: 2 additions & 3 deletions Tests/Shared/IntegrationTest.swift
Expand Up @@ -15,9 +15,8 @@ open class IntegrationTest: UnitTest {
// Date util stub is available in UnitTest
public private(set) var sampleDataFilesUtil: SampleDataFilesUtil!

// setup should match that of UnitTest so that when a subclass calls setUp, it should call this function instead UnitTest's version.
override open func setUp(enableLogs: Bool = false, modifySdkConfig: ((inout SdkConfig) -> Void)? = nil) {
super.setUp(enableLogs: enableLogs, modifySdkConfig: modifySdkConfig)
override open func setUp() {
super.setUp()

sampleDataFilesUtil = SampleDataFilesUtil(fileStore: diGraph.fileStorage)

Expand Down
4 changes: 0 additions & 4 deletions Tests/Shared/Stub/HttpRequestRunnerStub.swift
Expand Up @@ -33,10 +33,6 @@ public class HttpRequestRunnerStub {
responseToAlwaysReturn = getHttpResponse(code: 200, data: "".data)
}

public func alwaysReturnResponse(code: Int, data: Data) {
responseToAlwaysReturn = getHttpResponse(code: code, data: data)
}

private func getHttpResponse(code: Int, data: Data) -> HttpResponse {
HttpResponse(
data: data,
Expand Down
5 changes: 2 additions & 3 deletions Tests/Shared/UnitTest.swift
Expand Up @@ -66,7 +66,7 @@ open class UnitTest: XCTestCase {
setUp(enableLogs: false)
}

open func setUp(enableLogs: Bool = false, modifySdkConfig: ((inout SdkConfig) -> Void)? = nil) {
public func setUp(enableLogs: Bool = false, modifySdkConfig: ((inout SdkConfig) -> Void)? = nil) {
var newSdkConfig = SdkConfig.Factory.create(region: Region.US)
if enableLogs {
newSdkConfig.logLevel = CioLogLevel.debug
Expand Down Expand Up @@ -156,7 +156,6 @@ open class UnitTest: XCTestCase {
public extension UnitTest {
func waitForQueueToFinishRunningTasks(
_ queue: Queue,
timeout: TimeInterval = 0.5,
file: StaticString = #file,
line: UInt = #line
) {
Expand All @@ -165,6 +164,6 @@ public extension UnitTest {
queueExpectation.fulfill()
}

waitForExpectations(for: [queueExpectation], timeout: timeout, file: file, line: line)
waitForExpectations(for: [queueExpectation], file: file, line: line)
}
}
3 changes: 1 addition & 2 deletions Tests/Shared/XCTestCaseExtensions.swift
Expand Up @@ -8,12 +8,11 @@ public extension XCTestCase {

func waitForExpectations(
for expectations: [XCTestExpectation],
timeout: TimeInterval = 0.5,
enforceOrder: Bool = false,
file _: StaticString = #file,
line _: UInt = #line
) {
wait(for: expectations, timeout: timeout, enforceOrder: enforceOrder)
wait(for: expectations, timeout: 0.5, enforceOrder: enforceOrder)
}

func getEnvironmentVariable(_ key: String) -> String? {
Expand Down
23 changes: 0 additions & 23 deletions Tests/Tracking/CustomerIOIntegrationTests.swift
Expand Up @@ -116,27 +116,4 @@ class CustomerIOIntegrationTests: IntegrationTest {
XCTAssertGreaterThan(httpRequestRunnerStub.requestCallsCount, 0)
XCTAssertEqual(diGraph.queueStorage.getInventory().count, 0)
}

// MARK: Misc tests

// Test BQ can process lots of tasks inside of it.
// Issues reported in the past where BQ caused a stackoverflow it too many tasks inside of it: https://github.com/customerio/issues/issues/8917
func test_backgroundQueueCanHandleLotsOfTasksInQueue() {
let numberOfTasksToAddToQueue = 1000
setUp { config in
config.backgroundQueueMinNumberOfTasks = numberOfTasksToAddToQueue
}
httpRequestRunnerStub.alwaysReturnResponse(code: 403, data: "".data)

CustomerIO.shared.identify(identifier: .random) // to allow us to add other tasks to the BQ

for _ in 0 ... numberOfTasksToAddToQueue {
CustomerIO.shared.track(name: .random)
}

// 30 second timeout because this test takes a while to execute.
waitForQueueToFinishRunningTasks(queue, timeout: 30.0)

XCTAssertGreaterThan(httpRequestRunnerStub.requestCallsCount, numberOfTasksToAddToQueue)
}
}

0 comments on commit 51b5831

Please sign in to comment.