Skip to content

Commit

Permalink
Move ringpop code from membership to ringpop package
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Mar 2, 2023
1 parent 84182b7 commit 336e8b6
Show file tree
Hide file tree
Showing 11 changed files with 354 additions and 309 deletions.
138 changes: 73 additions & 65 deletions common/ringpop/ringpop.go → common/membership/ringpop/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ var (
IPV4Localhost = net.IPv4(127, 0, 0, 1)
)

// RingpopFactory implements the RingpopFactory interface
type ringpopFactory struct {
// factory provides a Monitor
type factory struct {
config *config.Membership
channel *tchannel.Channel
serviceName primitives.ServiceName
Expand All @@ -74,9 +74,9 @@ type ringpopFactory struct {
monOnce sync.Once
}

// NewRingpopFactory builds a ringpop factory conforming
// newFactory builds a ringpop factory conforming
// to the underlying configuration
func NewRingpopFactory(
func newFactory(
rpConfig *config.Membership,
serviceName primitives.ServiceName,
servicePortMap map[primitives.ServiceName]int,
Expand All @@ -85,14 +85,14 @@ func NewRingpopFactory(
rpcConfig *config.RPC,
tlsProvider encryption.TLSConfigProvider,
dc *dynamicconfig.Collection,
) (*ringpopFactory, error) {
if err := ValidateRingpopConfig(rpConfig); err != nil {
) (*factory, error) {
if err := ValidateConfig(rpConfig); err != nil {
return nil, err
}
if rpConfig.MaxJoinDuration == 0 {
rpConfig.MaxJoinDuration = defaultMaxJoinDuration
}
return &ringpopFactory{
return &factory{
config: rpConfig,
serviceName: serviceName,
servicePortMap: servicePortMap,
Expand All @@ -104,20 +104,20 @@ func NewRingpopFactory(
}, nil
}

// ValidateRingpopConfig validates that ringpop config is parseable and valid
func ValidateRingpopConfig(rpConfig *config.Membership) error {
if rpConfig.BroadcastAddress != "" && net.ParseIP(rpConfig.BroadcastAddress) == nil {
// ValidateConfig validates that ringpop config is parseable and valid
func ValidateConfig(cfg *config.Membership) error {
if cfg.BroadcastAddress != "" && net.ParseIP(cfg.BroadcastAddress) == nil {
return fmt.Errorf("ringpop config malformed `broadcastAddress` param")
}
return nil
}

// GetMembershipMonitor return a membership monitor
func (factory *ringpopFactory) GetMembershipMonitor() (membership.Monitor, error) {
// getMembershipMonitor return a membership monitor
func (factory *factory) getMembershipMonitor() (membership.Monitor, error) {
return factory.getMembership()
}

func (factory *ringpopFactory) getMembership() (membership.Monitor, error) {
func (factory *factory) getMembership() (membership.Monitor, error) {
var err error
factory.monOnce.Do(func() {
ctx, cancel := context.WithTimeout(context.Background(), persistenceOperationTimeout)
Expand All @@ -128,16 +128,16 @@ func (factory *ringpopFactory) getMembership() (membership.Monitor, error) {
if err != nil {
factory.logger.Fatal("Failed to get current cluster ID", tag.Error(err))
}
ringpopAppName := "temporal"
appName := "temporal"
if currentClusterMetadata.UseClusterIdMembership {
ringpopAppName = fmt.Sprintf("temporal-%s", currentClusterMetadata.GetClusterId())
appName = fmt.Sprintf("temporal-%s", currentClusterMetadata.GetClusterId())
}
if rp, err := ringpop.New(ringpopAppName, ringpop.Channel(factory.getTChannel()), ringpop.AddressResolverFunc(factory.broadcastAddressResolver)); err != nil {
if rp, err := ringpop.New(appName, ringpop.Channel(factory.getTChannel()), ringpop.AddressResolverFunc(factory.broadcastAddressResolver)); err != nil {
factory.logger.Fatal("Failed to get new ringpop", tag.Error(err))
} else {
mrp := membership.NewRingPop(rp, factory.config.MaxJoinDuration, factory.logger)
mrp := newService(rp, factory.config.MaxJoinDuration, factory.logger)

factory.membershipMonitor = membership.NewRingpopMonitor(
factory.membershipMonitor = newMonitor(
factory.serviceName,
factory.servicePortMap,
mrp,
Expand All @@ -151,66 +151,74 @@ func (factory *ringpopFactory) getMembership() (membership.Monitor, error) {
return factory.membershipMonitor, err
}

func (factory *ringpopFactory) broadcastAddressResolver() (string, error) {
return membership.BuildBroadcastHostPort(factory.getTChannel().PeerInfo(), factory.config.BroadcastAddress)
func (factory *factory) broadcastAddressResolver() (string, error) {
return buildBroadcastHostPort(factory.getTChannel().PeerInfo(), factory.config.BroadcastAddress)
}

func (factory *ringpopFactory) getTChannel() *tchannel.Channel {
func (factory *factory) getTChannel() *tchannel.Channel {
factory.chOnce.Do(func() {
ringpopServiceName := fmt.Sprintf("%v-ringpop", factory.serviceName)
ringpopHostAddress := net.JoinHostPort(factory.getListenIP().String(), convert.IntToString(factory.rpcConfig.MembershipPort))
enableTLS := factory.dc.GetBoolProperty(dynamicconfig.EnableRingpopTLS, false)()

var tChannel *tchannel.Channel
if enableTLS {
clientTLSConfig, err := factory.tlsFactory.GetInternodeClientConfig()
if err != nil {
factory.logger.Fatal("Failed to get internode TLS client config", tag.Error(err))
}

serverTLSConfig, err := factory.tlsFactory.GetInternodeServerConfig()
if err != nil {
factory.logger.Fatal("Failed to get internode TLS server config", tag.Error(err))
}

listener, err := tls.Listen("tcp", ringpopHostAddress, serverTLSConfig)
if err != nil {
factory.logger.Fatal("Failed to start ringpop TLS listener", tag.Error(err), tag.Address(ringpopHostAddress))
}

dialer := tls.Dialer{Config: clientTLSConfig}
tChannel, err := tchannel.NewChannel(ringpopServiceName, &tchannel.ChannelOptions{Dialer: dialer.DialContext})
if err != nil {
factory.logger.Fatal("Failed to create ringpop TChannel", tag.Error(err))
}

if err := tChannel.Serve(listener); err != nil {
factory.logger.Fatal("Failed to serve ringpop listener", tag.Error(err), tag.Address(ringpopHostAddress))
}

factory.channel = tChannel
tChannel = factory.getTLSChannel(ringpopHostAddress, ringpopServiceName)
} else {
listener, err := net.Listen("tcp", ringpopHostAddress)
if err != nil {
factory.logger.Fatal("Failed to start ringpop listener", tag.Error(err), tag.Address(ringpopHostAddress))
}

tChannel, err := tchannel.NewChannel(ringpopServiceName, &tchannel.ChannelOptions{})
if err != nil {
factory.logger.Fatal("Failed to create ringpop TChannel", tag.Error(err))
}

if err := tChannel.Serve(listener); err != nil {
factory.logger.Fatal("Failed to serve ringpop listener", tag.Error(err), tag.Address(ringpopHostAddress))
}

factory.channel = tChannel
tChannel = factory.getTCPChannel(ringpopHostAddress, ringpopServiceName)
}
factory.channel = tChannel
})

return factory.channel
}

func (factory *ringpopFactory) getListenIP() net.IP {
func (factory *factory) getTCPChannel(ringpopHostAddress string, ringpopServiceName string) *tchannel.Channel {
listener, err := net.Listen("tcp", ringpopHostAddress)
if err != nil {
factory.logger.Fatal("Failed to start ringpop listener", tag.Error(err), tag.Address(ringpopHostAddress))
}

tChannel, err := tchannel.NewChannel(ringpopServiceName, &tchannel.ChannelOptions{})
if err != nil {
factory.logger.Fatal("Failed to create ringpop TChannel", tag.Error(err))
}

if err := tChannel.Serve(listener); err != nil {
factory.logger.Fatal("Failed to serve ringpop listener", tag.Error(err), tag.Address(ringpopHostAddress))
}
return tChannel
}

func (factory *factory) getTLSChannel(ringpopHostAddress string, ringpopServiceName string) *tchannel.Channel {
clientTLSConfig, err := factory.tlsFactory.GetInternodeClientConfig()
if err != nil {
factory.logger.Fatal("Failed to get internode TLS client config", tag.Error(err))
}

serverTLSConfig, err := factory.tlsFactory.GetInternodeServerConfig()
if err != nil {
factory.logger.Fatal("Failed to get internode TLS server config", tag.Error(err))
}

listener, err := tls.Listen("tcp", ringpopHostAddress, serverTLSConfig)
if err != nil {
factory.logger.Fatal("Failed to start ringpop TLS listener", tag.Error(err), tag.Address(ringpopHostAddress))
}

dialer := tls.Dialer{Config: clientTLSConfig}
tChannel, err := tchannel.NewChannel(ringpopServiceName, &tchannel.ChannelOptions{Dialer: dialer.DialContext})
if err != nil {
factory.logger.Fatal("Failed to create ringpop TChannel", tag.Error(err))
}

if err := tChannel.Serve(listener); err != nil {
factory.logger.Fatal("Failed to serve ringpop listener", tag.Error(err), tag.Address(ringpopHostAddress))
}
return tChannel
}

func (factory *factory) getListenIP() net.IP {
if factory.rpcConfig.BindOnLocalHost && len(factory.rpcConfig.BindOnIP) > 0 {
factory.logger.Fatal("ListenIP failed, bindOnLocalHost and bindOnIP are mutually exclusive")
return nil
Expand All @@ -236,8 +244,8 @@ func (factory *ringpopFactory) getListenIP() net.IP {
return ip
}

// CloseTChannel allows fx Stop hook to close channel
func (factory *ringpopFactory) CloseTChannel() {
// closeTChannel allows fx Stop hook to close channel
func (factory *factory) closeTChannel() {
if factory.channel != nil {
factory.getTChannel().Close()
factory.channel = nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ type (
internodeConfigMutualTLS config.GroupTLS
internodeConfigServerTLS config.GroupTLS

ringpopMutualTLSFactoryA *ringpopFactory
ringpopMutualTLSFactoryB *ringpopFactory
ringpopServerTLSFactoryA *ringpopFactory
ringpopServerTLSFactoryB *ringpopFactory
mutualTLSFactoryA *factory
mutualTLSFactoryB *factory
serverTLSFactoryA *factory
serverTLSFactoryB *factory

insecureFactory *ringpopFactory
insecureFactory *factory
}
)

Expand Down Expand Up @@ -137,19 +137,19 @@ func (s *RingpopSuite) TestHostsMode() {
s.Nil(err)
s.Equal("1.2.3.4", cfg.BroadcastAddress)
s.Equal(time.Second*30, cfg.MaxJoinDuration)
err = ValidateRingpopConfig(&cfg)
err = ValidateConfig(&cfg)
s.Nil(err)
f, err := NewRingpopFactory(&cfg, "test", nil, log.NewNoopLogger(), nil, nil, nil, nil)
f, err := newFactory(&cfg, "test", nil, log.NewNoopLogger(), nil, nil, nil, nil)
s.Nil(err)
s.NotNil(f)
}

func (s *RingpopSuite) TestInvalidConfig() {
var cfg config.Membership
cfg.MaxJoinDuration = time.Minute
s.NoError(ValidateRingpopConfig(&cfg))
s.NoError(ValidateConfig(&cfg))
cfg.BroadcastAddress = "sjhdfskdjhf"
s.Error(ValidateRingpopConfig(&cfg))
s.Error(ValidateConfig(&cfg))
}

func getHostsConfig() string {
Expand All @@ -164,8 +164,8 @@ func newTestRingpopFactory(
rpcConfig *config.RPC,
tlsProvider encryption.TLSConfigProvider,
dc *dynamicconfig.Collection,
) *ringpopFactory {
return &ringpopFactory{
) *factory {
return &factory{
config: nil,
serviceName: serviceName,
servicePortMap: nil,
Expand All @@ -178,18 +178,18 @@ func newTestRingpopFactory(
}

func (s *RingpopSuite) TestRingpopMutualTLS() {
runRingpopTLSTest(s.Suite, s.logger, s.ringpopMutualTLSFactoryA, s.ringpopMutualTLSFactoryB, false)
s.NoError(runRingpopTLSTest(&s.Suite, s.mutualTLSFactoryA, s.mutualTLSFactoryB))
}

func (s *RingpopSuite) TestRingpopServerTLS() {
runRingpopTLSTest(s.Suite, s.logger, s.ringpopServerTLSFactoryA, s.ringpopServerTLSFactoryB, false)
s.NoError(runRingpopTLSTest(&s.Suite, s.serverTLSFactoryA, s.serverTLSFactoryB))
}

func (s *RingpopSuite) TestRingpopInvalidTLS() {
runRingpopTLSTest(s.Suite, s.logger, s.insecureFactory, s.ringpopServerTLSFactoryB, true)
s.Error(runRingpopTLSTest(&s.Suite, s.insecureFactory, s.serverTLSFactoryB))
}

func runRingpopTLSTest(s suite.Suite, logger log.Logger, serverA *ringpopFactory, serverB *ringpopFactory, expectError bool) {
func runRingpopTLSTest(s *suite.Suite, serverA *factory, serverB *factory) error {
// Start two ringpop nodes
chA := serverA.getTChannel()
chB := serverB.getTChannel()
Expand All @@ -198,26 +198,22 @@ func runRingpopTLSTest(s suite.Suite, logger log.Logger, serverA *ringpopFactory

// Ping A through B to make sure B's dialer uses TLS to communicate with A
hostPortA := chA.PeerInfo().HostPort
err := chB.Ping(context.Background(), hostPortA)
if expectError {
s.Error(err)
} else {
s.NoError(err)
if err := chB.Ping(context.Background(), hostPortA); err != nil {
return err
}

// Confirm that A's listener is actually using TLS
clientTLSConfig, err := serverB.tlsFactory.GetInternodeClientConfig()
s.NoError(err)

conn, err := tls.Dial("tcp", hostPortA, clientTLSConfig)
if err != nil {
return err
}
if conn != nil {
_ = conn.Close()
}
if expectError {
s.Error(err)
} else {
s.NoError(err)
}
return nil
}

func (s *RingpopSuite) setupInternodeRingpop() {
Expand All @@ -226,14 +222,14 @@ func (s *RingpopSuite) setupInternodeRingpop() {
s.insecureFactory = newTestRingpopFactory("tester", s.logger, rpcTestCfgDefault, provider, dynamicconfig.NewNoopCollection())
s.NotNil(s.insecureFactory)

ringpopServerTLS := &config.Global{
serverTLS := &config.Global{
Membership: s.membershipConfig,
TLS: config.RootTLS{
Internode: s.internodeConfigServerTLS,
},
}

ringpopMutualTLS := &config.Global{
mutualTLS := &config.Global{
Membership: s.membershipConfig,
TLS: config.RootTLS{
Internode: s.internodeConfigMutualTLS,
Expand All @@ -247,17 +243,17 @@ func (s *RingpopSuite) setupInternodeRingpop() {
dynamicconfig.EnableRingpopTLS: true,
}), s.logger)

provider, err = encryption.NewTLSConfigProviderFromConfig(ringpopMutualTLS.TLS, metrics.NoopMetricsHandler, s.logger, nil)
provider, err = encryption.NewTLSConfigProviderFromConfig(mutualTLS.TLS, metrics.NoopMetricsHandler, s.logger, nil)
s.NoError(err)
s.ringpopMutualTLSFactoryA = newTestRingpopFactory("tester-A", s.logger, rpcCfgA, provider, dc)
s.NotNil(s.ringpopMutualTLSFactoryA)
s.ringpopMutualTLSFactoryB = newTestRingpopFactory("tester-B", s.logger, rpcCfgB, provider, dc)
s.NotNil(s.ringpopMutualTLSFactoryB)
s.mutualTLSFactoryA = newTestRingpopFactory("tester-A", s.logger, rpcCfgA, provider, dc)
s.NotNil(s.mutualTLSFactoryA)
s.mutualTLSFactoryB = newTestRingpopFactory("tester-B", s.logger, rpcCfgB, provider, dc)
s.NotNil(s.mutualTLSFactoryB)

provider, err = encryption.NewTLSConfigProviderFromConfig(ringpopServerTLS.TLS, metrics.NoopMetricsHandler, s.logger, nil)
provider, err = encryption.NewTLSConfigProviderFromConfig(serverTLS.TLS, metrics.NoopMetricsHandler, s.logger, nil)
s.NoError(err)
s.ringpopServerTLSFactoryA = newTestRingpopFactory("tester-A", s.logger, rpcCfgA, provider, dc)
s.NotNil(s.ringpopServerTLSFactoryA)
s.ringpopServerTLSFactoryB = newTestRingpopFactory("tester-B", s.logger, rpcCfgB, provider, dc)
s.NotNil(s.ringpopServerTLSFactoryB)
s.serverTLSFactoryA = newTestRingpopFactory("tester-A", s.logger, rpcCfgA, provider, dc)
s.NotNil(s.serverTLSFactoryA)
s.serverTLSFactoryB = newTestRingpopFactory("tester-B", s.logger, rpcCfgB, provider, dc)
s.NotNil(s.serverTLSFactoryB)
}

0 comments on commit 336e8b6

Please sign in to comment.