Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pinger test #6

Merged
merged 5 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 97 additions & 20 deletions database/test/tarantool/pinger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ func TestConnectFailover(t *testing.T) {

arConnTimeout := 200 * time.Millisecond

pingInterval := 500 * time.Millisecond

// включаем микросекунды в std логере
log.SetFlags(log.LstdFlags | log.Lmicroseconds)

Expand Down Expand Up @@ -63,8 +61,7 @@ func TestConnectFailover(t *testing.T) {
"arcfg/Timeout": arConnTimeout,
})

pinger := activerecord.NewPinger(activerecord.WithPingInterval(pingInterval))
defer pinger.StopWatch()
pinger := NewBasicConnectionChecker()

logger := activerecord.NewLogger()
logger.SetLogLevel(activerecord.ErrorLoggerLevel)
Expand All @@ -75,11 +72,15 @@ func TestConnectFailover(t *testing.T) {
activerecord.WithConnectionPinger(pinger),
)

_, err = activerecord.AddClusterChecker(ctx, cfgName, octopus.ClusterConfigParams)
_, err = activerecord.AddClusterChecker(ctx, cfgName, activerecord.ClusterConfigParameters{
Globs: octopus.DefaultConnectionParams,
OptionCreator: octopus.DefaultOptionCreator,
OptionChecker: octopus.CheckShardInstance,
})
require.NoError(t, err)

// проверяем типы и состав узлов кластера после загрузки конфигурации
instances := pinger.ObservedInstances(cfgName)
instances := pinger.ObservedInstances()
// все инстансы из конфигурации (включая несуществующие fakehost)
require.Len(t, instances, 5)

