-
Notifications
You must be signed in to change notification settings - Fork 162
/
scyllacluster_shardawareness.go
118 lines (95 loc) · 4.07 KB
/
scyllacluster_shardawareness.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
// Copyright (C) 2021 ScyllaDB
package scyllacluster
import (
"context"
"fmt"
"net"
"sync"
"time"
"github.com/gocql/gocql"
g "github.com/onsi/ginkgo/v2"
o "github.com/onsi/gomega"
"github.com/scylladb/gocqlx/v2"
scyllaclusterfixture "github.com/scylladb/scylla-operator/test/e2e/fixture/scylla"
"github.com/scylladb/scylla-operator/test/e2e/framework"
"github.com/scylladb/scylla-operator/test/e2e/utils"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
)
var _ = g.Describe("ScyllaCluster", func() {
defer g.GinkgoRecover()
f := framework.NewFramework("scyllacluster")
g.It("should allow to build connection pool using shard aware ports", func() {
g.Skip("Shardawareness doesn't work on setups NATting traffic, and our CI does it when traffic is going through ClusterIPs." +
"It's shall be reenabled once we switch client-node communication to PodIPs.",
)
const (
nonShardAwarePort = 9042
shardAwarePort = 19042
nrShards = 2
)
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
sc := scyllaclusterfixture.BasicScyllaCluster.ReadOrFail()
sc.Spec.Datacenter.Racks[0].Members = 1
// Ensure 2 shards.
sc.Spec.Datacenter.Racks[0].Resources.Limits[corev1.ResourceCPU] = resource.MustParse(fmt.Sprintf("%d", nrShards))
framework.By("Creating a ScyllaCluster")
sc, err := f.ScyllaClient().ScyllaV1().ScyllaClusters(f.Namespace()).Create(ctx, sc, metav1.CreateOptions{})
o.Expect(err).NotTo(o.HaveOccurred())
framework.By("Waiting for the ScyllaCluster to rollout (RV=%s)", sc.ResourceVersion)
waitCtx1, waitCtx1Cancel := utils.ContextForRollout(ctx, sc)
defer waitCtx1Cancel()
sc, err = utils.WaitForScyllaClusterState(waitCtx1, f.ScyllaClient().ScyllaV1(), sc.Namespace, sc.Name, utils.WaitForStateOptions{}, utils.IsScyllaClusterRolledOut)
o.Expect(err).NotTo(o.HaveOccurred())
hosts := getScyllaHostsAndWaitForFullQuorum(ctx, f.KubeClient().CoreV1(), sc)
o.Expect(hosts).To(o.HaveLen(1))
di := insertAndVerifyCQLData(ctx, hosts)
defer di.Close()
connections := make(map[uint16]string)
var connectionsMut sync.Mutex
clusterConfig := gocql.NewCluster(hosts...)
clusterConfig.Dialer = DialerFunc(func(ctx context.Context, network, addr string) (net.Conn, error) {
sourcePort := gocql.ScyllaGetSourcePort(ctx)
localAddr, err := net.ResolveTCPAddr(network, fmt.Sprintf(":%d", sourcePort))
if err != nil {
return nil, err
}
framework.Infof("Connecting to %s using %d source port", addr, sourcePort)
connectionsMut.Lock()
connections[sourcePort] = addr
connectionsMut.Unlock()
d := &net.Dialer{LocalAddr: localAddr}
return d.DialContext(ctx, network, addr)
})
framework.By("Waiting for the driver to establish connection to shards")
session, err := gocqlx.WrapSession(clusterConfig.CreateSession())
o.Expect(err).NotTo(o.HaveOccurred())
defer session.Close()
err = wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (done bool, err error) {
return len(connections) == nrShards, nil
})
o.Expect(err).NotTo(o.HaveOccurred())
shardAwareAttempts := 0
for sourcePort, addr := range connections {
// Control connection is also put in pool, and it always uses the default port.
if sourcePort == 0 {
o.Expect(addr).To(o.HaveSuffix(fmt.Sprintf("%d", nonShardAwarePort)))
continue
}
o.Expect(addr).To(o.HaveSuffix(fmt.Sprintf("%d", shardAwarePort)))
shardAwareAttempts++
}
// Control connection used for shard number discovery, lands on some random shard.
// This connection is also put in pool, and driver only establish connections to missing shards
// using shard-aware-port.
// Connections to shard-aware-port are guaranteed to land on shard driver wants.
o.Expect(shardAwareAttempts).To(o.Equal(nrShards - 1))
})
})
type DialerFunc func(ctx context.Context, network, addr string) (net.Conn, error)
func (f DialerFunc) DialContext(ctx context.Context, network, addr string) (net.Conn, error) {
return f(ctx, network, addr)
}