Skip to content

Commit

Permalink
Merge pull request #307 from VladoLavor/etcd-post-init
Browse files Browse the repository at this point in the history
allow delayed start for the etcd
  • Loading branch information
VladoLavor committed Jul 19, 2018
2 parents c3a3e85 + 4923f0c commit d434249
Show file tree
Hide file tree
Showing 9 changed files with 205 additions and 43 deletions.
23 changes: 20 additions & 3 deletions datasync/kvdbsync/plugin_impl_dbsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package kvdbsync

import (
"errors"
"fmt"

"github.com/golang/protobuf/proto"
"github.com/ligato/cn-infra/datasync"
Expand Down Expand Up @@ -80,7 +80,24 @@ func (plugin *Plugin) Init() error {
// The order of plugins in flavor is not important to resync
// since Watch() is called in Plugin.Init() and Resync.Register()
// is called in Plugin.AfterInit().
//
// If provided connection is not ready (not connected), AfterInit starts new goroutine in order to
// 'wait' for the connection. After that, the new transport watcher is built as usual.
func (plugin *Plugin) AfterInit() error {
if plugin.KvPlugin == nil || plugin.KvPlugin.Disabled() {
return nil
}
// Define function executed on kv plugin connection
plugin.KvPlugin.OnConnect(func() error {
if err := plugin.initKvPlugin(); err != nil {
return fmt.Errorf("init KV plugin %v failed: %v", plugin.KvPlugin.GetPluginName(), err)
}
return nil
})
return nil
}

func (plugin *Plugin) initKvPlugin() error {
if plugin.KvPlugin != nil && !plugin.KvPlugin.Disabled() {
db := plugin.KvPlugin.NewBroker(plugin.ServiceLabel.GetAgentPrefix())
dbW := plugin.KvPlugin.NewWatcher(plugin.ServiceLabel.GetAgentPrefix())
Expand Down Expand Up @@ -125,7 +142,7 @@ func (plugin *Plugin) Put(key string, data proto.Message, opts ...datasync.PutOp
return plugin.adapter.db.Put(key, data, opts...)
}

return errors.New("Transport adapter is not ready yet. (Probably called before AfterInit)")
return fmt.Errorf("transport adapter is not ready yet. (Probably called before AfterInit)")
}

// Delete propagates this call to a particular kvdb.Plugin unless the kvdb.Plugin is Disabled().
Expand All @@ -140,7 +157,7 @@ func (plugin *Plugin) Delete(key string, opts ...datasync.DelOption) (existed bo
return plugin.adapter.db.Delete(key, opts...)
}

return false, errors.New("Transport adapter is not ready yet. (Probably called before AfterInit)")
return false, fmt.Errorf("transport adapter is not ready yet. (Probably called before AfterInit)")
}

// Close resources.
Expand Down
12 changes: 12 additions & 0 deletions db/keyval/consul/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,18 @@ func (plugin *Plugin) Init() (err error) {
return nil
}

// OnConnect executes callback from datasync
func (plugin *Plugin) OnConnect(callback func() error) {
if err := callback(); err != nil {
plugin.Log.Error(err)
}
}

// GetPluginName returns name of the plugin
func (plugin *Plugin) GetPluginName() core.PluginName {
return plugin.PluginName
}

// Close closes Consul plugin.
func (plugin *Plugin) Close() error {
return nil
Expand Down
2 changes: 1 addition & 1 deletion db/keyval/etcd/bytes_broker_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func NewEtcdConnectionWithBytes(config ClientConfig, log logging.Logger) (*Bytes
start := time.Now()
etcdClient, err := clientv3.New(*config.Config)
if err != nil {
log.Errorf("Failed to connect to Etcd etcd(s) %v, Error: '%s'", config.Endpoints, err)
log.Debugf("Unable to connect to ETCD %v, Error: '%s'", config.Endpoints, err)
return nil, err
}
etcdConnectTime := time.Since(start)
Expand Down
2 changes: 2 additions & 0 deletions db/keyval/etcd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type Config struct {
CAfile string `json:"ca-file"`
AutoCompact time.Duration `json:"auto-compact"`
ReconnectResync bool `json:"resync-after-reconnect"`
AllowDelayedStart bool `json:"allow-delayed-start"`
ReconnectInterval time.Duration `json:"reconnect-interval"`
}

// ClientConfig extends clientv3.Config with configuration options introduced
Expand Down
9 changes: 8 additions & 1 deletion db/keyval/etcd/etcd.conf
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,11 @@ auto-compact: 0

# If ETCD server lost connection, the flag allows to automatically run the whole resync procedure
# for all registered plugins if it reconnects
resync-after-reconnect: false
resync-after-reconnect: false

# Allow to start without connected ETCD database. Plugin will try to connect and if successful, overall resync will
# be called
allow-delayed-start: false

# Interval between ETCD reconnect attempts in ns. Default value is 2 seconds. Has no use if `delayed start` is turned off
reconnect-interval: 2000000000
174 changes: 137 additions & 37 deletions db/keyval/etcd/plugin_impl_etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"fmt"
"time"

"sync"

"github.com/ligato/cn-infra/core"
"github.com/ligato/cn-infra/datasync/resync"
"github.com/ligato/cn-infra/db/keyval"
Expand All @@ -30,20 +32,33 @@ import (
const (
// healthCheckProbeKey is a key used to probe Etcd state
healthCheckProbeKey = "/probe-etcd-connection"
// ETCD reconnect interval
defaultReconnectInterval = 2 * time.Second
)

// Plugin implements etcd plugin.
type Plugin struct {
Deps
sync.Mutex

// Plugin is disabled if there is no config file available
disabled bool
// Set if connected to ETCD db
connected bool
// ETCD connection encapsulation
connection *BytesConnectionEtcd
// Read/Write proto modelled data
protoWrapper *kvproto.ProtoWrapper

// plugin config
config *Config

// List of callback functions, used in case ETCD is not connected immediately. All plugins using
// ETCD as dependency add their own function if cluster is not reachable. After connection, all
// functions are executed.
onConnection []func() error

autoCompactDone chan struct{}
reconnectResync bool
lastConnErr error
}

Expand All @@ -65,52 +80,38 @@ type Deps struct {
// the connection cannot be established.
func (plugin *Plugin) Init() (err error) {
// Read ETCD configuration file. Returns error if does not exists.
etcdCfg, err := plugin.getEtcdConfig()
plugin.config, err = plugin.getEtcdConfig()
if err != nil || plugin.disabled {
return err
}
// Transforms .yaml config to ETCD client configuration
etcdClientCfg, err := ConfigToClient(&etcdCfg)
etcdClientCfg, err := ConfigToClient(plugin.config)
if err != nil {
return err
}
// Uses config file to establish connection with the database
plugin.connection, err = NewEtcdConnectionWithBytes(*etcdClientCfg, plugin.Log)
if err != nil {
plugin.Log.Errorf("Err: %v", err)
return err
}
plugin.reconnectResync = etcdCfg.ReconnectResync
if etcdCfg.AutoCompact > 0 {
if etcdCfg.AutoCompact < time.Duration(time.Minute*60) {
plugin.Log.Warnf("Auto compact option for ETCD is set to less than 60 minutes!")
}
plugin.startPeriodicAutoCompact(etcdCfg.AutoCompact)
}
plugin.protoWrapper = kvproto.NewProtoWrapperWithSerializer(plugin.connection, &keyval.SerializerJSON{})

// Register for providing status reports (polling mode).
if plugin.StatusCheck != nil {
plugin.StatusCheck.Register(core.PluginName(plugin.PluginName), func() (statuscheck.PluginState, error) {
_, _, _, err := plugin.connection.GetValue(healthCheckProbeKey)
if err == nil {
if plugin.reconnectResync && plugin.lastConnErr != nil {
plugin.Log.Info("Starting resync after ETCD reconnect")
if plugin.Resync != nil {
plugin.Resync.DoResync()
plugin.lastConnErr = nil
} else {
plugin.Log.Warn("Expected resync after ETCD reconnect could not start beacuse of missing Resync plugin")
}
}
return statuscheck.OK, nil
}
plugin.lastConnErr = err
return statuscheck.Error, err
})
plugin.StatusCheck.Register(plugin.PluginName, plugin.statusCheckProbe)
} else {
plugin.Log.Warnf("Unable to start status check for etcd")
}
if err != nil && plugin.config.AllowDelayedStart {
// If the connection cannot be established during init, keep trying in another goroutine (if allowed) and
// end the init
go plugin.etcdReconnectionLoop(etcdClientCfg)
return nil
} else if err != nil {
// If delayed start is not allowed, return error
return fmt.Errorf("error connecting to ETCD: %v", err)
}

// If successful, configure and return
plugin.configureConnection()

// Mark plugin as connected at this point
plugin.connected = true

return nil
}
Expand All @@ -137,6 +138,25 @@ func (plugin *Plugin) Disabled() (disabled bool) {
return plugin.disabled
}

// OnConnect executes callback if plugin is connected, or gathers functions from all plugin with ETCD as dependency
func (plugin *Plugin) OnConnect(callback func() error) {
plugin.Lock()
defer plugin.Unlock()

if plugin.connected {
if err := callback(); err != nil {
plugin.Log.Error(err)
}
} else {
plugin.onConnection = append(plugin.onConnection, callback)
}
}

// GetPluginName returns name of the plugin
func (plugin *Plugin) GetPluginName() core.PluginName {
return plugin.PluginName
}

// PutIfNotExists puts given key-value pair into etcd if there is no value set for the key. If the put was successful
// succeeded is true. If the key already exists succeeded is false and the value for the key is untouched.
func (plugin *Plugin) PutIfNotExists(key string, value []byte) (succeeded bool, err error) {
Expand All @@ -154,18 +174,98 @@ func (plugin *Plugin) Compact(rev ...int64) (toRev int64, err error) {
return 0, fmt.Errorf("connection is not established")
}

func (plugin *Plugin) getEtcdConfig() (Config, error) {
// Method starts loop which attempt to connect to the ETCD. If successful, send signal callback with resync,
// which will be started when datasync confirms successful registration
func (plugin *Plugin) etcdReconnectionLoop(clientCfg *ClientConfig) {
var err error
// Set reconnect interval
interval := plugin.config.ReconnectInterval
if interval == 0 {
interval = defaultReconnectInterval
}
plugin.Log.Infof("ETCD server %s not reachable in init phase. Agent will continue to try to connect every %d second(s)",
plugin.config.Endpoints, interval)
for {
time.Sleep(interval)

plugin.Log.Infof("Connecting to ETCD %v ...", plugin.config.Endpoints)
plugin.connection, err = NewEtcdConnectionWithBytes(*clientCfg, plugin.Log)
if err != nil {
continue
}
plugin.setupPostInitConnection()
return
}
}

func (plugin *Plugin) setupPostInitConnection() {
plugin.Log.Infof("ETCD server %s connected", plugin.config.Endpoints)

plugin.Lock()
defer plugin.Unlock()

// Configure connection and set as connected
plugin.configureConnection()
plugin.connected = true
// Execute callback functions (if any)
for _, callback := range plugin.onConnection {
if err := callback(); err != nil {
plugin.Log.Error(err)
}
}
// Call resync if any callback was executed. Otherwise there is nothing to resync
if plugin.Resync != nil && len(plugin.onConnection) > 0 {
plugin.Resync.DoResync()
}
plugin.Log.Debugf("Etcd reconnection loop ended")
}

// If ETCD is connected, complete all other procedures
func (plugin *Plugin) configureConnection() {
if plugin.config.AutoCompact > 0 {
if plugin.config.AutoCompact < time.Duration(time.Minute*60) {
plugin.Log.Warnf("Auto compact option for ETCD is set to less than 60 minutes!")
}
plugin.startPeriodicAutoCompact(plugin.config.AutoCompact)
}
plugin.protoWrapper = kvproto.NewProtoWrapperWithSerializer(plugin.connection, &keyval.SerializerJSON{})
}

// ETCD status check probe function
func (plugin *Plugin) statusCheckProbe() (statuscheck.PluginState, error) {
if plugin.connection == nil {
plugin.connected = false
return statuscheck.Error, fmt.Errorf("no ETCD connection available")
}
if _, _, _, err := plugin.connection.GetValue(healthCheckProbeKey); err != nil {
plugin.lastConnErr = err
plugin.connected = false
return statuscheck.Error, err
}
if plugin.config.ReconnectResync && plugin.lastConnErr != nil {
if plugin.Resync != nil {
plugin.Resync.DoResync()
plugin.lastConnErr = nil
} else {
plugin.Log.Warn("Expected resync after ETCD reconnect could not start beacuse of missing Resync plugin")
}
}
plugin.connected = true
return statuscheck.OK, nil
}

func (plugin *Plugin) getEtcdConfig() (*Config, error) {
var etcdCfg Config
found, err := plugin.PluginConfig.GetValue(&etcdCfg)
if err != nil {
return etcdCfg, err
return nil, err
}
if !found {
plugin.Log.Info("ETCD config not found, skip loading this plugin")
plugin.disabled = true
return etcdCfg, nil
return &etcdCfg, nil
}
return etcdCfg, nil
return &etcdCfg, nil
}

func (plugin *Plugin) startPeriodicAutoCompact(period time.Duration) {
Expand Down
8 changes: 8 additions & 0 deletions db/keyval/plugin_api_keyval.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package keyval

import "github.com/ligato/cn-infra/core"

// Root denotes that no prefix is prepended to the keys.
const Root = ""

Expand All @@ -32,6 +34,12 @@ type KvProtoPlugin interface {
// Disabled returns true if there was no configuration and therefore agent
// started without connectivity to a particular data store.
Disabled() bool
// OnConnect executes datasync callback if KV plugin is connected. If not, it gathers
// these functions from all plugins using the specific KV plugin as dependency and
// if delayed start is allowed, callbacks are executed after successful connection.
OnConnect(func() error)
// Returns plugin's name
GetPluginName() core.PluginName
}

// KvBytesPlugin provides unifying interface for different key-value datastore
Expand Down

0 comments on commit d434249

Please sign in to comment.