Skip to content

Commit

Permalink
Merge pull request #24 from operable/kevsmith/central-dyn-config
Browse files Browse the repository at this point in the history
Added support for centralized dynamic config
  • Loading branch information
Kevin Smith committed Jun 25, 2016
2 parents 50f608d + b54815a commit 628ce09
Show file tree
Hide file tree
Showing 11 changed files with 311 additions and 23 deletions.
14 changes: 13 additions & 1 deletion example_relay.conf
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,19 @@ max_concurrent: 8
# Environment variable: $RELAY_DYNAMIC_CONFIG_ROOT
# Default: none
# Required: No
# dynamic_config_root: /tmp/bundle_configs
# dynamic_config_root: /tmp/dynamic_configs

# Use managed dynamic configuration
# Requires dynamic_config_root
# Environment variable: $RELAY_MANAGED_DYNAMIC_CONFIG
# Default: false
# managed_dynamic_config: false

# Refresh interval for managed dynamic configuration files.
# Valid time units are s (seconds), m (minutes), and h (hours).
# Requires managed_dynamic_config
# Environment variable: $RELAY_MANAGED_DYNAMIC_CONFIG_INTERVAL
# Default: 5s

# Log level
# Environment variable: $RELAY_LOG_LEVEL
Expand Down
2 changes: 2 additions & 0 deletions relay/bus/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ type ConnectionOptions struct {
SSLEnabled bool
SSLCertPath string
EventsHandler EventHandler
AutoReconnect bool
OnDisconnect *DisconnectMessage
}

// Connection is the high-level message bus interface
type Connection interface {
Connect(options ConnectionOptions) error
Disconnect() error
Publish(topic string, payload []byte) error
Subscribe(topic string, handler SubscriptionHandler) error
}
Expand Down
12 changes: 10 additions & 2 deletions relay/bus/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ func (mqc *MQTTConnection) Connect(options ConnectionOptions) error {
return nil
}

// Disconnect is required by the bus.Connection interface
func (mqc *MQTTConnection) Disconnect() error {
mqc.conn.Disconnect(1000)
return nil
}

