Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 145 additions & 0 deletions architecture/event_processor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
# Analytic Event Processor

The Event Processor is responsible for consuming, batching, and delivering events generated
by the server and client-side LaunchDarkly SDKs.

```mermaid
classDiagram
IEventProcessor <|-- NullEventProcessor
IEventProcessor <|-- AsioEventProcessor

AsioEventProcessor *-- LRUCache
AsioEventProcessor *-- Outbox
AsioEventProcessor *-- WorkerPool
AsioEventProcessor *-- Summarizer

RequestWorker *-- EventBatch
WorkerPool *-- "5" RequestWorker

TrackEvent -- TrackEventParams: (alias)
InputEvent *-- IdentifyEventParams
InputEvent *-- FeatureEventParams
InputEvent *-- TrackEventParams


OutputEvent *-- IndexEvent
OutputEvent *-- FeatureEvent
OutputEvent *-- DebugEvent
OutputEvent *-- IdentifyEvent
OutputEvent *-- TrackEvent

EventBatch --> Outbox: Pulls individual events from..
EventBatch --> Summarizer: Pulls summary events from..

IEventProcessor --> InputEvent
Outbox --> OutputEvent

Summarizer --> FeatureEventParams


class IEventProcessor {
<<interface>>
+SendAsync(InputEvent event) void
+FlushAsync() void
+ShutdownAsync() void
}

class NullEventProcessor {

}

class AsioEventProcessor {

}

class EventBatch {
+const Count() size_t
+const Request() network:: HttpRequest
+const Target() std:: string
}

class LRUCache {
+Notice(std:: string value) bool
+const Size() size_t
+Clear() void
}

class Outbox {
+PushDiscardingOverflow(std:: vector~OutputEvent~ events) bool
+Consume() std:: vector~OutputEvent~
+const Empty() bool
}

class RequestWorker {
+const Available() bool
+AsyncDeliver(EventBatch, delivery_callback)
}

class WorkerPool {
+Get(worker_callback) void
}

class Summarizer {
+Update(FeatureEventParams) void
+Finish()
+const StartTime() Time
+const EndTime() Time
}

%% note: the 'namespace' feature isn't supported on Github yet
%% namespace events {
class InputEvent {
+std:: variant
}


class OutputEvent {
+std:: variant
}

class FeatureEventParams {

}

class IdentifyEventParams {

}

class TrackEventParams {

}

class FeatureEvent {

}

class DebugEvent {

}

class IdentifyEvent {

}

class IndexEvent {

}

class TrackEvent {

}

%% }
```

### Notes

SDKs may be configured to disable events, so `NullEventProcessor` is made available. This component accepts
events generated
by the SDK and discards them.

If events are enabled, SDKs use the `AsioEventProcessor` implementation, which is an asynchronous processor
utilizing `boost::asio`.

Most event definitions are shared between the server and client-side SDKs. Unique to the server-side SDK
is `IndexEvent`.
6 changes: 3 additions & 3 deletions libs/internal/include/launchdarkly/events/detail/outbox.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,18 @@ class Outbox {
* @return True if all events were accepted; false if >= 1 events were
* dropped.
*/
bool PushDiscardingOverflow(std::vector<OutputEvent> events);
[[nodiscard]] bool PushDiscardingOverflow(std::vector<OutputEvent> events);

/**
* Consumes all events in the outbox.
* @return All events in the outbox, in the order they were pushed.
*/
std::vector<OutputEvent> Consume();
[[nodiscard]] std::vector<OutputEvent> Consume();

/**
* True if the outbox is empty.
*/
bool Empty();
[[nodiscard]] bool Empty() const;

private:
std::queue<OutputEvent> items_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ class Summarizer {
/**
* Returns the summary's start time as given in the constructor.
*/
[[nodiscard]] Time start_time() const;
[[nodiscard]] Time StartTime() const;

/**
* Returns the summary's end time as specified using Finish.
*/
[[nodiscard]] Time end_time() const;
[[nodiscard]] Time EndTime() const;

struct VariationSummary {
public:
Expand Down
2 changes: 1 addition & 1 deletion libs/internal/src/events/outbox.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ std::vector<OutputEvent> Outbox::Consume() {
return out;
}

bool Outbox::Empty() {
bool Outbox::Empty() const {
return items_.empty();
}

Expand Down
4 changes: 2 additions & 2 deletions libs/internal/src/events/summarizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ Summarizer& Summarizer::Finish(Time end_time) {
return *this;
}

Summarizer::Time Summarizer::start_time() const {
Summarizer::Time Summarizer::StartTime() const {
return start_time_;
}

Summarizer::Time Summarizer::end_time() const {
Summarizer::Time Summarizer::EndTime() const {
return end_time_;
}

Expand Down
4 changes: 2 additions & 2 deletions libs/internal/src/serialization/events/json_events.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ void tag_invoke(boost::json::value_from_tag const& tag,
auto& obj = json_value.emplace_object();
obj.emplace("kind", "summary");
obj.emplace("startDate",
boost::json::value_from(Date{summary.start_time()}));
obj.emplace("endDate", boost::json::value_from(Date{summary.end_time()}));
boost::json::value_from(Date{summary.StartTime()}));
obj.emplace("endDate", boost::json::value_from(Date{summary.EndTime()}));
obj.emplace("features", boost::json::value_from(summary.Features()));
}
} // namespace launchdarkly::events::detail
4 changes: 2 additions & 2 deletions libs/internal/tests/event_summarizer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ TEST(SummarizerTests, IsEmptyOnConstruction) {

TEST(SummarizerTests, DefaultConstructionUsesZeroStartTime) {
Summarizer summarizer;
ASSERT_EQ(summarizer.start_time(), TimeZero());
ASSERT_EQ(summarizer.StartTime(), TimeZero());
}

TEST(SummarizerTests, ExplicitStartTimeIsCorrect) {
auto start = std::chrono::system_clock::from_time_t(12345);
Summarizer summarizer(start);
ASSERT_EQ(summarizer.start_time(), start);
ASSERT_EQ(summarizer.StartTime(), start);
}

struct EvaluationParams {
Expand Down