diff --git a/vault/external_tests/consul_fencing_binary/consul_fencing_test.go b/vault/external_tests/consul_fencing_binary/consul_fencing_test.go deleted file mode 100644 index 9f1b0e70c1..0000000000 --- a/vault/external_tests/consul_fencing_binary/consul_fencing_test.go +++ /dev/null @@ -1,295 +0,0 @@ -// Copyright (c) HashiCorp, Inc. -// SPDX-License-Identifier: BUSL-1.1 - -package consul_fencing - -import ( - "context" - "fmt" - "sort" - "sync" - "sync/atomic" - "testing" - "time" - - "github.com/hashicorp/go-hclog" - "github.com/openbao/openbao/api" - "github.com/openbao/openbao/helper/testhelpers/consul" - "github.com/openbao/openbao/sdk/helper/testcluster" - "github.com/openbao/openbao/sdk/helper/testcluster/docker" - "github.com/stretchr/testify/require" -) - -// TestConsulFencing_PartitionedLeaderCantWrite attempts to create an active -// node split-brain when using Consul storage to ensure the old leader doesn't -// continue to write data potentially corrupting storage. It is naturally -// non-deterministic since it relies heavily on timing of the different -// container processes, however it pretty reliably failed before the fencing fix -// (and Consul lock improvements) and should _never_ fail now we correctly fence -// writes. -func TestConsulFencing_PartitionedLeaderCantWrite(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) - defer cancel() - - consulStorage := consul.NewClusterStorage() - - // Create cluster logger that will dump cluster logs to stdout for debugging. - logger := hclog.NewInterceptLogger(hclog.DefaultOptions) - logger.SetLevel(hclog.Trace) - - clusterOpts := docker.DefaultOptions(t) - clusterOpts.ImageRepo = "hashicorp/vault-enterprise" - clusterOpts.ClusterOptions.Logger = logger - - clusterOpts.Storage = consulStorage - - logger.Info("==> starting cluster") - c, err := docker.NewDockerCluster(ctx, clusterOpts) - require.NoError(t, err) - logger.Info(" ✅ done.", "root_token", c.GetRootToken(), - "consul_token", consulStorage.Config().Token) - - logger.Info("==> waiting for leader") - leaderIdx, err := testcluster.WaitForActiveNode(ctx, c) - require.NoError(t, err) - - leader := c.Nodes()[leaderIdx] - leaderClient := leader.APIClient() - - notLeader := c.Nodes()[1] // Assumes it's usually zero and correct below - if leaderIdx == 1 { - notLeader = c.Nodes()[0] - } - - // Mount a KV v2 backend - logger.Info("==> mounting KV") - err = leaderClient.Sys().Mount("/test", &api.MountInput{ - Type: "kv-v2", - }) - require.NoError(t, err) - - // Start two background workers that will cause writes to Consul in the - // background. KV v2 relies on a single active node for correctness. - // Specifically its patch operation does a read-modify-write under a - // key-specific lock which is correct for concurrent writes to one process, - // but which by nature of our storage API is not going to be atomic if another - // active node is also writing the same KV. It's made worse because the cache - // backend means the active node will not actually read from Consul after the - // initial read and will be modifying its own in-memory version and writing - // that back. So we should be able to detect multiple active nodes writing - // reliably with the following setup: - // 1. Two separate "client" goroutines each connected to different Vault - // servers. - // 2. Both write to the same kv-v2 key, each one modifies only its own set - // of subkeys c1-X or c2-X. - // 3. Each request adds the next sequential X value for that client. We use a - // Patch operation so we don't need to read the version or use CAS. On an - // error each client will retry the same key until it gets a success. - // 4. In a correct system with a single active node despite a partition, we - // expect a complete set of consecutive X values for both clients (i.e. - // no writes lost). If an old leader is still allowed to write to Consul - // then it will continue to patch against its own last-known version from - // cache and so will overwrite any concurrent updates from the other - // client and we should see that as lost updates at the end. - var wg sync.WaitGroup - errCh := make(chan error, 10) - var writeCount uint64 - - // Initialise the key once - kv := leaderClient.KVv2("/test") - _, err = kv.Put(ctx, "data", map[string]interface{}{ - "c0-00000000": 1, // value don't matter here only keys in this set. - "c1-00000000": 1, - }) - require.NoError(t, err) - - const interval = 500 * time.Millisecond - - runWriter := func(i int, targetServer testcluster.VaultClusterNode, ctr *uint64) { - wg.Add(1) - defer wg.Done() - kv := targetServer.APIClient().KVv2("/test") - - for { - key := fmt.Sprintf("c%d-%08d", i, atomic.LoadUint64(ctr)) - - // Use a short timeout. If we don't then the one goroutine writing to the - // partitioned active node can get stuck here until the 60 second request - // timeout kicks in without issuing another request. - reqCtx, cancel := context.WithTimeout(ctx, interval) - logger.Debug("sending patch", "client", i, "key", key) - _, err = kv.Patch(reqCtx, "data", map[string]interface{}{ - key: 1, - }) - cancel() - // Deliver errors to test, don't block if we get too many before context - // is cancelled otherwise client 0 can end up blocked as it has so many - // errors during the partition it doesn't actually start writing again - // ever and so the test never sees split-brain writes. - if err != nil { - select { - case <-ctx.Done(): - return - case errCh <- fmt.Errorf("client %d error: %w", i, err): - default: - // errCh is blocked, carry on anyway - } - } else { - // Only increment our set counter here now we've had an ack that the - // update was successful. - atomic.AddUint64(ctr, 1) - atomic.AddUint64(&writeCount, 1) - } - select { - case <-ctx.Done(): - return - case <-time.After(interval): - } - } - } - - logger.Info("==> starting writers") - client0Ctr, client1Ctr := uint64(1), uint64(1) - go runWriter(0, leader, &client0Ctr) - go runWriter(1, notLeader, &client1Ctr) - - // Wait for some writes to have started - var writesBeforePartition uint64 - logger.Info("==> waiting for writes") - for { - time.Sleep(1 * time.Millisecond) - writesBeforePartition = atomic.LoadUint64(&writeCount) - if writesBeforePartition >= 5 { - break - } - // Also check for any write errors - select { - case err := <-errCh: - require.NoError(t, err) - default: - } - require.NoError(t, ctx.Err()) - } - - val, err := kv.Get(ctx, "data") - require.NoError(t, err) - - logger.Info("==> partitioning leader") - // Now partition the leader from everything else (including Consul) - err = leader.(*docker.DockerClusterNode).PartitionFromCluster(ctx) - require.NoError(t, err) - - // Reload this incase more writes occurred before the partition completed. - writesBeforePartition = atomic.LoadUint64(&writeCount) - - // Wait for some more writes to have happened (the client writing to leader - // will probably have sent one and hung waiting for a response but the other - // one should eventually start committing again when new active node is - // elected). - - logger.Info("==> waiting for writes to new leader") - for { - time.Sleep(1 * time.Millisecond) - writesAfterPartition := atomic.LoadUint64(&writeCount) - if (writesAfterPartition - writesBeforePartition) >= 20 { - break - } - // Also check for any write errors or timeouts - select { - case err := <-errCh: - // Don't fail here because we expect writes to the old leader to fail - // eventually or if they need a new connection etc. - logger.Info("failed write", "write_count", writesAfterPartition, "err", err) - default: - } - require.NoError(t, ctx.Err()) - } - - // Heal partition - logger.Info("==> healing partition") - err = leader.(*docker.DockerClusterNode).UnpartitionFromCluster(ctx) - require.NoError(t, err) - - // Wait for old leader to rejoin as a standby and get healthy. - logger.Info("==> wait for old leader to rejoin") - - require.NoError(t, waitUntilNotLeader(ctx, leaderClient, logger)) - - // Stop the writers and wait for them to shut down nicely - logger.Info("==> stopping writers") - cancel() - wg.Wait() - - // Now verify that all Consul data is consistent with only one leader writing. - // Use a new context since we just cancelled the general one - reqCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - val, err = kv.Get(reqCtx, "data") - require.NoError(t, err) - - // Ensure we have every consecutive key for both client - sets := [][]int{make([]int, 0, 128), make([]int, 0, 128)} - for k := range val.Data { - var cNum, x int - _, err := fmt.Sscanf(k, "c%d-%08d", &cNum, &x) - require.NoError(t, err) - sets[cNum] = append(sets[cNum], x) - } - - // Sort both sets - sort.Ints(sets[0]) - sort.Ints(sets[1]) - - // Ensure they are both complete by creating an expected set and comparing to - // get nice output to debug. Note that make set is an exclusive bound since we - // don't know it the current counter value write completed or not yet so we'll - // only create sets up to one less than that value that we know for sure - // should be present. - c0Writes := int(atomic.LoadUint64(&client0Ctr)) - c1Writes := int(atomic.LoadUint64(&client1Ctr)) - expect0 := makeSet(c0Writes) - expect1 := makeSet(c1Writes) - - // Trim the sets to only the writes we know completed since that's all the - // expected arrays contain. But only if they are longer so we don't change the - // slice length if they are shorter than the expected number. - if len(sets[0]) > c0Writes { - sets[0] = sets[0][0:c0Writes] - } - if len(sets[1]) > c1Writes { - sets[1] = sets[1][0:c1Writes] - } - require.Equal(t, expect0, sets[0], "Client 0 writes lost") - require.Equal(t, expect1, sets[1], "Client 1 writes lost") -} - -func makeSet(n int) []int { - a := make([]int, n) - for i := 0; i < n; i++ { - a[i] = i - } - return a -} - -func waitUntilNotLeader(ctx context.Context, oldLeaderClient *api.Client, logger hclog.Logger) error { - for { - // Wait for the original leader to acknowledge it's not active any more. - resp, err := oldLeaderClient.Sys().LeaderWithContext(ctx) - // Tolerate errors as the old leader is in a difficult place right now. - if err == nil { - if !resp.IsSelf { - // We are not leader! - return nil - } - logger.Info("old leader not ready yet", "IsSelf", resp.IsSelf) - } else { - logger.Info("failed to read old leader status", "err", err) - } - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(time.Second): - // Loop again - } - } -}