Skip to content

Commit

Permalink
replication: simplify parallel replication variables & expose them in…
Browse files Browse the repository at this point in the history
… config

closes #140
  • Loading branch information
problame committed Mar 14, 2021
1 parent 07f2bff commit 0ceea1b
Show file tree
Hide file tree
Showing 13 changed files with 246 additions and 47 deletions.
8 changes: 7 additions & 1 deletion config/config.go
Expand Up @@ -99,14 +99,20 @@ type RecvOptions struct {
}

type Replication struct {
Protection *ReplicationOptionsProtection `yaml:"protection,optional,fromdefaults"`
Protection *ReplicationOptionsProtection `yaml:"protection,optional,fromdefaults"`
Concurrency *ReplicationOptionsConcurrency `yaml:"concurrency,optional,fromdefaults"`
}

type ReplicationOptionsProtection struct {
Initial string `yaml:"initial,optional,default=guarantee_resumability"`
Incremental string `yaml:"incremental,optional,default=guarantee_resumability"`
}

type ReplicationOptionsConcurrency struct {
Steps int `yaml:"steps,optional,default=1"`
SizeEstimates int `yaml:"size_estimates,optional,default=4"`
}

func (l *RecvOptions) SetDefault() {
*l = RecvOptions{Properties: &PropertyRecvOptions{}}
}
Expand Down
36 changes: 31 additions & 5 deletions daemon/job/active.go
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/prometheus/common/log"

"github.com/zrepl/zrepl/daemon/logging/trace"
"github.com/zrepl/zrepl/util/envconst"

"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/job/reset"
Expand All @@ -33,6 +34,8 @@ type ActiveSide struct {
name endpoint.JobID
connecter transport.Connecter

replicationDriverConfig driver.Config

prunerFactory *pruner.PrunerFactory

promRepStateSecs *prometheus.HistogramVec // labels: state
Expand Down Expand Up @@ -159,8 +162,12 @@ func modePushFromConfig(g *config.Global, in *config.PushJob, jobID endpoint.Job
}

m.plannerPolicy = &logic.PlannerPolicy{
EncryptedSend: logic.TriFromBool(in.Send.Encrypted),
ReplicationConfig: replicationConfig,
EncryptedSend: logic.TriFromBool(in.Send.Encrypted),
ReplicationConfig: replicationConfig,
SizeEstimationConcurrency: in.Replication.Concurrency.SizeEstimates,
}
if err := m.plannerPolicy.Validate(); err != nil {
return nil, errors.Wrap(err, "cannot build planner policy")
}

if m.snapper, err = snapper.FromConfig(g, m.senderConfig.FSF, in.Snapshotting); err != nil {
Expand Down Expand Up @@ -254,8 +261,12 @@ func modePullFromConfig(g *config.Global, in *config.PullJob, jobID endpoint.Job
}

m.plannerPolicy = &logic.PlannerPolicy{
EncryptedSend: logic.DontCare,
ReplicationConfig: replicationConfig,
EncryptedSend: logic.DontCare,
ReplicationConfig: replicationConfig,
SizeEstimationConcurrency: in.Replication.Concurrency.SizeEstimates,
}
if err := m.plannerPolicy.Validate(); err != nil {
return nil, errors.Wrap(err, "cannot build planner policy")
}

m.receiverConfig, err = buildReceiverConfig(in, jobID)
Expand All @@ -266,6 +277,16 @@ func modePullFromConfig(g *config.Global, in *config.PullJob, jobID endpoint.Job
return m, nil
}

func replicationDriverConfigFromConfig(in *config.Replication) (c driver.Config, err error) {
c = driver.Config{
StepQueueConcurrency: in.Concurrency.Steps,
MaxAttempts: envconst.Int("ZREPL_REPLICATION_MAX_ATTEMPTS", 3),
ReconnectHardFailTimeout: envconst.Duration("ZREPL_REPLICATION_RECONNECT_HARD_FAIL_TIMEOUT", 10*time.Minute),
}
err = c.Validate()
return c, err
}

func activeSide(g *config.Global, in *config.ActiveJob, configJob interface{}) (j *ActiveSide, err error) {

j = &ActiveSide{}
Expand Down Expand Up @@ -326,6 +347,11 @@ func activeSide(g *config.Global, in *config.ActiveJob, configJob interface{}) (
return nil, err
}

j.replicationDriverConfig, err = replicationDriverConfigFromConfig(in.Replication)
if err != nil {
return nil, errors.Wrap(err, "cannot build replication driver config")
}

return j, nil
}

Expand Down Expand Up @@ -459,7 +485,7 @@ func (j *ActiveSide) do(ctx context.Context) {
*tasks = activeSideTasks{}
tasks.replicationCancel = func() { repCancel(); endSpan() }
tasks.replicationReport, repWait = replication.Do(
ctx, logic.NewPlanner(j.promRepStateSecs, j.promBytesReplicated, sender, receiver, j.mode.PlannerPolicy()),
ctx, j.replicationDriverConfig, logic.NewPlanner(j.promRepStateSecs, j.promBytesReplicated, sender, receiver, j.mode.PlannerPolicy()),
)
tasks.state = ActiveSideReplicating
})
Expand Down
110 changes: 110 additions & 0 deletions daemon/job/build_jobs_test.go
Expand Up @@ -144,3 +144,113 @@ func TestSampleConfigsAreBuiltWithoutErrors(t *testing.T) {
}

}

