Skip to content

Commit

Permalink
Merge pull request #549 from github/auto-populate-kv
Browse files Browse the repository at this point in the history
Auto populate kv
  • Loading branch information
Shlomi Noach committed Nov 29, 2018
2 parents fffc10f + 6e5c0f5 commit d755e1e
Show file tree
Hide file tree
Showing 9 changed files with 140 additions and 39 deletions.
10 changes: 6 additions & 4 deletions docs/kv.md
Expand Up @@ -26,11 +26,11 @@ The most common scenario is to update a Proxy to direct cluster's write traffic

Clusters' master entries are populated on:

- Encountering a new cluster, or encountering a master for which there is no existing KV entry. This check runs automatically and periodically.
- The periodic check first consults with `orchestrator`'s internal KV store. It will only attempt to populate external stores (`Consul`, `Zookeeper`) if the internal store does not already have the master entries.
It follows that the periodic checks will only inject external KV _once_.
- An actual failover: `orchestrator` overwrites existing entry with identity of new master
- A manual request for entry population

For each cluster, you will want to make one manual request for entry population. KV stores have no initial knowledge of your setup. `orchestrator` does, but does not routinely update the stores. Use:

- A manual request for entry population:
- `orchestrator-client -c submit-masters-to-kv-stores` to submit all clusters' masters to KV, or
- `orchestrator-client -c submit-masters-to-kv-stores -alias mycluster` to submit the master of `mycluster` to KV

Expand All @@ -44,6 +44,8 @@ Clusters' master entries are populated on:

respectively.

Both actual failover and manual request will override any existing KV entries, internal and external.

### KV and orchestrator/raft

On an [orchestrator/raft](raft.md) setup, all KV writes go through the `raft` protocol. Thus, once the leader determines a write needs to be made to KV stores, it publishes the request to all `raft` nodes. Each of the nodes will apply the write independently, based on its own configuration.
Expand Down
7 changes: 1 addition & 6 deletions go/app/cli.go
Expand Up @@ -1222,15 +1222,10 @@ func Cli(command string, strict bool, instance string, destination string, owner
clusterName := getClusterName(clusterAlias, instanceKey)
log.Debugf("cluster name is <%s>", clusterName)

kvPairs, err := inst.GetMastersKVPairs(clusterName)
kvPairs, _, err := logic.SubmitMastersToKvStores(clusterName, true)
if err != nil {
log.Fatale(err)
}
for _, kvPair := range kvPairs {
if err := kv.PutKVPair(kvPair); err != nil {
log.Fatale(err)
}
}
for _, kvPair := range kvPairs {
fmt.Println(fmt.Sprintf("%s:%s", kvPair.Key, kvPair.Value))
}
Expand Down
16 changes: 2 additions & 14 deletions go/http/api.go
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/github/orchestrator/go/config"
"github.com/github/orchestrator/go/discovery"
"github.com/github/orchestrator/go/inst"
"github.com/github/orchestrator/go/kv"
"github.com/github/orchestrator/go/logic"
"github.com/github/orchestrator/go/metrics/query"
"github.com/github/orchestrator/go/process"
Expand Down Expand Up @@ -1779,23 +1778,12 @@ func (this *HttpAPI) SubmitMastersToKvStores(params martini.Params, r render.Ren
Respond(r, &APIResponse{Code: ERROR, Message: fmt.Sprintf("%+v", err)})
return
}
kvPairs, err := inst.GetMastersKVPairs(clusterName)
kvPairs, submittedCount, err := logic.SubmitMastersToKvStores(clusterName, true)
if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: fmt.Sprintf("%+v", err)})
return
}
for _, kvPair := range kvPairs {
if orcraft.IsRaftEnabled() {
_, err = orcraft.PublishCommand("put-key-value", kvPair)
} else {
err = kv.PutKVPair(kvPair)
}
if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: fmt.Sprintf("%+v", err)})
return
}
}
Respond(r, &APIResponse{Code: OK, Message: fmt.Sprintf("Submitted %d masters", len(kvPairs)), Details: kvPairs})
Respond(r, &APIResponse{Code: OK, Message: fmt.Sprintf("Submitted %d masters", submittedCount), Details: kvPairs})
}

// Clusters provides list of known masters
Expand Down
13 changes: 9 additions & 4 deletions go/kv/consul.go
Expand Up @@ -55,13 +55,18 @@ func (this *consulStore) PutKeyValue(key string, value string) (err error) {
return err
}

