Skip to content

Commit

Permalink
feat: introduce a writeQueue for event tracking (#124)
Browse files Browse the repository at this point in the history
* feat: introduce a writeQueue for event tracking

the write queue will consume one tracked event after the other and write them to disk and run the flush policies

* fixup! feat: introduce a writeQueue for event tracking

* fixup! feat: introduce a writeQueue for event tracking
  • Loading branch information
nicklasl committed May 27, 2024
1 parent 1b32d05 commit a49a393
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 58 deletions.
3 changes: 1 addition & 2 deletions Sources/Confidence/Confidence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,7 @@ extension Confidence {
let eventSenderEngine = EventSenderEngineImpl(
clientSecret: clientSecret,
uploader: uploader,
storage: eventStorage,
flushPolicies: [SizeFlushPolicy(batchSize: 10)])
storage: eventStorage)
return Confidence(
clientSecret: clientSecret,
region: region,
Expand Down
50 changes: 34 additions & 16 deletions Sources/Confidence/EventSenderEngine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,36 +24,54 @@ final class EventSenderEngineImpl: EventSenderEngine {
private let clientSecret: String
private let payloadMerger: PayloadMerger = PayloadMergerImpl()
private let semaphore = DispatchSemaphore(value: 1)
private let writeQueue: DispatchQueue

convenience init(
clientSecret: String,
uploader: ConfidenceClient,
storage: EventStorage
) {
self.init(
clientSecret: clientSecret,
uploader: uploader,
storage: storage,
flushPolicies: [SizeFlushPolicy(batchSize: 10)],
writeQueue: DispatchQueue(label: "ConfidenceWriteQueue")
)
}

init(
clientSecret: String,
uploader: ConfidenceClient,
storage: EventStorage,
flushPolicies: [FlushPolicy]
flushPolicies: [FlushPolicy],
writeQueue: DispatchQueue
) {
self.uploader = uploader
self.clientSecret = clientSecret
self.storage = storage
self.flushPolicies = flushPolicies + [ManualFlushPolicy()]
self.writeQueue = writeQueue

writeReqChannel.sink { [weak self] event in
guard let self = self else { return }
if event.name != manualFlushEvent.name { // skip storing flush events.
do {
try self.storage.writeEvent(event: event)
} catch {
writeReqChannel
.receive(on: self.writeQueue)
.sink { [weak self] event in
guard let self = self else { return }
if event.name != manualFlushEvent.name { // skip storing flush events.
do {
try self.storage.writeEvent(event: event)
} catch {
}
}
}

self.flushPolicies.forEach { policy in policy.hit(event: event) }
let shouldFlush = self.flushPolicies.contains { policy in policy.shouldFlush() }
self.flushPolicies.forEach { policy in policy.hit(event: event) }
let shouldFlush = self.flushPolicies.contains { policy in policy.shouldFlush() }

if shouldFlush {
self.uploadReqChannel.send(EventSenderEngineImpl.sendSignalName)
self.flushPolicies.forEach { policy in policy.reset() }
if shouldFlush {
self.uploadReqChannel.send(EventSenderEngineImpl.sendSignalName)
self.flushPolicies.forEach { policy in policy.reset() }
}
}
}
.store(in: &cancellables)
.store(in: &cancellables)

uploadReqChannel.sink { [weak self] _ in
guard let self = self else { return }
Expand Down
92 changes: 52 additions & 40 deletions Tests/ConfidenceTests/EventSenderEngineTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,29 @@ final class ImmidiateFlushPolicy: FlushPolicy {
}

final class EventSenderEngineTest: XCTestCase {
// swiftlint:disable implicitly_unwrapped_optional
var writeQueue: DispatchQueue!
var uploaderMock: EventUploaderMock!
var storageMock: EventStorageMock!
// swiftlint:enable implicitly_unwrapped_optional

override func setUp() async throws {
writeQueue = DispatchQueue(label: "ConfidenceWriteQueue")
uploaderMock = EventUploaderMock()
storageMock = EventStorageMock()
}

func testPayloadOnEmit() throws {
let flushPolicies = [MinSizeFlushPolicy(maxSize: 1)]
let uploader = EventUploaderMock()
let eventSenderEngine = EventSenderEngineImpl(
clientSecret: "CLIENT_SECRET",
uploader: uploader,
storage: EventStorageMock(),
flushPolicies: flushPolicies
uploader: uploaderMock,
storage: storageMock,
flushPolicies: [MinSizeFlushPolicy(maxSize: 1)],
writeQueue: writeQueue
)

let expectation = XCTestExpectation(description: "Upload finished")
let cancellable = uploader.subject.sink { _ in
let cancellable = uploaderMock.subject.sink { _ in
expectation.fulfill()
}
eventSenderEngine.emit(
Expand All @@ -67,100 +78,102 @@ final class EventSenderEngineTest: XCTestCase {


wait(for: [expectation], timeout: 5)
XCTAssertEqual(try XCTUnwrap(uploader.calledRequest)[0].eventDefinition, "my_event")
XCTAssertEqual(try XCTUnwrap(uploader.calledRequest)[0].payload, NetworkStruct(fields: [
XCTAssertEqual(try XCTUnwrap(uploaderMock.calledRequest)[0].eventDefinition, "my_event")
XCTAssertEqual(try XCTUnwrap(uploaderMock.calledRequest)[0].payload, NetworkStruct(fields: [
"a": .number(0.0),
"message": .number(1.0)
]))
cancellable.cancel()
}

func testAddingEventsWithSizeFlushPolicyWorks() throws {
let flushPolicies = [MinSizeFlushPolicy(maxSize: 5)]
let uploader = EventUploaderMock()
let eventSenderEngine = EventSenderEngineImpl(
clientSecret: "CLIENT_SECRET",
uploader: uploader,
storage: EventStorageMock(),
flushPolicies: flushPolicies
uploader: uploaderMock,
storage: storageMock,
flushPolicies: [MinSizeFlushPolicy(maxSize: 5)],
writeQueue: writeQueue
)

eventSenderEngine.emit(eventName: "Hello", message: [:], context: [:])
// TODO: We need to wait for writeReqChannel to complete to make this test meaningful
XCTAssertNil(uploader.calledRequest)
XCTAssertNil(uploaderMock.calledRequest)
}

func testRemoveEventsFromStorageOnBadRequest() throws {
MockedClientURLProtocol.mockedOperation = .badRequest
let client = RemoteConfidenceClient(
let badRequestUploader = RemoteConfidenceClient(
options: ConfidenceClientOptions(credentials: ConfidenceClientCredentials.clientSecret(secret: "")),
session: MockedClientURLProtocol.mockedSession(),
metadata: ConfidenceMetadata(name: "", version: ""))

let flushPolicies = [ImmidiateFlushPolicy()]
let storage = EventStorageMock()
let eventSenderEngine = EventSenderEngineImpl(
clientSecret: "CLIENT_SECRET",
uploader: client,
storage: storage,
flushPolicies: flushPolicies
uploader: badRequestUploader,
storage: storageMock,
flushPolicies: [ImmidiateFlushPolicy()],
writeQueue: writeQueue
)
eventSenderEngine.emit(eventName: "testEvent", message: ConfidenceStruct(), context: ConfidenceStruct())
let expectation = expectation(description: "events batched")
storage.eventsRemoved{
storageMock.eventsRemoved{
expectation.fulfill()
}
wait(for: [expectation], timeout: 2)

XCTAssertEqual(storage.isEmpty(), true)
XCTAssertEqual(storageMock.isEmpty(), true)
}

func testKeepEventsInStorageForRetry() throws {
MockedClientURLProtocol.mockedOperation = .needRetryLater
let client = RemoteConfidenceClient(
let retryLaterUploader = RemoteConfidenceClient(
options: ConfidenceClientOptions(credentials: ConfidenceClientCredentials.clientSecret(secret: "")),
session: MockedClientURLProtocol.mockedSession(),
metadata: ConfidenceMetadata(name: "", version: ""))

let flushPolicies = [ImmidiateFlushPolicy()]
let storage = EventStorageMock()
let eventSenderEngine = EventSenderEngineImpl(
clientSecret: "CLIENT_SECRET",
uploader: client,
storage: storage,
flushPolicies: flushPolicies
uploader: retryLaterUploader,
storage: storageMock,
flushPolicies: [ImmidiateFlushPolicy()],
writeQueue: writeQueue
)

eventSenderEngine.emit(eventName: "testEvent", message: ConfidenceStruct(), context: ConfidenceStruct())

XCTAssertEqual(storage.isEmpty(), false)
writeQueue.sync {
XCTAssertEqual(storageMock.isEmpty(), false)
}
}

func testManualFlushWorks() throws {
let uploaderMock = EventUploaderMock()
let storageMock = EventStorageMock()
let eventSenderEngine = EventSenderEngineImpl(
clientSecret: "CLIENT_SECRET",
uploader: uploaderMock,
storage: storageMock,
// no other flush policy is set which means that only manual flushes will trigger upload
flushPolicies: []
flushPolicies: [],
writeQueue: writeQueue
)

eventSenderEngine.emit(eventName: "Hello", message: [:], context: [:])
eventSenderEngine.emit(eventName: "Hello", message: [:], context: [:])
eventSenderEngine.emit(eventName: "Hello", message: [:], context: [:])
eventSenderEngine.emit(eventName: "Hello", message: [:], context: [:])
XCTAssertEqual(storageMock.events.count, 4)
XCTAssertNil(uploaderMock.calledRequest)


writeQueue.sync {
XCTAssertEqual(storageMock.events.count, 4)
XCTAssertNil(uploaderMock.calledRequest)
}

eventSenderEngine.flush()

let expectation = XCTestExpectation(description: "Upload finished")
let uploadExpectation = XCTestExpectation(description: "Upload finished")
let cancellable = uploaderMock.subject.sink { _ in
expectation.fulfill()
uploadExpectation.fulfill()
}
wait(for: [expectation], timeout: 1)
wait(for: [uploadExpectation], timeout: 1)
let uploadRequest = uploaderMock.calledRequest
XCTAssertEqual(uploadRequest?.count, 4)

Expand All @@ -169,14 +182,13 @@ final class EventSenderEngineTest: XCTestCase {


func testManualFlushEventIsNotStored() throws {
let uploaderMock = EventUploaderMock()
let storageMock = EventStorageMock()
let eventSenderEngine = EventSenderEngineImpl(
clientSecret: "CLIENT_SECRET",
uploader: uploaderMock,
storage: storageMock,
// no other flush policy is set which means that only manual flushes will trigger upload
flushPolicies: []
flushPolicies: [],
writeQueue: writeQueue
)

eventSenderEngine.flush()
Expand Down

0 comments on commit a49a393

Please sign in to comment.