Skip to content

Commit

Permalink
query: cleanup store statuses as they come and go
Browse files Browse the repository at this point in the history
Signed-off-by: Adrien Fillon <adrien.fillon@cdiscount.com>
  • Loading branch information
adrien-f committed Mar 12, 2019
1 parent 432785e commit 01972a8
Showing 1 changed file with 46 additions and 15 deletions.
61 changes: 46 additions & 15 deletions pkg/query/storeset.go
Expand Up @@ -22,6 +22,7 @@ import (
const (
unhealthyStoreMessage = "removing store because it's unhealthy or does not exist"
droppingStoreMessage = "dropping store, external labels are not unique"
storeUnhealthyTimeout = time.Minute
)

type StoreSpec interface {
Expand All @@ -36,13 +37,14 @@ type StoreSpec interface {
}

type StoreStatus struct {
Name string
LastCheck time.Time
LastError error
Labels []storepb.Label
StoreType component.StoreAPI
MinTime int64
MaxTime int64
Name string
LastCheck time.Time
LastHealthyCheck *time.Time
LastError error
Labels []storepb.Label
StoreType component.StoreAPI
MinTime int64
MaxTime int64
}

type grpcStoreSpec struct {
Expand Down Expand Up @@ -251,6 +253,7 @@ func (s *StoreSet) Update(ctx context.Context) {
}
s.externalLabelStores = externalLabelStores
s.storeNodeConnections.Set(float64(len(s.stores)))
s.cleanUpStoreStatuses()
}

func (s *StoreSet) getHealthyStores(ctx context.Context) map[string]*storeRef {
Expand Down Expand Up @@ -341,16 +344,30 @@ func (s *StoreSet) updateStoreStatus(store *storeRef, err error) {
s.storesStatusesMtx.Lock()
defer s.storesStatusesMtx.Unlock()

var status StoreStatus
now := time.Now()
s.storeStatuses[store.addr] = &StoreStatus{
Name: store.addr,
LastError: err,
LastCheck: now,
Labels: store.labels,
StoreType: store.storeType,
MinTime: store.minTime,
MaxTime: store.maxTime,

if previousStatus, ok := s.storeStatuses[store.addr]; ok {
status = *previousStatus
status.LastError = err
status.LastCheck = now
} else {
status = StoreStatus{
Name: store.addr,
LastError: err,
LastCheck: now,
}
}

if err == nil {
status.LastHealthyCheck = &now
status.Labels = store.labels
status.StoreType = store.storeType
status.MinTime = store.minTime
status.MaxTime = store.maxTime
}

s.storeStatuses[store.addr] = &status
}

func (s *StoreSet) GetStoreStatus() []StoreStatus {
Expand Down Expand Up @@ -397,3 +414,17 @@ func (s *StoreSet) Close() {
st.close()
}
}

func (s *StoreSet) cleanUpStoreStatuses() {
s.storesStatusesMtx.RLock()
defer s.storesStatusesMtx.RUnlock()

now := time.Now()
for k, status := range s.storeStatuses {
if _, ok := s.stores[k]; !ok {
if status.LastHealthyCheck != nil && now.Sub(*status.LastHealthyCheck) >= storeUnhealthyTimeout {
delete(s.storeStatuses, k)
}
}
}
}

0 comments on commit 01972a8

Please sign in to comment.