diff --git a/config.toml.sample b/config.toml.sample index 093903d695c..b0220bee78d 100644 --- a/config.toml.sample +++ b/config.toml.sample @@ -74,9 +74,11 @@ protobuf_heartbeat = "200ms" # the heartbeat interval between the servers. must # will be replayed from the WAL write-buffer-size = 10000 -# When queries get distributed out, the go in parallel. However, the responses must be sent in time order. -# This setting determines how many responses can be buffered in memory per shard before data starts gettind dropped. -query-shard-buffer-size = 1000 +# When queries get distributed out to shards, they go in parallel. This means that results can get buffered +# in memory since results will come in any order, but have to be processed in the correct time order. +# Setting this higher will give better performance, but you'll need more memory. Setting this to 1 will ensure +# that you don't need to buffer in memory, but you won't get the best performance. +concurrent-shard-query-limit = 10 [leveldb] diff --git a/src/cluster/shard.go b/src/cluster/shard.go index 6a535ea3fad..4ea53e53f2f 100644 --- a/src/cluster/shard.go +++ b/src/cluster/shard.go @@ -72,11 +72,13 @@ type ShardData struct { shardType ShardType durationIsSplit bool shardDuration time.Duration + shardSeconds int64 localServerId uint32 IsLocal bool } func NewShard(id uint32, startTime, endTime time.Time, shardType ShardType, durationIsSplit bool, wal WAL) *ShardData { + shardDuration := endTime.Sub(startTime) return &ShardData{ id: id, startTime: startTime, @@ -87,7 +89,8 @@ func NewShard(id uint32, startTime, endTime time.Time, shardType ShardType, dura serverIds: make([]uint32, 0), shardType: shardType, durationIsSplit: durationIsSplit, - shardDuration: endTime.Sub(startTime), + shardDuration: shardDuration, + shardSeconds: int64(shardDuration.Seconds()), } } @@ -332,6 +335,36 @@ func (self *ShardData) ShouldAggregateLocally(querySpec *parser.QuerySpec) bool return false } +func (self *ShardData) QueryResponseBufferSize(querySpec *parser.QuerySpec, batchPointSize int) int { + groupByTime := querySpec.GetGroupByInterval() + if groupByTime == nil { + // If the group by time is nil, we shouldn't have to use a buffer since the shards should be queried sequentially. + // However, set this to something high just to be safe. + log.Info("BUFFER SIZE: 1000") + return 1000 + } + tickCount := int(self.shardSeconds / int64(groupByTime.Seconds())) + if tickCount < 10 { + tickCount = 100 + } else if tickCount > 1000 { + // cap this because each response should have up to this number of points in it. + tickCount = tickCount / batchPointSize + + // but make sure it's at least 1k + if tickCount < 1000 { + tickCount = 1000 + } + } + columnCount := querySpec.GetGroupByColumnCount() + if columnCount > 1 { + // we don't really know the cardinality for any column up front. This is a just a multiplier so we'll see how this goes. + // each response can have many points, so having a buffer of the ticks * 100 should be safe, but we'll see. + tickCount = tickCount * 100 + } + log.Info("BUFFER SIZE: ", tickCount) + return tickCount +} + func (self *ShardData) logAndHandleDeleteQuery(querySpec *parser.QuerySpec, response chan *p.Response) { queryString := querySpec.GetQueryStringWithTimeCondition() request := self.createRequest(querySpec) diff --git a/src/configuration/config.toml b/src/configuration/config.toml index 2ac47586f6e..6cdc4a06aab 100644 --- a/src/configuration/config.toml +++ b/src/configuration/config.toml @@ -71,9 +71,11 @@ protobuf_heartbeat = "200ms" # the heartbeat interval between the servers. must # will be replayed from the WAL write-buffer-size = 10000 -# When queries get distributed out, the go in parallel. However, the responses must be sent in time order. -# This setting determines how many responses can be buffered in memory per shard before data starts gettind dropped. -query-shard-buffer-size = 1000 +# When queries get distributed out to shards, they go in parallel. This means that results can get buffered +# in memory since results will come in any order, but have to be processed in the correct time order. +# Setting this higher will give better performance, but you'll need more memory. Setting this to 1 will ensure +# that you don't need to buffer in memory, but you won't get the best performance. +concurrent-shard-query-limit = 10 [leveldb] diff --git a/src/configuration/configuration.go b/src/configuration/configuration.go index 376f0ceaf0a..5ab4dea8ee8 100644 --- a/src/configuration/configuration.go +++ b/src/configuration/configuration.go @@ -85,7 +85,7 @@ type ClusterConfig struct { ProtobufTimeout duration `toml:"protobuf_timeout"` ProtobufHeartbeatInterval duration `toml:"protobuf_heartbeat"` WriteBufferSize int `toml"write-buffer-size"` - QueryShardBufferSize int `toml:"query-shard-buffer-size"` + ConcurrentShardQueryLimit int `toml:"concurrent-shard-query-limit"` } type LoggingConfig struct { @@ -215,7 +215,7 @@ type Configuration struct { WalRequestsPerLogFile int LocalStoreWriteBufferSize int PerServerWriteBufferSize int - QueryShardBufferSize int + ConcurrentShardQueryLimit int } func LoadConfiguration(fileName string) *Configuration { @@ -254,9 +254,9 @@ func parseTomlConfiguration(filename string) (*Configuration, error) { tomlConfiguration.WalConfig.RequestsPerLogFile = 10 * tomlConfiguration.WalConfig.IndexAfterRequests } - defaultQueryShardBufferSize := 100 - if tomlConfiguration.Cluster.QueryShardBufferSize != 0 { - defaultQueryShardBufferSize = tomlConfiguration.Cluster.QueryShardBufferSize + defaultConcurrentShardQueryLimit := 10 + if tomlConfiguration.Cluster.ConcurrentShardQueryLimit != 0 { + defaultConcurrentShardQueryLimit = tomlConfiguration.Cluster.ConcurrentShardQueryLimit } if tomlConfiguration.Raft.Timeout.Duration == 0 { @@ -297,7 +297,7 @@ func parseTomlConfiguration(filename string) (*Configuration, error) { WalRequestsPerLogFile: tomlConfiguration.WalConfig.RequestsPerLogFile, LocalStoreWriteBufferSize: tomlConfiguration.Storage.WriteBufferSize, PerServerWriteBufferSize: tomlConfiguration.Cluster.WriteBufferSize, - QueryShardBufferSize: defaultQueryShardBufferSize, + ConcurrentShardQueryLimit: defaultConcurrentShardQueryLimit, } if config.LocalStoreWriteBufferSize == 0 { diff --git a/src/coordinator/coordinator.go b/src/coordinator/coordinator.go index 05f3ae73e48..dffd675e469 100644 --- a/src/coordinator/coordinator.go +++ b/src/coordinator/coordinator.go @@ -159,20 +159,12 @@ func (self *CoordinatorImpl) runListSeriesQuery(querySpec *parser.QuerySpec, ser } seriesYielded := make(map[string]bool) - responses := make([]chan *protocol.Response, 0) - for _, shard := range shortTermShards { - responseChan := make(chan *protocol.Response, self.config.QueryShardBufferSize) - go shard.Query(querySpec, responseChan) - responses = append(responses, responseChan) - } - for _, shard := range longTermShards { - responseChan := make(chan *protocol.Response, self.config.QueryShardBufferSize) - go shard.Query(querySpec, responseChan) - responses = append(responses, responseChan) - } + shards := append(shortTermShards, longTermShards...) var err error - for _, responseChan := range responses { + for _, shard := range shards { + responseChan := make(chan *protocol.Response, shard.QueryResponseBufferSize(querySpec, self.config.LevelDbPointBatchSize)) + go shard.Query(querySpec, responseChan) for { response := <-responseChan if *response.Type == endStreamResponse || *response.Type == accessDeniedResponse { @@ -224,6 +216,13 @@ func (self *CoordinatorImpl) shouldAggregateLocally(shards []*cluster.ShardData, return true } +func (self *CoordinatorImpl) shouldQuerySequentially(shards []*cluster.ShardData, querySpec *parser.QuerySpec) bool { + // if we're not aggregating locally, that means all the raw points + // are being sent back in this query. Do it sequentially so we don't + // fill up memory like crazy. + return !self.shouldAggregateLocally(shards, querySpec) +} + func (self *CoordinatorImpl) getShardsAndProcessor(querySpec *parser.QuerySpec, writer SeriesWriter) ([]*cluster.ShardData, cluster.QueryProcessor, chan bool, error) { shards := self.clusterConfiguration.GetShards(querySpec) shouldAggregateLocally := self.shouldAggregateLocally(shards, querySpec) @@ -275,32 +274,23 @@ func (self *CoordinatorImpl) getShardsAndProcessor(querySpec *parser.QuerySpec, return shards, processor, seriesClosed, nil } -func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWriter SeriesWriter) error { - shards, processor, seriesClosed, err := self.getShardsAndProcessor(querySpec, seriesWriter) - if err != nil { - return err - } - - responses := make([]chan *protocol.Response, 0) - for _, shard := range shards { - responseChan := make(chan *protocol.Response, self.config.QueryShardBufferSize) - // We query shards for data and stream them to query processor - go shard.Query(querySpec, responseChan) - responses = append(responses, responseChan) - } - - for i, responseChan := range responses { - log.Debug("READING: shard: ", shards[i].String()) +func (self *CoordinatorImpl) readFromResposneChannels(processor cluster.QueryProcessor, + writer SeriesWriter, + isExplainQuery bool, + channels []<-chan *protocol.Response) (err error) { + for _, responseChan := range channels { for { response := <-responseChan //log.Debug("GOT RESPONSE: ", response.Type, response.Series) log.Debug("GOT RESPONSE: ", response.Type) if *response.Type == endStreamResponse || *response.Type == accessDeniedResponse { - if response.ErrorMessage != nil && err == nil { - err = common.NewQueryError(common.InvalidArgument, *response.ErrorMessage) + if response.ErrorMessage == nil { + break } - break + + err = common.NewQueryError(common.InvalidArgument, *response.ErrorMessage) + return } if response.Series == nil || len(response.Series.Points) == 0 { @@ -321,11 +311,43 @@ func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWri // If we have EXPLAIN query, we don't write actual points (of // response.Type Query) to the client - if !(*response.Type == queryResponse && querySpec.IsExplainQuery()) { - seriesWriter.Write(response.Series) + if !(*response.Type == queryResponse && isExplainQuery) { + writer.Write(response.Series) } } - log.Debug("DONE: shard: ", shards[i].String()) + } + return +} + +func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWriter SeriesWriter) error { + shards, processor, seriesClosed, err := self.getShardsAndProcessor(querySpec, seriesWriter) + if err != nil { + return err + } + + shardConcurrentLimit := self.config.ConcurrentShardQueryLimit + if self.shouldQuerySequentially(shards, querySpec) { + log.Debug("Querying shards sequentially") + shardConcurrentLimit = 1 + } + log.Debug("Shard concurrent limit: ", shardConcurrentLimit) + for i := 0; i < len(shards); i += shardConcurrentLimit { + responses := make([]<-chan *protocol.Response, 0, shardConcurrentLimit) + + for j := 0; j < shardConcurrentLimit && i+j < len(shards); j++ { + shard := shards[i+j] + responseChan := make(chan *protocol.Response, shard.QueryResponseBufferSize(querySpec, self.config.LevelDbPointBatchSize)) + // We query shards for data and stream them to query processor + log.Debug("QUERYING: shard: ", i+j, shard.String()) + go shard.Query(querySpec, responseChan) + responses = append(responses, responseChan) + } + + err := self.readFromResposneChannels(processor, seriesWriter, querySpec.IsExplainQuery(), responses) + if err != nil { + log.Error("Reading responses from channels returned an error: %s", err) + return err + } } if processor != nil { diff --git a/src/coordinator/protobuf_client.go b/src/coordinator/protobuf_client.go index 94ba7cec4fb..1c43aa81008 100644 --- a/src/coordinator/protobuf_client.go +++ b/src/coordinator/protobuf_client.go @@ -179,6 +179,7 @@ func (self *ProtobufClient) sendResponse(response *protocol.Response) { case req.responseChan <- response: default: log.Error("ProtobufClient: Response buffer full! ", self.hostAndPort, response) + panic("fuck, dropping shit!") // if it's an end stream response, we have to send it so start it in a goroutine so we can make sure it gets through without blocking the reading of responses. if *response.Type == protocol.Response_END_STREAM || *response.Type == protocol.Response_WRITE_OK || *response.Type == protocol.Response_ACCESS_DENIED { go func() { diff --git a/src/integration/server_test.go b/src/integration/server_test.go index 34664d0db60..b27394242b2 100644 --- a/src/integration/server_test.go +++ b/src/integration/server_test.go @@ -8,6 +8,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + . "launchpad.net/gocheck" "net" "net/http" "net/url" @@ -15,7 +16,6 @@ import ( "path/filepath" "syscall" "time" - . "launchpad.net/gocheck" ) type ServerSuite struct { @@ -216,13 +216,13 @@ func (self *ServerProcess) VerifyForbiddenQuery(database, query string, onlyLoca func (self *ServerProcess) Post(url, data string, c *C) *http.Response { err := self.Request("POST", url, data, c) - time.Sleep(time.Millisecond * 10) + time.Sleep(time.Millisecond * 100) return err } func (self *ServerProcess) Delete(url, body string, c *C) *http.Response { err := self.Request("DELETE", url, body, c) - time.Sleep(time.Millisecond * 10) + time.Sleep(time.Millisecond * 100) return err } diff --git a/src/parser/query_spec.go b/src/parser/query_spec.go index 0c53c1a812a..7fedd1984f9 100644 --- a/src/parser/query_spec.go +++ b/src/parser/query_spec.go @@ -15,6 +15,8 @@ type QuerySpec struct { endTime time.Time seriesValuesAndColumns map[*Value][]string RunAgainstAllServersInShard bool + groupByInterval *time.Duration + groupByColumnCount int } func NewQuerySpec(user common.User, database string, query *Query) *QuerySpec { @@ -89,8 +91,20 @@ func (self *QuerySpec) GetGroupByInterval() *time.Duration { if self.query.SelectQuery == nil { return nil } - duration, _ := self.query.SelectQuery.GetGroupByClause().GetGroupByTime() - return duration + if self.groupByInterval == nil { + self.groupByInterval, _ = self.query.SelectQuery.GetGroupByClause().GetGroupByTime() + } + return self.groupByInterval +} + +func (self *QuerySpec) GetGroupByColumnCount() int { + if self.query.SelectQuery == nil { + return 0 + } + if self.groupByColumnCount == 0 { + self.groupByColumnCount = len(self.query.SelectQuery.GetGroupByClause().Elems) - 1 + } + return self.groupByColumnCount } func (self *QuerySpec) IsRegex() bool {