Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SHOW DIAGNOSTICS command #2061

Merged
merged 6 commits into from
Mar 24, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Features
- [#2058](https://github.com/influxdb/influxdb/pull/2058): Track number of queries executed in stats.
- [#2059](https://github.com/influxdb/influxdb/pull/2059): Retention policies sorted by name on return to client.
- [#2061](https://github.com/influxdb/influxdb/pull/2061): Implement SHOW DIAGNOSTICS.
- [#2064](https://github.com/influxdb/influxdb/pull/2064): Allow init.d script to return influxd version.

### Bugfixes
Expand Down
4 changes: 3 additions & 1 deletion cmd/influxd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B
if !config.ReportingDisabled {
// Make sure we have a config object b4 we try to use it.
if clusterID := b.Broker.ClusterID(); clusterID != 0 {
go s.StartReportingLoop(version, clusterID)
go s.StartReportingLoop(clusterID)
}
}

Expand Down Expand Up @@ -343,6 +343,8 @@ func openServer(config *Config, b *influxdb.Broker, initServer, initBroker bool,
s.RecomputeNoOlderThan = time.Duration(config.ContinuousQuery.RecomputeNoOlderThan)
s.ComputeRunsPerInterval = config.ContinuousQuery.ComputeRunsPerInterval
s.ComputeNoMoreThan = time.Duration(config.ContinuousQuery.ComputeNoMoreThan)
s.Version = version
s.CommitHash = commit

// Open server with data directory and broker client.
if err := s.Open(config.Data.Dir, c); err != nil {
Expand Down
13 changes: 13 additions & 0 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (*ShowRetentionPoliciesStatement) node() {}
func (*ShowMeasurementsStatement) node() {}
func (*ShowSeriesStatement) node() {}
func (*ShowStatsStatement) node() {}
func (*ShowDiagnosticsStatement) node() {}
func (*ShowTagKeysStatement) node() {}
func (*ShowTagValuesStatement) node() {}
func (*ShowUsersStatement) node() {}
Expand Down Expand Up @@ -172,6 +173,7 @@ func (*ShowMeasurementsStatement) stmt() {}
func (*ShowRetentionPoliciesStatement) stmt() {}
func (*ShowSeriesStatement) stmt() {}
func (*ShowStatsStatement) stmt() {}
func (*ShowDiagnosticsStatement) stmt() {}
func (*ShowTagKeysStatement) stmt() {}
func (*ShowTagValuesStatement) stmt() {}
func (*ShowUsersStatement) stmt() {}
Expand Down Expand Up @@ -1386,6 +1388,17 @@ func (s *ShowStatsStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges}}
}

// ShowDiagnosticsStatement represents a command for show node diagnostics.
type ShowDiagnosticsStatement struct{}

// String returns a string representation of the ShowDiagnosticsStatement.
func (s *ShowDiagnosticsStatement) String() string { return "SHOW DIAGNOSTICS" }

// RequiredPrivileges returns the privilege required to execute a ShowDiagnosticsStatement
func (s *ShowDiagnosticsStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges}}
}

// ShowTagKeysStatement represents a command for listing tag keys.
type ShowTagKeysStatement struct {
// Data source that fields are extracted from.
Expand Down
8 changes: 8 additions & 0 deletions influxql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ func (p *Parser) parseShowStatement() (Statement, error) {
return p.parseShowSeriesStatement()
case STATS:
return p.parseShowStatsStatement()
case DIAGNOSTICS:
return p.parseShowDiagnosticsStatement()
case TAG:
tok, pos, lit := p.scanIgnoreWhitespace()
if tok == KEYS {
Expand Down Expand Up @@ -1193,6 +1195,12 @@ func (p *Parser) parseShowStatsStatement() (*ShowStatsStatement, error) {
return stmt, err
}

// parseShowDiagnostics parses a string and returns a ShowDiagnosticsStatement.
func (p *Parser) parseShowDiagnosticsStatement() (*ShowDiagnosticsStatement, error) {
stmt := &ShowDiagnosticsStatement{}
return stmt, nil
}

// parseDropContinuousQueriesStatement parses a string and returns a DropContinuousQueryStatement.
// This function assumes the "DROP CONTINUOUS" tokens have already been consumed.
func (p *Parser) parseDropContinuousQueryStatement() (*DropContinuousQueryStatement, error) {
Expand Down
6 changes: 6 additions & 0 deletions influxql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,12 @@ func TestParser_ParseStatement(t *testing.T) {
},
},

// SHOW DIAGNOSTICS
{
s: `SHOW DIAGNOSTICS`,
stmt: &influxql.ShowDiagnosticsStatement{},
},

// Errors
{s: ``, err: `found EOF, expected SELECT, DELETE, SHOW, CREATE, DROP, GRANT, REVOKE, ALTER at line 1, char 1`},
{s: `SELECT`, err: `found EOF, expected identifier, string, number, bool at line 1, char 8`},
Expand Down
2 changes: 2 additions & 0 deletions influxql/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ const (
SHOW
SLIMIT
STATS
DIAGNOSTICS
SOFFSET
TAG
TO
Expand Down Expand Up @@ -210,6 +211,7 @@ var tokens = [...]string{
SLIMIT: "SLIMIT",
SOFFSET: "SOFFSET",
STATS: "STATS",
DIAGNOSTICS: "DIAGNOSTICS",
TAG: "TAG",
TO: "TO",
USER: "USER",
Expand Down
160 changes: 155 additions & 5 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ const (
retentionPolicyMinDuration = time.Hour
)

var startTime time.Time

func init() {
startTime = time.Now().UTC()
}

// Server represents a collection of metadata and raw metric data.
type Server struct {
mu sync.RWMutex
Expand Down Expand Up @@ -93,6 +99,10 @@ type Server struct {
// is just getting the request after being off duty for running CQs then
// it will recompute all of them
lastContinuousQueryRun time.Time

// Build information.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's important the server object get this stuff, so it be dumped by the diags. I am open to other approaches.

Version string
CommitHash string
}

// NewServer returns a new instance of Server.
Expand Down Expand Up @@ -308,6 +318,7 @@ func (s *Server) load() error {
if err := sh.open(s.shardPath(sh.ID), s.client.Conn(sh.ID)); err != nil {
return fmt.Errorf("cannot open shard store: id=%d, err=%s", sh.ID, err)
}
s.stats.Inc("shardsOpen")
}
}
}
Expand Down Expand Up @@ -2032,6 +2043,8 @@ func (s *Server) ExecuteQuery(q *influxql.Query, database string, user *User) Re
res = s.executeShowFieldKeysStatement(stmt, database, user)
case *influxql.ShowStatsStatement:
res = s.executeShowStatsStatement(stmt, user)
case *influxql.ShowDiagnosticsStatement:
res = s.executeShowDiagnosticsStatement(stmt, user)
case *influxql.GrantStatement:
res = s.executeGrantStatement(stmt, user)
case *influxql.RevokeStatement:
Expand Down Expand Up @@ -2664,6 +2677,143 @@ func (s *Server) executeShowStatsStatement(stmt *influxql.ShowStatsStatement, us
return &Result{Series: rows}
}

func (s *Server) executeShowDiagnosticsStatement(stmt *influxql.ShowDiagnosticsStatement, user *User) *Result {
s.mu.RLock()
defer s.mu.RUnlock()

rows := make([]*influxql.Row, 0)
now := time.Now().UTC().Format(time.RFC3339Nano)

var m runtime.MemStats
runtime.ReadMemStats(&m)

hostname, err := os.Hostname()
if err != nil {
hostname = "unknown"
}

diags := []struct {
name string
fields map[string]interface{}
}{
{
name: "build",
fields: map[string]interface{}{
"version": s.Version,
"commitHash": s.CommitHash,
},
},
{
name: "server",
fields: map[string]interface{}{
"id": s.id,
"path": s.path,
"authEnabled": s.authenticationEnabled,
"index": s.index,
"retentionAutoCreate": s.RetentionAutoCreate,
"numShards": len(s.shards),
},
},
{
name: "cq",
fields: map[string]interface{}{
"lastRun": s.lastContinuousQueryRun,
},
},
{
name: "system",
fields: map[string]interface{}{
"startTime": startTime,
"uptime": time.Since(startTime).String(),
"hostname": hostname,
"pid": os.Getpid(),
"os": runtime.GOOS,
"arch": runtime.GOARCH,
"numcpu": runtime.NumCPU(),
},
},
{
name: "memory",
fields: map[string]interface{}{
"alloc": m.Alloc,
"totalAlloc": m.TotalAlloc,
"sys": m.Sys,
"lookups": m.Lookups,
"mallocs": m.Mallocs,
"frees": m.Frees,
"heapAlloc": m.HeapAlloc,
"heapSys": m.HeapSys,
"heapIdle": m.HeapIdle,
"heapInUse": m.HeapInuse,
"heapReleased": m.HeapReleased,
"heapObjects": m.HeapObjects,
"pauseTotalNs": m.PauseTotalNs,
"numGC": m.NumGC,
},
},
{
name: "go",
fields: map[string]interface{}{
"goMaxProcs": runtime.GOMAXPROCS(0),
"numGoroutine": runtime.NumGoroutine(),
"version": runtime.Version(),
},
},
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd be easier to use this data from a client or another system if it were a type:


type Diagnostics struct {
    Build struct {
        Version    string `json:"version"`
        CommitHash string `json:"commitHash"`
    } `json:"build"`

    Server struct {
        ID          string `json:"id"`
        Path        string `json:"path"`
        AuthEnabled bool   `json:"authenticationEnabled"`
        ...
    }
}

And then you can get the current Diagnostics type using NewDiagnostics(). That would shorten this function quite a bit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe name it Stats instead -- similar to MemStats.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about something along those lines this morning, but it wasn't clear exactly what to do.

The Diagnostics type needs information from various components -- the server, build information, Go runtime, indexes. Which part of the code fills in the values? I want to do something like this, as I want to factor out this code next so that it can be written to the database (like the stats) every minute or so.

Let me know what you have in mind, as I was going to something like this next.


for _, d := range diags {
row := &influxql.Row{Columns: []string{"time"}}
row.Name = d.name

// Get sorted list of keys.
sortedKeys := make([]string, 0, len(d.fields))
for k, _ := range d.fields {
sortedKeys = append(sortedKeys, k)
}
sort.Strings(sortedKeys)

values := []interface{}{now}
for _, k := range sortedKeys {
row.Columns = append(row.Columns, k)
values = append(values, d.fields[k])
}
row.Values = append(row.Values, values)
rows = append(rows, row)
}

// Shard groups.
shardGroupsRow := &influxql.Row{Columns: []string{}}
shardGroupsRow.Name = "shardGroups"
shardGroupsRow.Columns = append(shardGroupsRow.Columns, "time", "database", "retentionPolicy", "id",
"startTime", "endTime", "duration", "numShards")
// Check all shard groups.
for _, db := range s.databases {
for _, rp := range db.policies {
for _, g := range rp.shardGroups {
shardGroupsRow.Values = append(shardGroupsRow.Values, []interface{}{now, db.name, rp.Name,
g.ID, g.StartTime, g.EndTime, g.Duration().String(), len(g.Shards)})
}
}
}
rows = append(rows, shardGroupsRow)

// Shards
shardsRow := &influxql.Row{Columns: []string{}}
shardsRow.Name = "shards"
shardsRow.Columns = append(shardsRow.Columns, "time", "id", "dataNodes", "index", "path")
for _, sh := range s.shards {
var nodes []string
for _, n := range sh.DataNodeIDs {
nodes = append(nodes, strconv.FormatUint(n, 10))
shardsRow.Values = append(shardsRow.Values, []interface{}{now, sh.ID, strings.Join(nodes, ","),
sh.index, sh.store.Path()})
}
}
rows = append(rows, shardsRow)

return &Result{Series: rows}
}

// filterMeasurementsByExpr filters a list of measurements by a tags expression.
func filterMeasurementsByExpr(measurements Measurements, expr influxql.Expr) (Measurements, error) {
// Create a list to hold result measurements.
Expand Down Expand Up @@ -3634,19 +3784,19 @@ func copyURL(u *url.URL) *url.URL {
return other
}

func (s *Server) StartReportingLoop(version string, clusterID uint64) chan struct{} {
s.reportStats(version, clusterID)
func (s *Server) StartReportingLoop(clusterID uint64) chan struct{} {
s.reportStats(clusterID)

ticker := time.NewTicker(24 * time.Hour)
for {
select {
case <-ticker.C:
s.reportStats(version, clusterID)
s.reportStats(clusterID)
}
}
}

func (s *Server) reportStats(version string, clusterID uint64) {
func (s *Server) reportStats(clusterID uint64) {
s.mu.RLock()
defer s.mu.RUnlock()

Expand All @@ -3663,7 +3813,7 @@ func (s *Server) reportStats(version string, clusterID uint64) {
"name":"reports",
"columns":["os", "arch", "version", "server_id", "id", "num_series", "num_measurements", "num_databases"],
"points":[["%s", "%s", "%s", "%x", "%x", "%d", "%d", "%d"]]
}]`, runtime.GOOS, runtime.GOARCH, version, s.ID(), clusterID, numSeries, numMeasurements, numDatabases)
}]`, runtime.GOOS, runtime.GOARCH, s.Version, s.ID(), clusterID, numSeries, numMeasurements, numDatabases)

data := bytes.NewBufferString(json)

Expand Down