Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for centralized dynamic config #24

Merged
merged 7 commits into from
Jun 25, 2016
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 13 additions & 1 deletion example_relay.conf
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,19 @@ max_concurrent: 8
# Environment variable: $RELAY_DYNAMIC_CONFIG_ROOT
# Default: none
# Required: No
# dynamic_config_root: /tmp/bundle_configs
# dynamic_config_root: /tmp/dynamic_configs

# Use managed dynamic configuration
# Requires dynamic_config_root
# Environment variable: $RELAY_MANAGED_DYNAMIC_CONFIG
# Default: false
# managed_dynamic_config: false

# Refresh interval for managed dynamic configuration files.
# Valid time units are s (seconds), m (minutes), and h (hours).
# Requires managed_dynamic_config
# Environment variable: $RELAY_MANAGED_DYNAMIC_CONFIG_INTERVAL
# Default: 5s
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll need to add these to the doc site: http://docs.operable.io/docs/relay-environment-variables

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already done: http://docs.operable.io/v0.8/docs/relay-environment-variables-2

Will flip this page when we release 0.9.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


# Log level
# Environment variable: $RELAY_LOG_LEVEL
Expand Down
2 changes: 2 additions & 0 deletions relay/bus/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ type ConnectionOptions struct {
SSLEnabled bool
SSLCertPath string
EventsHandler EventHandler
AutoReconnect bool
OnDisconnect *DisconnectMessage
}

// Connection is the high-level message bus interface
type Connection interface {
Connect(options ConnectionOptions) error
Disconnect() error
Publish(topic string, payload []byte) error
Subscribe(topic string, handler SubscriptionHandler) error
}
Expand Down
12 changes: 10 additions & 2 deletions relay/bus/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ func (mqc *MQTTConnection) Connect(options ConnectionOptions) error {
return nil
}

// Disconnect is required by the bus.Connection interface
func (mqc *MQTTConnection) Disconnect() error {
mqc.conn.Disconnect(1000)
return nil
}

