Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better handle etcd compaction in multi-apiserver #28202

Merged
merged 2 commits into from
Jul 15, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
111 changes: 84 additions & 27 deletions pkg/storage/etcd3/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package etcd3

import (
"strconv"
"sync"
"time"

Expand All @@ -25,7 +26,10 @@ import (
"golang.org/x/net/context"
)

const compactInterval = 10 * time.Minute
const (
compactInterval = 10 * time.Minute
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reasoning behind the 10 minute interval?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, how would compaction affect etcd performance when under pressure?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reasoning behind the 10 minute interval?

There is already comment in there:
"We save the most recent 10 minutes of data which should be enough for slow watcher and to tolerate burst. Ideally, when the storage layer can take advantage of mvcc, we might increase the interval to 12h."

how would compaction affect etcd performance when under pressure?

It will slightly affect, but won't pause.

compactRevKey = "compact_rev_key"
)

var (
endpointsMapMu sync.Mutex
Expand All @@ -36,38 +40,76 @@ func init() {
endpointsMap = make(map[string]struct{})
}

// StartCompactor starts a compactor in the background in order to compact keys
// older than fixed time.
// We need to compact keys because we can't let on disk data grow forever.
// We save the most recent 10 minutes data. It should be enough for slow watchers and to tolerate burst.
// TODO: We might keep a longer history (12h) in the future once storage API can take
// advantage of multi-version key.
// StartCompactor starts a compactor in the background to compact old version of keys that's not needed.
// By default, we save the most recent 10 minutes data and compact versions > 10minutes ago.
// It should be enough for slow watchers and to tolerate burst.
// TODO: We might keep a longer history (12h) in the future once storage API can take advantage of past version of keys.
func StartCompactor(ctx context.Context, client *clientv3.Client) {
endpointsMapMu.Lock()
defer endpointsMapMu.Unlock()

// We can't have multiple compaction jobs for the same cluster.
// In one process, we can have only one compactor for one cluster.
// Currently we rely on endpoints to differentiate clusters.
var emptyStruct struct{}
for _, ep := range client.Endpoints() {
if _, ok := endpointsMap[ep]; ok {
glog.V(4).Infof("compactor already exists for endpoints %v", client.Endpoints())
return
}
}
for _, ep := range client.Endpoints() {
endpointsMap[ep] = emptyStruct
endpointsMap[ep] = struct{}{}
}

go compactor(ctx, client, compactInterval)
}

// compactor periodically compacts historical versions of keys in etcd.
// After compaction, old versions of keys set before given interval will be gone.
// Any API call for the old versions of keys will return error.
// interval: the interval between each compaction. The first compaction happens after "interval".
// It will compact keys with versions older than given interval.
// In other words, after compaction, it will only contain keys set during last interval.
// Any API call for the older versions of keys will return error.
// Interval is the time interval between each compaction. The first compaction happens after "interval".
func compactor(ctx context.Context, client *clientv3.Client, interval time.Duration) {
var curRev int64
// Technical definitions:
// We have a special key in etcd defined as *compactRevKey*.
// compactRevKey's value will be set to the string of last compacted revision.
// compactRevKey's version will be used as logical time for comparison. THe version is referred as compact time.
// Initially, because the key doesn't exist, the compact time (version) is 0.
//
// Algorithm:
// - Compare to see if (local compact_time) = (remote compact_time).
// - If yes, increment both local and remote compact_time, and do a compaction.
// - If not, set local to remote compact_time.
//
// Technical details/insights:
//
// The protocol here is lease based. If one compactor CAS successfully, the others would know it when they fail in
// CAS later and would try again in 10 minutes. If an APIServer crashed, another one would "take over" the lease.
//
// For example, in the following diagram, we have a compactor C1 doing compaction in t1, t2. Another compactor C2
// at t1' (t1 < t1' < t2) would CAS fail, set its known oldRev to rev at t1, and try again in t2' (t2' > t2).
// If C1 crashed and wouldn't compact at t2, C2 would CAS successfully at t2'.
//
// oldRev(t2) curRev(t2)
// +
// oldRev curRev |
// + + |
// | | |
// | | t1' | t2'
// +---v-------------v----^---------v------^---->
// t0 t1 t2
//
// We have the guarantees:
// - in normal cases, the interval is 10 minutes.
// - in failover, the interval is >10m and <20m
//
// FAQ:
// - What if time is not accurate? We don't care as long as someone did the compaction. Atomicity is ensured using
// etcd API.
// - What happened under heavy load scenarios? Initially, each apiserver will do only one compaction
// every 10 minutes. This is very unlikely affecting or affected w.r.t. server load.

var compactTime int64
var rev int64
var err error
for {
select {
Expand All @@ -76,29 +118,44 @@ func compactor(ctx context.Context, client *clientv3.Client, interval time.Durat
return
}

curRev, err = compact(ctx, client, curRev)
compactTime, rev, err = compact(ctx, client, compactTime, rev)
if err != nil {
glog.Error(err)
glog.Errorf("etcd: endpoint (%v) compact failed: %v", client.Endpoints(), err)
continue
}
}
}

// compact compacts etcd store and returns current rev.
// If it couldn't get current revision, the old rev will be returned.
func compact(ctx context.Context, client *clientv3.Client, oldRev int64) (int64, error) {
resp, err := client.Get(ctx, "/")
// It will return the current compact time and global revision if no error occurred.
// Note that CAS fail will not incur any error.
func compact(ctx context.Context, client *clientv3.Client, t, rev int64) (int64, int64, error) {
resp, err := client.KV.Txn(ctx).If(
clientv3.Compare(clientv3.Version(compactRevKey), "=", t),
).Then(
clientv3.OpPut(compactRevKey, strconv.FormatInt(rev, 10)), // Expect side effect: increment Version
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just so I'm clear, in the case where you had 10 api-servers and 2 somehow raced on this put b/c the compare matched, only 1 would succeed b/c the put is atomic.

).Else(
clientv3.OpGet(compactRevKey),
).Commit()
if err != nil {
return oldRev, err
return t, rev, err
}

curRev := resp.Header.Revision
if oldRev == 0 {
return curRev, nil

if !resp.Succeeded {
curTime := resp.Responses[0].GetResponseRange().Kvs[0].Version
return curTime, curRev, nil
}
err = client.Compact(ctx, oldRev)
if err != nil {
return curRev, err
curTime := t + 1

if rev == 0 {
// We don't compact on bootstrap.
return curTime, curRev, nil
}
if err = client.Compact(ctx, rev); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still worry that we could be doing random compaction under heavy load scenarios. Could we outline a TODO for an ideal case where this becomes a low-priority back-ground task when load is low.

return curTime, curRev, err
}
glog.Infof("etcd: Compacted rev %d, endpoints %v", oldRev, client.Endpoints())
return curRev, nil
glog.Infof("etcd: compacted rev (%d), endpoints (%v)", rev, client.Endpoints())
return curTime, curRev, nil
}
36 changes: 35 additions & 1 deletion pkg/storage/etcd3/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestCompact(t *testing.T) {
t.Fatalf("Put failed: %v", err)
}

_, err = compact(ctx, client, putResp.Header.Revision)
_, _, err = compact(ctx, client, 0, putResp.Header.Revision)
if err != nil {
t.Fatalf("compact failed: %v", err)
}
Expand All @@ -46,3 +46,37 @@ func TestCompact(t *testing.T) {
t.Errorf("Expecting ErrCompacted, but get=%v", err)
}
}

// TestCompactConflict tests that two compactors (Let's use C1, C2) are trying to compact etcd cluster with the same
// logical time.
// - C1 compacts first. It will succeed.
// - C2 compacts after. It will fail. But it will get latest logical time, which should be larger by one.
func TestCompactConflict(t *testing.T) {
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
client := cluster.RandClient()
ctx := context.Background()

putResp, err := client.Put(ctx, "/somekey", "data")
if err != nil {
t.Fatalf("Put failed: %v", err)
}

// Compact first. It would do the compaction and return compact time which is incremented by 1.
curTime, _, err := compact(ctx, client, 0, putResp.Header.Revision)
if err != nil {
t.Fatalf("compact failed: %v", err)
}
if curTime != 1 {
t.Errorf("Expect current logical time = 1, get = %v", curTime)
}

// Compact again with the same parameters. It won't do compaction but return the latest compact time.
curTime2, _, err := compact(ctx, client, 0, putResp.Header.Revision)
if err != nil {
t.Fatalf("compact failed: %v", err)
}
if curTime != curTime2 {
t.Errorf("Unexpected curTime (%v) != curTime2 (%v)", curTime, curTime2)
}
}