diff --git a/ldclient.go b/ldclient.go index a0ea8b5..e2249a8 100644 --- a/ldclient.go +++ b/ldclient.go @@ -17,9 +17,8 @@ type LDClient struct { apiKey string config Config eventProcessor *eventProcessor - offline bool - streamProcessor *streamProcessor - requestor *requestor + updateProcessor updateProcessor + store FeatureStore } // Exposes advanced configuration options for the LaunchDarkly client. @@ -29,12 +28,20 @@ type Config struct { EventsUri string Capacity int FlushInterval time.Duration + PollInterval time.Duration Logger *log.Logger Timeout time.Duration Stream bool FeatureStore FeatureStore UseLdd bool SendEvents bool + Offline bool +} + +type updateProcessor interface { + initialized() bool + close() + start(chan<- bool) } // Provides the default configuration options for the LaunchDarkly client. @@ -48,45 +55,83 @@ var DefaultConfig = Config{ EventsUri: "https://events.launchdarkly.com", Capacity: 1000, FlushInterval: 5 * time.Second, + PollInterval: 1 * time.Second, Logger: log.New(os.Stderr, "[LaunchDarkly]", log.LstdFlags), Timeout: 3000 * time.Millisecond, Stream: true, FeatureStore: nil, UseLdd: false, SendEvents: true, + Offline: false, } +var ErrInitializationTimeout = errors.New("Timeout encountered waiting for LaunchDarkly client initialization") +var ErrClientNotInitialized = errors.New("Toggle called before LaunchDarkly client initialization completed") + // Creates a new client instance that connects to LaunchDarkly with the default configuration. In most -// cases, you should use this method to instantiate your client. -func MakeClient(apiKey string) *LDClient { - res := MakeCustomClient(apiKey, DefaultConfig) - return &res +// cases, you should use this method to instantiate your client. The optional duration parameter allows callers to +// block until the client has connected to LaunchDarkly and is properly initialized. +func MakeClient(apiKey string, waitFor time.Duration) (*LDClient, error) { + return MakeCustomClient(apiKey, DefaultConfig, waitFor) } -// Creates a new client instance that connects to LaunchDarkly with a custom configuration. -func MakeCustomClient(apiKey string, config Config) LDClient { - var streamProcessor *streamProcessor +// Creates a new client instance that connects to LaunchDarkly with a custom configuration. The optional duration parameter allows callers to +// block until the client has connected to LaunchDarkly and is properly initialized. +func MakeCustomClient(apiKey string, config Config, waitFor time.Duration) (*LDClient, error) { + var updateProcessor updateProcessor + var store FeatureStore + + ch := make(chan bool) config.BaseUri = strings.TrimRight(config.BaseUri, "/") + config.EventsUri = strings.TrimRight(config.EventsUri, "/") requestor := newRequestor(apiKey, config) - if config.Stream { - streamProcessor = newStream(apiKey, config, requestor) + if config.FeatureStore == nil { + config.FeatureStore = NewInMemoryFeatureStore() } - return LDClient{ + if config.PollInterval < (1 * time.Second) { + config.PollInterval = 1 * time.Second + } + + store = config.FeatureStore + + if !config.UseLdd && !config.Offline { + if config.Stream { + updateProcessor = newStreamProcessor(apiKey, config, requestor) + } else { + updateProcessor = newPollingProcessor(config, requestor) + } + updateProcessor.start(ch) + } + + client := LDClient{ apiKey: apiKey, config: config, - requestor: requestor, eventProcessor: newEventProcessor(apiKey, config), - offline: false, - streamProcessor: streamProcessor, + updateProcessor: updateProcessor, + store: store, + } + + timeout := time.After(waitFor) + + for { + select { + case <-ch: + return &client, nil + case <-timeout: + if waitFor > 0 { + return &client, ErrInitializationTimeout + } + return &client, nil + } } } func (client *LDClient) Identify(user User) error { - if client.offline { + if client.IsOffline() { return nil } evt := NewIdentifyEvent(user) @@ -96,65 +141,51 @@ func (client *LDClient) Identify(user User) error { // Tracks that a user has performed an event. Custom data can be attached to the // event, and is serialized to JSON using the encoding/json package (http://golang.org/pkg/encoding/json/). func (client *LDClient) Track(key string, user User, data interface{}) error { - if client.offline { + if client.IsOffline() { return nil } evt := NewCustomEvent(key, user, data) return client.eventProcessor.sendEvent(evt) } -// Puts the LaunchDarkly client in offline mode. In offline mode, no network calls will be made, -// and no events will be recorded. In addition, all calls to Toggle will return the default value. -func (client *LDClient) SetOffline() { - client.offline = true -} - -// Puts the LaunchDarkly client in online mode. -func (client *LDClient) SetOnline() { - client.offline = false -} - // Returns whether the LaunchDarkly client is in offline mode. func (client *LDClient) IsOffline() bool { - return client.offline + return client.config.Offline } -// Eagerly initializes the stream connection. If InitializeStream is not called, the stream will -// be initialized lazily with the first call to Toggle. -func (client *LDClient) InitializeStream() { - if client.config.Stream { - client.streamProcessor.StartOnce() - } -} - -// Returns false if the LaunchDarkly client does not have an active connection to -// the LaunchDarkly streaming endpoint. If streaming mode is disabled in the client -// configuration, this will always return false. -func (client *LDClient) IsStreamDisconnected() bool { - return client.config.Stream == false || client.streamProcessor == nil || client.streamProcessor.ShouldFallbackUpdate() +// Returns whether the LaunchDarkly client is initialized. +func (client *LDClient) Initialized() bool { + return client.IsOffline() || client.config.UseLdd || client.updateProcessor.initialized() } -// Returns whether the LaunchDarkly client has received an initial response from -// the LaunchDarkly streaming endpoint. If this is the case, the client can service -// Toggle calls from the stream. If streaming mode is disabled in the client -// configuration, this will always return false. -func (client *LDClient) IsStreamInitialized() bool { - return client.config.Stream && client.streamProcessor != nil && client.streamProcessor.Initialized() -} - -// Stops the LaunchDarkly client from sending any additional events. +// Shuts down the LaunchDarkly client. After calling this, the LaunchDarkly client +// should no longer be used. func (client *LDClient) Close() { + if client.IsOffline() { + return + } client.eventProcessor.close() + if !client.config.UseLdd { + client.updateProcessor.close() + } } // Immediately flushes queued events. func (client *LDClient) Flush() { + if client.IsOffline() { + return + } client.eventProcessor.flush() } // Returns the value of a boolean feature flag for a given user. Returns defaultVal if -// there is an error, if the flag doesn't exist, or the feature is turned off. +// there is an error, if the flag doesn't exist, the client hasn't completed initialization, +// or the feature is turned off. func (client *LDClient) Toggle(key string, user User, defaultVal bool) (bool, error) { + if client.IsOffline() { + return defaultVal, nil + } + value, err := client.evaluate(key, user, defaultVal) if err != nil { @@ -166,7 +197,7 @@ func (client *LDClient) Toggle(key string, user User, defaultVal bool) (bool, er if !ok { client.sendFlagRequestEvent(key, user, defaultVal, defaultVal) - return defaultVal, errors.New("Feature flag returned non-bool value") + return defaultVal, errors.New("Feature flag returned non-boolean value") } client.sendFlagRequestEvent(key, user, value, defaultVal) @@ -176,6 +207,10 @@ func (client *LDClient) Toggle(key string, user User, defaultVal bool) (bool, er // Returns the value of a feature flag (whose variations are integers) for the given user. // Returns defaultVal if there is an error, if the flag doesn't exist, or the feature is turned off. func (client *LDClient) IntVariation(key string, user User, defaultVal int) (int, error) { + if client.IsOffline() { + return defaultVal, nil + } + value, err := client.evaluate(key, user, float64(defaultVal)) if err != nil { @@ -198,6 +233,10 @@ func (client *LDClient) IntVariation(key string, user User, defaultVal int) (int // Returns the value of a feature flag (whose variations are floats) for the given user. // Returns defaultVal if there is an error, if the flag doesn't exist, or the feature is turned off. func (client *LDClient) Float64Variation(key string, user User, defaultVal float64) (float64, error) { + if client.IsOffline() { + return defaultVal, nil + } + value, err := client.evaluate(key, user, defaultVal) if err != nil { @@ -217,7 +256,7 @@ func (client *LDClient) Float64Variation(key string, user User, defaultVal float } func (client *LDClient) sendFlagRequestEvent(key string, user User, value, defaultVal interface{}) error { - if client.offline { + if client.IsOffline() { return nil } evt := NewFeatureRequestEvent(key, user, value, defaultVal) @@ -226,45 +265,24 @@ func (client *LDClient) sendFlagRequestEvent(key string, user User, value, defau func (client *LDClient) evaluate(key string, user User, defaultVal interface{}) (interface{}, error) { var feature Feature - var streamErr error + var storeErr error + var featurePtr *Feature - if client.IsOffline() { - return defaultVal, nil + if !client.Initialized() { + return defaultVal, ErrClientNotInitialized } - client.InitializeStream() - if client.IsStreamInitialized() { - var featurePtr *Feature - featurePtr, streamErr = client.streamProcessor.GetFeature(key) - - if !client.config.UseLdd && client.IsStreamDisconnected() { - go func() { - if feature, err := client.requestor.makeRequest(key, true); err != nil { - client.config.Logger.Printf("Failed to update feature in fallback mode. Flag values may be stale.") - } else { - client.streamProcessor.store.Upsert(*feature.Key, *feature) - } - }() - } + featurePtr, storeErr = client.store.Get(key) - if streamErr != nil { - client.config.Logger.Printf("Encountered error in stream: %+v", streamErr) - return defaultVal, streamErr - } + if storeErr != nil { + client.config.Logger.Printf("Encountered error fetching feature from store: %+v", storeErr) + return defaultVal, storeErr + } - if featurePtr != nil { - feature = *featurePtr - } else { - return defaultVal, errors.New("Unknown feature key. Verify that this feature key exists. Returning default value.") - } + if featurePtr != nil { + feature = *featurePtr } else { - // If streaming mode is enabled, get the latest version of the feature - // Otherwise, respect the TTL - if featurePtr, reqErr := client.requestor.makeRequest(key, client.config.Stream); reqErr != nil { - return defaultVal, reqErr - } else { - feature = *featurePtr - } + return defaultVal, errors.New("Unknown feature key. Verify that this feature key exists. Returning default value.") } value, pass := feature.Evaluate(user) diff --git a/ldclient_test.go b/ldclient_test.go index 6fb58be..39f1b80 100644 --- a/ldclient_test.go +++ b/ldclient_test.go @@ -13,16 +13,17 @@ var config = Config{ FlushInterval: 5 * time.Second, Logger: log.New(os.Stderr, "[LaunchDarkly]", log.LstdFlags), Timeout: 1500 * time.Millisecond, + Stream: true, + Offline: true, } func TestOfflineModeAlwaysReturnsDefaultValue(t *testing.T) { - client := MakeCustomClient("api_key", config) - client.SetOffline() + client, _ := MakeCustomClient("api_key", config, 0) var key = "foo" res, err := client.Toggle("anything", User{Key: &key}, true) if err != nil { - t.Errorf("Unexpected error in Toggle") + t.Errorf("Unexpected error in Toggle: %+v", err) } if !res { diff --git a/polling.go b/polling.go new file mode 100644 index 0000000..3705945 --- /dev/null +++ b/polling.go @@ -0,0 +1,73 @@ +package ldclient + +import ( + "sync" + "time" +) + +type pollingProcessor struct { + store FeatureStore + requestor *requestor + config Config + setInitializedOnce sync.Once + isInitialized bool + quit chan bool +} + +func newPollingProcessor(config Config, requestor *requestor) updateProcessor { + pp := &pollingProcessor{ + store: config.FeatureStore, + requestor: requestor, + config: config, + quit: make(chan bool), + } + + return pp +} + +func (pp *pollingProcessor) start(ch chan<- bool) { + go func() { + for { + select { + case <-pp.quit: + return + default: + then := time.Now() + err := pp.poll() + if err == nil { + pp.setInitializedOnce.Do(func() { + pp.isInitialized = true + ch <- true + }) + } + delta := pp.config.PollInterval - time.Since(then) + + if delta > 0 { + time.Sleep(delta) + } + } + } + }() +} + +func (pp *pollingProcessor) poll() error { + features, cached, err := pp.requestor.makeAllRequest(true) + + if err != nil { + return err + } + + // We initialize the store only if the request wasn't cached + if !cached { + return pp.store.Init(features) + } + return nil +} + +func (pp *pollingProcessor) close() { + pp.quit <- true +} + +func (pp *pollingProcessor) initialized() bool { + return pp.isInitialized +} diff --git a/requestor.go b/requestor.go index a7bad98..65b4447 100644 --- a/requestor.go +++ b/requestor.go @@ -42,7 +42,7 @@ func newRequestor(apiKey string, config Config) *requestor { return &requestor } -func (r *requestor) makeAllRequest(latest bool) (map[string]*Feature, error) { +func (r *requestor) makeAllRequest(latest bool) (map[string]*Feature, bool, error) { var features map[string]*Feature var resource string @@ -56,7 +56,7 @@ func (r *requestor) makeAllRequest(latest bool) (map[string]*Feature, error) { req, reqErr := http.NewRequest("GET", r.config.BaseUri+resource, nil) if reqErr != nil { - return nil, reqErr + return nil, false, reqErr } req.Header.Add("Authorization", "api_key "+r.apiKey) @@ -72,33 +72,38 @@ func (r *requestor) makeAllRequest(latest bool) (map[string]*Feature, error) { }() if resErr != nil { - return nil, resErr + return nil, false, resErr } if res.StatusCode == http.StatusUnauthorized { - return nil, errors.New("Invalid API key. Verify that your API key is correct. Returning default value.") + return nil, false, errors.New("Invalid API key. Verify that your API key is correct. Returning default value.") } if res.StatusCode == http.StatusNotFound { - return nil, errors.New("Unknown feature key. Verify that this feature key exists. Returning default value.") + return nil, false, errors.New("Unknown feature key. Verify that this feature key exists. Returning default value.") + } + + if res.Header.Get(httpcache.XFromCache) != "" { + return nil, true, nil } if res.StatusCode != http.StatusOK { - return nil, errors.New("Unexpected response code: " + strconv.Itoa(res.StatusCode)) + return nil, false, errors.New("Unexpected response code: " + strconv.Itoa(res.StatusCode)) } body, err := ioutil.ReadAll(res.Body) if err != nil { - return nil, err + return nil, false, err } jsonErr := json.Unmarshal(body, &features) if jsonErr != nil { - return nil, jsonErr + return nil, false, jsonErr } - return features, nil + + return features, false, nil } func (r *requestor) makeRequest(key string, latest bool) (*Feature, error) { diff --git a/streaming.go b/streaming.go index b5d1c4f..dbd5ad6 100644 --- a/streaming.go +++ b/streaming.go @@ -2,7 +2,6 @@ package ldclient import ( "encoding/json" - "errors" es "github.com/launchdarkly/eventsource" "io" "net/http" @@ -20,13 +19,13 @@ const ( ) type streamProcessor struct { - store FeatureStore - requestor *requestor - stream *es.Stream - config Config - disconnected *time.Time - apiKey string - ignition sync.Once + store FeatureStore + requestor *requestor + stream *es.Stream + config Config + apiKey string + setInitializedOnce sync.Once + isInitialized bool sync.RWMutex } @@ -40,38 +39,19 @@ type featureDeleteData struct { Version int `json:"version"` } -func (sp *streamProcessor) Initialized() bool { - return sp.store.Initialized() +func (sp *streamProcessor) initialized() bool { + return sp.isInitialized } -func (sp *streamProcessor) GetFeature(key string) (*Feature, error) { - if !sp.store.Initialized() { - return nil, errors.New("Requested stream data before initialization completed") - } else { - return sp.store.Get(key) - } -} - -func (sp *streamProcessor) ShouldFallbackUpdate() bool { - sp.RLock() - defer sp.RUnlock() - return sp.disconnected != nil && sp.disconnected.Before(time.Now().Add(-2*time.Minute)) -} - -func (sp *streamProcessor) StartOnce() { - sp.ignition.Do(func() { - if !sp.config.UseLdd { - go sp.start() - go sp.errors() - } - }) +func (sp *streamProcessor) start(ch chan<- bool) { + go sp.startOnce(ch) + go sp.errors() } -func (sp *streamProcessor) start() { +func (sp *streamProcessor) startOnce(ch chan<- bool) { for { subscribed := sp.checkSubscribe() if !subscribed { - sp.setDisconnected() time.Sleep(2 * time.Second) continue } @@ -83,7 +63,10 @@ func (sp *streamProcessor) start() { sp.config.Logger.Printf("Unexpected error unmarshalling feature json: %+v", err) } else { sp.store.Init(features) - sp.setConnected() + sp.setInitializedOnce.Do(func() { + sp.isInitialized = true + ch <- true + }) } case patchEvent: var patch featurePatchData @@ -92,7 +75,6 @@ func (sp *streamProcessor) start() { } else { key := strings.TrimLeft(patch.Path, "/") sp.store.Upsert(key, patch.Data) - sp.setConnected() } case indirectPatchEvent: key := event.Data() @@ -100,14 +82,15 @@ func (sp *streamProcessor) start() { sp.config.Logger.Printf("Unexpected error requesting feature: %+v", err) } else { sp.store.Upsert(key, *feature) - sp.setConnected() } case indirectPutEvent: - if features, err := sp.requestor.makeAllRequest(true); err != nil { + if features, _, err := sp.requestor.makeAllRequest(true); err != nil { sp.config.Logger.Printf("Unexpected error requesting all features: %+v", err) } else { sp.store.Init(features) - sp.setConnected() + sp.setInitializedOnce.Do(func() { + sp.isInitialized = true + }) } case deleteEvent: var data featureDeleteData @@ -116,26 +99,16 @@ func (sp *streamProcessor) start() { } else { key := strings.TrimLeft(data.Path, "/") sp.store.Delete(key, data.Version) - sp.setConnected() } default: sp.config.Logger.Printf("Unexpected event found in stream: %s", event.Event()) - sp.setConnected() } } } -func newStream(apiKey string, config Config, requestor *requestor) *streamProcessor { - var store FeatureStore - - if config.FeatureStore != nil { - store = config.FeatureStore - } else { - store = NewInMemoryFeatureStore() - } - +func newStreamProcessor(apiKey string, config Config, requestor *requestor) updateProcessor { sp := &streamProcessor{ - store: store, + store: config.FeatureStore, config: config, apiKey: apiKey, requestor: requestor, @@ -178,7 +151,6 @@ func (sp *streamProcessor) errors() { for { subscribed := sp.checkSubscribe() if !subscribed { - sp.setDisconnected() time.Sleep(2 * time.Second) continue } @@ -188,37 +160,11 @@ func (sp *streamProcessor) errors() { sp.config.Logger.Printf("Error encountered processing stream: %+v", err) } if err != nil { - sp.setDisconnected() } } } -func (sp *streamProcessor) setConnected() { - sp.RLock() - if sp.disconnected != nil { - sp.RUnlock() - sp.Lock() - defer sp.Unlock() - if sp.disconnected != nil { - sp.disconnected = nil - } - } else { - sp.RUnlock() - } - -} - -func (sp *streamProcessor) setDisconnected() { - sp.RLock() - if sp.disconnected == nil { - sp.RUnlock() - sp.Lock() - defer sp.Unlock() - if sp.disconnected == nil { - now := time.Now() - sp.disconnected = &now - } - } else { - sp.RUnlock() - } +func (sp *streamProcessor) close() { + // TODO : the EventSource library doesn't support close() yet. + // when it does, call it here }