-
Notifications
You must be signed in to change notification settings - Fork 12
Bounded resource consumption refactoring #37
Changes from all commits
eea903e
5479d87
a660468
e24da1b
0a430e2
69f7a77
8b23de9
a3b2207
8171106
590d2ce
ee0dc1b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,9 +17,8 @@ type LDClient struct { | |
| apiKey string | ||
| config Config | ||
| eventProcessor *eventProcessor | ||
| offline bool | ||
| streamProcessor *streamProcessor | ||
| requestor *requestor | ||
| updateProcessor updateProcessor | ||
| store FeatureStore | ||
| } | ||
|
|
||
| // Exposes advanced configuration options for the LaunchDarkly client. | ||
|
|
@@ -29,12 +28,20 @@ type Config struct { | |
| EventsUri string | ||
| Capacity int | ||
| FlushInterval time.Duration | ||
| PollInterval time.Duration | ||
| Logger *log.Logger | ||
| Timeout time.Duration | ||
| Stream bool | ||
| FeatureStore FeatureStore | ||
| UseLdd bool | ||
| SendEvents bool | ||
| Offline bool | ||
| } | ||
|
|
||
| type updateProcessor interface { | ||
| initialized() bool | ||
| close() | ||
| start(chan<- bool) | ||
| } | ||
|
|
||
| // Provides the default configuration options for the LaunchDarkly client. | ||
|
|
@@ -48,45 +55,83 @@ var DefaultConfig = Config{ | |
| EventsUri: "https://events.launchdarkly.com", | ||
| Capacity: 1000, | ||
| FlushInterval: 5 * time.Second, | ||
| PollInterval: 1 * time.Second, | ||
| Logger: log.New(os.Stderr, "[LaunchDarkly]", log.LstdFlags), | ||
| Timeout: 3000 * time.Millisecond, | ||
| Stream: true, | ||
| FeatureStore: nil, | ||
| UseLdd: false, | ||
| SendEvents: true, | ||
| Offline: false, | ||
| } | ||
|
|
||
| var ErrInitializationTimeout = errors.New("Timeout encountered waiting for LaunchDarkly client initialization") | ||
| var ErrClientNotInitialized = errors.New("Toggle called before LaunchDarkly client initialization completed") | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. exported var ErrClientNotInitialized should have comment or be unexported |
||
|
|
||
| // Creates a new client instance that connects to LaunchDarkly with the default configuration. In most | ||
| // cases, you should use this method to instantiate your client. | ||
| func MakeClient(apiKey string) *LDClient { | ||
| res := MakeCustomClient(apiKey, DefaultConfig) | ||
| return &res | ||
| // cases, you should use this method to instantiate your client. The optional duration parameter allows callers to | ||
| // block until the client has connected to LaunchDarkly and is properly initialized. | ||
| func MakeClient(apiKey string, waitFor time.Duration) (*LDClient, error) { | ||
| return MakeCustomClient(apiKey, DefaultConfig, waitFor) | ||
| } | ||
|
|
||
| // Creates a new client instance that connects to LaunchDarkly with a custom configuration. | ||
| func MakeCustomClient(apiKey string, config Config) LDClient { | ||
| var streamProcessor *streamProcessor | ||
| // Creates a new client instance that connects to LaunchDarkly with a custom configuration. The optional duration parameter allows callers to | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. comment on exported function MakeCustomClient should be of the form "MakeCustomClient ..." |
||
| // block until the client has connected to LaunchDarkly and is properly initialized. | ||
| func MakeCustomClient(apiKey string, config Config, waitFor time.Duration) (*LDClient, error) { | ||
| var updateProcessor updateProcessor | ||
| var store FeatureStore | ||
|
|
||
| ch := make(chan bool) | ||
|
|
||
| config.BaseUri = strings.TrimRight(config.BaseUri, "/") | ||
| config.EventsUri = strings.TrimRight(config.EventsUri, "/") | ||
|
|
||
| requestor := newRequestor(apiKey, config) | ||
|
|
||
| if config.Stream { | ||
| streamProcessor = newStream(apiKey, config, requestor) | ||
| if config.FeatureStore == nil { | ||
| config.FeatureStore = NewInMemoryFeatureStore() | ||
| } | ||
|
|
||
| return LDClient{ | ||
| if config.PollInterval < (1 * time.Second) { | ||
| config.PollInterval = 1 * time.Second | ||
| } | ||
|
|
||
| store = config.FeatureStore | ||
|
|
||
| if !config.UseLdd && !config.Offline { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If LDD mode is set, then we will always timeout. |
||
| if config.Stream { | ||
| updateProcessor = newStreamProcessor(apiKey, config, requestor) | ||
| } else { | ||
| updateProcessor = newPollingProcessor(config, requestor) | ||
| } | ||
| updateProcessor.start(ch) | ||
| } | ||
|
|
||
| client := LDClient{ | ||
| apiKey: apiKey, | ||
| config: config, | ||
| requestor: requestor, | ||
| eventProcessor: newEventProcessor(apiKey, config), | ||
| offline: false, | ||
| streamProcessor: streamProcessor, | ||
| updateProcessor: updateProcessor, | ||
| store: store, | ||
| } | ||
|
|
||
| timeout := time.After(waitFor) | ||
|
|
||
| for { | ||
| select { | ||
| case <-ch: | ||
| return &client, nil | ||
| case <-timeout: | ||
| if waitFor > 0 { | ||
| return &client, ErrInitializationTimeout | ||
| } | ||
| return &client, nil | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func (client *LDClient) Identify(user User) error { | ||
| if client.offline { | ||
| if client.IsOffline() { | ||
| return nil | ||
| } | ||
| evt := NewIdentifyEvent(user) | ||
|
|
@@ -96,65 +141,51 @@ func (client *LDClient) Identify(user User) error { | |
| // Tracks that a user has performed an event. Custom data can be attached to the | ||
| // event, and is serialized to JSON using the encoding/json package (http://golang.org/pkg/encoding/json/). | ||
| func (client *LDClient) Track(key string, user User, data interface{}) error { | ||
| if client.offline { | ||
| if client.IsOffline() { | ||
| return nil | ||
| } | ||
| evt := NewCustomEvent(key, user, data) | ||
| return client.eventProcessor.sendEvent(evt) | ||
| } | ||
|
|
||
| // Puts the LaunchDarkly client in offline mode. In offline mode, no network calls will be made, | ||
| // and no events will be recorded. In addition, all calls to Toggle will return the default value. | ||
| func (client *LDClient) SetOffline() { | ||
| client.offline = true | ||
| } | ||
|
|
||
| // Puts the LaunchDarkly client in online mode. | ||
| func (client *LDClient) SetOnline() { | ||
| client.offline = false | ||
| } | ||
|
|
||
| // Returns whether the LaunchDarkly client is in offline mode. | ||
| func (client *LDClient) IsOffline() bool { | ||
| return client.offline | ||
| return client.config.Offline | ||
| } | ||
|
|
||
| // Eagerly initializes the stream connection. If InitializeStream is not called, the stream will | ||
| // be initialized lazily with the first call to Toggle. | ||
| func (client *LDClient) InitializeStream() { | ||
| if client.config.Stream { | ||
| client.streamProcessor.StartOnce() | ||
| } | ||
| } | ||
|
|
||
| // Returns false if the LaunchDarkly client does not have an active connection to | ||
| // the LaunchDarkly streaming endpoint. If streaming mode is disabled in the client | ||
| // configuration, this will always return false. | ||
| func (client *LDClient) IsStreamDisconnected() bool { | ||
| return client.config.Stream == false || client.streamProcessor == nil || client.streamProcessor.ShouldFallbackUpdate() | ||
| // Returns whether the LaunchDarkly client is initialized. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. comment on exported method LDClient.Initialized should be of the form "Initialized ..." There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. comment on exported method LDClient.Initialized should be of the form "Initialized ..." |
||
| func (client *LDClient) Initialized() bool { | ||
| return client.IsOffline() || client.config.UseLdd || client.updateProcessor.initialized() | ||
| } | ||
|
|
||
| // Returns whether the LaunchDarkly client has received an initial response from | ||
| // the LaunchDarkly streaming endpoint. If this is the case, the client can service | ||
| // Toggle calls from the stream. If streaming mode is disabled in the client | ||
| // configuration, this will always return false. | ||
| func (client *LDClient) IsStreamInitialized() bool { | ||
| return client.config.Stream && client.streamProcessor != nil && client.streamProcessor.Initialized() | ||
| } | ||
|
|
||
| // Stops the LaunchDarkly client from sending any additional events. | ||
| // Shuts down the LaunchDarkly client. After calling this, the LaunchDarkly client | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. comment on exported method LDClient.Close should be of the form "Close ..." There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. comment on exported method LDClient.Close should be of the form "Close ..." |
||
| // should no longer be used. | ||
| func (client *LDClient) Close() { | ||
| if client.IsOffline() { | ||
| return | ||
| } | ||
| client.eventProcessor.close() | ||
| if !client.config.UseLdd { | ||
| client.updateProcessor.close() | ||
| } | ||
| } | ||
|
|
||
| // Immediately flushes queued events. | ||
| func (client *LDClient) Flush() { | ||
| if client.IsOffline() { | ||
| return | ||
| } | ||
| client.eventProcessor.flush() | ||
| } | ||
|
|
||
| // Returns the value of a boolean feature flag for a given user. Returns defaultVal if | ||
| // there is an error, if the flag doesn't exist, or the feature is turned off. | ||
| // there is an error, if the flag doesn't exist, the client hasn't completed initialization, | ||
| // or the feature is turned off. | ||
| func (client *LDClient) Toggle(key string, user User, defaultVal bool) (bool, error) { | ||
| if client.IsOffline() { | ||
| return defaultVal, nil | ||
| } | ||
|
|
||
| value, err := client.evaluate(key, user, defaultVal) | ||
|
|
||
| if err != nil { | ||
|
|
@@ -166,7 +197,7 @@ func (client *LDClient) Toggle(key string, user User, defaultVal bool) (bool, er | |
|
|
||
| if !ok { | ||
| client.sendFlagRequestEvent(key, user, defaultVal, defaultVal) | ||
| return defaultVal, errors.New("Feature flag returned non-bool value") | ||
| return defaultVal, errors.New("Feature flag returned non-boolean value") | ||
| } | ||
|
|
||
| client.sendFlagRequestEvent(key, user, value, defaultVal) | ||
|
|
@@ -176,6 +207,10 @@ func (client *LDClient) Toggle(key string, user User, defaultVal bool) (bool, er | |
| // Returns the value of a feature flag (whose variations are integers) for the given user. | ||
| // Returns defaultVal if there is an error, if the flag doesn't exist, or the feature is turned off. | ||
| func (client *LDClient) IntVariation(key string, user User, defaultVal int) (int, error) { | ||
| if client.IsOffline() { | ||
| return defaultVal, nil | ||
| } | ||
|
|
||
| value, err := client.evaluate(key, user, float64(defaultVal)) | ||
|
|
||
| if err != nil { | ||
|
|
@@ -198,6 +233,10 @@ func (client *LDClient) IntVariation(key string, user User, defaultVal int) (int | |
| // Returns the value of a feature flag (whose variations are floats) for the given user. | ||
| // Returns defaultVal if there is an error, if the flag doesn't exist, or the feature is turned off. | ||
| func (client *LDClient) Float64Variation(key string, user User, defaultVal float64) (float64, error) { | ||
| if client.IsOffline() { | ||
| return defaultVal, nil | ||
| } | ||
|
|
||
| value, err := client.evaluate(key, user, defaultVal) | ||
|
|
||
| if err != nil { | ||
|
|
@@ -217,7 +256,7 @@ func (client *LDClient) Float64Variation(key string, user User, defaultVal float | |
| } | ||
|
|
||
| func (client *LDClient) sendFlagRequestEvent(key string, user User, value, defaultVal interface{}) error { | ||
| if client.offline { | ||
| if client.IsOffline() { | ||
| return nil | ||
| } | ||
| evt := NewFeatureRequestEvent(key, user, value, defaultVal) | ||
|
|
@@ -226,45 +265,24 @@ func (client *LDClient) sendFlagRequestEvent(key string, user User, value, defau | |
|
|
||
| func (client *LDClient) evaluate(key string, user User, defaultVal interface{}) (interface{}, error) { | ||
| var feature Feature | ||
| var streamErr error | ||
| var storeErr error | ||
| var featurePtr *Feature | ||
|
|
||
| if client.IsOffline() { | ||
| return defaultVal, nil | ||
| if !client.Initialized() { | ||
| return defaultVal, ErrClientNotInitialized | ||
| } | ||
|
|
||
| client.InitializeStream() | ||
| if client.IsStreamInitialized() { | ||
| var featurePtr *Feature | ||
| featurePtr, streamErr = client.streamProcessor.GetFeature(key) | ||
|
|
||
| if !client.config.UseLdd && client.IsStreamDisconnected() { | ||
| go func() { | ||
| if feature, err := client.requestor.makeRequest(key, true); err != nil { | ||
| client.config.Logger.Printf("Failed to update feature in fallback mode. Flag values may be stale.") | ||
| } else { | ||
| client.streamProcessor.store.Upsert(*feature.Key, *feature) | ||
| } | ||
| }() | ||
| } | ||
| featurePtr, storeErr = client.store.Get(key) | ||
|
|
||
| if streamErr != nil { | ||
| client.config.Logger.Printf("Encountered error in stream: %+v", streamErr) | ||
| return defaultVal, streamErr | ||
| } | ||
| if storeErr != nil { | ||
| client.config.Logger.Printf("Encountered error fetching feature from store: %+v", storeErr) | ||
| return defaultVal, storeErr | ||
| } | ||
|
|
||
| if featurePtr != nil { | ||
| feature = *featurePtr | ||
| } else { | ||
| return defaultVal, errors.New("Unknown feature key. Verify that this feature key exists. Returning default value.") | ||
| } | ||
| if featurePtr != nil { | ||
| feature = *featurePtr | ||
| } else { | ||
| // If streaming mode is enabled, get the latest version of the feature | ||
| // Otherwise, respect the TTL | ||
| if featurePtr, reqErr := client.requestor.makeRequest(key, client.config.Stream); reqErr != nil { | ||
| return defaultVal, reqErr | ||
| } else { | ||
| feature = *featurePtr | ||
| } | ||
| return defaultVal, errors.New("Unknown feature key. Verify that this feature key exists. Returning default value.") | ||
| } | ||
|
|
||
| value, pass := feature.Evaluate(user) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported var ErrInitializationTimeout should have comment or be unexported