// Publish is required by the bus.Connection interface
func (mqc *MQTTConnection) Publish(topic string, payload []byte) error {
token := mqc.conn.Publish(topic, 1, false, payload)
Expand Down Expand Up @@ -80,7 +86,7 @@ func (mqc *MQTTConnection) disconnected(cilent *mqtt.Client, err error) {
func (mqc *MQTTConnection) buildMQTTOptions(options ConnectionOptions) *mqtt.ClientOptions {
clientID := fmt.Sprintf("%x", time.Now().UTC().UnixNano())
mqttOpts := mqtt.NewClientOptions()
mqttOpts.SetAutoReconnect(false)
mqttOpts.SetAutoReconnect(options.AutoReconnect)
mqttOpts.SetKeepAlive(time.Duration(60) * time.Second)
mqttOpts.SetPingTimeout(time.Duration(15) * time.Second)
mqttOpts.SetUsername(options.Userid)
Expand All @@ -89,7 +95,9 @@ func (mqc *MQTTConnection) buildMQTTOptions(options ConnectionOptions) *mqtt.Cli
mqttOpts.SetCleanSession(true)
brokerURL := brokerURL(options)
mqttOpts.AddBroker(brokerURL)
mqttOpts.SetConnectionLostHandler(mqc.disconnected)
if !options.AutoReconnect {
mqttOpts.SetConnectionLostHandler(mqc.disconnected)
}
return mqttOpts
}

Expand Down
49 changes: 37 additions & 12 deletions relay/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"os"
"path"
"reflect"
"strconv"
"strings"
Expand All @@ -21,23 +22,32 @@ const (
NativeEngine = "native"
)

// Managed dynamic config symlink name
const (
ManagedDynamicConfigLink = "__managed__"
)

var validEngineNames = []string{DockerEngine, NativeEngine}
var errorNoExecutionEngines = errors.New("Invalid Relay configuration detected. At least one execution engine must be enabled.")
var errorMissingDynamicConfigRoot = errors.New("Enabling 'managed_dynamic_config' requires setting 'dynamic_config_root'.")
var errorBadDynConfigInterval = errors.New("Error parsing managed_dynamic_config_interval")

// Config is the top level struct for all Relay configuration
type Config struct {
Version int `yaml:"version" valid:"int64,required"`
ID string `yaml:"id" env:"RELAY_ID" valid:"uuid,required"`
MaxConcurrent int `yaml:"max_concurrent" env:"RELAY_MAX_CONCURRENT" valid:"int64,required" default:"16"`
DynamicConfigRoot string `yaml:"dynamic_config_root" env:"RELAY_DYNAMIC_CONFIG_ROOT" valid:"-"`
LogLevel string `yaml:"log_level" env:"RELAY_LOG_LEVEL" valid:"required" default:"info"`
LogJSON bool `yaml:"log_json" env:"RELAY_LOG_JSON" valid:"bool" default:"false"`
LogPath string `yaml:"log_path" env:"RELAY_LOG_PATH" valid:"required" default:"stdout"`
Cog *CogInfo `yaml:"cog" valid:"required"`
EnginesEnabled string `yaml:"enabled_engines" env:"RELAY_ENABLED_ENGINES" valid:"exec_engines" default:"docker,native"`
ParsedEnginesEnabled []string
Docker *DockerInfo `yaml:"docker" valid:"-"`
Execution *ExecutionInfo `yaml:"execution" valid:"-"`
Version int `yaml:"version" valid:"int64,required"`
ID string `yaml:"id" env:"RELAY_ID" valid:"uuid,required"`
MaxConcurrent int `yaml:"max_concurrent" env:"RELAY_MAX_CONCURRENT" valid:"int64,required" default:"16"`
DynamicConfigRoot string `yaml:"dynamic_config_root" env:"RELAY_DYNAMIC_CONFIG_ROOT" valid:"-"`
ManagedDynamicConfig bool `yaml:"managed_dynamic_config" env:"RELAY_MANAGED_DYNAMIC_CONFIG" valid:"-"`
DynamicConfigInterval string `yaml:"managed_dynamic_config_interval" env:"RELAY_MANAGED_DYNAMIC_CONFIG_INTERVAL" default:"5s"`
LogLevel string `yaml:"log_level" env:"RELAY_LOG_LEVEL" valid:"required" default:"info"`
LogJSON bool `yaml:"log_json" env:"RELAY_LOG_JSON" valid:"bool" default:"false"`
LogPath string `yaml:"log_path" env:"RELAY_LOG_PATH" valid:"required" default:"stdout"`
Cog *CogInfo `yaml:"cog" valid:"required"`
EnginesEnabled string `yaml:"enabled_engines" env:"RELAY_ENABLED_ENGINES" valid:"exec_engines" default:"docker,native"`
ParsedEnginesEnabled []string
Docker *DockerInfo `yaml:"docker" valid:"-"`
Execution *ExecutionInfo `yaml:"execution" valid:"-"`
}

// RefreshDuration returns RefreshInterval as a time.Duration
Expand All @@ -49,6 +59,15 @@ func (c *Config) RefreshDuration() time.Duration {
return duration
}

// ManagedDynamicConfigRefreshDuration returns DynamicConfigInterval as a time.Duration
func (c *Config) ManagedDynamicConfigRefreshDuration() time.Duration {
duration, err := time.ParseDuration(c.DynamicConfigInterval)
if err != nil {
panic(errorBadDynConfigInterval)
}
return duration
}

// DockerEnabled returns true when enabled_engines includes "docker"
func (c *Config) DockerEnabled() bool {
return c.engineEnabled(DockerEngine)
Expand All @@ -73,6 +92,12 @@ func (c *Config) Verify() error {
if c.DockerEnabled() == false && c.NativeEnabled() == false {
return errorNoExecutionEngines
}
if c.ManagedDynamicConfig == true && c.DynamicConfigRoot == "" {
return errorMissingDynamicConfigRoot
}
if c.ManagedDynamicConfig == true {
c.DynamicConfigRoot = path.Join(c.DynamicConfigRoot, ManagedDynamicConfigLink)
}
return nil
}

Expand Down
186 changes: 186 additions & 0 deletions relay/dynamic_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package relay

import (
"bytes"
"encoding/json"
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/go-yaml/yaml"
"github.com/operable/go-relay/relay/bus"
"github.com/operable/go-relay/relay/messages"
"github.com/operable/go-relay/relay/util"
"io/ioutil"
"os"
"path"
"path/filepath"
"strings"
"time"
)

// DynamicConfigUpdater periodically updates bundle dynamic configurations from Cog
type DynamicConfigUpdater struct {
id string
configTopic string
options bus.ConnectionOptions
conn bus.Connection
dynamicConfigRoot string
lastSignature string
control chan interface{}
refreshInterval time.Duration
refreshTimer *time.Timer
}

// NewDynamicConfigUpdater creates a new updater
func NewDynamicConfigUpdater(relayID string, busOpts bus.ConnectionOptions, dynamicConfigRoot string,
refreshInterval time.Duration) *DynamicConfigUpdater {
return &DynamicConfigUpdater{
id: relayID,
configTopic: fmt.Sprintf("bot/relays/%s/dynconfigs", relayID),
options: busOpts,
dynamicConfigRoot: dynamicConfigRoot,
refreshInterval: refreshInterval,
control: make(chan interface{}),
}
}

// Run connects the announcer to Cog and starts its main
// loop in a goroutine
func (dcu *DynamicConfigUpdater) Run() error {
log.Infof("Managed bundle dynamic configs enabled.")
log.Infof("Refreshing bundle dynamic configs every %v.", dcu.refreshInterval)
dcu.options.AutoReconnect = true
dcu.options.EventsHandler = dcu.handleBusEvents
conn := &bus.MQTTConnection{}
if err := conn.Connect(dcu.options); err != nil {
return err
}
dcu.refreshConfigs()
dcu.refreshTimer = time.AfterFunc(dcu.refreshInterval, dcu.refreshConfigs)
go func() {
dcu.wait()
}()
return nil
}

// Halt tells the DCU to stop.
func (dcu *DynamicConfigUpdater) Halt() {
dcu.control <- 1
}

func (dcu *DynamicConfigUpdater) handleBusEvents(conn bus.Connection, event bus.Event) {
if event == bus.ConnectedEvent {
dcu.conn = conn
if err := dcu.conn.Subscribe(dcu.configTopic, dcu.dynConfigUpdate); err != nil {
log.Errorf("Failed to set up dynamic config updater subscriptions: %s.", err)
panic(err)
}
}
}

func (dcu *DynamicConfigUpdater) dynConfigUpdate(conn bus.Connection, topic string, payload []byte) {
defer dcu.refreshTimer.Reset(dcu.refreshInterval)
var envelope messages.DynamicConfigsResponseEnvelope
decoder := util.NewJSONDecoder(bytes.NewReader(payload))
if err := decoder.Decode(&envelope); err != nil {
log.Errorf("Error decoding GetDynamicConfigs result: %s.", err)
}
if envelope.Signature != dcu.lastSignature && envelope.Changed == true {
if dcu.updateConfigs(envelope.Signature, envelope.Configs) {
dcu.lastSignature = envelope.Signature
dcu.cleanOldConfigs()
log.Info("Updated bundle dynamic configs.")

}
}
}

func (dcu *DynamicConfigUpdater) refreshConfigs() {
request := messages.GetDynamicConfigsEnvelope{
GetDynamicConfigs: &messages.GetDynamicConfigs{
RelayID: dcu.id,
ReplyTo: dcu.configTopic,
Signature: dcu.lastSignature,
},
}
raw, _ := json.Marshal(request)
if err := dcu.conn.Publish("bot/relays/info", raw); err != nil {
log.Errorf("Error requesting bundle dynamic configuration update: %s.", err)
dcu.refreshTimer.Reset(dcu.refreshInterval)
}
}

func (dcu *DynamicConfigUpdater) wait() {
<-dcu.control
dcu.refreshTimer.Stop()
dcu.conn.Disconnect()
}

// updateConfigs does its best to make dynamic config updates atomic. It does this by
// writing a new set of configs to a separate directory, creating a symlink pointing to
// the new config dir, and finally renaming the symlink to `dynamicConfigRoot`/config.ManagedDynamicConfigLink`.
// The process takes advantage of the atomic nature of renames on most sane OSs and
// filesystems.
func (dcu *DynamicConfigUpdater) updateConfigs(signature string, configs []messages.DynamicConfig) bool {
if !dcu.verifyManagedConfigPath() {
return false
}
updateDir := path.Join(dcu.dynamicConfigRoot, "..", signature)
if err := os.MkdirAll(updateDir, 0755); err != nil {
log.Errorf("Error preparing directory %s for updated bundle dynamic configs: %s.", updateDir, err)
return false
}
for _, config := range configs {
convertedContents, err := yaml.Marshal(config.Config)
if err != nil {
log.Errorf("Error preparing dynamic config for bundle %s: %s.", config.BundleName, err)
return false
}
if err := os.MkdirAll(path.Join(updateDir, config.BundleName), 0755); err != nil {
log.Errorf("Error preparing dynamic config for bundle %s: %s.", config.BundleName, err)
return false
}
configFileName := path.Join(updateDir, config.BundleName, "config.yml")
if err := ioutil.WriteFile(configFileName, convertedContents, 0644); err != nil {
log.Errorf("Error writing dynamic config file to path %s: %s.", configFileName, err)
return false
}
log.Debugf("Wrote bundle dynamic config file %s.", configFileName)
}
// Create and rename new symlink should make config updates atomic
symlinkTarget := path.Join(dcu.dynamicConfigRoot, "..", "new")
if err := os.Symlink(updateDir, symlinkTarget); err != nil {
log.Errorf("Error replacing existing bundle dynamic configs with updated contents: %s.", err)
return false
}
if err := os.Rename(symlinkTarget, dcu.dynamicConfigRoot); err != nil {
log.Errorf("Error replacing existing bundle dynamic configs with updated contents: %s.", err)
return false
}
return true
}

func (dcu *DynamicConfigUpdater) cleanOldConfigs() {
entries, _ := filepath.Glob(path.Join(dcu.dynamicConfigRoot, "..", "*"))
for _, entry := range entries {
if entry == dcu.dynamicConfigRoot || strings.HasSuffix(entry, dcu.lastSignature) {
continue
}
os.RemoveAll(entry)
}
}

func (dcu *DynamicConfigUpdater) verifyManagedConfigPath() bool {
info, err := os.Lstat(dcu.dynamicConfigRoot)
if err != nil {
if strings.HasSuffix(err.Error(), "no such file or directory") {
return true
}
log.Errorf("Error stat-ing dynamic config root directory: %s.", err)
return false
}
if info.Mode()&os.ModeSymlink == 0 {
log.Errorf("Managed dynamic config root directory %s is not a symlink. Update ABORTED.", dcu.dynamicConfigRoot)
return false
}
return true
}
27 changes: 27 additions & 0 deletions relay/messages/directives.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,33 @@ type BundleRef struct {
Version string `json:"version,omitempty"`
}

// GetDynamicConfigsEnvelope is a wrapper around a GetDynamicConfigs directive.
type GetDynamicConfigsEnvelope struct {
GetDynamicConfigs *GetDynamicConfigs `json:"get_dynamic_configs"`
}

// GetDynamicConfigs asks Cog to send the complete list of
// dynamic configs for the bundles assigned to the Relay.
type GetDynamicConfigs struct {
RelayID string `json:"relay_id"`
Signature string `json:"config_hash"`
ReplyTo string `json:"reply_to"`
}

// DynamicConfigsResponseEnvelope is a wrapper around the
// response to a GetDynamicConfigs directive.
type DynamicConfigsResponseEnvelope struct {
Signature string `json:"signature"`
Changed bool `json:"changed"`
Configs []DynamicConfig `json:"configs"`
}

// DynamicConfig is the contents of a dynamic config file for a bundle.
type DynamicConfig struct {
BundleName string `json:"bundle_name"`
Config interface{} `json:"config"`
}

// AnnouncementEnvelope is a wrapper around an Announcement directive.
type AnnouncementEnvelope struct {
Announcement *Announcement `json:"announce" valid:"required"`
Expand Down

0 comments on commit 628ce09

Please sign in to comment.