Skip to content

Commit

Permalink
Merge pull request #260 from VladoLavor/dev
Browse files Browse the repository at this point in the history
removed 'skeleton' plugin
  • Loading branch information
VladoLavor committed Apr 18, 2018
2 parents 0e2a4b0 + 49f6319 commit e5bff3c
Show file tree
Hide file tree
Showing 18 changed files with 165 additions and 332 deletions.
5 changes: 0 additions & 5 deletions db/keyval/etcdv3/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,6 @@

The Etcd plugin provides access to an etcd key-value data store.

## API

Implements API described in the [skeleton](../plugin):
The plugin is documented in more detail in the [doc.go](doc.go) file.

## Configuration

- Location of the Etcd configuration file can be defined either by the
Expand Down
6 changes: 3 additions & 3 deletions db/keyval/etcdv3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

// Config represents a part of the etcd configuration that can be
// loaded from a file. Usually, the Config is next transformed into
// ClientConfig using ConfigToClientv3() function for use with the coreos/etcd
// ClientConfig using ConfigToClient() function for use with the coreos/etcd
// package.
type Config struct {
Endpoints []string `json:"endpoints"`
Expand Down Expand Up @@ -60,7 +60,7 @@ const (
defaultOpTimeout = 3 * time.Second
)

// ConfigToClientv3 transforms yaml configuration <yc> modelled by Config
// ConfigToClient transforms yaml configuration <yc> modelled by Config
// into ClientConfig, which is ready for use with the underlying coreos/etcd
// package.
// If the etcd endpoint addresses are not specified in the configuration,
Expand All @@ -69,7 +69,7 @@ const (
// endpoint location, a default address "127.0.0.1:2379" is assumed.
// The function may return error only if TLS connection is selected and the
// CA or client certificate is not accessible/valid.
func ConfigToClientv3(yc *Config) (*ClientConfig, error) {
func ConfigToClient(yc *Config) (*ClientConfig, error) {
dialTimeout := defaultDialTimeout
if yc.DialTimeout != 0 {
dialTimeout = yc.DialTimeout
Expand Down
2 changes: 1 addition & 1 deletion db/keyval/etcdv3/etcdv3_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func TestWatchDeleteResp(t *testing.T) {
func TestConfig(t *testing.T) {
RegisterTestingT(t)
cfg := &Config{DialTimeout: time.Second, OpTimeout: time.Second}
etcdCfg, err := ConfigToClientv3(cfg)
etcdCfg, err := ConfigToClient(cfg)
Expect(err).To(BeNil())
Expect(etcdCfg).NotTo(BeNil())
Expect(etcdCfg.OpTimeout).To(BeEquivalentTo(time.Second))
Expand Down
210 changes: 99 additions & 111 deletions db/keyval/etcdv3/plugin_impl_etcdv3.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
package etcdv3

import (
"fmt"
"time"

"fmt"
"github.com/ligato/cn-infra/core"
"github.com/ligato/cn-infra/datasync/resync"
"github.com/ligato/cn-infra/db/keyval/plugin"
"github.com/ligato/cn-infra/db/keyval"
"github.com/ligato/cn-infra/db/keyval/kvproto"
"github.com/ligato/cn-infra/flavors/local"
"github.com/ligato/cn-infra/health/statuscheck"
"github.com/ligato/cn-infra/servicelabel"
"github.com/ligato/cn-infra/utils/safeclose"
)

Expand All @@ -34,10 +34,14 @@ const (

// Plugin implements etcdv3 plugin.
type Plugin struct {
Deps // inject
*plugin.Skeleton
disabled bool
connection *BytesConnectionEtcd
Deps
// Plugin is disabled if there is no config file available
disabled bool
// ETCD connection encapsulation
connection *BytesConnectionEtcd
// Read/Write proto modelled data
protoWrapper *kvproto.ProtoWrapper

autoCompactDone chan struct{}
reconnectResync bool
lastConnErr error
Expand All @@ -46,11 +50,11 @@ type Plugin struct {
// Deps lists dependencies of the etcdv3 plugin.
// If injected, etcd plugin will use StatusCheck to signal the connection status.
type Deps struct {
local.PluginInfraDeps // inject
Resync *resync.Plugin
local.PluginInfraDeps
Resync *resync.Plugin
}

// Init retrieves etcd configuration and establishes a new connection
// Init retrieves ETCD configuration and establishes a new connection
// with the etcd data store.
// If the configuration file doesn't exist or cannot be read, the returned error
// will be of os.PathError type. An untyped error is returned in case the file
Expand All @@ -59,143 +63,127 @@ type Deps struct {
// CA or client certificate is not accessible(os.PathError)/valid(untyped).
// Check clientv3.New from coreos/etcd for possible errors returned in case
// the connection cannot be established.
func (p *Plugin) Init() (err error) {
// Init connection
if p.Skeleton == nil {
// Retrieve config
var cfg Config
if found, err := p.PluginConfig.GetValue(&cfg); err != nil {
return err
} else if !found {
p.Log.Info("etcd config not found ", p.PluginConfig.GetConfigName(), " - skip loading this plugin")
p.disabled = true
return nil
}

etcdConfig, err := ConfigToClientv3(&cfg)
if err != nil {
return err
}

p.connection, err = NewEtcdConnectionWithBytes(*etcdConfig, p.Log)
if err != nil {
return err
}

// Set flag wheter resync will be started after reconnect
p.reconnectResync = cfg.ReconnectResync

if cfg.AutoCompact > 0 {
if cfg.AutoCompact < time.Duration(time.Minute*60) {
p.Log.Warnf("auto compact option for ETCD is set to less than 60 minutes!")
}
p.startPeriodicAutoCompact(cfg.AutoCompact)
}

p.Skeleton = plugin.NewSkeleton(p.String(),
p.ServiceLabel,
p.connection,
)
func (plugin *Plugin) Init() (err error) {
// Read ETCD configuration file. Returns error if does not exists.
etcdCfg, err := plugin.getEtcdConfig()
if err != nil || plugin.disabled {
return err
}

if err := p.Skeleton.Init(); err != nil {
// Transforms .yaml config to ETCD client configuration
etcdClientCfg, err := ConfigToClient(&etcdCfg)
if err != nil {
return err
}
// Uses config file to establish connection with the database
plugin.connection, err = NewEtcdConnectionWithBytes(*etcdClientCfg, plugin.Log)
if err != nil {
plugin.Log.Errorf("Err: %v", err)
return err
}
plugin.reconnectResync = etcdCfg.ReconnectResync
if etcdCfg.AutoCompact > 0 {
if etcdCfg.AutoCompact < time.Duration(time.Minute*60) {
plugin.Log.Warnf("Auto compact option for ETCD is set to less than 60 minutes!")
}
plugin.startPeriodicAutoCompact(etcdCfg.AutoCompact)
}
plugin.protoWrapper = kvproto.NewProtoWrapperWithSerializer(plugin.connection, &keyval.SerializerJSON{})

// Register for providing status reports (polling mode).
if p.StatusCheck != nil {
p.StatusCheck.Register(core.PluginName(p.String()), func() (statuscheck.PluginState, error) {
_, _, _, err := p.connection.GetValue(healthCheckProbeKey)
if plugin.StatusCheck != nil {
plugin.StatusCheck.Register(core.PluginName(plugin.PluginName), func() (statuscheck.PluginState, error) {
_, _, _, err := plugin.connection.GetValue(healthCheckProbeKey)
if err == nil {
if p.reconnectResync && p.lastConnErr != nil {
p.Log.Info("Starting resync after ETCD reconnect")
if p.Resync != nil {
p.Resync.DoResync()
p.lastConnErr = nil
if plugin.reconnectResync && plugin.lastConnErr != nil {
plugin.Log.Info("Starting resync after ETCD reconnect")
if plugin.Resync != nil {
plugin.Resync.DoResync()
plugin.lastConnErr = nil
} else {
p.Log.Warn("Expected resync after ETCD reconnect could not start beacuse of missing Resync plugin")
plugin.Log.Warn("Expected resync after ETCD reconnect could not start beacuse of missing Resync plugin")
}
}
return statuscheck.OK, nil
}
p.lastConnErr = err
plugin.lastConnErr = err
return statuscheck.Error, err
})
} else {
p.Log.Warnf("Unable to start status check for etcd")
plugin.Log.Warnf("Unable to start status check for etcd")
}

return nil
}

// AfterInit registers status polling function with StatusCheck plugin
// (if injected).
func (p *Plugin) AfterInit() error {
return nil
}

func (p *Plugin) startPeriodicAutoCompact(period time.Duration) {
p.autoCompactDone = make(chan struct{})
go func() {
p.Log.Infof("Starting periodic auto compacting every %v", period)
for {
select {
case <-time.After(period):
p.Log.Debugf("Executing auto compact")
if toRev, err := p.connection.Compact(); err != nil {
p.Log.Errorf("Periodic auto compacting failed: %v", err)
} else {
p.Log.Infof("Auto compacting finished (to revision %v)", toRev)
}
case <-p.autoCompactDone:
return
}
}
}()
}

// FromExistingConnection is used mainly for testing of existing connection
// injection into the plugin.
// Note, need to set Deps for returned value!
func FromExistingConnection(connection *BytesConnectionEtcd, sl servicelabel.ReaderAPI) *Plugin {
skel := plugin.NewSkeleton("testing", sl, connection)
return &Plugin{Skeleton: skel, connection: connection}
}

// Close shutdowns the connection.
func (p *Plugin) Close() error {
_, err := safeclose.CloseAll(p.Skeleton, p.autoCompactDone)
func (plugin *Plugin) Close() error {
err := safeclose.Close(plugin.autoCompactDone)
return err
}

// String returns the plugin name from dependencies if injected,
// "kvdbsync" otherwise.
func (p *Plugin) String() string {
if len(p.Deps.PluginName) == 0 {
return "kvdbsync"
}
return string(p.Deps.PluginName)
// NewBroker creates new instance of prefixed broker that provides API with arguments of type proto.Message.
func (plugin *Plugin) NewBroker(keyPrefix string) keyval.ProtoBroker {
return plugin.protoWrapper.NewBroker(keyPrefix)
}

// NewWatcher creates new instance of prefixed broker that provides API with arguments of type proto.Message.
func (plugin *Plugin) NewWatcher(keyPrefix string) keyval.ProtoWatcher {
return plugin.protoWrapper.NewWatcher(keyPrefix)
}

// Disabled returns *true* if the plugin is not in use due to missing
// etcd configuration.
func (p *Plugin) Disabled() (disabled bool) {
return p.disabled
func (plugin *Plugin) Disabled() (disabled bool) {
return plugin.disabled
}

// PutIfNotExists puts given key-value pair into etcd if there is no value set for the key. If the put was successful
// succeeded is true. If the key already exists succeeded is false and the value for the key is untouched.
func (p *Plugin) PutIfNotExists(key string, value []byte) (succeeded bool, err error) {
if p.connection != nil {
return p.connection.PutIfNotExists(key, value)
func (plugin *Plugin) PutIfNotExists(key string, value []byte) (succeeded bool, err error) {
if plugin.connection != nil {
return plugin.connection.PutIfNotExists(key, value)
}
return false, fmt.Errorf("connection is not established")
}

// Compact compatcs the ETCD database to the specific revision
func (p *Plugin) Compact(rev ...int64) (toRev int64, err error) {
if p.connection != nil {
return p.connection.Compact(rev...)
func (plugin *Plugin) Compact(rev ...int64) (toRev int64, err error) {
if plugin.connection != nil {
return plugin.connection.Compact(rev...)
}
return 0, fmt.Errorf("connection is not established")
}

func (plugin *Plugin) getEtcdConfig() (Config, error) {
var etcdCfg Config
found, err := plugin.PluginConfig.GetValue(&etcdCfg)
if err != nil {
return etcdCfg, err
}
if !found {
plugin.Log.Info("ETCD config not found, skip loading this plugin")
plugin.disabled = true
return etcdCfg, nil
}
return etcdCfg, nil
}

func (plugin *Plugin) startPeriodicAutoCompact(period time.Duration) {
plugin.autoCompactDone = make(chan struct{})
go func() {
plugin.Log.Infof("Starting periodic auto compacting every %v", period)
for {
select {
case <-time.After(period):
plugin.Log.Debugf("Executing auto compact")
if toRev, err := plugin.connection.Compact(); err != nil {
plugin.Log.Errorf("Periodic auto compacting failed: %v", err)
} else {
plugin.Log.Infof("Auto compacting finished (to revision %v)", toRev)
}
case <-plugin.autoCompactDone:
return
}
}
}()
}
9 changes: 0 additions & 9 deletions db/keyval/plugin/README.md

This file was deleted.

18 changes: 0 additions & 18 deletions db/keyval/plugin/doc.go

This file was deleted.

0 comments on commit e5bff3c

Please sign in to comment.