Skip to content

Commit

Permalink
beetle configuration client proxy
Browse files Browse the repository at this point in the history
configuration client publishes zmq messages with redis master file
content, client proxy connects to the socket and writes file content.

Needed because kubernetes has deprecated host path mounts.
  • Loading branch information
skaes committed Apr 13, 2023
1 parent 88dcc61 commit bed98c5
Show file tree
Hide file tree
Showing 9 changed files with 286 additions and 31 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ install: $(GO_INSTALL_TARGETS)
uninstall:
cd $(BIN_DIR) && rm -f $(GO_INSTALL_TARGETS) $(SCRIPTS)

GO_MODULES = $(patsubst %,$(GO_SRC)/%, client.go server.go datatypes.go server_state.go failover_state.go redis.go redis_shim.go redis_server_info.go logging.go version.go garbage_collect_keys.go notification_mailer.go config.go delete_keys.go copy_keys.go dump_expiries.go consul/consul.go)
GO_MODULES = $(patsubst %,$(GO_SRC)/%, client.go server.go datatypes.go server_state.go failover_state.go redis.go redis_shim.go redis_server_info.go logging.go version.go garbage_collect_keys.go notification_mailer.go config.go delete_keys.go copy_keys.go dump_expiries.go consul/consul.go client_proxy.go lvcache.go)

beetle: $(GO_SRC)/beetle.go $(GO_MODULES)
cd $(GO_SRC) && $(GO_ENV) go build -o ../$@
Expand Down
6 changes: 5 additions & 1 deletion features/support/test_daemons/redis_configuration_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def stop
def daemon_controller
@daemon_controller ||= DaemonController.new(
:identifier => "Redis configuration test client #{@name}",
:start_command => "./beetle configuration_client -v -d --redis-master-file #{redis_master_file} --id #{@name} --pid-file #{pid_file} --log-file #{log_file}",
:start_command => "./beetle configuration_client -v -d --redis-master-file #{redis_master_file} --id #{@name} --pid-file #{pid_file} --log-file #{log_file} --client-proxy-port #{client_proxy_port}",
:ping_command => lambda{ true },
:pid_file => pid_file,
:log_file => log_file,
Expand All @@ -45,6 +45,10 @@ def daemon_controller
)
end

def client_proxy_port
9700 + @daemon_id
end

def redis_master_file
"#{tmp_path}/redis-master-#{@name}"
end
Expand Down
37 changes: 37 additions & 0 deletions go/beetle.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ var opts struct {
ClientIds string `long:"client-ids" description:"Clients that have to acknowledge on master switch (e.g. client-id1,client-id2)."`
ClientTimeout int `long:"client-timeout" description:"Number of seconds to wait until considering a client dead (or unreachable). Defaults to 10."`
ClientHeartbeatInterval int `long:"client-heartbeat-interval" description:"Number of seconds between client heartbeats. Defaults to 5."`
ClientProxyPort int `long:"client-proxy-port" description:"Port to use for client proxy messages. Defaults to 9700."`
ClientProxyIP string `long:"client-proxy-ip" env:"BEETLE_CONFIGURATION_CLIENT_PROXY_IP" description:"IP address to use for client proxy messages. Defaults to 127.0.0.1."`
ConfigFile string `long:"config-file" description:"Config file path."`
RedisServers string `long:"redis-servers" description:"List of redis failover sets (separated by semicolon or newlines). Each set consists of comma separated host:port pairs, preceded by a system name and a slash. Example: primary/a1:4,a2:5;secondary/b1:3,b2:3"`
RedisMasterFile string `long:"redis-master-file" description:"Path of redis master file."`
Expand Down Expand Up @@ -183,6 +185,38 @@ func (x *CmdRunDumpExpiries) Execute(args []string) error {
})
}

// CmdRunClientProxy is used when the program arguments tell us to run a mailer.
type CmdRunClientProxy struct{}