func TestReplicationOptions(t *testing.T) {
tmpl := `
jobs:
- name: foo
type: push
connect:
type: local
listener_name: foo
client_identity: bar
filesystems: {"<": true}
%s
snapshotting:
type: manual
pruning:
keep_sender:
- type: last_n
count: 10
keep_receiver:
- type: last_n
count: 10
`

type Test struct {
name string
input string
expectOk func(t *testing.T, a *ActiveSide, m *modePush)
expectError bool
}

tests := []Test{
{
name: "defaults",
input: `
replication: {}
`,
expectOk: func(t *testing.T, a *ActiveSide, m *modePush) {},
},
{
name: "steps_zero",
input: `
replication:
concurrency:
steps: 0
`,
expectError: true,
},
{
name: "size_estimates_zero",
input: `
replication:
concurrency:
size_estimates: 0
`,
expectError: true,
},
{
name: "custom_values",
input: `
replication:
concurrency:
steps: 23
size_estimates: 42
`,
expectOk: func(t *testing.T, a *ActiveSide, m *modePush) {
assert.Equal(t, 23, a.replicationDriverConfig.StepQueueConcurrency)
assert.Equal(t, 42, m.plannerPolicy.SizeEstimationConcurrency)
},
},
{
name: "negative_values_forbidden",
input: `
replication:
concurrency:
steps: -23
size_estimates: -42
`,
expectError: true,
},
}

fill := func(s string) string { return fmt.Sprintf(tmpl, s) }

for _, ts := range tests {
t.Run(ts.name, func(t *testing.T) {
assert.True(t, (ts.expectError) != (ts.expectOk != nil))

cstr := fill(ts.input)
t.Logf("testing config:\n%s", cstr)
c, err := config.ParseConfigBytes([]byte(cstr))
require.NoError(t, err)
jobs, err := JobsFromConfig(c)
if ts.expectOk != nil {
require.NoError(t, err)
require.NotNil(t, c)
require.NoError(t, err)
require.Len(t, jobs, 1)
a := jobs[0].(*ActiveSide)
m := a.mode.(*modePush)
ts.expectOk(t, a, m)
} else if ts.expectError {
require.Error(t, err)
} else {
t.Fatalf("test must define expectOk or expectError")
}

})
}

}
27 changes: 27 additions & 0 deletions docs/configuration/replication.rst
Expand Up @@ -14,6 +14,10 @@ Replication Options
protection:
initial: guarantee_resumability # guarantee_{resumability,incremental,nothing}
incremental: guarantee_resumability # guarantee_{resumability,incremental,nothing}
concurrency:
size_estimates: 4
steps: 1

...

.. _replication-option-protection:
Expand Down Expand Up @@ -45,3 +49,26 @@ which is useful if replication happens so rarely (or fails so frequently) that t

