Skip to content

Commit

Permalink
Move ringpop code from membership to ringpop package
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Mar 1, 2023
1 parent b88a417 commit 60d4e9b
Show file tree
Hide file tree
Showing 11 changed files with 487 additions and 447 deletions.
131 changes: 0 additions & 131 deletions common/membership/rp.go

This file was deleted.

55 changes: 1 addition & 54 deletions common/resource/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
package resource

import (
"context"
"crypto/tls"
"fmt"
"net"
Expand Down Expand Up @@ -127,7 +126,7 @@ var Module = fx.Options(
)

var DefaultOptions = fx.Options(
fx.Provide(MembershipMonitorProvider),
fx.Provide(ringpop.MembershipMonitorProvider),
fx.Provide(RPCFactoryProvider),
fx.Provide(ArchivalMetadataProvider),
fx.Provide(ArchiverProviderProvider),
Expand Down Expand Up @@ -246,58 +245,6 @@ func ClientBeanProvider(
)
}

func MembershipMonitorProvider(
lc fx.Lifecycle,
clusterMetadataManager persistence.ClusterMetadataManager,
logger log.SnTaggedLogger,
cfg *config.Config,
svcName primitives.ServiceName,
tlsConfigProvider encryption.TLSConfigProvider,
dc *dynamicconfig.Collection,
) (membership.Monitor, error) {
servicePortMap := make(map[primitives.ServiceName]int)
for sn, sc := range cfg.Services {
servicePortMap[primitives.ServiceName(sn)] = sc.RPC.GRPCPort
}

rpcConfig := cfg.Services[string(svcName)].RPC

factory, err := ringpop.NewRingpopFactory(
&cfg.Global.Membership,
svcName,
servicePortMap,
logger,
clusterMetadataManager,
&rpcConfig,
tlsConfigProvider,
dc,
)
if err != nil {
return nil, err
}

monitor, err := factory.GetMembershipMonitor()
if err != nil {
return nil, err
}

lc.Append(
fx.Hook{
OnStart: func(context.Context) error {
monitor.Start()
return nil
},
OnStop: func(context.Context) error {
monitor.Stop()
factory.CloseTChannel()
return nil
},
},
)

return monitor, nil
}

func FrontendClientProvider(clientBean client.Bean) workflowservice.WorkflowServiceClient {
frontendRawClient := clientBean.GetFrontendClient()
return frontend.NewRetryableClient(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package membership
// Package ringpop provides a RingPop-based membership monitor
package ringpop

type ringpopHostInfo struct {
addr string // ip:port
Expand Down Expand Up @@ -52,9 +53,9 @@ func (hi *ringpopHostInfo) Identity() string {
}

// Label implements ringpop's Membership interface
func (hi *ringpopHostInfo) Label(key string) (value string, has bool) {
value, has = hi.labels[key]
return
func (hi *ringpopHostInfo) Label(key string) (string, bool) {
value, ok := hi.labels[key]
return value, ok
}

// SetLabel sets the label.
Expand Down
23 changes: 11 additions & 12 deletions common/membership/rp_monitor.go → common/ringpop/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package membership
package ringpop

import (
"context"
Expand All @@ -38,6 +38,7 @@ import (
"go.temporal.io/server/common/convert"
"go.temporal.io/server/common/future"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/membership"
"go.temporal.io/server/common/primitives"

"github.com/pborman/uuid"
Expand Down Expand Up @@ -73,7 +74,7 @@ type ringpopMonitor struct {
initialized *future.FutureImpl[struct{}]
}

var _ Monitor = (*ringpopMonitor)(nil)
var _ membership.Monitor = (*ringpopMonitor)(nil)

// NewRingpopMonitor returns a ringpop-based membership monitor
func NewRingpopMonitor(
Expand All @@ -83,8 +84,7 @@ func NewRingpopMonitor(
logger log.Logger,
metadataManager persistence.ClusterMetadataManager,
broadcastHostPortResolver func() (string, error),
) Monitor {

) membership.Monitor {
lifecycleCtx, lifecycleCancel := context.WithCancel(context.Background())
lifecycleCtx = headers.SetCallerInfo(
lifecycleCtx,
Expand Down Expand Up @@ -297,7 +297,6 @@ func (rpo *ringpopMonitor) fetchCurrentBootstrapHostports() ([]string, error) {
rpo.logger.Info("bootstrap hosts fetched", tag.BootstrapHostPorts(strings.Join(bootstrapHostPorts, ",")))
return bootstrapHostPorts, nil
}

}
}

Expand Down Expand Up @@ -341,7 +340,7 @@ func (rpo *ringpopMonitor) Stop() {
// This is different from service address as we register ringpop handlers on a separate port.
// For this reason we need to lookup the port for the service and replace ringpop port with service port before
// returning HostInfo back.
func (rpo *ringpopMonitor) WhoAmI() (HostInfo, error) {
func (rpo *ringpopMonitor) WhoAmI() (membership.HostInfo, error) {
address, err := rpo.rp.WhoAmI()
if err != nil {
return nil, err
Expand All @@ -353,7 +352,7 @@ func (rpo *ringpopMonitor) WhoAmI() (HostInfo, error) {

servicePort, ok := rpo.services[rpo.serviceName]
if !ok {
return nil, ErrUnknownService
return nil, membership.ErrUnknownService
}

serviceAddress, err := replaceServicePort(address, servicePort)
Expand All @@ -367,23 +366,23 @@ func (rpo *ringpopMonitor) EvictSelf() error {
return rpo.rp.SelfEvict()
}

func (rpo *ringpopMonitor) GetResolver(service primitives.ServiceName) (ServiceResolver, error) {
func (rpo *ringpopMonitor) GetResolver(service primitives.ServiceName) (membership.ServiceResolver, error) {
ring, found := rpo.rings[service]
if !found {
return nil, ErrUnknownService
return nil, membership.ErrUnknownService
}
return ring, nil
}

func (rpo *ringpopMonitor) Lookup(service primitives.ServiceName, key string) (HostInfo, error) {
func (rpo *ringpopMonitor) Lookup(service primitives.ServiceName, key string) (membership.HostInfo, error) {
ring, err := rpo.GetResolver(service)
if err != nil {
return nil, err
}
return ring.Lookup(key)
}

func (rpo *ringpopMonitor) AddListener(service primitives.ServiceName, name string, notifyChannel chan<- *ChangedEvent) error {
func (rpo *ringpopMonitor) AddListener(service primitives.ServiceName, name string, notifyChannel chan<- *membership.ChangedEvent) error {
ring, err := rpo.GetResolver(service)
if err != nil {
return err
Expand Down Expand Up @@ -414,7 +413,7 @@ func (rpo *ringpopMonitor) GetMemberCount(service primitives.ServiceName) (int,
func replaceServicePort(address string, servicePort int) (string, error) {
host, _, err := net.SplitHostPort(address)
if err != nil {
return "", ErrIncorrectAddressFormat
return "", membership.ErrIncorrectAddressFormat
}
return net.JoinHostPort(host, convert.IntToString(servicePort)), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package membership
package ringpop

import (
"testing"
"time"

"golang.org/x/exp/maps"

"go.temporal.io/server/common/membership"
"go.temporal.io/server/common/primitives"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -58,7 +59,7 @@ func (s *RpoSuite) TestRingpopMonitor() {

time.Sleep(time.Second)

listenCh := make(chan *ChangedEvent, 5)
listenCh := make(chan *membership.ChangedEvent, 5)
err := rpm.AddListener(serviceName, "test-listener", listenCh)
s.Nil(err, "AddListener failed")

Expand Down Expand Up @@ -96,18 +97,18 @@ func (s *RpoSuite) TestRingpopMonitor() {
}

func (s *RpoSuite) TestCompareMembers() {
s.testCompareMembers([]string{}, []string{"a"}, []string{"+a"})
s.testCompareMembers([]string{}, []string{"a", "b"}, []string{"+a", "+b"})
s.testCompareMembers([]string{"a"}, []string{"a", "b"}, []string{"+b"})
s.testCompareMembers([]string{}, []string{}, nil)
s.testCompareMembers([]string{"a"}, []string{"a"}, nil)
s.testCompareMembers([]string{"a"}, []string{}, []string{"-a"})
s.testCompareMembers([]string{"a", "b"}, []string{}, []string{"-a", "-b"})
s.testCompareMembers([]string{"a", "b"}, []string{"a", "b"}, nil)
s.testCompareMembers([]string{"a", "b", "c"}, []string{"b", "d", "e"}, []string{"+d", "+e", "-a", "-c"})
s.verifyMemberDiff([]string{}, []string{"a"}, []string{"+a"})
s.verifyMemberDiff([]string{}, []string{"a", "b"}, []string{"+a", "+b"})
s.verifyMemberDiff([]string{"a"}, []string{"a", "b"}, []string{"+b"})
s.verifyMemberDiff([]string{}, []string{}, nil)
s.verifyMemberDiff([]string{"a"}, []string{"a"}, nil)
s.verifyMemberDiff([]string{"a"}, []string{}, []string{"-a"})
s.verifyMemberDiff([]string{"a", "b"}, []string{}, []string{"-a", "-b"})
s.verifyMemberDiff([]string{"a", "b"}, []string{"a", "b"}, nil)
s.verifyMemberDiff([]string{"a", "b", "c"}, []string{"b", "d", "e"}, []string{"+d", "+e", "-a", "-c"})
}

func (s *RpoSuite) testCompareMembers(curr []string, new []string, expectedDiff []string) {
func (s *RpoSuite) verifyMemberDiff(curr []string, new []string, expectedDiff []string) {
resolver := &ringpopServiceResolver{}
currMembers := make(map[string]struct{}, len(curr))
for _, m := range curr {
Expand Down

0 comments on commit 60d4e9b

Please sign in to comment.