From e24435ea6806fa22c3aebddd1cd268a612b89715 Mon Sep 17 00:00:00 2001 From: svaroqui Date: Tue, 2 Jan 2018 10:16:44 +0100 Subject: [PATCH] Add sphinx statistics --- cluster/prov_opensvc.go | 8 +++ cluster/prx.go | 3 ++ cluster/prx_sphinx.go | 86 ++++++++++++++++++++++++++++++++ config/config.go | 1 + server.go | 1 + sphinx/sphinx.go | 108 ++++++++++++++++++++++++++++++++++++++++ 6 files changed, 207 insertions(+) create mode 100644 cluster/prx_sphinx.go create mode 100644 sphinx/sphinx.go diff --git a/cluster/prov_opensvc.go b/cluster/prov_opensvc.go index 168191adc..f73bae4db 100644 --- a/cluster/prov_opensvc.go +++ b/cluster/prov_opensvc.go @@ -653,6 +653,14 @@ show_disabled = false conf = conf + cluster.GetPodDockerSphinxTemplate(collector, pod) conf = conf + cluster.GetPodPackageTemplate(collector, pod) conf = conf + cluster.GetProxiesEnv(collector, servers, agent, prx) + + conf = conf + `[task0] + schedule = ` + cluster.conf.ProvSphinxCron + ` + command = ` + collector.ProvFSPath + `/{svcname}_pod01/init/reindex.sh + user = root + + ` + log.Println(conf) return conf, nil } diff --git a/cluster/prx.go b/cluster/prx.go index 2ef85343a..dba255359 100644 --- a/cluster/prx.go +++ b/cluster/prx.go @@ -346,6 +346,9 @@ func (cluster *Cluster) refreshProxies() { if cluster.conf.HaproxyOn && pr.Type == proxyHaproxy { cluster.refreshHaproxy(pr) } + if cluster.conf.SphinxOn && pr.Type == proxySphinx { + cluster.refreshSphinx(pr) + } if cluster.conf.GraphiteMetrics { cluster.SendProxyStats(pr) } diff --git a/cluster/prx_sphinx.go b/cluster/prx_sphinx.go new file mode 100644 index 000000000..6a11e0c60 --- /dev/null +++ b/cluster/prx_sphinx.go @@ -0,0 +1,86 @@ +// replication-manager - Replication Manager Monitoring and CLI for MariaDB and MySQL +// Copyright 2017 Signal 18 SARL +// Authors: Guillaume Lefranc +// Stephane Varoqui +// This source code is licensed under the GNU General Public License, version 3. +// Redistribution/Reuse of this code is permitted under the GNU v3 license, as +// an additional term, ALL code must carry the original Author(s) credit in comment form. +// See LICENSE in this directory for the integral text. +package cluster + +import ( + "fmt" + + "github.com/signal18/replication-manager/sphinx" + "github.com/signal18/replication-manager/state" +) + +func connectSphinx(proxy *Proxy) (sphinx.SphinxSQL, error) { + sphinx := sphinx.SphinxSQL{ + User: proxy.User, + Password: proxy.Pass, + Host: proxy.Host, + Port: proxy.Port, + } + + var err error + err = sphinx.Connect() + if err != nil { + return sphinx, err + } + return sphinx, nil +} + +func (cluster *Cluster) initSphinx(proxy *Proxy) { + if cluster.conf.SphinxOn == false { + return + } + + sphinx, err := connectSphinx(proxy) + if err != nil { + cluster.sme.AddState("ERR00051", state.State{ErrType: "ERROR", ErrDesc: fmt.Sprintf(clusterError["ERR00051"], err), ErrFrom: "MON"}) + return + } + defer sphinx.Connection.Close() + +} + +func (cluster *Cluster) refreshSphinx(proxy *Proxy) { + if cluster.conf.SphinxOn == false { + return + } + + sphinx, err := connectSphinx(proxy) + if err != nil { + cluster.sme.AddState("ERR00051", state.State{ErrType: "ERROR", ErrDesc: fmt.Sprintf(clusterError["ERR00051"], err), ErrFrom: "MON"}) + return + } + defer sphinx.Connection.Close() + proxy.Version = sphinx.GetVersion() + + proxy.BackendsWrite = nil + proxy.BackendsRead = nil + + status, err := sphinx.GetStatus() + var bke = Backend{ + Host: cluster.conf.ProvProxRouteAddr, + Port: cluster.conf.ProvProxRoutePort, + Status: "UP", + PrxName: "", + PrxStatus: "UP", + PrxConnections: status["CONNECTIONS"], + PrxByteIn: "0", + PrxByteOut: "0", + PrxLatency: status["AVG_QUERY_WALL"], + } + if err == nil { + proxy.BackendsWrite = append(proxy.BackendsRead, bke) + } +} + +func (cluster *Cluster) setMaintenanceSphinx(proxy *Proxy, host string, port string) { + if cluster.conf.SphinxOn == false { + return + } + +} diff --git a/config/config.go b/config/config.go index 616dfc972..f1deb082e 100644 --- a/config/config.go +++ b/config/config.go @@ -251,6 +251,7 @@ type Config struct { ProvSphinxDiskDevice string `mapstructure:"prov-sphinx-disk-device"` ProvSphinxDiskType string `mapstructure:"prov-sphinx-disk-type"` ProvSphinxTags string `mapstructure:"prov-sphinx-tags"` + ProvSphinxCron string `mapstructure:"prov-sphinx-reindex-schedule"` ProvSphinxType string `mapstructure:"prov-sphinx-service-type"` ProvSSLCa string `mapstructure:"prov-tls-server-ca"` ProvSSLCert string `mapstructure:"prov-tls-server-cert"` diff --git a/server.go b/server.go index ddde808df..da51e6e53 100644 --- a/server.go +++ b/server.go @@ -401,6 +401,7 @@ func init() { monitorCmd.Flags().StringVar(&conf.ProvSphinxMem, "prov-sphinx-memory", "256", "Memory in M for micro service VM") monitorCmd.Flags().StringVar(&conf.ProvSphinxDisk, "prov-sphinx-disk-size", "20g", "Disk in g for micro service VM") monitorCmd.Flags().StringVar(&conf.ProvSphinxCores, "prov-sphinx-cpu-cores", "1", "Number of cpu cores for the micro service VM") + monitorCmd.Flags().StringVar(&conf.ProvSphinxCron, "prov-sphinx-reindex-schedule", "@5", "task time to 5 minutes for index rotation") monitorCmd.Flags().StringVar(&conf.ProvSSLCa, "prov-tls-server-ca", "", "server TLS ca") monitorCmd.Flags().StringVar(&conf.ProvSSLCert, "prov-tls-server-cert", "", "server TLS cert") monitorCmd.Flags().StringVar(&conf.ProvSSLKey, "prov-tls-server-key", "", "server TLS key") diff --git a/sphinx/sphinx.go b/sphinx/sphinx.go new file mode 100644 index 000000000..bd5ae69ef --- /dev/null +++ b/sphinx/sphinx.go @@ -0,0 +1,108 @@ +package sphinx + +import ( + "errors" + "fmt" + "time" + + "github.com/go-sql-driver/mysql" + "github.com/jmoiron/sqlx" +) + +type SphinxSQL struct { + Connection *sqlx.DB + User string + Password string + Port string + Host string +} + +func (sphinxql *SphinxSQL) Connect() error { + SphinxConfig := mysql.Config{ + User: sphinxql.User, + Passwd: sphinxql.Password, + Net: "tcp", + Addr: fmt.Sprintf("%s:%s", sphinxql.Host, sphinxql.Port), + Timeout: time.Second * 5, + ReadTimeout: time.Second * 15, + } + + var err error + sphinxql.Connection, err = sqlx.Connect("mysql", SphinxConfig.FormatDSN()) + if err != nil { + return fmt.Errorf("Could not connect to SphinxQL (%s)", err) + } + return nil +} + +func (sphinxql *SphinxSQL) GetVariables() (map[string]string, error) { + type Variable struct { + Variable_name string + Value string + } + + vars := make(map[string]string) + rows, err := sphinxql.Connection.Queryx("SHOW VARIABLES") + if err != nil { + return nil, errors.New("Could not get status variables") + } + for rows.Next() { + var v Variable + err := rows.Scan(&v.Variable_name, &v.Value) + if err != nil { + return nil, errors.New("Could not get results from status scan") + } + vars[v.Variable_name] = v.Value + } + return vars, nil +} + +func (sphinxql *SphinxSQL) GetStatus() (map[string]string, error) { + type Variable struct { + Variable_name string + Value string + } + + vars := make(map[string]string) + rows, err := sphinxql.Connection.Queryx("SHOW STATUS") + if err != nil { + return nil, errors.New("Could not get status variables") + } + for rows.Next() { + var v Variable + err := rows.Scan(&v.Variable_name, &v.Value) + if err != nil { + return nil, errors.New("Could not get results from status scan") + } + vars[v.Variable_name] = v.Value + } + return vars, nil + +} + +func (sphinxql *SphinxSQL) GetVersion() string { + var version string + return version +} + +func (sphinxql *SphinxSQL) GetIndexes() (map[string]string, error) { + type Indexes struct { + Index string + Type string + } + + vars := make(map[string]string) + rows, err := sphinxql.Connection.Queryx("SHOW TABLES") + if err != nil { + return nil, errors.New("Could not get status variables") + } + for rows.Next() { + var v Indexes + err := rows.Scan(&v.Index, &v.Type) + if err != nil { + return nil, errors.New("Could not get results from status scan") + } + vars[v.Index] = v.Type + } + return vars, nil +}