Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support dropping non-Raft nodes #4310

Merged
merged 1 commit into from
Oct 4, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- [#4262](https://github.com/influxdb/influxdb/pull/4262): Allow configuration of UDP retention policy
- [#4265](https://github.com/influxdb/influxdb/pull/4265): Add statistics for Hinted-Handoff
- [#4284](https://github.com/influxdb/influxdb/pull/4284): Add exponential backoff for hinted-handoff failures
- [#4310](https://github.com/influxdb/influxdb/pull/4310): Support dropping non-Raft nodes. Work mostly by @corylanou

### Bugfixes
- [#4166](https://github.com/influxdb/influxdb/pull/4166): Fix parser error on invalid SHOW
Expand Down
29 changes: 14 additions & 15 deletions cluster/internal/data.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func (*DropDatabaseStatement) node() {}
func (*DropMeasurementStatement) node() {}
func (*DropRetentionPolicyStatement) node() {}
func (*DropSeriesStatement) node() {}
func (*DropServerStatement) node() {}
func (*DropUserStatement) node() {}
func (*GrantStatement) node() {}
func (*GrantAdminStatement) node() {}
Expand Down Expand Up @@ -198,6 +199,7 @@ func (*DropDatabaseStatement) stmt() {}
func (*DropMeasurementStatement) stmt() {}
func (*DropRetentionPolicyStatement) stmt() {}
func (*DropSeriesStatement) stmt() {}
func (*DropServerStatement) stmt() {}
func (*DropUserStatement) stmt() {}
func (*GrantStatement) stmt() {}
func (*GrantAdminStatement) stmt() {}
Expand Down Expand Up @@ -1824,6 +1826,30 @@ func (s DropSeriesStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Admin: false, Name: "", Privilege: WritePrivilege}}
}

// DropServerStatement represents a command for removing a server from the cluster.
type DropServerStatement struct {
// ID of the node to be dropped.
NodeID uint64
// Force will force the server to drop even it it means losing data
Force bool
}

// String returns a string representation of the drop series statement.
func (s *DropServerStatement) String() string {
var buf bytes.Buffer
_, _ = buf.WriteString("DROP SERVER ")
_, _ = buf.WriteString(strconv.FormatUint(s.NodeID, 10))
if s.Force {
_, _ = buf.WriteString(" FORCE")
}
return buf.String()
}

// RequiredPrivileges returns the privilege required to execute a DropServerStatement.
func (s *DropServerStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges}}
}

// ShowContinuousQueriesStatement represents a command for listing continuous queries.
type ShowContinuousQueriesStatement struct{}

Expand Down
27 changes: 25 additions & 2 deletions influxql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ func (p *Parser) parseDropStatement() (Statement, error) {
return p.parseDropRetentionPolicyStatement()
} else if tok == USER {
return p.parseDropUserStatement()
} else if tok == SERVER {
return p.parseDropServerStatement()
}

return nil, newParseError(tokstr(tok, lit), []string{"SERIES", "CONTINUOUS", "MEASUREMENT"}, pos)
Expand Down Expand Up @@ -311,8 +313,8 @@ func (p *Parser) parseCreateRetentionPolicyStatement() (*CreateRetentionPolicySt
// Parse optional DEFAULT token.
if tok, pos, lit = p.scanIgnoreWhitespace(); tok == DEFAULT {
stmt.Default = true
} else {
p.unscan()
} else if tok != EOF && tok != SEMICOLON {
return nil, newParseError(tokstr(tok, lit), []string{"DEFAULT"}, pos)
}

return stmt, nil
Expand Down Expand Up @@ -1178,6 +1180,27 @@ func (p *Parser) parseDropSeriesStatement() (*DropSeriesStatement, error) {
return stmt, nil
}

// parseDropServerStatement parses a string and returns a DropServerStatement.
// This function assumes the "DROP SERVER" tokens have already been consumed.
func (p *Parser) parseDropServerStatement() (*DropServerStatement, error) {
s := &DropServerStatement{}
var err error

// Parse the server's ID.
if s.NodeID, err = p.parseUInt64(); err != nil {
return nil, err
}

// Parse optional FORCE token.
if tok, pos, lit := p.scanIgnoreWhitespace(); tok == FORCE {
s.Force = true
} else if tok != EOF && tok != SEMICOLON {
return nil, newParseError(tokstr(tok, lit), []string{"FORCE"}, pos)
}

return s, nil
}

// parseShowContinuousQueriesStatement parses a string and returns a ShowContinuousQueriesStatement.
// This function assumes the "SHOW CONTINUOUS" tokens have already been consumed.
func (p *Parser) parseShowContinuousQueriesStatement() (*ShowContinuousQueriesStatement, error) {
Expand Down
26 changes: 20 additions & 6 deletions influxql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,16 @@ func TestParser_ParseStatement(t *testing.T) {
},
},

// DROP SERVER statement
{
s: `DROP SERVER 123`,
stmt: &influxql.DropServerStatement{NodeID: 123},
},
{
s: `DROP SERVER 123 FORCE`,
stmt: &influxql.DropServerStatement{NodeID: 123, Force: true},
},

// SHOW CONTINUOUS QUERIES statement
{
s: `SHOW CONTINUOUS QUERIES`,
Expand Down Expand Up @@ -1453,15 +1463,15 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `SELECT top() FROM myseries`, err: `invalid number of arguments for top, expected at least 2, got 0`},
{s: `SELECT top(field1) FROM myseries`, err: `invalid number of arguments for top, expected at least 2, got 1`},
{s: `SELECT top(field1,foo) FROM myseries`, err: `expected integer as last argument in top(), found foo`},
{s: `SELECT top(field1,host,server,foo) FROM myseries`, err: `expected integer as last argument in top(), found foo`},
{s: `SELECT top(field1,5,server,2) FROM myseries`, err: `only fields or tags are allowed in top(), found 5.000`},
{s: `SELECT top(field1,max(foo),server,2) FROM myseries`, err: `only fields or tags are allowed in top(), found max(foo)`},
{s: `SELECT top(field1,host,'server',foo) FROM myseries`, err: `expected integer as last argument in top(), found foo`},
{s: `SELECT top(field1,5,'server',2) FROM myseries`, err: `only fields or tags are allowed in top(), found 5.000`},
{s: `SELECT top(field1,max(foo),'server',2) FROM myseries`, err: `only fields or tags are allowed in top(), found max(foo)`},
{s: `SELECT bottom() FROM myseries`, err: `invalid number of arguments for bottom, expected at least 2, got 0`},
{s: `SELECT bottom(field1) FROM myseries`, err: `invalid number of arguments for bottom, expected at least 2, got 1`},
{s: `SELECT bottom(field1,foo) FROM myseries`, err: `expected integer as last argument in bottom(), found foo`},
{s: `SELECT bottom(field1,host,server,foo) FROM myseries`, err: `expected integer as last argument in bottom(), found foo`},
{s: `SELECT bottom(field1,5,server,2) FROM myseries`, err: `only fields or tags are allowed in bottom(), found 5.000`},
{s: `SELECT bottom(field1,max(foo),server,2) FROM myseries`, err: `only fields or tags are allowed in bottom(), found max(foo)`},
{s: `SELECT bottom(field1,host,'server',foo) FROM myseries`, err: `expected integer as last argument in bottom(), found foo`},
{s: `SELECT bottom(field1,5,'server',2) FROM myseries`, err: `only fields or tags are allowed in bottom(), found 5.000`},
{s: `SELECT bottom(field1,max(foo),'server',2) FROM myseries`, err: `only fields or tags are allowed in bottom(), found max(foo)`},
{s: `SELECT percentile() FROM myseries`, err: `invalid number of arguments for percentile, expected 2, got 0`},
{s: `SELECT percentile(field1) FROM myseries`, err: `invalid number of arguments for percentile, expected 2, got 1`},
{s: `SELECT percentile(field1, foo) FROM myseries`, err: `expected float argument in percentile()`},
Expand Down Expand Up @@ -1515,6 +1525,9 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `DROP SERIES`, err: `found EOF, expected FROM, WHERE at line 1, char 13`},
{s: `DROP SERIES FROM`, err: `found EOF, expected identifier at line 1, char 18`},
{s: `DROP SERIES FROM src WHERE`, err: `found EOF, expected identifier, string, number, bool at line 1, char 28`},
{s: `DROP SERVER`, err: `found EOF, expected number at line 1, char 13`},
{s: `DROP SERVER abc`, err: `found abc, expected number at line 1, char 13`},
{s: `DROP SERVER 1 1`, err: `found 1, expected FORCE at line 1, char 15`},
{s: `SHOW CONTINUOUS`, err: `found EOF, expected QUERIES at line 1, char 17`},
{s: `SHOW RETENTION`, err: `found EOF, expected POLICIES at line 1, char 16`},
{s: `SHOW RETENTION ON`, err: `found ON, expected POLICIES at line 1, char 16`},
Expand Down Expand Up @@ -1624,6 +1637,7 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION 3.14`, err: `number must be an integer at line 1, char 67`},
{s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION 0`, err: `invalid value 0: must be 1 <= n <= 2147483647 at line 1, char 67`},
{s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION bad`, err: `found bad, expected number at line 1, char 67`},
{s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION 1 foo`, err: `found foo, expected DEFAULT at line 1, char 69`},
{s: `ALTER`, err: `found EOF, expected RETENTION at line 1, char 7`},
{s: `ALTER RETENTION`, err: `found EOF, expected POLICY at line 1, char 17`},
{s: `ALTER RETENTION POLICY`, err: `found EOF, expected identifier at line 1, char 24`},
Expand Down
2 changes: 2 additions & 0 deletions influxql/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ func TestScanner_Scan(t *testing.T) {
{s: `REVOKE`, tok: influxql.REVOKE},
{s: `SELECT`, tok: influxql.SELECT},
{s: `SERIES`, tok: influxql.SERIES},
{s: `SERVER`, tok: influxql.SERVER},
{s: `SERVERS`, tok: influxql.SERVERS},
{s: `TAG`, tok: influxql.TAG},
{s: `TO`, tok: influxql.TO},
{s: `USER`, tok: influxql.USER},
Expand Down
4 changes: 4 additions & 0 deletions influxql/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ const (
EXPLAIN
FIELD
FOR
FORCE
FROM
GRANT
GRANTS
Expand Down Expand Up @@ -108,6 +109,7 @@ const (
REVOKE
SELECT
SERIES
SERVER
SERVERS
SET
SHOW
Expand Down Expand Up @@ -187,6 +189,7 @@ var tokens = [...]string{
EXPLAIN: "EXPLAIN",
FIELD: "FIELD",
FOR: "FOR",
FORCE: "FORCE",
FROM: "FROM",
GRANT: "GRANT",
GRANTS: "GRANTS",
Expand Down Expand Up @@ -218,6 +221,7 @@ var tokens = [...]string{
REVOKE: "REVOKE",
SELECT: "SELECT",
SERIES: "SERIES",
SERVER: "SERVER",
SERVERS: "SERVERS",
SET: "SET",
SHOW: "SHOW",
Expand Down
69 changes: 63 additions & 6 deletions meta/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,71 @@ func (data *Data) CreateNode(host string) error {
}

// DeleteNode removes a node from the metadata.
func (data *Data) DeleteNode(id uint64) error {
for i := range data.Nodes {
if data.Nodes[i].ID == id {
data.Nodes = append(data.Nodes[:i], data.Nodes[i+1:]...)
return nil
func (data *Data) DeleteNode(id uint64, force bool) error {
// Node has to be larger than 0 to be real
if id == 0 {
return ErrNodeIDRequired
}
// Is this a valid node?
nodeInfo := data.Node(id)
if nodeInfo == nil {
return ErrNodeNotFound
}

// Am I the only node? If so, nothing to do
if len(data.Nodes) == 1 {
return ErrNodeUnableToDropFinalNode
}

// Determine if there are any any non-replicated nodes and force was not specified
if !force {
for _, d := range data.Databases {
for _, rp := range d.RetentionPolicies {
// ignore replicated retention policies
if rp.ReplicaN > 1 {
continue
}
for _, sg := range rp.ShardGroups {
for _, s := range sg.Shards {
if s.OwnedBy(id) && len(s.Owners) == 1 {
return ErrShardNotReplicated
}
}
}
}
}
}

// Remove node id from all shard infos
for di, d := range data.Databases {
for ri, rp := range d.RetentionPolicies {
for sgi, sg := range rp.ShardGroups {
for si, s := range sg.Shards {
if s.OwnedBy(id) {
var owners []ShardOwner
for _, o := range s.Owners {
if o.NodeID != id {
owners = append(owners, o)
}
}
data.Databases[di].RetentionPolicies[ri].ShardGroups[sgi].Shards[si].Owners = owners
}
}
}
}
}
return ErrNodeNotFound

// Remove this node from the in memory nodes
var nodes []NodeInfo
for _, n := range data.Nodes {
if n.ID == id {
continue
}
nodes = append(nodes, n)
}
data.Nodes = nodes

return nil
}

// Database returns a database by name.
Expand Down
Loading