Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi job & multi-machine improvements & docs #153

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 8 additions & 4 deletions daemon/control.go
Expand Up @@ -5,16 +5,18 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"time"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/zrepl/zrepl/daemon/job"
"github.com/zrepl/zrepl/daemon/nethelpers"
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/version"
"io"
"net"
"net/http"
"time"
"github.com/zrepl/zrepl/zfs"
)

type controlJob struct {
Expand All @@ -38,6 +40,8 @@ func (j *controlJob) Name() string { return jobNameControl }

func (j *controlJob) Status() *job.Status { return &job.Status{Type: job.TypeInternal} }

func (j *controlJob) OwnedDatasetSubtreeRoot() (p *zfs.DatasetPath, ok bool) { return nil, false }

var promControl struct {
requestBegin *prometheus.CounterVec
requestFinished *prometheus.HistogramVec
Expand Down
9 changes: 9 additions & 0 deletions daemon/job/active.go
Expand Up @@ -299,6 +299,15 @@ func (j *ActiveSide) Status() *Status {
return &Status{Type: t, JobSpecific: s}
}

func (j *ActiveSide) OwnedDatasetSubtreeRoot() (rfs *zfs.DatasetPath, ok bool) {
pull, ok := j.mode.(*modePull)
if !ok {
_ = j.mode.(*modePush) // make sure we didn't introduce a new job type
return nil, false
}
return pull.rootFS.Copy(), true
}

func (j *ActiveSide) Run(ctx context.Context) {
log := GetLogger(ctx)
ctx = logging.WithSubsystemLoggers(ctx, log)
Expand Down
43 changes: 43 additions & 0 deletions daemon/job/build_jobs.go
Expand Up @@ -2,6 +2,9 @@ package job

import (
"fmt"
"sort"
"strings"

"github.com/pkg/errors"
"github.com/zrepl/zrepl/config"
)
Expand All @@ -18,6 +21,22 @@ func JobsFromConfig(c *config.Config) ([]Job, error) {
}
js[i] = j
}

// receiving-side root filesystems must not overlap
{
rfss := make([]string, len(js))
for i, j := range js {
jrfs, ok := j.OwnedDatasetSubtreeRoot()
if !ok {
continue
}
rfss[i] = jrfs.ToString()
}
if err := validateReceivingSidesDoNotOverlap(rfss); err != nil {
return nil, err
}
}

return js, nil
}

Expand Down Expand Up @@ -74,3 +93,27 @@ func buildJob(c *config.Global, in config.JobEnum) (j Job, err error) {
return j, nil

}

func validateReceivingSidesDoNotOverlap(receivingRootFSs []string) error {
if len(receivingRootFSs) == 0 {
return nil
}
rfss := make([]string, len(receivingRootFSs))
copy(rfss, receivingRootFSs)
sort.Slice(rfss, func(i, j int) bool {
return strings.Compare(rfss[i], rfss[j]) == -1
})
// idea:
// no path in rfss must be prefix of another
//
// rfss is now lexicographically sorted, which means that
// if i is prefix of j, i < j (in lexicographical order)
// thus,
// if any i is prefix of i+n (n >= 1), there is overlap
for i := 0; i < len(rfss)-1; i++ {
if strings.HasPrefix(rfss[i+1], rfss[i]) {
return fmt.Errorf("receiving jobs with overlapping root filesystems are forbidden")
}
}
return nil
}
42 changes: 42 additions & 0 deletions daemon/job/build_jobs_test.go
@@ -0,0 +1,42 @@
package job

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestValidateReceivingSidesDoNotOverlap(t *testing.T) {
type testCase struct {
err bool
input []string
}
tcs := []testCase{
{false, nil},
{false, []string{}},
{false, []string{""}}, // not our job to determine valid paths
{false, []string{"a"}},
{false, []string{"some/path"}},
{false, []string{"zroot/sink1", "zroot/sink2", "zroot/sink3"}},
{true, []string{"zroot/b", "zroot/b"}},
{true, []string{"zroot/foo", "zroot/foo/bar", "zroot/baz"}},
{false, []string{"a/x", "b/x"}},
{false, []string{"a", "b"}},
{true, []string{"a", "a"}},
{true, []string{"a/x/y", "a/x"}},
{true, []string{"a/x", "a/x/y"}},
{true, []string{"a/x", "b/x", "a/x/y"}},
{true, []string{"a", "a/b", "a/c", "a/b"}},
{true, []string{"a/b", "a/c", "a/b", "a/d", "a/c"}},
}

for _, tc := range tcs {
t.Logf("input: %v", tc.input)
err := validateReceivingSidesDoNotOverlap(tc.input)
if tc.err {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
}
}
5 changes: 5 additions & 0 deletions daemon/job/job.go
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"encoding/json"
"fmt"

"github.com/prometheus/client_golang/prometheus"
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/zfs"
)

type Logger = logger.Logger
Expand Down Expand Up @@ -33,6 +35,9 @@ type Job interface {
Run(ctx context.Context)
Status() *Status
RegisterMetrics(registerer prometheus.Registerer)
// Jobs that return a subtree of the dataset hierarchy
// must return the root of that subtree as rfs and ok = true
OwnedDatasetSubtreeRoot() (rfs *zfs.DatasetPath, ok bool)
}

type Type string
Expand Down
9 changes: 9 additions & 0 deletions daemon/job/passive.go
Expand Up @@ -103,6 +103,15 @@ func (s *PassiveSide) Status() *Status {
return &Status{Type: s.mode.Type()} // FIXME PassiveStatus
}

func (j *PassiveSide) OwnedDatasetSubtreeRoot() (rfs *zfs.DatasetPath, ok bool) {
sink, ok := j.mode.(*modeSink)
if !ok {
_ = j.mode.(*modeSource) // make sure we didn't introduce a new job type
return nil, false
}
return sink.rootDataset.Copy(), true
}

func (*PassiveSide) RegisterMetrics(registerer prometheus.Registerer) {}

func (j *PassiveSide) Run(ctx context.Context) {
Expand Down
4 changes: 4 additions & 0 deletions daemon/job/snapjob.go
Expand Up @@ -77,6 +77,10 @@ func (j *SnapJob) Status() *Status {
return &Status{Type: t, JobSpecific: s}
}

func (j *SnapJob) OwnedDatasetSubtreeRoot() (rfs *zfs.DatasetPath, ok bool) {
return nil, false
}

func (j *SnapJob) Run(ctx context.Context) {
log := GetLogger(ctx)
ctx = logging.WithSubsystemLoggers(ctx, log)
Expand Down
2 changes: 2 additions & 0 deletions daemon/prometheus.go
Expand Up @@ -42,6 +42,8 @@ func (j *prometheusJob) Name() string { return jobNamePrometheus }

func (j *prometheusJob) Status() *job.Status { return &job.Status{Type: job.TypeInternal} }

func (j *prometheusJob) OwnedDatasetSubtreeRoot() (p *zfs.DatasetPath, ok bool) { return nil, false }

func (j *prometheusJob) RegisterMetrics(registerer prometheus.Registerer) {}

func (j *prometheusJob) Run(ctx context.Context) {
Expand Down
53 changes: 49 additions & 4 deletions docs/configuration/jobs.rst
Expand Up @@ -108,6 +108,7 @@ The **replication cursor bookmark** ``#zrepl_replication_cursor`` is kept per fi
It is a bookmark of the most recent successfully replicated snapshot to the receiving side.
It is is used by the :ref:`not_replicated <prune-keep-not-replicated>` keep rule to identify all snapshots that have not yet been replicated to the receiving side.
Regardless of whether that keep rule is used, the bookmark ensures that replication can always continue incrementally.
Note that there is only one cursor bookmark per filesystem, which prohibits multiple jobs to replicate the same filesystem (:ref:`see below<jobs-multiple-jobs>`).

.. _replication-placeholder-property:

Expand Down Expand Up @@ -172,6 +173,50 @@ Note that you will have to trigger replication manually using the ``zrepl signal
type: manual
...

.. _jobs-multiple-jobs:

Multiple Jobs & More than 2 Machines
------------------------------------

.. ATTENTION::

When using multiple jobs across single or multiple machines, the following rules are critical to avoid race conditions & data loss:

1. The sets of ZFS filesystems matched by the ``filesystems`` filter fields must be disjoint across all jobs configured on a machine.
2. The ZFS filesystem subtrees of jobs with ``root_fs`` must be disjoint.
3. Across all zrepl instances on all machines in the replication domain, there must be a 1:1 correspondence between active and passive jobs.

Explanations & exceptions to above rules are detailed below.

If you would like to see improvements to multi-job setups, please `open an issue on GitHub <https://github.com/zrepl/zrepl/issues/new>`_.

No Overlapping
~~~~~~~~~~~~~~

Jobs run independently of each other.
If two jobs match the same filesystem with their ``filesystems`` filter, they will operate on that filesystem independently and potentially in parallel.
For example, if job A prunes snapshots that job B is planning to replicate, the replication will fail because B asssumed the snapshot to still be present.
More subtle race conditions can occur with the :ref:`replication cursor bookmark <replication-cursor-bookmark>`, which currently only exists once per filesystem.

N push jobs to 1 sink
~~~~~~~~~~~~~~~~~~~~~

The :ref:`sink job <job-sink>` namespaces by client identity.
It is thus safe to push to one sink job with different client identities.
If the push jobs have the same client identity, the filesystems matched by the push jobs must be disjoint to avoid races.

N pull jobs from 1 source
~~~~~~~~~~~~~~~~~~~~~~~~~

Multiple pull jobs pulling from the same source have potential for race conditions during pruning:
each pull job prunes the source side independently, causing replication-prune and prune-prune races.

There is currently no way for a pull job to filter which snapshots it should attempt to replicate.
Thus, it is not possibe to just manually assert that the prune rules of all pull jobs are disjoint to avoid replication-prune and prune-prune races.


------------------------------------------------------------------------------


.. _job-push:

Expand Down Expand Up @@ -217,8 +262,8 @@ Job Type ``sink``
* - ``serve``
- |serve-transport|
* - ``root_fs``
- ZFS dataset path are received to
``$root_fs/$client_identity``
- ZFS filesystems are received to
``$root_fs/$client_identity/$source_path``

Example config: :sampleconf:`/sink.yml`

Expand All @@ -240,8 +285,8 @@ Job Type ``pull``
* - ``connect``
- |connect-transport|
* - ``root_fs``
- ZFS dataset path are received to
``$root_fs/$client_identity``
- ZFS filesystems are received to
``$root_fs/$source_path``
* - ``interval``
- | Interval at which to pull from the source job (e.g. ``10m``).
| ``manual`` disables periodic pulling, replication then only happens on :ref:`wakeup <cli-signal-wakeup>`.
Expand Down