var cmdRunClientProxy CmdRunClientProxy

// Execute runs a mailer.
func (x *CmdRunClientProxy) Execute(args []string) error {
return RunClientProxy(ClientProxyOptions{
RedisMasterFile: initialConfig.RedisMasterFile,
ClientProxyPort: initialConfig.ClientProxyPort,
ClientProxyIP: initialConfig.ClientProxyIP,
ExitAfterFileReceived: false,
Verbose: opts.Verbose,
})
}

// CmdRunClientProxyInit is used when the program arguments tell us to run a mailer.
type CmdRunClientProxyInit struct{}

var cmdRunClientProxyInit CmdRunClientProxyInit

// Execute runs a mailer.
func (x *CmdRunClientProxyInit) Execute(args []string) error {
return RunClientProxy(ClientProxyOptions{
RedisMasterFile: initialConfig.RedisMasterFile,
ClientProxyPort: initialConfig.ClientProxyPort,
ClientProxyIP: initialConfig.ClientProxyIP,
ExitAfterFileReceived: true,
Verbose: opts.Verbose,
})
}

func init() {
ReportVersionIfRequestedAndExit()
opts.Id = getFQDN()
Expand Down Expand Up @@ -274,6 +308,7 @@ func getProgramParameters() *Config {
ClientIds: opts.ClientIds,
ClientHeartbeat: opts.ClientHeartbeatInterval,
ClientTimeout: opts.ClientTimeout,
ClientProxyPort: opts.ClientProxyPort,
ConfidenceLevel: opts.ConfidenceLevel,
RedisMasterRetries: opts.RedisMasterRetries,
RedisMasterRetryInterval: opts.RedisMasterRetryInterval,
Expand Down Expand Up @@ -363,6 +398,8 @@ func main() {
parser.AddCommand("copy_queue_keys", "copy all keys for a given queue prefix from current master to a given redis server", "", &cmdRunCopyKeys)
parser.AddCommand("dump_expiries", "print all expiry values from redis master", "", &cmdRunDumpExpiries)
parser.AddCommand("notification_mailer", "listen to system notifications and send them via /usr/sbin/sendmail", "", &cmdRunMailer)
parser.AddCommand("configuration_client_proxy", "run redis configuration client proxy", "", &cmdRunClientProxy)
parser.AddCommand("configuration_client_proxy_init", "retrieve redis master file by running proxy until it arrives on socket", "", &cmdRunClientProxyInit)
parser.CommandHandler = cmdHandler

_, err := parser.Parse()
Expand Down
49 changes: 42 additions & 7 deletions go/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync/atomic"
"time"

zmq "github.com/pebbe/zmq4"
"github.com/xing/beetle/consul"
"gopkg.in/gorilla/websocket.v1"
)
Expand Down Expand Up @@ -39,6 +40,7 @@ type ClientState struct {
readerDone chan struct{}
configChanges chan consul.Env
redisSystems map[string]*RedisSystem
lvcacheSocket *zmq.Socket
}

// GetConfig returns the client configuration in a thread safe way.
Expand All @@ -63,15 +65,15 @@ func (s *ClientState) SetConfig(config *Config) *Config {
return oldconfig
}

// ServerUrl constructs the webesocker URL to contact the server.
// ServerUrl constructs the websocket URL to contact the server.
func (s *ClientState) ServerUrl() string {
config := s.GetConfig()
addr := fmt.Sprintf("%s:%d", config.Server, config.Port)
u := url.URL{Scheme: "ws", Host: addr, Path: "/configuration"}
return u.String()
}

// Connect establishes a webscket connection to the server.
// Connect establishes a websocket connection to the server.
func (s *ClientState) Connect() (err error) {
url := s.ServerUrl()
websocket.DefaultDialer.HandshakeTimeout = time.Duration(s.GetConfig().DialTimeout) * time.Second
Expand Down Expand Up @@ -160,9 +162,7 @@ func (s *RedisSystem) NewMaster(server string) {
s.currentMaster = NewRedisShim(server)
}

// UpdateMasterFile writes the known masters information to the redis master file.
func (s *ClientState) UpdateMasterFile() {
path := s.GetConfig().RedisMasterFile
func (s *ClientState) CurrentMasterFileData() string {
systems := make(map[string]string, 0)
for system, rs := range s.redisSystems {
if rs.currentMaster == nil {
Expand All @@ -171,8 +171,39 @@ func (s *ClientState) UpdateMasterFile() {
systems[system] = rs.currentMaster.server
}
}
content := MarshalMasterFileContent(systems)
return MarshalMasterFileContent(systems)
}

// UpdateMasterFile writes the known masters information to the redis master file.
func (s *ClientState) UpdateMasterFile() {
path := s.GetConfig().RedisMasterFile
content := s.CurrentMasterFileData()
WriteRedisMasterFile(path, content)
s.UpdateLVCache(content)
}

// UpdateLVCache send the current master file to the PUB socket.
func (s *ClientState) UpdateLVCache(content string) {
if s.lvcacheSocket != nil {
s.lvcacheSocket.SendMessage("redis-master-file-content", content)
}
}

// Start the LV cache with the initial contents of the redis master file.
func (s *ClientState) StartLVCache(content string) {
initialCacheData := map[string]string{"redis-master-file-content": content}
go RunLVCache(initialCacheData, s.GetConfig().ClientProxyPort)
var err error
s.lvcacheSocket, err = zmq.NewSocket(zmq.PUB)
if err != nil {
logError("could not create internal value cache PUB socket")
return
}
s.lvcacheSocket.SetLinger(0)
err = s.lvcacheSocket.Connect("inproc://lv-cache")
if err != nil {
logError("could not connect internal value cache PUB socket")
}
}

// DetermineInitialMasters tries to read the current masters from disk
Expand All @@ -183,7 +214,9 @@ func (s *ClientState) DetermineInitialMasters() {
s.UpdateMasterFile()
return
}
masters := RedisMastersFromMasterFile(path)
content := ReadRedisMasterFile(path)
s.StartLVCache(content)
masters := UnmarshalMasterFileContent(content)
invalidSystems := make([]string, 0)
for system, server := range masters {
rs := s.RegisterSystem(system)
Expand Down Expand Up @@ -235,7 +268,9 @@ func (s *ClientState) Reconfigure(msg MsgBody) error {
if rs.currentMaster == nil || rs.currentMaster.server != msg.Server {
rs.NewMaster(msg.Server)
s.UpdateMasterFile()
// return nil
}
// s.UpdateLVCache(s.CurrentMasterFileData())
return nil
}

Expand Down
67 changes: 67 additions & 0 deletions go/client_proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package main

import (
"fmt"
"time"

zmq "github.com/pebbe/zmq4"
)

type ClientProxyOptions struct {
RedisMasterFile string
ClientProxyPort int
ClientProxyIP string
ExitAfterFileReceived bool
Verbose bool
}

func RunClientProxy(options ClientProxyOptions) error {
logInfo("configuration client proxy started")
defer logInfo("configuration client proxy terminated")

frontend, err := zmq.NewSocket(zmq.SUB)
if err != nil {
logError("could not create configuration client proxy SUB socket")
return err
}
defer frontend.Close()
frontend.SetLinger(0)
spec := fmt.Sprintf("tcp://%s:%d", options.ClientProxyIP, options.ClientProxyPort)
frontend.Connect(spec)
frontend.SetSubscribe("redis-master-file-content")

poller := zmq.NewPoller()
poller.Add(frontend, zmq.POLLIN)

content := ReadRedisMasterFile(options.RedisMasterFile)

for !interrupted {
polled, err := poller.Poll(1000 * time.Millisecond)
if err != nil {
continue
}
for range polled {
msg, err := frontend.RecvMessage(0)
if err != nil {
return err
}
topic := msg[0]
newContent := msg[1]
if options.Verbose {
fmt.Printf("%s:\n%s\n", topic, newContent)
}
if newContent != content {
err := WriteRedisMasterFile(options.RedisMasterFile, newContent)
if err != nil {
logError("could not update redis master file: %v", err)
continue
}
content = newContent
}
if options.ExitAfterFileReceived {
return nil
}
}
}
return nil
}
19 changes: 19 additions & 0 deletions go/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ type Config struct {
ClientIds string `yaml:"redis_configuration_client_ids"`
ClientHeartbeat int `yaml:"redis_configuration_client_heartbeat"`
ClientTimeout int `yaml:"redis_configuration_client_timeout"`
ClientProxyPort int `yaml:"redis_configuration_client_proxy_port"`
ClientProxyIP string // This is never taken from a yaml file.
RedisMasterRetries int `yaml:"redis_configuration_master_retries"`
RedisMasterRetryInterval int `yaml:"redis_configuration_master_retry_interval"`
RedisMasterFile string `yaml:"redis_server"`
Expand Down Expand Up @@ -99,6 +101,12 @@ func (c *Config) SetDefaults() *Config {
if c.ClientHeartbeat == 0 {
c.ClientHeartbeat = 5
}
if c.ClientProxyPort == 0 {
c.ClientProxyPort = 9700
}
if c.ClientProxyIP == "" {
c.ClientProxyIP = "127.0.0.1"
}
if c.RedisMasterRetries == 0 {
c.RedisMasterRetries = 3
}
Expand Down Expand Up @@ -156,6 +164,12 @@ func (c *Config) Merge(d *Config) *Config {
if c.ClientHeartbeat == 0 {
c.ClientHeartbeat = d.ClientHeartbeat
}
if c.ClientProxyPort == 0 {
c.ClientProxyPort = d.ClientProxyPort
}
if c.ClientProxyIP == "" {
c.ClientProxyIP = d.ClientProxyIP
}
if c.RedisMasterRetries == 0 {
c.RedisMasterRetries = d.RedisMasterRetries
}
Expand Down Expand Up @@ -217,6 +231,11 @@ func configFromConsulEnv(env consul.Env) *Config {
c.ClientTimeout = d
}
}
if v, ok := env["REDIS_CONFIGURATION_CLIENT_PROXY_PORT"]; ok {
if d, err := strconv.Atoi(v); err == nil {
c.ClientProxyPort = d
}
}
if v, ok := env["REDIS_GC_THRESHOLD"]; ok {
if d, err := strconv.Atoi(v); err == nil {
c.GcThreshold = d
Expand Down
11 changes: 6 additions & 5 deletions go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/davecgh/go-spew v1.1.1
github.com/gobuffalo/packr v1.30.1
github.com/jessevdk/go-flags v1.5.0
github.com/pebbe/zmq4 v1.2.9
github.com/pkg/errors v0.9.1
github.com/yookoala/realpath v1.0.0
golang.org/x/text v0.8.0
Expand All @@ -15,12 +16,12 @@ require (
)

require (
github.com/gobuffalo/envy v1.10.1 // indirect
github.com/gobuffalo/packd v1.0.1 // indirect
github.com/gobuffalo/envy v1.10.2 // indirect
github.com/gobuffalo/packd v1.0.2 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/joho/godotenv v1.4.0 // indirect
github.com/joho/godotenv v1.5.1 // indirect
github.com/onsi/ginkgo v1.7.0 // indirect
github.com/onsi/gomega v1.4.3 // indirect
github.com/rogpeppe/go-internal v1.8.1 // indirect
golang.org/x/sys v0.5.0 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
golang.org/x/sys v0.6.0 // indirect
)

0 comments on commit bed98c5

Please sign in to comment.