Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature add subscriber service for creating/dropping subscriptions #4375

Merged
merged 1 commit into from Oct 15, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -19,6 +19,7 @@
- [#4291](https://github.com/influxdb/influxdb/pull/4291): Added ALTER DATABASE RENAME. Thanks @linearb
- [#4409](https://github.com/influxdb/influxdb/pull/4291): wire up INTO queries.
- [#4379](https://github.com/influxdb/influxdb/pull/4379): Auto-create database for UDP input.
- [#4375](https://github.com/influxdb/influxdb/pull/4375): Add Subscriptions so data can be 'forked' out of InfluxDB to another third party.

### Bugfixes
- [#4389](https://github.com/influxdb/influxdb/pull/4389): Don't add a new segment file on each hinted-handoff purge cycle.
Expand Down Expand Up @@ -62,6 +63,7 @@
- [#4434](https://github.com/influxdb/influxdb/pull/4434): Allow 'E' for scientific values. Fixes [#4433](https://github.com/influxdb/influxdb/issues/4433)
- [#4431](https://github.com/influxdb/influxdb/issues/4431): Add tsm1 WAL QuickCheck
- [#4438](https://github.com/influxdb/influxdb/pull/4438): openTSDB service shutdown fixes
- [#3820](https://github.com/influxdb/influxdb/issues/3820): Fix js error in admin UI.

## v0.9.4 [2015-09-14]

Expand Down
16 changes: 16 additions & 0 deletions cluster/points_writer.go
Expand Up @@ -31,6 +31,8 @@ const (
statWriteTimeout = "write_timeout"
statWriteErr = "write_error"
statWritePointReqHH = "point_req_hh"
statSubWriteOK = "sub_write_ok"
statSubWriteDrop = "sub_write_drop"
)

const (
Expand Down Expand Up @@ -107,6 +109,10 @@ type PointsWriter struct {
WriteShard(shardID, ownerID uint64, points []models.Point) error
}

Subscriber interface {
Points() chan<- *WritePointsRequest
}

statMap *expvar.Map
}

Expand Down Expand Up @@ -245,6 +251,16 @@ func (w *PointsWriter) WritePoints(p *WritePointsRequest) error {
}(shardMappings.Shards[shardID], p.Database, p.RetentionPolicy, points)
}

// Send points to subscriptions if possible.
if w.Subscriber != nil {
select {
case w.Subscriber.Points() <- p:
w.statMap.Add(statSubWriteOK, 1)
default:
w.statMap.Add(statSubWriteDrop, 1)
}
}

for range shardMappings.Points {
select {
case <-w.closing:
Expand Down
26 changes: 26 additions & 0 deletions cluster/points_writer_test.go
Expand Up @@ -308,11 +308,19 @@ func TestPointsWriter_WritePoints(t *testing.T) {
return nil, nil
}
ms.NodeIDFn = func() uint64 { return 1 }

subPoints := make(chan *cluster.WritePointsRequest, 1)
sub := Subscriber{}
sub.PointsFn = func() chan<- *cluster.WritePointsRequest {
return subPoints
}

c := cluster.NewPointsWriter()
c.MetaStore = ms
c.ShardWriter = sw
c.TSDBStore = store
c.HintedHandoff = hh
c.Subscriber = sub

err := c.WritePoints(pr)
if err == nil && test.expErr != nil {
Expand All @@ -325,6 +333,16 @@ func TestPointsWriter_WritePoints(t *testing.T) {
if err != nil && test.expErr != nil && err.Error() != test.expErr.Error() {
t.Errorf("PointsWriter.WritePoints(): '%s' error: got %v, exp %v", test.name, err, test.expErr)
}
if test.expErr == nil {
select {
case p := <-subPoints:
if p != pr {
t.Errorf("PointsWriter.WritePoints(): '%s' error: unexpected WritePointsRequest got %v, exp %v", test.name, p, pr)
}
default:
t.Errorf("PointsWriter.WritePoints(): '%s' error: Subscriber.Points not called", test.name)
}
}
}
}

Expand Down Expand Up @@ -406,6 +424,14 @@ func (m MetaStore) ShardOwner(shardID uint64) (string, string, *meta.ShardGroupI
return m.ShardOwnerFn(shardID)
}

type Subscriber struct {
PointsFn func() chan<- *cluster.WritePointsRequest
}

func (s Subscriber) Points() chan<- *cluster.WritePointsRequest {
return s.PointsFn()
}

func NewRetentionPolicy(name string, duration time.Duration, nodeCount int) *meta.RetentionPolicyInfo {
shards := []meta.ShardInfo{}
owners := []meta.ShardOwner{}
Expand Down
17 changes: 10 additions & 7 deletions cmd/influxd/run/config.go
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/influxdb/influxdb/services/opentsdb"
"github.com/influxdb/influxdb/services/precreator"
"github.com/influxdb/influxdb/services/retention"
"github.com/influxdb/influxdb/services/subscriber"
"github.com/influxdb/influxdb/services/udp"
"github.com/influxdb/influxdb/tsdb"
)
Expand All @@ -35,13 +36,14 @@ type Config struct {
Retention retention.Config `toml:"retention"`
Precreator precreator.Config `toml:"shard-precreation"`

Admin admin.Config `toml:"admin"`
Monitor monitor.Config `toml:"monitor"`
HTTPD httpd.Config `toml:"http"`
Graphites []graphite.Config `toml:"graphite"`
Collectd collectd.Config `toml:"collectd"`
OpenTSDB opentsdb.Config `toml:"opentsdb"`
UDPs []udp.Config `toml:"udp"`
Admin admin.Config `toml:"admin"`
Monitor monitor.Config `toml:"monitor"`
Subscriber subscriber.Config `toml:"subscriber"`
HTTPD httpd.Config `toml:"http"`
Graphites []graphite.Config `toml:"graphite"`
Collectd collectd.Config `toml:"collectd"`
OpenTSDB opentsdb.Config `toml:"opentsdb"`
UDPs []udp.Config `toml:"udp"`

// Snapshot SnapshotConfig `toml:"snapshot"`
ContinuousQuery continuous_querier.Config `toml:"continuous_queries"`
Expand All @@ -62,6 +64,7 @@ func NewConfig() *Config {

c.Admin = admin.NewConfig()
c.Monitor = monitor.NewConfig()
c.Subscriber = subscriber.NewConfig()
c.HTTPD = httpd.NewConfig()
c.Collectd = collectd.NewConfig()
c.OpenTSDB = opentsdb.NewConfig()
Expand Down
5 changes: 5 additions & 0 deletions cmd/influxd/run/config_test.go
Expand Up @@ -45,6 +45,9 @@ bind-address = ":4444"
[monitoring]
enabled = true

[subscriber]
enabled = true

[continuous_queries]
enabled = true
`, &c); err != nil {
Expand Down Expand Up @@ -72,6 +75,8 @@ enabled = true
t.Fatalf("unexpected opentsdb bind address: %s", c.OpenTSDB.BindAddress)
} else if c.UDPs[0].BindAddress != ":4444" {
t.Fatalf("unexpected udp bind address: %s", c.UDPs[0].BindAddress)
} else if c.Subscriber.Enabled != true {
t.Fatalf("unexpected subscriber enabled: %v", c.Subscriber.Enabled)
} else if c.ContinuousQuery.Enabled != true {
t.Fatalf("unexpected continuous query enabled: %v", c.ContinuousQuery.Enabled)
}
Expand Down
16 changes: 16 additions & 0 deletions cmd/influxd/run/server.go
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/influxdb/influxdb/services/precreator"
"github.com/influxdb/influxdb/services/retention"
"github.com/influxdb/influxdb/services/snapshotter"
"github.com/influxdb/influxdb/services/subscriber"
"github.com/influxdb/influxdb/services/udp"
"github.com/influxdb/influxdb/tcp"
"github.com/influxdb/influxdb/tsdb"
Expand Down Expand Up @@ -60,6 +61,7 @@ type Server struct {
ShardWriter *cluster.ShardWriter
ShardMapper *cluster.ShardMapper
HintedHandoff *hh.Service
Subscriber *subscriber.Service

Services []Service

Expand Down Expand Up @@ -127,13 +129,18 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
// Create the hinted handoff service
s.HintedHandoff = hh.NewService(c.HintedHandoff, s.ShardWriter, s.MetaStore)

// Create the Subscriber service
s.Subscriber = subscriber.NewService(c.Subscriber)
s.Subscriber.MetaStore = s.MetaStore

// Initialize points writer.
s.PointsWriter = cluster.NewPointsWriter()
s.PointsWriter.WriteTimeout = time.Duration(c.Cluster.WriteTimeout)
s.PointsWriter.MetaStore = s.MetaStore
s.PointsWriter.TSDBStore = s.TSDBStore
s.PointsWriter.ShardWriter = s.ShardWriter
s.PointsWriter.HintedHandoff = s.HintedHandoff
s.PointsWriter.Subscriber = s.Subscriber

// needed for executing INTO queries.
s.QueryExecutor.IntoWriter = s.PointsWriter
Expand Down Expand Up @@ -374,6 +381,11 @@ func (s *Server) Open() error {
return fmt.Errorf("open hinted handoff: %s", err)
}

// Open the subcriber service
if err := s.Subscriber.Open(); err != nil {
return fmt.Errorf("open subscriber: %s", err)
}

for _, service := range s.Services {
if err := service.Open(); err != nil {
return fmt.Errorf("open service: %s", err)
Expand Down Expand Up @@ -423,6 +435,10 @@ func (s *Server) Close() error {
s.TSDBStore.Close()
}

if s.Subscriber != nil {
s.Subscriber.Close()
}

// Finally close the meta-store since everything else depends on it
if s.MetaStore != nil {
s.MetaStore.Close()
Expand Down
65 changes: 55 additions & 10 deletions influxql/INFLUXQL.md
Expand Up @@ -82,16 +82,17 @@ _cpu_stats
## Keywords

```
ALL ALTER AS ASC BEGIN BY
CREATE CONTINUOUS DATABASE DATABASES DEFAULT DELETE
DESC DROP DURATION END EXISTS EXPLAIN
FIELD FROM GRANT GROUP IF IN
INNER INSERT INTO KEY KEYS LIMIT
SHOW MEASUREMENT MEASUREMENTS NOT OFFSET ON
ORDER PASSWORD POLICY POLICIES PRIVILEGES QUERIES
QUERY READ REPLICATION RETENTION REVOKE SELECT
SERIES SLIMIT SOFFSET TAG TO USER
USERS VALUES WHERE WITH WRITE
ALL ALTER ANY AS ASC BEGIN
BY CREATE CONTINUOUS DATABASE DATABASES DEFAULT
DELETE DESC DESTINATIONS DROP DURATION END
EXISTS EXPLAIN FIELD FROM GRANT GROUP
IF IN INNER INSERT INTO KEY
KEYS LIMIT SHOW MEASUREMENT MEASUREMENTS NOT
OFFSET ON ORDER PASSWORD POLICY POLICIES
PRIVILEGES QUERIES QUERY READ REPLICATION RETENTION
REVOKE SELECT SERIES SLIMIT SOFFSET SUBSCRIPTION
SUBSCRIPTIONS TAG TO USER USERS VALUES
WHERE WITH WRITE
```

## Literals
Expand Down Expand Up @@ -174,12 +175,14 @@ statement = alter_retention_policy_stmt |
create_database_stmt |
create_retention_policy_stmt |
create_user_stmt |
create_subscription_stmt |
delete_stmt |
drop_continuous_query_stmt |
drop_database_stmt |
drop_measurement_stmt |
drop_retention_policy_stmt |
drop_series_stmt |
drop_subscription_stmt |
drop_user_stmt |
grant_stmt |
show_continuous_queries_stmt |
Expand All @@ -189,6 +192,7 @@ statement = alter_retention_policy_stmt |
show_retention_policies |
show_series_stmt |
show_shards_stmt |
show_subscriptions_stmt|
show_tag_keys_stmt |
show_tag_values_stmt |
show_users_stmt |
Expand Down Expand Up @@ -292,6 +296,22 @@ CREATE RETENTION POLICY "10m.events" ON somedb DURATION 10m REPLICATION 2;
CREATE RETENTION POLICY "10m.events" ON somedb DURATION 10m REPLICATION 2 DEFAULT;
```

### CREATE SUBSCRIPTION

```
create_subscription_stmt = "CREATE SUBSCRIPTION" subscription_name "ON" db_name "." retention_policy "DESTINATIONS" ("ANY"|"ALL") host { "," host} .
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this EBNF syntax correct? Not sure the ("ANY"|"ALL") part is.

Should it be like this?

create_subscription_stmt = "CREATE SUBSCRIPTION" subscription_name "ON" db_name "." retention_policy "DESTINATIONS" all_any host { "," host} .

all_any = "ALL" | "ANY" 

```

#### Examples:

```sql
-- Create a SUBSCRIPTION on database 'mydb' and retention policy 'default' that send data to 'example.com:9090' via UDP.
CREATE SUBSCRIPTION sub0 ON "mydb"."default" DESTINATIONS ALL 'udp://example.com:9090' ;

-- Create a SUBSCRIPTION on database 'mydb' and retention policy 'default' that round robins the data to 'h1.example.com:9090' and 'h2.example.com:9090'.
CREATE SUBSCRIPTION sub0 ON "mydb"."default" DESTINATIONS ANY 'udp://h1.example.com:9090', 'udp://h2.example.com:9090';
```

### CREATE USER

```
Expand Down Expand Up @@ -382,6 +402,19 @@ drop_series_stmt = "DROP SERIES" [ from_clause ] [ where_clause ]

```

### DROP SUBSCRIPTION

```
drop_subscription_stmt = "DROP SUBSCRIPTION" subscription_name "ON" db_name "." retention_policy .
```

#### Example:

```sql
DROP SUBSCRIPTION sub0 ON "mydb"."default";

```

### DROP USER

```
Expand Down Expand Up @@ -502,6 +535,18 @@ show_shards_stmt = "SHOW SHARDS" .
SHOW SHARDS;
```

### SHOW SUBSCRIPTIONS

```
show_subscriptions_stmt = "SHOW SUBSCRIPTIONS" .
```

#### Example:

```sql
SHOW SUBSCRIPTIONS;
```

### SHOW TAG KEYS

```
Expand Down