Expand All @@ -101,18 +102,18 @@ func TestConnectFailover(t *testing.T) {
for g := 0; g < 8; g++ {
g := g
eg.Go(func() error {
for i := 0; i < 1000; i++ {
for i := 0; i < 5000; i++ {
st := time.Now()
// чтобы не тротлить пул
time.Sleep(800 * time.Microsecond)
err = nil //lua_procedure.Execute(ctx, "return box.info.status", activerecord.ReplicaOrMasterInstanceType)
err = execute(ctx, cfgName)

if err != nil {
// подождем немного и попробуем сделать еще запрос
time.Sleep(10 * time.Millisecond)
st := time.Now()

err = nil //lua_procedure.Execute(ctx, "return box.info.status", activerecord.ReplicaOrMasterInstanceType)
err = execute(ctx, cfgName)
if qDuration(st) > 5 {
log.Printf("'%d' long query %d, time: %d ms\n", g, i, qDuration(st))
}
Expand Down Expand Up @@ -144,11 +145,10 @@ func TestConnectFailover(t *testing.T) {
}

// останавливаем мастер ноду
require.NoError(t, master.Stop(ctx))
// подождем пока пингер актуализирует кластер после остановки ноды
time.Sleep(pingInterval)
require.NoError(t, master.Terminate(ctx))
require.NoError(t, pinger.check(ctx))

instances = pinger.ObservedInstances(cfgName)
instances = pinger.ObservedInstances()
// проверяем что остановленный мастер пропал из доступных узлов
masters := availableInstances(instances, activerecord.ModeMaster)
require.Len(t, masters, 0)
Expand All @@ -159,16 +159,17 @@ func TestConnectFailover(t *testing.T) {

// останавливаем одну реплику (но в конфигурации активрекорд она по прежнему присутствует)
require.NoError(t, replica3.Stop(ctx))
// подождем пока пингер актуализирует кластер после остановки ноды
time.Sleep(pingInterval)
require.NoError(t, pinger.check(ctx))

instances = pinger.ObservedInstances(cfgName)
instances = pinger.ObservedInstances()
replicas = availableInstances(instances, activerecord.ModeReplica)
// осталась одна доступная реплика
require.Len(t, replicas, 1)
require.Equal(t, rHost1, replicas[0].Config.Addr)

require.NoError(t, master.Start(ctx))
// поднимает контейнер с мастером БД
master, err = tarantool.RunContainer(ctx, tarantool.WithTarantool15("tarantool/tarantool:1.5", time.Second))
require.NoError(t, err)
masterHost, err := master.ServerHostPort(ctx)
require.NoError(t, err)

Expand All @@ -179,11 +180,12 @@ func TestConnectFailover(t *testing.T) {
"arcfg/Timeout": arConnTimeout,
})

// подождем пока пингер актуализирует кластер после остановки ноды
time.Sleep(pingInterval)
log.Println("update arconfig")

require.NoError(t, pinger.check(ctx))

// обновленная конфигурация состоит из 2 узлов
instances = pinger.ObservedInstances(cfgName)
instances = pinger.ObservedInstances()
require.Len(t, instances, 2)

masters = availableInstances(instances, activerecord.ModeMaster)
Expand All @@ -199,6 +201,19 @@ func TestConnectFailover(t *testing.T) {
eg.Wait()
}

func execute(ctx context.Context, path string) error {
box, err := octopus.Box(ctx, 0, activerecord.ReplicaOrMasterInstanceType, path, nil)
if err != nil {
return err
}
_, err = octopus.CallLua(ctx, box, "box.dostring", "return box.info.status")
if err != nil {
return err
}

return nil
}

func availableInstances(instances []activerecord.ShardInstance, modeType activerecord.ServerModeType) []activerecord.ShardInstance {
ret := make([]activerecord.ShardInstance, 0, len(instances))
for _, instance := range instances {
Expand All @@ -210,6 +225,68 @@ func availableInstances(instances []activerecord.ShardInstance, modeType activer
return ret
}

type TestConnectionChecker struct {
checkParams activerecord.ClusterConfigParameters
cfgName string
instances []activerecord.ShardInstance
}

func (c *TestConnectionChecker) AddClusterChecker(ctx context.Context, cfgName string, params activerecord.ClusterConfigParameters) (*activerecord.Cluster, error) {
c.checkParams = params
c.cfgName = cfgName

return nil, c.check(ctx)
}

func NewBasicConnectionChecker() *TestConnectionChecker {
return &TestConnectionChecker{}
}

func (c *TestConnectionChecker) check(ctx context.Context) error {
clusterConfig, err := activerecord.ConfigCacher().Get(ctx, c.cfgName, c.checkParams.Globs, c.checkParams.OptionCreator)
if err != nil {
return fmt.Errorf("can't load cluster info: %w", err)
}

for i := 0; i < clusterConfig.Shards(); i++ {
curInstances := clusterConfig.ShardInstances(i)

var actualShard activerecord.Shard

for _, si := range curInstances {
opts, connErr := c.checkParams.OptionChecker(ctx, si)

if opts != nil {
si.Config.Mode = opts.InstanceMode()
}

instance := activerecord.ShardInstance{
ParamsID: si.ParamsID,
Config: si.Config,
Options: si.Options,
Offline: connErr != nil,
}

switch instance.Config.Mode {
case activerecord.ModeMaster:
actualShard.Masters = append(actualShard.Masters, instance)
case activerecord.ModeReplica:
actualShard.Replicas = append(actualShard.Replicas, instance)
}
}

instances := actualShard.Instances()
c.instances = instances
clusterConfig.SetShardInstances(i, instances)
}

return nil
}

func (c *TestConnectionChecker) ObservedInstances() []activerecord.ShardInstance {
return c.instances
}

type TestConfig struct {
cfg sync.Map
created time.Time
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.19

require (
github.com/ebirukov/tnt-containers v1.0.2
github.com/mailru/activerecord v1.11.0-b3
github.com/mailru/activerecord v1.11.0
github.com/stretchr/testify v1.8.4
golang.org/x/sync v0.3.0
gopkg.in/yaml.v3 v3.0.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
github.com/mailru/activerecord v1.11.0-b3 h1:mJsp9cGhm+nhKrU4UDs7++kW/uZ1FZDAoJBODCqvEuo=
github.com/mailru/activerecord v1.11.0-b3/go.mod h1:Dg3GWaG8VauQujFrWsjlXX6I093sm3xq3945/aur+OM=
github.com/mailru/activerecord v1.11.0 h1:OwxTHyMmrNuSe/7syqYcfKkkEoBapBvrylWZPuL3d/g=
github.com/mailru/activerecord v1.11.0/go.mod h1:Dg3GWaG8VauQujFrWsjlXX6I093sm3xq3945/aur+OM=
github.com/mailru/mapstructure v0.0.0-20230117153631-a4140f9ccc45 h1:x3Zw96Gt6HbEPUWsTbQYj/nfaNv5lWHy6CeEkl8gwqw=
github.com/mailru/mapstructure v0.0.0-20230117153631-a4140f9ccc45/go.mod h1:guLmlFj8yjd0hoz+QWxRU4Gn+VOb2nOQZ4EqRmMHarw=
github.com/moby/patternmatcher v0.5.0 h1:YCZgJOeULcxLw1Q+sVR636pmS7sPEn1Qo2iAN6M7DBo=
Expand Down