When changing this flag, obsoleted zrepl-managed bookmarks and holds will be destroyed on the next replication step that is attempted for each filesystem.


.. _replication-option-concurrency:

``concurrency`` option
----------------------

The ``concurrency`` options control the maximum amount of concurrency during replication.
The default values allow some concurrency during size estimation but no parallelism for the actual replication.

* ``concurrency.steps`` (default = 1) controls the maximum number of concurrently executed :ref:`replication steps <overview-how-replication-works>`.
The planning step for each file system is counted as a single step.
* ``concurrency.size_estimates`` (default = 4) controls the maximum number of concurrent step size estimations done by the job.

Note that initial replication cannot start replicating child filesystems before the parent filesystem's initial replication step has completed.

Some notes on tuning these values:

* Disk: Size estimation is less I/O intensive than step execution because it does not need to access the data blocks.
* CPU: Size estimation is usually a dense CPU burst whereas step execution CPU utilization is stretched out over time because of disk IO.
Faster disks, sending a compressed dataset in :ref:`plain mode <zfs-background-knowledge-plain-vs-raw-sends>` and the zrepl transport mode all contribute to higher CPU requirements.
* Network bandwidth: Size estimation does not consume meaningful amounts of bandwidth, step execution does.
* :ref:`zrepl ZFS abstractions <zrepl-zfs-abstractions>`: for each replication step zrepl needs to update its ZFS abstractions through the ``zfs`` command which often waits multiple seconds for the zpool to sync.
Thus, if the actual send & recv time of a step is small compared to the time spent on zrepl ZFS abstractions then increasing step execution concurrency will result in a lower overall turnaround time.
26 changes: 0 additions & 26 deletions endpoint/endpoint.go
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/zrepl/zrepl/util/chainlock"
"github.com/zrepl/zrepl/util/envconst"
"github.com/zrepl/zrepl/util/nodefault"
"github.com/zrepl/zrepl/util/semaphore"
"github.com/zrepl/zrepl/zfs"
zfsprop "github.com/zrepl/zrepl/zfs/property"
)
Expand Down Expand Up @@ -130,9 +129,6 @@ func (s *Sender) ListFilesystemVersions(ctx context.Context, r *pdu.ListFilesyst

}

var maxConcurrentZFSSend = envconst.Int64("ZREPL_ENDPOINT_MAX_CONCURRENT_SEND", 10)
var maxConcurrentZFSSendSemaphore = semaphore.New(maxConcurrentZFSSend)

func uncheckedSendArgsFromPDU(fsv *pdu.FilesystemVersion) *zfs.ZFSSendArgVersion {
if fsv == nil {
return nil
Expand Down Expand Up @@ -199,16 +195,6 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea
return nil, nil, errors.Wrap(err, "validate send arguments")
}

getLogger(ctx).Debug("acquire concurrent send semaphore")
// TODO use try-acquire and fail with resource-exhaustion rpc status
// => would require handling on the client-side
// => this is a dataconn endpoint, doesn't have the status code semantics of gRPC
guard, err := maxConcurrentZFSSendSemaphore.Acquire(ctx)
if err != nil {
return nil, nil, err
}
defer guard.Release()

si, err := zfs.ZFSSendDry(ctx, sendArgs)
if err != nil {
return nil, nil, errors.Wrap(err, "zfs send dry failed")
Expand Down Expand Up @@ -682,8 +668,6 @@ func (s *Receiver) Send(ctx context.Context, req *pdu.SendReq) (*pdu.SendRes, io
return nil, nil, fmt.Errorf("receiver does not implement Send()")
}

var maxConcurrentZFSRecvSemaphore = semaphore.New(envconst.Int64("ZREPL_ENDPOINT_MAX_CONCURRENT_RECV", 10))

func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive io.ReadCloser) (*pdu.ReceiveRes, error) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()

Expand Down Expand Up @@ -803,16 +787,6 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive io.
return nil, errors.Wrap(err, "cannot determine whether we can use resumable send & recv")
}

log.Debug("acquire concurrent recv semaphore")
// TODO use try-acquire and fail with resource-exhaustion rpc status
// => would require handling on the client-side
// => this is a dataconn endpoint, doesn't have the status code semantics of gRPC
guard, err := maxConcurrentZFSRecvSemaphore.Acquire(ctx)
if err != nil {
return nil, err
}
defer guard.Release()