// Publish is required by the bus.Connection interface
func (mqc *MQTTConnection) Publish(topic string, payload []byte) error {
token := mqc.conn.Publish(topic, 1, false, payload)
Expand Down Expand Up @@ -80,7 +86,7 @@ func (mqc *MQTTConnection) disconnected(cilent *mqtt.Client, err error) {
func (mqc *MQTTConnection) buildMQTTOptions(options ConnectionOptions) *mqtt.ClientOptions {
clientID := fmt.Sprintf("%x", time.Now().UTC().UnixNano())
mqttOpts := mqtt.NewClientOptions()
mqttOpts.SetAutoReconnect(false)
mqttOpts.SetAutoReconnect(options.AutoReconnect)
mqttOpts.SetKeepAlive(time.Duration(60) * time.Second)
mqttOpts.SetPingTimeout(time.Duration(15) * time.Second)
mqttOpts.SetUsername(options.Userid)
Expand All @@ -89,7 +95,9 @@ func (mqc *MQTTConnection) buildMQTTOptions(options ConnectionOptions) *mqtt.Cli
mqttOpts.SetCleanSession(true)
brokerURL := brokerURL(options)
mqttOpts.AddBroker(brokerURL)
mqttOpts.SetConnectionLostHandler(mqc.disconnected)
if !options.AutoReconnect {
mqttOpts.SetConnectionLostHandler(mqc.disconnected)
}
return mqttOpts
}

Expand Down
44 changes: 32 additions & 12 deletions relay/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"os"
"path"
"reflect"
"strconv"
"strings"
Expand All @@ -23,21 +24,25 @@ const (

var validEngineNames = []string{DockerEngine, NativeEngine}
var errorNoExecutionEngines = errors.New("Invalid Relay configuration detected. At least one execution engine must be enabled.")
var errorMissingDynamicConfigRoot = errors.New("Enabling 'managed_dynamic_config' requires setting 'dynamic_config_root'.")
var errorBadDynConfigInterval = errors.New("Error parsing managed_dynamic_config_interval")

// Config is the top level struct for all Relay configuration
type Config struct {
Version int `yaml:"version" valid:"int64,required"`
ID string `yaml:"id" env:"RELAY_ID" valid:"uuid,required"`
MaxConcurrent int `yaml:"max_concurrent" env:"RELAY_MAX_CONCURRENT" valid:"int64,required" default:"16"`
DynamicConfigRoot string `yaml:"dynamic_config_root" env:"RELAY_DYNAMIC_CONFIG_ROOT" valid:"-"`
LogLevel string `yaml:"log_level" env:"RELAY_LOG_LEVEL" valid:"required" default:"info"`
LogJSON bool `yaml:"log_json" env:"RELAY_LOG_JSON" valid:"bool" default:"false"`
LogPath string `yaml:"log_path" env:"RELAY_LOG_PATH" valid:"required" default:"stdout"`
Cog *CogInfo `yaml:"cog" valid:"required"`
EnginesEnabled string `yaml:"enabled_engines" env:"RELAY_ENABLED_ENGINES" valid:"exec_engines" default:"docker,native"`
ParsedEnginesEnabled []string
Docker *DockerInfo `yaml:"docker" valid:"-"`
Execution *ExecutionInfo `yaml:"execution" valid:"-"`
Version int `yaml:"version" valid:"int64,required"`
ID string `yaml:"id" env:"RELAY_ID" valid:"uuid,required"`
MaxConcurrent int `yaml:"max_concurrent" env:"RELAY_MAX_CONCURRENT" valid:"int64,required" default:"16"`
DynamicConfigRoot string `yaml:"dynamic_config_root" env:"RELAY_DYNAMIC_CONFIG_ROOT" valid:"-"`
ManagedDynamicConfig bool `yaml:"managed_dynamic_config" env:"RELAY_MANAGED_DYNAMIC_CONFIG" valid:"-"`
DynamicConfigInterval string `yaml:"managed_dynamic_config_interval" env:"RELAY_MANAGED_DYNAMIC_CONFIG_INTERVAL" default:"5s"`
LogLevel string `yaml:"log_level" env:"RELAY_LOG_LEVEL" valid:"required" default:"info"`
LogJSON bool `yaml:"log_json" env:"RELAY_LOG_JSON" valid:"bool" default:"false"`
LogPath string `yaml:"log_path" env:"RELAY_LOG_PATH" valid:"required" default:"stdout"`
Cog *CogInfo `yaml:"cog" valid:"required"`
EnginesEnabled string `yaml:"enabled_engines" env:"RELAY_ENABLED_ENGINES" valid:"exec_engines" default:"docker,native"`
ParsedEnginesEnabled []string
Docker *DockerInfo `yaml:"docker" valid:"-"`
Execution *ExecutionInfo `yaml:"execution" valid:"-"`
}

// RefreshDuration returns RefreshInterval as a time.Duration
Expand All @@ -49,6 +54,15 @@ func (c *Config) RefreshDuration() time.Duration {
return duration
}

// ManagedDynamicConfigRefreshDuration returns DynamicConfigInterval as a time.Duration
func (c *Config) ManagedDynamicConfigRefreshDuration() time.Duration {
duration, err := time.ParseDuration(c.DynamicConfigInterval)
if err != nil {
panic(errorBadDynConfigInterval)
}
return duration
}

// DockerEnabled returns true when enabled_engines includes "docker"
func (c *Config) DockerEnabled() bool {
return c.engineEnabled(DockerEngine)
Expand All @@ -73,6 +87,12 @@ func (c *Config) Verify() error {
if c.DockerEnabled() == false && c.NativeEnabled() == false {
return errorNoExecutionEngines
}
if c.ManagedDynamicConfig == true && c.DynamicConfigRoot == "" {
return errorMissingDynamicConfigRoot
}
if c.ManagedDynamicConfig == true {
c.DynamicConfigRoot = path.Join(c.DynamicConfigRoot, "managed")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means we can't ever have a bundle named "managed", correct? We should guard against that in Cog, then (not a big deal; we already have a few other protected bundle names, and the rest of the app deals with them fine; we just need to add a new string for it to guard against)

}
return nil
}

Expand Down
180 changes: 180 additions & 0 deletions relay/dynamic_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package relay

import (
"bytes"
"encoding/json"
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/go-yaml/yaml"
"github.com/operable/go-relay/relay/bus"
"github.com/operable/go-relay/relay/messages"
"io/ioutil"
"os"
"path"
"path/filepath"
"strings"
"time"
)

// DynamicConfigUpdater periodically updates bundle dynamic configurations from Cog
type DynamicConfigUpdater struct {
id string
configTopic string
options bus.ConnectionOptions
conn bus.Connection
dynamicConfigRoot string
lastSignature string
control chan interface{}
refreshInterval time.Duration
refreshTimer *time.Timer
}

// NewDynamicConfigUpdater creates a new updater
func NewDynamicConfigUpdater(relayID string, busOpts bus.ConnectionOptions, dynamicConfigRoot string,
refreshInterval time.Duration) *DynamicConfigUpdater {
return &DynamicConfigUpdater{
id: relayID,
configTopic: fmt.Sprintf("bot/relays/%s/dynconfigs", relayID),
options: busOpts,
dynamicConfigRoot: dynamicConfigRoot,
refreshInterval: refreshInterval,
control: make(chan interface{}),
}
}

// Run connects the announcer to Cog and starts its main
// loop in a goroutine
func (dcu *DynamicConfigUpdater) Run() error {
log.Infof("Managed bundle dynamic configs enabled.")
log.Infof("Refreshing bundle dynamic configs every %v.", dcu.refreshInterval)
dcu.options.AutoReconnect = true
dcu.options.EventsHandler = dcu.handleBusEvents
conn := &bus.MQTTConnection{}
if err := conn.Connect(dcu.options); err != nil {
return err
}
dcu.refreshConfigs()
dcu.refreshTimer = time.AfterFunc(dcu.refreshInterval, dcu.refreshConfigs)
go func() {
dcu.loop()
}()
return nil
}

func (dcu *DynamicConfigUpdater) Halt() {
dcu.control <- 1
}

func (dcu *DynamicConfigUpdater) handleBusEvents(conn bus.Connection, event bus.Event) {
if event == bus.ConnectedEvent {
dcu.conn = conn
if err := dcu.conn.Subscribe(dcu.configTopic, dcu.dynConfigUpdate); err != nil {
log.Errorf("Failed to set up dynamic config updater subscriptions: %s.", err)
panic(err)
}
}
}

func (dcu *DynamicConfigUpdater) dynConfigUpdate(conn bus.Connection, topic string, payload []byte) {
defer dcu.refreshTimer.Reset(dcu.refreshInterval)
var envelope messages.DynamicConfigsResponseEnvelope
decoder := json.NewDecoder(bytes.NewReader(payload))
decoder.UseNumber()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One day, we should try and pull all the JSON decoding into a module so we don't have to keep repeating this same song and dance... 😦 . I'm afraid one day we're going to add something new and forget about this little wrinkle.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One day, we should try and pull all the JSON decoding into a module so we don't have to keep repeating this same song and dance... 😦 . I'm afraid one day we're going to add something new and forget about this little wrinkle.

if err := decoder.Decode(&envelope); err != nil {
log.Errorf("Error decoding GetDynamicConfigs result: %s.", err)
}
if envelope.Signature != dcu.lastSignature && envelope.Changed == true {
if dcu.updateConfigs(envelope.Signature, envelope.Configs) {
dcu.lastSignature = envelope.Signature
dcu.cleanOldConfigs()
log.Info("Updated bundle dynamic configs.")

}
}
}

func (dcu *DynamicConfigUpdater) refreshConfigs() {
request := messages.GetDynamicConfigsEnvelope{
GetDynamicConfigs: &messages.GetDynamicConfigs{
RelayID: dcu.id,
ReplyTo: dcu.configTopic,
Signature: dcu.lastSignature,
},
}
raw, _ := json.Marshal(request)
if err := dcu.conn.Publish("bot/relays/info", raw); err != nil {
log.Errorf("Error requesting bundle dynamic configuration update: %s.", err)
dcu.refreshTimer.Reset(dcu.refreshInterval)
}
}

func (dcu *DynamicConfigUpdater) loop() {
<-dcu.control
dcu.refreshTimer.Stop()
dcu.conn.Disconnect()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this loop?

}

func (dcu *DynamicConfigUpdater) updateConfigs(signature string, configs []messages.DynamicConfig) bool {
if !dcu.verifyManagedConfigPath() {
return false
}
updateDir := path.Join(dcu.dynamicConfigRoot, "..", signature)
if err := os.MkdirAll(updateDir, 0755); err != nil {
log.Errorf("Error preparing directory %s for updated bundle dynamic configs: %s.", updateDir, err)
return false
}
for _, config := range configs {
convertedContents, err := yaml.Marshal(config.Config)
if err != nil {
log.Errorf("Error preparing dynamic config for bundle %s: %s.", config.BundleName, err)
return false
}
if err := os.MkdirAll(path.Join(updateDir, config.BundleName), 0755); err != nil {
log.Errorf("Error preparing dynamic config for bundle %s: %s.", config.BundleName, err)
return false
}
configFileName := path.Join(updateDir, config.BundleName, "config.yml")
if err := ioutil.WriteFile(configFileName, convertedContents, 0644); err != nil {
log.Errorf("Error writing dynamic config file to path %s: %s.", configFileName, err)
return false
}
log.Debugf("Wrote bundle dynamic config file %s.", configFileName)
}
// Create and rename new symlink should make config updates atomic
symlinkTarget := path.Join(dcu.dynamicConfigRoot, "..", "new")
if err := os.Symlink(updateDir, symlinkTarget); err != nil {
log.Errorf("Error replacing existing bundle dynamic configs with updated contents: %s.", err)
return false
}
if err := os.Rename(symlinkTarget, dcu.dynamicConfigRoot); err != nil {
log.Errorf("Error replacing existing bundle dynamic configs with updated contents: %s.", err)
return false
}
return true
}

func (dcu *DynamicConfigUpdater) cleanOldConfigs() {
entries, _ := filepath.Glob(path.Join(dcu.dynamicConfigRoot, "..", "*"))
for _, entry := range entries {
if entry == dcu.dynamicConfigRoot || strings.HasSuffix(entry, dcu.lastSignature) {
continue
}
os.RemoveAll(entry)
}
}

func (dcu *DynamicConfigUpdater) verifyManagedConfigPath() bool {
info, err := os.Lstat(dcu.dynamicConfigRoot)
if err != nil {
if strings.HasSuffix(err.Error(), "no such file or directory") {
return true
}
log.Errorf("Error stat-ing dynamic config root directory: %s.", err)
return false
}
if info.Mode()&os.ModeSymlink == 0 {
log.Errorf("Managed dynamic config root directory %s is not a symlink. Update ABORTED.", dcu.dynamicConfigRoot)
return false
}
return true
}
27 changes: 27 additions & 0 deletions relay/messages/directives.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,33 @@ type BundleRef struct {
Version string `json:"version,omitempty"`
}

// GetDynamicConfigsEnvelope is a wrapper around a GetDynamicConfigs directive.
type GetDynamicConfigsEnvelope struct {
GetDynamicConfigs *GetDynamicConfigs `json:"get_dynamic_configs"`
}

// GetDynamicConfigs asks Cog to send the complete list of
// dynamic configs for the bundles assigned to the Relay.
type GetDynamicConfigs struct {
RelayID string `json:"relay_id"`
Signature string `json:"config_hash"`
ReplyTo string `json:"reply_to"`
}

// DynamicConfigsResponseEnvelope is a wrapper around the
// response to a GetDynamicConfigs directive.
type DynamicConfigsResponseEnvelope struct {
Signature string `json:"signature"`
Changed bool `json:"changed"`
Configs []DynamicConfig `json:"configs"`
}

// DynamicConfig is the contents of a dynamic config file for a bundle.
type DynamicConfig struct {
BundleName string `json:"bundle_name"`
Config interface{} `json:"config"`
}

// AnnouncementEnvelope is a wrapper around an Announcement directive.
type AnnouncementEnvelope struct {
Announcement *Announcement `json:"announce" valid:"required"`
Expand Down