func (this *consulStore) GetKeyValue(key string) (value string, err error) {
func (this *consulStore) GetKeyValue(key string) (value string, found bool, err error) {
if this.client == nil {
return value, nil
return value, found, nil
}
pair, _, err := this.client.KV().Get(key, nil)
if err != nil {
return value, err
return value, found, err
}
return string(pair.Value), nil
return string(pair.Value), (pair != nil), nil
}

func (this *consulStore) AddKeyValue(key string, value string) (added bool, err error) {
err = this.PutKeyValue(key, value)
return (err != nil), err
}
27 changes: 24 additions & 3 deletions go/kv/internal.go
Expand Up @@ -43,20 +43,41 @@ func (this *internalKVStore) PutKeyValue(key string, value string) (err error) {
return log.Errore(err)
}

func (this *internalKVStore) GetKeyValue(key string) (value string, err error) {
func (this *internalKVStore) GetKeyValue(key string) (value string, found bool, err error) {
query := `
select
store_value
from
kv_store
where
key = ?
store_key = ?
`

err = db.QueryOrchestrator(query, sqlutils.Args(key), func(m sqlutils.RowMap) error {
value = m.GetString("store_value")
found = true
return nil
})

return value, log.Errore(err)
return value, found, log.Errore(err)
}

func (this *internalKVStore) AddKeyValue(key string, value string) (added bool, err error) {
sqlResult, err := db.ExecOrchestrator(`
insert ignore
into kv_store (
store_key, store_value, last_updated
) values (
?, ?, now()
)
`, key, value,
)
if err != nil {
return false, log.Errore(err)
}
rowsAffected, err := sqlResult.RowsAffected()
if err != nil {
return false, log.Errore(err)
}
return (rowsAffected > 0), nil
}
28 changes: 25 additions & 3 deletions go/kv/kv.go
Expand Up @@ -36,7 +36,9 @@ func (this *KVPair) String() string {

type KVStore interface {
PutKeyValue(key string, value string) (err error)
GetKeyValue(key string) (value string, err error)
GetKeyValue(key string) (value string, found bool, err error)

AddKeyValue(key string, value string) (added bool, err error)
}

var kvMutex sync.Mutex
Expand Down Expand Up @@ -66,12 +68,12 @@ func getKVStores() (stores []KVStore) {
return stores
}

func GetValue(key string) (value string, err error) {
func GetValue(key string) (value string, found bool, err error) {
for _, store := range getKVStores() {
// It's really only the first (internal) that matters here
return store.GetKeyValue(key)
}
return value, err
return value, found, err
}

func PutValue(key string, value string) (err error) {
Expand All @@ -89,3 +91,23 @@ func PutKVPair(kvPair *KVPair) (err error) {
}
return PutValue(kvPair.Key, kvPair.Value)
}

func AddValue(key string, value string) (err error) {
for _, store := range getKVStores() {
added, err := store.AddKeyValue(key, value)
if err != nil {
return err
}
if !added {
return nil
}
}
return nil
}

func AddKVPair(kvPair *KVPair) (err error) {
if kvPair == nil {
return nil
}
return AddValue(kvPair.Key, kvPair.Value)
}
13 changes: 9 additions & 4 deletions go/kv/zk.go
Expand Up @@ -64,13 +64,18 @@ func (this *zkStore) PutKeyValue(key string, value string) (err error) {
return err
}

func (this *zkStore) GetKeyValue(key string) (value string, err error) {
func (this *zkStore) GetKeyValue(key string) (value string, found bool, err error) {
if this.zook == nil {
return value, nil
return value, false, nil
}
result, err := this.zook.Get(normalizeKey(key))
if err != nil {
return value, err
return value, false, err
}
return string(result), nil
return string(result), true, nil
}

func (this *zkStore) AddKeyValue(key string, value string) (added bool, err error) {
err = this.PutKeyValue(key, value)
return (err != nil), err
}
11 changes: 11 additions & 0 deletions go/logic/command_applier.go
Expand Up @@ -77,6 +77,8 @@ func (applier *CommandApplier) ApplyCommand(op string, value []byte) interface{}
return applier.enableGlobalRecoveries(value)
case "put-key-value":
return applier.putKeyValue(value)
case "add-key-value":
return applier.addKeyValue(value)
case "leader-uri":
return applier.leaderURI(value)
case "request-health-report":
Expand Down Expand Up @@ -258,6 +260,15 @@ func (applier *CommandApplier) putKeyValue(value []byte) interface{} {
return err
}

func (applier *CommandApplier) addKeyValue(value []byte) interface{} {
kvPair := kv.KVPair{}
if err := json.Unmarshal(value, &kvPair); err != nil {
return log.Errore(err)
}
err := kv.AddKVPair(&kvPair)
return err
}

func (applier *CommandApplier) leaderURI(value []byte) interface{} {
var uri string
if err := json.Unmarshal(value, &uri); err != nil {
Expand Down
54 changes: 53 additions & 1 deletion go/logic/orchestrator.go
Expand Up @@ -70,6 +70,7 @@ var isElectedNode int64 = 0

var recentDiscoveryOperationKeys *cache.Cache
var pseudoGTIDPublishCache = cache.New(time.Minute, time.Second)
var kvFoundCache = cache.New(10*time.Minute, time.Minute)

func init() {
snapshotDiscoveryKeys = make(chan inst.InstanceKey, 10)
Expand Down Expand Up @@ -405,6 +406,49 @@ func InjectPseudoGTIDOnWriters() error {
return nil
}

// Write a cluster's master (or all clusters masters) to kv stores.
// This should generally only happen once in a lifetime of a cluster. Otherwise KV
// stores are updated via failovers.
func SubmitMastersToKvStores(clusterName string, force bool) (kvPairs [](*kv.KVPair), submittedCount int, err error) {
kvPairs, err = inst.GetMastersKVPairs(clusterName)
if err != nil {
return kvPairs, submittedCount, log.Errore(err)
}
command := "add-key-value"
applyFunc := kv.AddKVPair
if force {
command = "put-key-value"
applyFunc = kv.PutKVPair
}
var selectedError error
for _, kvPair := range kvPairs {
if !force {
// !force: Called periodically to auto-populate KV
// We'd like to avoid some overhead.
if _, found := kvFoundCache.Get(kvPair.Key); found {
// Let's not overload database with queries. Let's not overload raft with events.
continue
}
if v, found, err := kv.GetValue(kvPair.Key); err == nil && found && v == kvPair.Value {
// Already has the right value.
kvFoundCache.Set(kvPair.Key, true, cache.DefaultExpiration)
continue
}
}
if orcraft.IsRaftEnabled() {
_, err = orcraft.PublishCommand(command, kvPair)
} else {
err = applyFunc(kvPair)
}
if err == nil {
submittedCount++
} else {
selectedError = err
}
}
return kvPairs, submittedCount, log.Errore(selectedError)
}

// ContinuousDiscovery starts an asynchronuous infinite discovery process where instances are
// periodically investigated and their status captured, and long since unseen instances are
// purged and forgotten.
Expand All @@ -429,6 +473,10 @@ func ContinuousDiscovery() {
snapshotTopologiesTick = time.Tick(time.Duration(config.Config.SnapshotTopologiesIntervalHours) * time.Hour)
}

runCheckAndRecoverOperationsTimeRipe := func() bool {
return time.Since(continuousDiscoveryStartTime) >= checkAndRecoverWaitPeriod
}

go ometrics.InitMetrics()
go ometrics.InitGraphiteMetrics()
go acceptSignals()
Expand Down Expand Up @@ -495,6 +543,10 @@ func ContinuousDiscovery() {
go ExpireFailureDetectionHistory()
go ExpireTopologyRecoveryHistory()
go ExpireTopologyRecoveryStepsHistory()

if runCheckAndRecoverOperationsTimeRipe() && IsLeader() {
go SubmitMastersToKvStores("", false)
}
} else {
// Take this opportunity to refresh yourself
go inst.LoadHostnameResolveCache()
Expand All @@ -520,7 +572,7 @@ func ContinuousDiscovery() {
} else {
return
}
if time.Since(continuousDiscoveryStartTime) >= checkAndRecoverWaitPeriod {
if runCheckAndRecoverOperationsTimeRipe() {
CheckAndRecover(nil, nil, false)
} else {
log.Debugf("Waiting for %+v seconds to pass before running failure detection/recovery", checkAndRecoverWaitPeriod.Seconds())
Expand Down

0 comments on commit d755e1e

Please sign in to comment.