var peek bytes.Buffer
var MaxPeek = envconst.Int64("ZREPL_ENDPOINT_RECV_PEEK_SIZE", 1<<20)
log.WithField("max_peek_bytes", MaxPeek).Info("peeking incoming stream")
Expand Down
5 changes: 4 additions & 1 deletion go.mod
Expand Up @@ -7,12 +7,15 @@ require (
github.com/gdamore/tcell v1.2.0
github.com/gitchander/permutation v0.0.0-20181107151852-9e56b92e9909
github.com/go-logfmt/logfmt v0.4.0
github.com/go-playground/universal-translator v0.17.0 // indirect
github.com/go-playground/validator v9.31.0+incompatible
github.com/go-sql-driver/mysql v1.4.1-0.20190907122137-b2c03bcae3d4
github.com/golang/protobuf v1.4.3
github.com/google/uuid v1.1.2
github.com/jinzhu/copier v0.0.0-20170922082739-db4671f3a9b8
github.com/kisielk/gotool v1.0.0 // indirect
github.com/kr/pretty v0.1.0
github.com/leodido/go-urn v1.2.1 // indirect
github.com/lib/pq v1.2.0
github.com/mattn/go-colorable v0.1.4 // indirect
github.com/mattn/go-isatty v0.0.8
Expand All @@ -29,7 +32,7 @@ require (
github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44 // go1.12 thinks it needs this
github.com/spf13/cobra v0.0.2
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.5.1
github.com/stretchr/testify v1.6.1
github.com/willf/bitset v1.1.10
github.com/yudai/gojsondiff v0.0.0-20170107030110-7b1b7adf999d
github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 // go1.12 thinks it needs this
Expand Down
13 changes: 13 additions & 0 deletions go.sum
Expand Up @@ -59,6 +59,12 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
github.com/go-logfmt/logfmt v0.4.0 h1:MP4Eh7ZCb31lleYCFuwm0oe4/YGak+5l1vA2NOE80nA=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-ole/go-ole v1.2.1/go.mod h1:7FAglXiTm7HKlQRDeOQ6ZNUHidzCWXuZWq/1dTyBNF8=
github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q=
github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8=
github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no=
github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA=
github.com/go-playground/validator v9.31.0+incompatible h1:UA72EPEogEnq76ehGdEDp4Mit+3FDh548oRqwVgNsHA=
github.com/go-playground/validator v9.31.0+incompatible/go.mod h1:yrEkQXlcI+PugkyDjY2bRrL/UBU4f3rvrgkN3V8JEig=
github.com/go-sql-driver/mysql v1.4.1-0.20190907122137-b2c03bcae3d4 h1:0suja/iKSDbEIYLbrS/8C7iArJiWpgCNcR+zwAHu7Ig=
github.com/go-sql-driver/mysql v1.4.1-0.20190907122137-b2c03bcae3d4/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
Expand Down Expand Up @@ -171,6 +177,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w=
github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY=
github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/logrusorgru/aurora v0.0.0-20181002194514-a7b3b318ed4e/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4=
Expand Down Expand Up @@ -300,6 +308,8 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/theckman/goconstraint v1.11.0 h1:oBUwN5wpE4dwyPhRGraEgJsFTr+JtLWiDnaJZJeeXI0=
github.com/theckman/goconstraint v1.11.0/go.mod h1:zkCR/f2kOULTk/h1ujgyB9BlCNLaqlQ6GN2Zl4mg81g=
github.com/timakin/bodyclose v0.0.0-20190407043127-4a873e97b2bb/go.mod h1:Qimiffbc6q9tBWlVV6x0P9sat/ao1xEkREYPPj9hphk=
Expand Down Expand Up @@ -380,6 +390,7 @@ golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9sn
golang.org/x/text v0.0.0-20170915090833-1cbadb444a80/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
Expand Down Expand Up @@ -460,6 +471,8 @@ gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bl
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down

0 comments on commit 0ceea1b

Please sign in to comment.