Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plugin: support flush tidb plugin {name} in tidb-server #9320

Merged
merged 1 commit into from
Feb 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 4 additions & 9 deletions executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,16 +377,11 @@ func (e *SimpleExec) executeFlush(s *ast.FlushStmt) error {
defer sysSessionPool.Put(ctx)
err = dom.PrivilegeHandle().Update(ctx.(sessionctx.Context))
return errors.Trace(err)
case ast.FlushStatus:
case ast.FlushTiDBPlugin:
dom := domain.GetDomain(e.ctx)
if plugin.Get(plugin.Audit, "ipwhitelist") != nil {
if cli := dom.GetEtcdClient(); cli != nil {
const whitelistKey = "/tidb/plugins/whitelist"
row := cli.KV
_, err := row.Put(context.Background(), whitelistKey, "")
if err != nil {
log.Warn("notify update whitelist failed:", err)
}
for _, pluginName := range s.Plugins {
err := plugin.NotifyFlush(dom, pluginName)
if err != nil {
return errors.Trace(err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ require (
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20190131052532-7e329e0c9e32
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7
github.com/pingcap/parser v0.0.0-20190212061044-a71b434969f3
github.com/pingcap/parser v0.0.0-20190218033509-9545f168ae97
github.com/pingcap/pd v2.1.0-rc.4+incompatible
github.com/pingcap/tidb-tools v2.1.3-0.20190116051332-34c808eef588+incompatible
github.com/pingcap/tipb v0.0.0-20181012112600-11e33c750323
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ github.com/pingcap/kvproto v0.0.0-20190131052532-7e329e0c9e32 h1:9uwqk2DvsAKImRK
github.com/pingcap/kvproto v0.0.0-20190131052532-7e329e0c9e32/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7 h1:kOHAMalwF69bJrtWrOdVaCSvZjLucrJhP4NQKIu6uM4=
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w=
github.com/pingcap/parser v0.0.0-20190212061044-a71b434969f3 h1:Wn8ERRenAuN00KT7TAISS86HzVHDyMRR+onWCeb6BjI=
github.com/pingcap/parser v0.0.0-20190212061044-a71b434969f3/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/parser v0.0.0-20190218033509-9545f168ae97 h1:GIhPQAwFwnf6cSdVYXdSNkx171Nl9ZmIVYrOtN3I2lw=
github.com/pingcap/parser v0.0.0-20190218033509-9545f168ae97/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/pd v2.1.0-rc.4+incompatible h1:/buwGk04aHO5odk/+O8ZOXGs4qkUjYTJ2UpCJXna8NE=
github.com/pingcap/pd v2.1.0-rc.4+incompatible/go.mod h1:nD3+EoYes4+aNNODO99ES59V83MZSI+dFbhyr667a0E=
github.com/pingcap/tidb-tools v2.1.3-0.20190116051332-34c808eef588+incompatible h1:e9Gi/LP9181HT3gBfSOeSBA+5JfemuE4aEAhqNgoE4k=
Expand Down
14 changes: 0 additions & 14 deletions plugin/conn_ip_example/conn_ip_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,5 @@ func Example_LoadRunShutdownPlugin() {
plugin.DeclareAuditManifest(auditPlugin.Manifest).NotifyEvent(context.Background(), nil)
}

err = plugin.Reload(ctx, cfg, plugin.ID("conn_ip_example-2"))
if err != nil {
panic(err)
}

for _, auditPlugin := range plugin.GetByKind(plugin.Audit) {
if auditPlugin.State != plugin.Ready {
continue
}
plugin.DeclareAuditManifest(auditPlugin.Manifest).NotifyEvent(
context.WithValue(context.Background(), "ip", "1.1.1.2"), nil,
)
}

plugin.Shutdown(context.Background())
}
150 changes: 84 additions & 66 deletions plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@ import (
"sync/atomic"
"unsafe"

"github.com/coreos/etcd/clientv3"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util"
log "github.com/sirupsen/logrus"
)

// pluginGlobal holds all global variables for plugin.
Expand Down Expand Up @@ -83,6 +87,7 @@ type Config struct {
PluginVarNames *[]string
SkipWhenFail bool
EnvVersion map[string]uint16
EtcdClient *clientv3.Client
}

// Plugin presents a TiDB plugin.
Expand Down Expand Up @@ -222,6 +227,55 @@ func Init(ctx context.Context, cfg Config) (err error) {
return
}

// InitWatchLoops starts etcd watch loops for plugin that need watch.
func InitWatchLoops(etcdClient *clientv3.Client) {
if etcdClient == nil {
return
}
tiPlugins := pluginGlobal.plugins()
for kind := range tiPlugins.plugins {
for i := range tiPlugins.plugins[kind] {
if tiPlugins.plugins[kind][i].OnFlush == nil {
continue
}
const pluginWatchPrefix = "/tidb/plugins/"
ctx, cancel := context.WithCancel(context.Background())
watcher := &flushWatcher{
ctx: ctx,
cancel: cancel,
path: pluginWatchPrefix + tiPlugins.plugins[kind][i].Name,
etcd: etcdClient,
manifest: tiPlugins.plugins[kind][i].Manifest,
}
tiPlugins.plugins[kind][i].flushWatcher = watcher
go util.WithRecovery(watcher.watchLoop, nil)
}
}
}

type flushWatcher struct {
ctx context.Context
cancel context.CancelFunc
path string
etcd *clientv3.Client
manifest *Manifest
}

func (w *flushWatcher) watchLoop() {
watchChan := w.etcd.Watch(w.ctx, w.path)
for {
select {
case <-w.ctx.Done():
return
case <-watchChan:
err := w.manifest.OnFlush(w.ctx, w.manifest)
if err != nil {
log.Errorf("Notify plugin %s flush event failure: %v", w.manifest.Name, err)
}
}
}
}

func loadOne(dir string, pluginID ID) (plugin Plugin, err error) {
plugin.Path = filepath.Join(dir, string(pluginID)+LibrarySuffix)
plugin.library, err = gplugin.Open(plugin.Path)
Expand Down Expand Up @@ -256,80 +310,20 @@ func loadOne(dir string, pluginID ID) (plugin Plugin, err error) {
return
}

// Reload hot swap a old plugin with new version.
// Limit: loaded plugins shouldn't be unload and only be mark dying.
func Reload(ctx context.Context, cfg Config, pluginID ID) (err error) {
newPlugin, err := loadOne(cfg.PluginDir, pluginID)
if err != nil {
return
}
_, err = replace(ctx, cfg, newPlugin.Name, newPlugin)
return
}

func replace(ctx context.Context, cfg Config, name string, newPlugin Plugin) (replaced bool, err error) {

oldPlugins := pluginGlobal.plugins()
if oldPlugins.versions[name] == newPlugin.Version {
replaced = false
return
}
err = newPlugin.validate(ctx, oldPlugins, reloadMode)
if err != nil {
return
}
err = newPlugin.OnInit(ctx, newPlugin.Manifest)
if err != nil {
return
}
if cfg.GlobalSysVar != nil {
for key, value := range newPlugin.SysVars {
(*cfg.GlobalSysVar)[key] = value
}
}

for {
oldPlugins = pluginGlobal.plugins()
newPlugins := oldPlugins.clone()
replaced = true
tiPluginKind := newPlugins.plugins[newPlugin.Kind]
var oldPlugin *Plugin
for i, p := range tiPluginKind {
if p.Name == name {
oldPlugin = &tiPluginKind[i]
tiPluginKind = append(tiPluginKind[:i], tiPluginKind[i+1:]...)
}
}

if oldPlugin != nil {
oldPlugin.State = Dying
newPlugins.dyingPlugins = append(newPlugins.dyingPlugins, *oldPlugin)
err = oldPlugin.OnShutdown(ctx, oldPlugin.Manifest)
if err != nil {
// When shutdown failure, the plugin is in stranger state, so make it as Dying.
return
}
}

newPlugin.State = Ready
tiPluginKind = append(tiPluginKind, newPlugin)
newPlugins.plugins[newPlugin.Kind] = tiPluginKind
newPlugins.versions[newPlugin.Name] = newPlugin.Version

if atomic.CompareAndSwapPointer(&pluginGlobal.tiPlugins, unsafe.Pointer(oldPlugins), unsafe.Pointer(newPlugins)) {
return
}
}
}

// Shutdown cleanups all plugin resources.
// Notice: it just cleanups the resource of plugin, but cannot unload plugins(limited by go plugin).
func Shutdown(ctx context.Context) {
for {
tiPlugins := pluginGlobal.plugins()
if tiPlugins == nil {
return
}
for _, plugins := range tiPlugins.plugins {
for _, p := range plugins {
p.State = Dying
if p.flushWatcher != nil {
p.flushWatcher.cancel()
}
if err := p.OnShutdown(ctx, p.Manifest); err != nil {
}
}
Expand Down Expand Up @@ -371,3 +365,27 @@ func GetAll() map[Kind][]Plugin {
}
return plugins.plugins
}

// NotifyFlush notify plugins to do flush logic.
func NotifyFlush(dom *domain.Domain, pluginName string) error {
p := getByName(pluginName)
if p == nil || p.Manifest.flushWatcher == nil {
return errors.Errorf("plugin %s doesn't exists or unsupported flush", pluginName)
}
_, err := dom.GetEtcdClient().KV.Put(context.Background(), p.Manifest.flushWatcher.path, "")
if err != nil {
return err
}
return nil
}

func getByName(pluginName string) *Plugin {
for _, plugins := range GetAll() {
for _, p := range plugins {
if p.Name == pluginName {
return &p
}
}
}
return nil
}
2 changes: 2 additions & 0 deletions plugin/spi.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type Manifest struct {
Validate func(ctx context.Context, manifest *Manifest) error
OnInit func(ctx context.Context, manifest *Manifest) error
OnShutdown func(ctx context.Context, manifest *Manifest) error
OnFlush func(ctx context.Context, manifest *Manifest) error
flushWatcher *flushWatcher
}

// ExportManifest exports a manifest to TiDB as a known format.
Expand Down
4 changes: 4 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1319,6 +1319,10 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) {
return nil, errors.Trace(err)
}

if len(cfg.Plugin.Load) > 0 {
plugin.InitWatchLoops(dom.GetEtcdClient())
}

if raw, ok := store.(domain.EtcdBackend); ok {
err = raw.StartGCWorker()
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/plugin"
"github.com/pingcap/tidb/privilege/privileges"
"github.com/pingcap/tidb/server"
"github.com/pingcap/tidb/session"
Expand Down Expand Up @@ -554,5 +555,6 @@ func cleanup() {
} else {
svr.TryGracefulDown()
}
plugin.Shutdown(context.Background())
closeDomainAndStorage()
}