From eea903e7b54bb85024ca45f24f23840b96111749 Mon Sep 17 00:00:00 2001 From: John Kodumal Date: Fri, 12 Feb 2016 20:17:53 -0800 Subject: [PATCH 01/11] Refactoring to switch to bounded resource consumption --- ldclient.go | 188 +++++++++++++++++++++++++++------------------------ polling.go | 18 +++++ streaming.go | 102 +++++++--------------------- 3 files changed, 142 insertions(+), 166 deletions(-) create mode 100644 polling.go diff --git a/ldclient.go b/ldclient.go index a0ea8b5..8c7296d 100644 --- a/ldclient.go +++ b/ldclient.go @@ -17,9 +17,8 @@ type LDClient struct { apiKey string config Config eventProcessor *eventProcessor - offline bool - streamProcessor *streamProcessor - requestor *requestor + updateProcessor updateProcessor + store FeatureStore } // Exposes advanced configuration options for the LaunchDarkly client. @@ -35,6 +34,13 @@ type Config struct { FeatureStore FeatureStore UseLdd bool SendEvents bool + Offline bool +} + +type updateProcessor interface { + initialized() bool + close() + start(chan<- bool) } // Provides the default configuration options for the LaunchDarkly client. @@ -54,39 +60,72 @@ var DefaultConfig = Config{ FeatureStore: nil, UseLdd: false, SendEvents: true, + Offline: false, } +var InitializationTimeoutError = errors.New("Timeout encountered waiting for LaunchDarkly client initialization") +var ClientNotInitializedError = errors.New("Toggle called before LaunchDarkly client initialization completed") + // Creates a new client instance that connects to LaunchDarkly with the default configuration. In most -// cases, you should use this method to instantiate your client. -func MakeClient(apiKey string) *LDClient { - res := MakeCustomClient(apiKey, DefaultConfig) - return &res +// cases, you should use this method to instantiate your client. The optional duration parameter allows callers to +// block until the client has connected to LaunchDarkly and is properly initialized. +func MakeClient(apiKey string, waitFor time.Duration) (*LDClient, error) { + return MakeCustomClient(apiKey, DefaultConfig, waitFor) } -// Creates a new client instance that connects to LaunchDarkly with a custom configuration. -func MakeCustomClient(apiKey string, config Config) LDClient { - var streamProcessor *streamProcessor +// Creates a new client instance that connects to LaunchDarkly with a custom configuration. The optional duration parameter allows callers to +// block until the client has connected to LaunchDarkly and is properly initialized. +func MakeCustomClient(apiKey string, config Config, waitFor time.Duration) (*LDClient, error) { + var updateProcessor updateProcessor + var store FeatureStore + + ch := make(chan bool) config.BaseUri = strings.TrimRight(config.BaseUri, "/") + config.EventsUri = strings.TrimRight(config.EventsUri, "/") requestor := newRequestor(apiKey, config) - if config.Stream { - streamProcessor = newStream(apiKey, config, requestor) + if config.FeatureStore != nil { + store = config.FeatureStore + } else { + store = NewInMemoryFeatureStore() } - return LDClient{ + if !config.UseLdd && !config.Offline { + if config.Stream { + updateProcessor = newStreamProcessor(apiKey, config, store, requestor) + } else { + updateProcessor = newPollingProcessor(apiKey, config, store, requestor) + } + updateProcessor.start(ch) + } + + client := LDClient{ apiKey: apiKey, config: config, - requestor: requestor, eventProcessor: newEventProcessor(apiKey, config), - offline: false, - streamProcessor: streamProcessor, + updateProcessor: updateProcessor, + } + + timeout := time.After(waitFor) + + for { + select { + case <-ch: + return &client, nil + case <-timeout: + if waitFor > 0 { + return &client, InitializationTimeoutError + } else { + return &client, nil + } + } } } func (client *LDClient) Identify(user User) error { - if client.offline { + if client.IsOffline() { return nil } evt := NewIdentifyEvent(user) @@ -96,65 +135,51 @@ func (client *LDClient) Identify(user User) error { // Tracks that a user has performed an event. Custom data can be attached to the // event, and is serialized to JSON using the encoding/json package (http://golang.org/pkg/encoding/json/). func (client *LDClient) Track(key string, user User, data interface{}) error { - if client.offline { + if client.IsOffline() { return nil } evt := NewCustomEvent(key, user, data) return client.eventProcessor.sendEvent(evt) } -// Puts the LaunchDarkly client in offline mode. In offline mode, no network calls will be made, -// and no events will be recorded. In addition, all calls to Toggle will return the default value. -func (client *LDClient) SetOffline() { - client.offline = true -} - -// Puts the LaunchDarkly client in online mode. -func (client *LDClient) SetOnline() { - client.offline = false -} - // Returns whether the LaunchDarkly client is in offline mode. func (client *LDClient) IsOffline() bool { - return client.offline -} - -// Eagerly initializes the stream connection. If InitializeStream is not called, the stream will -// be initialized lazily with the first call to Toggle. -func (client *LDClient) InitializeStream() { - if client.config.Stream { - client.streamProcessor.StartOnce() - } -} - -// Returns false if the LaunchDarkly client does not have an active connection to -// the LaunchDarkly streaming endpoint. If streaming mode is disabled in the client -// configuration, this will always return false. -func (client *LDClient) IsStreamDisconnected() bool { - return client.config.Stream == false || client.streamProcessor == nil || client.streamProcessor.ShouldFallbackUpdate() + return client.config.Offline } -// Returns whether the LaunchDarkly client has received an initial response from -// the LaunchDarkly streaming endpoint. If this is the case, the client can service -// Toggle calls from the stream. If streaming mode is disabled in the client -// configuration, this will always return false. -func (client *LDClient) IsStreamInitialized() bool { - return client.config.Stream && client.streamProcessor != nil && client.streamProcessor.Initialized() +// Returns whether the LaunchDarkly client is initialized. +func (client *LDClient) Initialized() bool { + return client.IsOffline() || client.config.UseLdd || client.updateProcessor.initialized() } -// Stops the LaunchDarkly client from sending any additional events. +// Shuts down the LaunchDarkly client. After calling this, the LaunchDarkly client +// should no longer be used. func (client *LDClient) Close() { + if client.IsOffline() { + return + } client.eventProcessor.close() + if !client.config.UseLdd { + client.updateProcessor.close() + } } // Immediately flushes queued events. func (client *LDClient) Flush() { + if client.IsOffline() { + return + } client.eventProcessor.flush() } // Returns the value of a boolean feature flag for a given user. Returns defaultVal if -// there is an error, if the flag doesn't exist, or the feature is turned off. +// there is an error, if the flag doesn't exist, the client hasn't completed initialization, +// or the feature is turned off. func (client *LDClient) Toggle(key string, user User, defaultVal bool) (bool, error) { + if client.IsOffline() { + return defaultVal, nil + } + value, err := client.evaluate(key, user, defaultVal) if err != nil { @@ -166,7 +191,7 @@ func (client *LDClient) Toggle(key string, user User, defaultVal bool) (bool, er if !ok { client.sendFlagRequestEvent(key, user, defaultVal, defaultVal) - return defaultVal, errors.New("Feature flag returned non-bool value") + return defaultVal, errors.New("Feature flag returned non-boolean value") } client.sendFlagRequestEvent(key, user, value, defaultVal) @@ -176,6 +201,10 @@ func (client *LDClient) Toggle(key string, user User, defaultVal bool) (bool, er // Returns the value of a feature flag (whose variations are integers) for the given user. // Returns defaultVal if there is an error, if the flag doesn't exist, or the feature is turned off. func (client *LDClient) IntVariation(key string, user User, defaultVal int) (int, error) { + if client.IsOffline() { + return defaultVal, nil + } + value, err := client.evaluate(key, user, float64(defaultVal)) if err != nil { @@ -198,6 +227,10 @@ func (client *LDClient) IntVariation(key string, user User, defaultVal int) (int // Returns the value of a feature flag (whose variations are floats) for the given user. // Returns defaultVal if there is an error, if the flag doesn't exist, or the feature is turned off. func (client *LDClient) Float64Variation(key string, user User, defaultVal float64) (float64, error) { + if client.IsOffline() { + return defaultVal, nil + } + value, err := client.evaluate(key, user, defaultVal) if err != nil { @@ -217,7 +250,7 @@ func (client *LDClient) Float64Variation(key string, user User, defaultVal float } func (client *LDClient) sendFlagRequestEvent(key string, user User, value, defaultVal interface{}) error { - if client.offline { + if client.IsOffline() { return nil } evt := NewFeatureRequestEvent(key, user, value, defaultVal) @@ -226,45 +259,24 @@ func (client *LDClient) sendFlagRequestEvent(key string, user User, value, defau func (client *LDClient) evaluate(key string, user User, defaultVal interface{}) (interface{}, error) { var feature Feature - var streamErr error + var storeErr error + var featurePtr *Feature - if client.IsOffline() { - return defaultVal, nil + if !client.Initialized() { + return defaultVal, ClientNotInitializedError } - client.InitializeStream() - if client.IsStreamInitialized() { - var featurePtr *Feature - featurePtr, streamErr = client.streamProcessor.GetFeature(key) - - if !client.config.UseLdd && client.IsStreamDisconnected() { - go func() { - if feature, err := client.requestor.makeRequest(key, true); err != nil { - client.config.Logger.Printf("Failed to update feature in fallback mode. Flag values may be stale.") - } else { - client.streamProcessor.store.Upsert(*feature.Key, *feature) - } - }() - } + featurePtr, storeErr = client.store.Get(key) - if streamErr != nil { - client.config.Logger.Printf("Encountered error in stream: %+v", streamErr) - return defaultVal, streamErr - } + if storeErr != nil { + client.config.Logger.Printf("Encountered error fetching feature from store: %+v", storeErr) + return defaultVal, storeErr + } - if featurePtr != nil { - feature = *featurePtr - } else { - return defaultVal, errors.New("Unknown feature key. Verify that this feature key exists. Returning default value.") - } + if featurePtr != nil { + feature = *featurePtr } else { - // If streaming mode is enabled, get the latest version of the feature - // Otherwise, respect the TTL - if featurePtr, reqErr := client.requestor.makeRequest(key, client.config.Stream); reqErr != nil { - return defaultVal, reqErr - } else { - feature = *featurePtr - } + return defaultVal, errors.New("Unknown feature key. Verify that this feature key exists. Returning default value.") } value, pass := feature.Evaluate(user) diff --git a/polling.go b/polling.go new file mode 100644 index 0000000..d5127a6 --- /dev/null +++ b/polling.go @@ -0,0 +1,18 @@ +package ldclient + +type pollingProcessor struct { +} + +// TODO +func newPollingProcessor(apiKey string, config Config, store FeatureStore, requestor *requestor) updateProcessor { + return nil +} + +// TODO +func (pp *pollingProcessor) start(ch <-chan bool) { +} + +// TODO +func (pp *pollingProcessor) close() { + +} diff --git a/streaming.go b/streaming.go index b5d1c4f..939ae00 100644 --- a/streaming.go +++ b/streaming.go @@ -2,7 +2,6 @@ package ldclient import ( "encoding/json" - "errors" es "github.com/launchdarkly/eventsource" "io" "net/http" @@ -20,13 +19,13 @@ const ( ) type streamProcessor struct { - store FeatureStore - requestor *requestor - stream *es.Stream - config Config - disconnected *time.Time - apiKey string - ignition sync.Once + store FeatureStore + requestor *requestor + stream *es.Stream + config Config + apiKey string + setInitializedOnce sync.Once + isInitialized bool sync.RWMutex } @@ -40,38 +39,19 @@ type featureDeleteData struct { Version int `json:"version"` } -func (sp *streamProcessor) Initialized() bool { - return sp.store.Initialized() +func (sp *streamProcessor) initialized() bool { + return sp.isInitialized } -func (sp *streamProcessor) GetFeature(key string) (*Feature, error) { - if !sp.store.Initialized() { - return nil, errors.New("Requested stream data before initialization completed") - } else { - return sp.store.Get(key) - } -} - -func (sp *streamProcessor) ShouldFallbackUpdate() bool { - sp.RLock() - defer sp.RUnlock() - return sp.disconnected != nil && sp.disconnected.Before(time.Now().Add(-2*time.Minute)) -} - -func (sp *streamProcessor) StartOnce() { - sp.ignition.Do(func() { - if !sp.config.UseLdd { - go sp.start() - go sp.errors() - } - }) +func (sp *streamProcessor) start(ch chan<- bool) { + go sp.startOnce(ch) + go sp.errors() } -func (sp *streamProcessor) start() { +func (sp *streamProcessor) startOnce(ch chan<- bool) { for { subscribed := sp.checkSubscribe() if !subscribed { - sp.setDisconnected() time.Sleep(2 * time.Second) continue } @@ -83,7 +63,10 @@ func (sp *streamProcessor) start() { sp.config.Logger.Printf("Unexpected error unmarshalling feature json: %+v", err) } else { sp.store.Init(features) - sp.setConnected() + sp.setInitializedOnce.Do(func() { + sp.isInitialized = true + ch <- true + }) } case patchEvent: var patch featurePatchData @@ -92,7 +75,6 @@ func (sp *streamProcessor) start() { } else { key := strings.TrimLeft(patch.Path, "/") sp.store.Upsert(key, patch.Data) - sp.setConnected() } case indirectPatchEvent: key := event.Data() @@ -100,14 +82,15 @@ func (sp *streamProcessor) start() { sp.config.Logger.Printf("Unexpected error requesting feature: %+v", err) } else { sp.store.Upsert(key, *feature) - sp.setConnected() } case indirectPutEvent: if features, err := sp.requestor.makeAllRequest(true); err != nil { sp.config.Logger.Printf("Unexpected error requesting all features: %+v", err) } else { sp.store.Init(features) - sp.setConnected() + sp.setInitializedOnce.Do(func() { + sp.isInitialized = true + }) } case deleteEvent: var data featureDeleteData @@ -116,24 +99,14 @@ func (sp *streamProcessor) start() { } else { key := strings.TrimLeft(data.Path, "/") sp.store.Delete(key, data.Version) - sp.setConnected() } default: sp.config.Logger.Printf("Unexpected event found in stream: %s", event.Event()) - sp.setConnected() } } } -func newStream(apiKey string, config Config, requestor *requestor) *streamProcessor { - var store FeatureStore - - if config.FeatureStore != nil { - store = config.FeatureStore - } else { - store = NewInMemoryFeatureStore() - } - +func newStreamProcessor(apiKey string, config Config, store FeatureStore, requestor *requestor) updateProcessor { sp := &streamProcessor{ store: store, config: config, @@ -178,7 +151,6 @@ func (sp *streamProcessor) errors() { for { subscribed := sp.checkSubscribe() if !subscribed { - sp.setDisconnected() time.Sleep(2 * time.Second) continue } @@ -188,37 +160,11 @@ func (sp *streamProcessor) errors() { sp.config.Logger.Printf("Error encountered processing stream: %+v", err) } if err != nil { - sp.setDisconnected() } } } -func (sp *streamProcessor) setConnected() { - sp.RLock() - if sp.disconnected != nil { - sp.RUnlock() - sp.Lock() - defer sp.Unlock() - if sp.disconnected != nil { - sp.disconnected = nil - } - } else { - sp.RUnlock() - } - -} - -func (sp *streamProcessor) setDisconnected() { - sp.RLock() - if sp.disconnected == nil { - sp.RUnlock() - sp.Lock() - defer sp.Unlock() - if sp.disconnected == nil { - now := time.Now() - sp.disconnected = &now - } - } else { - sp.RUnlock() - } +func (sp *streamProcessor) close() { + // TODO : the EventSource library doesn't support close() yet. + // when it does, call it here } From 5479d87042d32428f9cca50c22e297cb988621c1 Mon Sep 17 00:00:00 2001 From: John Kodumal Date: Fri, 12 Feb 2016 20:20:41 -0800 Subject: [PATCH 02/11] Set the store --- ldclient.go | 1 + 1 file changed, 1 insertion(+) diff --git a/ldclient.go b/ldclient.go index 8c7296d..a3301cc 100644 --- a/ldclient.go +++ b/ldclient.go @@ -106,6 +106,7 @@ func MakeCustomClient(apiKey string, config Config, waitFor time.Duration) (*LDC config: config, eventProcessor: newEventProcessor(apiKey, config), updateProcessor: updateProcessor, + store: store, } timeout := time.After(waitFor) From a6604680f45f5b4697377df6a9b84be1281e4137 Mon Sep 17 00:00:00 2001 From: John Kodumal Date: Fri, 12 Feb 2016 20:24:19 -0800 Subject: [PATCH 03/11] Panic for now if we try to use the polling processor --- polling.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/polling.go b/polling.go index d5127a6..40ed036 100644 --- a/polling.go +++ b/polling.go @@ -5,14 +5,15 @@ type pollingProcessor struct { // TODO func newPollingProcessor(apiKey string, config Config, store FeatureStore, requestor *requestor) updateProcessor { - return nil + panic("Can't get there from here") } // TODO func (pp *pollingProcessor) start(ch <-chan bool) { + panic("Can't get there from here") } // TODO func (pp *pollingProcessor) close() { - + panic("Can't get there from here") } From e24da1b95435a85d464087c76b61cb422acf95ea Mon Sep 17 00:00:00 2001 From: John Kodumal Date: Fri, 12 Feb 2016 20:37:12 -0800 Subject: [PATCH 04/11] Fix tests --- ldclient_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/ldclient_test.go b/ldclient_test.go index 6fb58be..39f1b80 100644 --- a/ldclient_test.go +++ b/ldclient_test.go @@ -13,16 +13,17 @@ var config = Config{ FlushInterval: 5 * time.Second, Logger: log.New(os.Stderr, "[LaunchDarkly]", log.LstdFlags), Timeout: 1500 * time.Millisecond, + Stream: true, + Offline: true, } func TestOfflineModeAlwaysReturnsDefaultValue(t *testing.T) { - client := MakeCustomClient("api_key", config) - client.SetOffline() + client, _ := MakeCustomClient("api_key", config, 0) var key = "foo" res, err := client.Toggle("anything", User{Key: &key}, true) if err != nil { - t.Errorf("Unexpected error in Toggle") + t.Errorf("Unexpected error in Toggle: %+v", err) } if !res { From 0a430e29f6cb83521a342e92c7272fbfbfe86072 Mon Sep 17 00:00:00 2001 From: John Kodumal Date: Sat, 13 Feb 2016 00:12:43 -0800 Subject: [PATCH 05/11] Initial implementation of polling processor --- ldclient.go | 2 +- polling.go | 67 +++++++++++++++++++++++++++++++++++++++++++++------- requestor.go | 41 +++++++++++++++++++++++++------- streaming.go | 2 +- 4 files changed, 93 insertions(+), 19 deletions(-) diff --git a/ldclient.go b/ldclient.go index a3301cc..9f09d21 100644 --- a/ldclient.go +++ b/ldclient.go @@ -96,7 +96,7 @@ func MakeCustomClient(apiKey string, config Config, waitFor time.Duration) (*LDC if config.Stream { updateProcessor = newStreamProcessor(apiKey, config, store, requestor) } else { - updateProcessor = newPollingProcessor(apiKey, config, store, requestor) + updateProcessor = newPollingProcessor(config, store, requestor) } updateProcessor.start(ch) } diff --git a/polling.go b/polling.go index 40ed036..a13a49a 100644 --- a/polling.go +++ b/polling.go @@ -1,19 +1,70 @@ package ldclient +import ( + "sync" + "time" +) + type pollingProcessor struct { + store FeatureStore + requestor *requestor + config Config + setInitializedOnce sync.Once + isInitialized bool + lastHeaders *cacheHeaders } -// TODO -func newPollingProcessor(apiKey string, config Config, store FeatureStore, requestor *requestor) updateProcessor { - panic("Can't get there from here") +func newPollingProcessor(config Config, store FeatureStore, requestor *requestor) updateProcessor { + pp := &pollingProcessor{ + store: store, + requestor: requestor, + config: config, + } + + return pp +} + +func (pp *pollingProcessor) start(ch chan<- bool) { + go func() { + for { + then := time.Now() + err := pp.poll() + if err == nil { + pp.setInitializedOnce.Do(func() { + pp.isInitialized = true + ch <- true + }) + } + delta := (1 * time.Second) - time.Since(then) + + if delta > 0 { + time.Sleep(delta) + } + } + }() } -// TODO -func (pp *pollingProcessor) start(ch <-chan bool) { - panic("Can't get there from here") +func (pp *pollingProcessor) poll() error { + features, nextHdrs, err := pp.requestor.makeAllRequest(pp.lastHeaders, true) + + if err != nil { + return err + } + + // We get nextHdrs only if we got a 200 response, which means we need to + // update the store. Otherwise we'll have gotten a 304 (do nothing) or an + // error + if nextHdrs != nil { + return pp.store.Init(features) + } + return nil } -// TODO +// TODO add support for canceling the goroutine func (pp *pollingProcessor) close() { - panic("Can't get there from here") + +} + +func (pp *pollingProcessor) initialized() bool { + return pp.isInitialized } diff --git a/requestor.go b/requestor.go index a7bad98..4ea95f9 100644 --- a/requestor.go +++ b/requestor.go @@ -17,6 +17,11 @@ type requestor struct { config Config } +type cacheHeaders struct { + etag string + lastModified string +} + func newRequestor(apiKey string, config Config) *requestor { baseTransport := httpcontrol.Transport{ RequestTimeout: config.Timeout, @@ -42,7 +47,7 @@ func newRequestor(apiKey string, config Config) *requestor { return &requestor } -func (r *requestor) makeAllRequest(latest bool) (map[string]*Feature, error) { +func (r *requestor) makeAllRequest(ch *cacheHeaders, latest bool) (map[string]*Feature, *cacheHeaders, error) { var features map[string]*Feature var resource string @@ -56,12 +61,20 @@ func (r *requestor) makeAllRequest(latest bool) (map[string]*Feature, error) { req, reqErr := http.NewRequest("GET", r.config.BaseUri+resource, nil) if reqErr != nil { - return nil, reqErr + return nil, nil, reqErr } req.Header.Add("Authorization", "api_key "+r.apiKey) req.Header.Add("User-Agent", "GoClient/"+Version) + if ch != nil && ch.etag != "" { + req.Header.Add("If-None-Match", ch.etag) + } + + if ch != nil && ch.lastModified != "" { + req.Header.Add("If-Modified-Since", ch.lastModified) + } + res, resErr := r.httpClient.Do(req) defer func() { @@ -72,33 +85,43 @@ func (r *requestor) makeAllRequest(latest bool) (map[string]*Feature, error) { }() if resErr != nil { - return nil, resErr + return nil, nil, resErr } if res.StatusCode == http.StatusUnauthorized { - return nil, errors.New("Invalid API key. Verify that your API key is correct. Returning default value.") + return nil, nil, errors.New("Invalid API key. Verify that your API key is correct. Returning default value.") } if res.StatusCode == http.StatusNotFound { - return nil, errors.New("Unknown feature key. Verify that this feature key exists. Returning default value.") + return nil, nil, errors.New("Unknown feature key. Verify that this feature key exists. Returning default value.") + } + + if res.StatusCode == http.StatusNotModified { + return nil, nil, nil } if res.StatusCode != http.StatusOK { - return nil, errors.New("Unexpected response code: " + strconv.Itoa(res.StatusCode)) + return nil, nil, errors.New("Unexpected response code: " + strconv.Itoa(res.StatusCode)) } body, err := ioutil.ReadAll(res.Body) if err != nil { - return nil, err + return nil, nil, err } jsonErr := json.Unmarshal(body, &features) if jsonErr != nil { - return nil, jsonErr + return nil, nil, jsonErr + } + + newHeaders := cacheHeaders{ + etag: res.Header.Get("ETag"), + lastModified: res.Header.Get("LastModified"), } - return features, nil + + return features, &newHeaders, nil } func (r *requestor) makeRequest(key string, latest bool) (*Feature, error) { diff --git a/streaming.go b/streaming.go index 939ae00..f11e77a 100644 --- a/streaming.go +++ b/streaming.go @@ -84,7 +84,7 @@ func (sp *streamProcessor) startOnce(ch chan<- bool) { sp.store.Upsert(key, *feature) } case indirectPutEvent: - if features, err := sp.requestor.makeAllRequest(true); err != nil { + if features, _, err := sp.requestor.makeAllRequest(nil, true); err != nil { sp.config.Logger.Printf("Unexpected error requesting all features: %+v", err) } else { sp.store.Init(features) From 69f7a77b50a25862fc2567c46a74f97f92a5808c Mon Sep 17 00:00:00 2001 From: John Kodumal Date: Sat, 13 Feb 2016 00:43:28 -0800 Subject: [PATCH 06/11] Add support for closing the polling update processor --- polling.go | 32 +++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/polling.go b/polling.go index a13a49a..a4659b2 100644 --- a/polling.go +++ b/polling.go @@ -12,6 +12,7 @@ type pollingProcessor struct { setInitializedOnce sync.Once isInitialized bool lastHeaders *cacheHeaders + quit chan bool } func newPollingProcessor(config Config, store FeatureStore, requestor *requestor) updateProcessor { @@ -19,6 +20,7 @@ func newPollingProcessor(config Config, store FeatureStore, requestor *requestor store: store, requestor: requestor, config: config, + quit: make(chan bool), } return pp @@ -27,18 +29,23 @@ func newPollingProcessor(config Config, store FeatureStore, requestor *requestor func (pp *pollingProcessor) start(ch chan<- bool) { go func() { for { - then := time.Now() - err := pp.poll() - if err == nil { - pp.setInitializedOnce.Do(func() { - pp.isInitialized = true - ch <- true - }) - } - delta := (1 * time.Second) - time.Since(then) + select { + case <-pp.quit: + return + default: + then := time.Now() + err := pp.poll() + if err == nil { + pp.setInitializedOnce.Do(func() { + pp.isInitialized = true + ch <- true + }) + } + delta := (1 * time.Second) - time.Since(then) - if delta > 0 { - time.Sleep(delta) + if delta > 0 { + time.Sleep(delta) + } } } }() @@ -60,9 +67,8 @@ func (pp *pollingProcessor) poll() error { return nil } -// TODO add support for canceling the goroutine func (pp *pollingProcessor) close() { - + pp.quit <- true } func (pp *pollingProcessor) initialized() bool { From 8b23de9a633ef02ed0ecb6cdec3b28e9e0a6925d Mon Sep 17 00:00:00 2001 From: John Kodumal Date: Sat, 13 Feb 2016 00:46:13 -0800 Subject: [PATCH 07/11] Address hound comments --- ldclient.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/ldclient.go b/ldclient.go index 9f09d21..21208fd 100644 --- a/ldclient.go +++ b/ldclient.go @@ -63,8 +63,8 @@ var DefaultConfig = Config{ Offline: false, } -var InitializationTimeoutError = errors.New("Timeout encountered waiting for LaunchDarkly client initialization") -var ClientNotInitializedError = errors.New("Toggle called before LaunchDarkly client initialization completed") +var ErrInitializationTimeout = errors.New("Timeout encountered waiting for LaunchDarkly client initialization") +var ErrClientNotInitialized = errors.New("Toggle called before LaunchDarkly client initialization completed") // Creates a new client instance that connects to LaunchDarkly with the default configuration. In most // cases, you should use this method to instantiate your client. The optional duration parameter allows callers to @@ -117,10 +117,9 @@ func MakeCustomClient(apiKey string, config Config, waitFor time.Duration) (*LDC return &client, nil case <-timeout: if waitFor > 0 { - return &client, InitializationTimeoutError - } else { - return &client, nil + return &client, ErrInitializationTimeout } + return &client, nil } } } @@ -264,7 +263,7 @@ func (client *LDClient) evaluate(key string, user User, defaultVal interface{}) var featurePtr *Feature if !client.Initialized() { - return defaultVal, ClientNotInitializedError + return defaultVal, ErrClientNotInitialized } featurePtr, storeErr = client.store.Get(key) From a3b2207acc2ad93d93d77e40daa3ca159746c3d1 Mon Sep 17 00:00:00 2001 From: John Kodumal Date: Tue, 16 Feb 2016 15:43:59 -0800 Subject: [PATCH 08/11] Set the polling interval in the config, and enforce a minimum interval of 1 second --- ldclient.go | 18 ++++++++++++------ polling.go | 6 +++--- streaming.go | 4 ++-- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/ldclient.go b/ldclient.go index 21208fd..e2249a8 100644 --- a/ldclient.go +++ b/ldclient.go @@ -28,6 +28,7 @@ type Config struct { EventsUri string Capacity int FlushInterval time.Duration + PollInterval time.Duration Logger *log.Logger Timeout time.Duration Stream bool @@ -54,6 +55,7 @@ var DefaultConfig = Config{ EventsUri: "https://events.launchdarkly.com", Capacity: 1000, FlushInterval: 5 * time.Second, + PollInterval: 1 * time.Second, Logger: log.New(os.Stderr, "[LaunchDarkly]", log.LstdFlags), Timeout: 3000 * time.Millisecond, Stream: true, @@ -86,17 +88,21 @@ func MakeCustomClient(apiKey string, config Config, waitFor time.Duration) (*LDC requestor := newRequestor(apiKey, config) - if config.FeatureStore != nil { - store = config.FeatureStore - } else { - store = NewInMemoryFeatureStore() + if config.FeatureStore == nil { + config.FeatureStore = NewInMemoryFeatureStore() + } + + if config.PollInterval < (1 * time.Second) { + config.PollInterval = 1 * time.Second } + store = config.FeatureStore + if !config.UseLdd && !config.Offline { if config.Stream { - updateProcessor = newStreamProcessor(apiKey, config, store, requestor) + updateProcessor = newStreamProcessor(apiKey, config, requestor) } else { - updateProcessor = newPollingProcessor(config, store, requestor) + updateProcessor = newPollingProcessor(config, requestor) } updateProcessor.start(ch) } diff --git a/polling.go b/polling.go index a4659b2..ea2354b 100644 --- a/polling.go +++ b/polling.go @@ -15,9 +15,9 @@ type pollingProcessor struct { quit chan bool } -func newPollingProcessor(config Config, store FeatureStore, requestor *requestor) updateProcessor { +func newPollingProcessor(config Config, requestor *requestor) updateProcessor { pp := &pollingProcessor{ - store: store, + store: config.FeatureStore, requestor: requestor, config: config, quit: make(chan bool), @@ -41,7 +41,7 @@ func (pp *pollingProcessor) start(ch chan<- bool) { ch <- true }) } - delta := (1 * time.Second) - time.Since(then) + delta := pp.config.PollInterval - time.Since(then) if delta > 0 { time.Sleep(delta) diff --git a/streaming.go b/streaming.go index f11e77a..1241085 100644 --- a/streaming.go +++ b/streaming.go @@ -106,9 +106,9 @@ func (sp *streamProcessor) startOnce(ch chan<- bool) { } } -func newStreamProcessor(apiKey string, config Config, store FeatureStore, requestor *requestor) updateProcessor { +func newStreamProcessor(apiKey string, config Config, requestor *requestor) updateProcessor { sp := &streamProcessor{ - store: store, + store: config.FeatureStore, config: config, apiKey: apiKey, requestor: requestor, From 8171106f0d62f493af5024a04f1a7cae758c24e6 Mon Sep 17 00:00:00 2001 From: John Kodumal Date: Fri, 19 Feb 2016 22:21:20 -0800 Subject: [PATCH 09/11] Set the last headers --- polling.go | 1 + 1 file changed, 1 insertion(+) diff --git a/polling.go b/polling.go index ea2354b..a300f2e 100644 --- a/polling.go +++ b/polling.go @@ -62,6 +62,7 @@ func (pp *pollingProcessor) poll() error { // update the store. Otherwise we'll have gotten a 304 (do nothing) or an // error if nextHdrs != nil { + pp.lastHeaders = nextHdrs return pp.store.Init(features) } return nil From 590d2cee9fd23cc1d2584e8ec601d8603bb7532f Mon Sep 17 00:00:00 2001 From: John Kodumal Date: Fri, 19 Feb 2016 22:34:33 -0800 Subject: [PATCH 10/11] Let the caching library handle etags for us --- polling.go | 10 +++------- requestor.go | 41 ++++++++++++----------------------------- streaming.go | 2 +- 3 files changed, 16 insertions(+), 37 deletions(-) diff --git a/polling.go b/polling.go index a300f2e..3705945 100644 --- a/polling.go +++ b/polling.go @@ -11,7 +11,6 @@ type pollingProcessor struct { config Config setInitializedOnce sync.Once isInitialized bool - lastHeaders *cacheHeaders quit chan bool } @@ -52,17 +51,14 @@ func (pp *pollingProcessor) start(ch chan<- bool) { } func (pp *pollingProcessor) poll() error { - features, nextHdrs, err := pp.requestor.makeAllRequest(pp.lastHeaders, true) + features, cached, err := pp.requestor.makeAllRequest(true) if err != nil { return err } - // We get nextHdrs only if we got a 200 response, which means we need to - // update the store. Otherwise we'll have gotten a 304 (do nothing) or an - // error - if nextHdrs != nil { - pp.lastHeaders = nextHdrs + // We initialize the store only if the request wasn't cached + if !cached { return pp.store.Init(features) } return nil diff --git a/requestor.go b/requestor.go index 4ea95f9..17ecd7b 100644 --- a/requestor.go +++ b/requestor.go @@ -3,6 +3,7 @@ package ldclient import ( "encoding/json" "errors" + "fmt" "github.com/facebookgo/httpcontrol" "github.com/gregjones/httpcache" "io/ioutil" @@ -17,11 +18,6 @@ type requestor struct { config Config } -type cacheHeaders struct { - etag string - lastModified string -} - func newRequestor(apiKey string, config Config) *requestor { baseTransport := httpcontrol.Transport{ RequestTimeout: config.Timeout, @@ -47,7 +43,7 @@ func newRequestor(apiKey string, config Config) *requestor { return &requestor } -func (r *requestor) makeAllRequest(ch *cacheHeaders, latest bool) (map[string]*Feature, *cacheHeaders, error) { +func (r *requestor) makeAllRequest(latest bool) (map[string]*Feature, bool, error) { var features map[string]*Feature var resource string @@ -61,20 +57,12 @@ func (r *requestor) makeAllRequest(ch *cacheHeaders, latest bool) (map[string]*F req, reqErr := http.NewRequest("GET", r.config.BaseUri+resource, nil) if reqErr != nil { - return nil, nil, reqErr + return nil, false, reqErr } req.Header.Add("Authorization", "api_key "+r.apiKey) req.Header.Add("User-Agent", "GoClient/"+Version) - if ch != nil && ch.etag != "" { - req.Header.Add("If-None-Match", ch.etag) - } - - if ch != nil && ch.lastModified != "" { - req.Header.Add("If-Modified-Since", ch.lastModified) - } - res, resErr := r.httpClient.Do(req) defer func() { @@ -85,43 +73,38 @@ func (r *requestor) makeAllRequest(ch *cacheHeaders, latest bool) (map[string]*F }() if resErr != nil { - return nil, nil, resErr + return nil, false, resErr } if res.StatusCode == http.StatusUnauthorized { - return nil, nil, errors.New("Invalid API key. Verify that your API key is correct. Returning default value.") + return nil, false, errors.New("Invalid API key. Verify that your API key is correct. Returning default value.") } if res.StatusCode == http.StatusNotFound { - return nil, nil, errors.New("Unknown feature key. Verify that this feature key exists. Returning default value.") + return nil, false, errors.New("Unknown feature key. Verify that this feature key exists. Returning default value.") } - if res.StatusCode == http.StatusNotModified { - return nil, nil, nil + if res.Header.Get(httpcache.XFromCache) != "" { + return nil, true, nil } if res.StatusCode != http.StatusOK { - return nil, nil, errors.New("Unexpected response code: " + strconv.Itoa(res.StatusCode)) + return nil, false, errors.New("Unexpected response code: " + strconv.Itoa(res.StatusCode)) } body, err := ioutil.ReadAll(res.Body) if err != nil { - return nil, nil, err + return nil, false, err } jsonErr := json.Unmarshal(body, &features) if jsonErr != nil { - return nil, nil, jsonErr - } - - newHeaders := cacheHeaders{ - etag: res.Header.Get("ETag"), - lastModified: res.Header.Get("LastModified"), + return nil, false, jsonErr } - return features, &newHeaders, nil + return features, false, nil } func (r *requestor) makeRequest(key string, latest bool) (*Feature, error) { diff --git a/streaming.go b/streaming.go index 1241085..dbd5ad6 100644 --- a/streaming.go +++ b/streaming.go @@ -84,7 +84,7 @@ func (sp *streamProcessor) startOnce(ch chan<- bool) { sp.store.Upsert(key, *feature) } case indirectPutEvent: - if features, _, err := sp.requestor.makeAllRequest(nil, true); err != nil { + if features, _, err := sp.requestor.makeAllRequest(true); err != nil { sp.config.Logger.Printf("Unexpected error requesting all features: %+v", err) } else { sp.store.Init(features) From ee0dc1b201af77f4434904e1732cca6a2190df98 Mon Sep 17 00:00:00 2001 From: John Kodumal Date: Fri, 19 Feb 2016 22:36:38 -0800 Subject: [PATCH 11/11] Remove unused import --- requestor.go | 1 - 1 file changed, 1 deletion(-) diff --git a/requestor.go b/requestor.go index 17ecd7b..65b4447 100644 --- a/requestor.go +++ b/requestor.go @@ -3,7 +3,6 @@ package ldclient import ( "encoding/json" "errors" - "fmt" "github.com/facebookgo/httpcontrol" "github.com/gregjones/httpcache" "io/ioutil"