From c3bf4876d7a1e77705676fc09d3883dd1d45fae2 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Wed, 26 Mar 2014 16:59:42 -0400 Subject: [PATCH 1/2] fix #341. Limit the amount of memory taken by query Remove the setting for shard query buffer size and add logic for max number of shards to query in parallel. --- config.toml.sample | 8 ++-- src/cluster/shard.go | 35 ++++++++++++++++- src/configuration/config.toml | 8 ++-- src/configuration/configuration.go | 12 +++--- src/coordinator/coordinator.go | 60 +++++++++++++++++++++--------- src/coordinator/protobuf_client.go | 1 + src/integration/server_test.go | 6 +-- src/parser/query_spec.go | 18 ++++++++- 8 files changed, 112 insertions(+), 36 deletions(-) diff --git a/config.toml.sample b/config.toml.sample index 093903d695c..b0220bee78d 100644 --- a/config.toml.sample +++ b/config.toml.sample @@ -74,9 +74,11 @@ protobuf_heartbeat = "200ms" # the heartbeat interval between the servers. must # will be replayed from the WAL write-buffer-size = 10000 -# When queries get distributed out, the go in parallel. However, the responses must be sent in time order. -# This setting determines how many responses can be buffered in memory per shard before data starts gettind dropped. -query-shard-buffer-size = 1000 +# When queries get distributed out to shards, they go in parallel. This means that results can get buffered +# in memory since results will come in any order, but have to be processed in the correct time order. +# Setting this higher will give better performance, but you'll need more memory. Setting this to 1 will ensure +# that you don't need to buffer in memory, but you won't get the best performance. +concurrent-shard-query-limit = 10 [leveldb] diff --git a/src/cluster/shard.go b/src/cluster/shard.go index 6a535ea3fad..4ea53e53f2f 100644 --- a/src/cluster/shard.go +++ b/src/cluster/shard.go @@ -72,11 +72,13 @@ type ShardData struct { shardType ShardType durationIsSplit bool shardDuration time.Duration + shardSeconds int64 localServerId uint32 IsLocal bool } func NewShard(id uint32, startTime, endTime time.Time, shardType ShardType, durationIsSplit bool, wal WAL) *ShardData { + shardDuration := endTime.Sub(startTime) return &ShardData{ id: id, startTime: startTime, @@ -87,7 +89,8 @@ func NewShard(id uint32, startTime, endTime time.Time, shardType ShardType, dura serverIds: make([]uint32, 0), shardType: shardType, durationIsSplit: durationIsSplit, - shardDuration: endTime.Sub(startTime), + shardDuration: shardDuration, + shardSeconds: int64(shardDuration.Seconds()), } } @@ -332,6 +335,36 @@ func (self *ShardData) ShouldAggregateLocally(querySpec *parser.QuerySpec) bool return false } +func (self *ShardData) QueryResponseBufferSize(querySpec *parser.QuerySpec, batchPointSize int) int { + groupByTime := querySpec.GetGroupByInterval() + if groupByTime == nil { + // If the group by time is nil, we shouldn't have to use a buffer since the shards should be queried sequentially. + // However, set this to something high just to be safe. + log.Info("BUFFER SIZE: 1000") + return 1000 + } + tickCount := int(self.shardSeconds / int64(groupByTime.Seconds())) + if tickCount < 10 { + tickCount = 100 + } else if tickCount > 1000 { + // cap this because each response should have up to this number of points in it. + tickCount = tickCount / batchPointSize + + // but make sure it's at least 1k + if tickCount < 1000 { + tickCount = 1000 + } + } + columnCount := querySpec.GetGroupByColumnCount() + if columnCount > 1 { + // we don't really know the cardinality for any column up front. This is a just a multiplier so we'll see how this goes. + // each response can have many points, so having a buffer of the ticks * 100 should be safe, but we'll see. + tickCount = tickCount * 100 + } + log.Info("BUFFER SIZE: ", tickCount) + return tickCount +} + func (self *ShardData) logAndHandleDeleteQuery(querySpec *parser.QuerySpec, response chan *p.Response) { queryString := querySpec.GetQueryStringWithTimeCondition() request := self.createRequest(querySpec) diff --git a/src/configuration/config.toml b/src/configuration/config.toml index 2ac47586f6e..6cdc4a06aab 100644 --- a/src/configuration/config.toml +++ b/src/configuration/config.toml @@ -71,9 +71,11 @@ protobuf_heartbeat = "200ms" # the heartbeat interval between the servers. must # will be replayed from the WAL write-buffer-size = 10000 -# When queries get distributed out, the go in parallel. However, the responses must be sent in time order. -# This setting determines how many responses can be buffered in memory per shard before data starts gettind dropped. -query-shard-buffer-size = 1000 +# When queries get distributed out to shards, they go in parallel. This means that results can get buffered +# in memory since results will come in any order, but have to be processed in the correct time order. +# Setting this higher will give better performance, but you'll need more memory. Setting this to 1 will ensure +# that you don't need to buffer in memory, but you won't get the best performance. +concurrent-shard-query-limit = 10 [leveldb] diff --git a/src/configuration/configuration.go b/src/configuration/configuration.go index 376f0ceaf0a..5ab4dea8ee8 100644 --- a/src/configuration/configuration.go +++ b/src/configuration/configuration.go @@ -85,7 +85,7 @@ type ClusterConfig struct { ProtobufTimeout duration `toml:"protobuf_timeout"` ProtobufHeartbeatInterval duration `toml:"protobuf_heartbeat"` WriteBufferSize int `toml"write-buffer-size"` - QueryShardBufferSize int `toml:"query-shard-buffer-size"` + ConcurrentShardQueryLimit int `toml:"concurrent-shard-query-limit"` } type LoggingConfig struct { @@ -215,7 +215,7 @@ type Configuration struct { WalRequestsPerLogFile int LocalStoreWriteBufferSize int PerServerWriteBufferSize int - QueryShardBufferSize int + ConcurrentShardQueryLimit int } func LoadConfiguration(fileName string) *Configuration { @@ -254,9 +254,9 @@ func parseTomlConfiguration(filename string) (*Configuration, error) { tomlConfiguration.WalConfig.RequestsPerLogFile = 10 * tomlConfiguration.WalConfig.IndexAfterRequests } - defaultQueryShardBufferSize := 100 - if tomlConfiguration.Cluster.QueryShardBufferSize != 0 { - defaultQueryShardBufferSize = tomlConfiguration.Cluster.QueryShardBufferSize + defaultConcurrentShardQueryLimit := 10 + if tomlConfiguration.Cluster.ConcurrentShardQueryLimit != 0 { + defaultConcurrentShardQueryLimit = tomlConfiguration.Cluster.ConcurrentShardQueryLimit } if tomlConfiguration.Raft.Timeout.Duration == 0 { @@ -297,7 +297,7 @@ func parseTomlConfiguration(filename string) (*Configuration, error) { WalRequestsPerLogFile: tomlConfiguration.WalConfig.RequestsPerLogFile, LocalStoreWriteBufferSize: tomlConfiguration.Storage.WriteBufferSize, PerServerWriteBufferSize: tomlConfiguration.Cluster.WriteBufferSize, - QueryShardBufferSize: defaultQueryShardBufferSize, + ConcurrentShardQueryLimit: defaultConcurrentShardQueryLimit, } if config.LocalStoreWriteBufferSize == 0 { diff --git a/src/coordinator/coordinator.go b/src/coordinator/coordinator.go index 05f3ae73e48..a0079bdb701 100644 --- a/src/coordinator/coordinator.go +++ b/src/coordinator/coordinator.go @@ -159,20 +159,12 @@ func (self *CoordinatorImpl) runListSeriesQuery(querySpec *parser.QuerySpec, ser } seriesYielded := make(map[string]bool) - responses := make([]chan *protocol.Response, 0) - for _, shard := range shortTermShards { - responseChan := make(chan *protocol.Response, self.config.QueryShardBufferSize) - go shard.Query(querySpec, responseChan) - responses = append(responses, responseChan) - } - for _, shard := range longTermShards { - responseChan := make(chan *protocol.Response, self.config.QueryShardBufferSize) - go shard.Query(querySpec, responseChan) - responses = append(responses, responseChan) - } + shards := append(shortTermShards, longTermShards...) var err error - for _, responseChan := range responses { + for _, shard := range shards { + responseChan := make(chan *protocol.Response, shard.QueryResponseBufferSize(querySpec, self.config.LevelDbPointBatchSize)) + go shard.Query(querySpec, responseChan) for { response := <-responseChan if *response.Type == endStreamResponse || *response.Type == accessDeniedResponse { @@ -224,6 +216,12 @@ func (self *CoordinatorImpl) shouldAggregateLocally(shards []*cluster.ShardData, return true } +func (self *CoordinatorImpl) shouldQuerySequentially(shards []*cluster.ShardData, querySpec *parser.QuerySpec) bool { + // if we're not aggregating locally, that means all the raw points are being sent back in this query. Do it + // sequentially so we don't fill up memory like crazy. + return !self.shouldAggregateLocally(shards, querySpec) +} + func (self *CoordinatorImpl) getShardsAndProcessor(querySpec *parser.QuerySpec, writer SeriesWriter) ([]*cluster.ShardData, cluster.QueryProcessor, chan bool, error) { shards := self.clusterConfiguration.GetShards(querySpec) shouldAggregateLocally := self.shouldAggregateLocally(shards, querySpec) @@ -281,16 +279,32 @@ func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWri return err } - responses := make([]chan *protocol.Response, 0) - for _, shard := range shards { - responseChan := make(chan *protocol.Response, self.config.QueryShardBufferSize) + responses := make([]chan *protocol.Response, len(shards), len(shards)) + + shardConcurrentLimit := self.config.ConcurrentShardQueryLimit + if self.shouldQuerySequentially(shards, querySpec) { + log.Debug("Querying shards sequentially") + shardConcurrentLimit = 1 + } + log.Debug("Shard concurrent limit: ", shardConcurrentLimit) + for i := 0; i < shardConcurrentLimit && i < len(shards); i++ { + shard := shards[i] + responseChan := make(chan *protocol.Response, shard.QueryResponseBufferSize(querySpec, self.config.LevelDbPointBatchSize)) // We query shards for data and stream them to query processor go shard.Query(querySpec, responseChan) - responses = append(responses, responseChan) + responses[i] = responseChan } + nextIndex := shardConcurrentLimit + // don't queue up new shards to query if we've hit the limit for the query + shouldContinue := false for i, responseChan := range responses { - log.Debug("READING: shard: ", shards[i].String()) + log.Debug("READING: shard: ", i, shards[i].String()) + + // Do this because it's possible should continue was false so we haven't set the other response channels. + if responseChan == nil { + break + } for { response := <-responseChan @@ -300,6 +314,15 @@ func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWri if response.ErrorMessage != nil && err == nil { err = common.NewQueryError(common.InvalidArgument, *response.ErrorMessage) } + if nextIndex < len(shards) && shouldContinue { + shard := shards[nextIndex] + responseChan := make(chan *protocol.Response, shard.QueryResponseBufferSize(querySpec, self.config.LevelDbPointBatchSize)) + // We query shards for data and stream them to query processor + log.Debug("Querying Shard: ", nextIndex, shard.String()) + go shard.Query(querySpec, responseChan) + responses[nextIndex] = responseChan + nextIndex += 1 + } break } @@ -315,7 +338,8 @@ func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWri // if the data wasn't aggregated at the shard level, aggregate // the data here log.Debug("YIELDING: %d points with %d columns", len(response.Series.Points), len(response.Series.Fields)) - processor.YieldSeries(response.Series) + shouldContinue = processor.YieldSeries(response.Series) + log.Debug("ShouldContinue: ", shouldContinue) continue } diff --git a/src/coordinator/protobuf_client.go b/src/coordinator/protobuf_client.go index 94ba7cec4fb..1c43aa81008 100644 --- a/src/coordinator/protobuf_client.go +++ b/src/coordinator/protobuf_client.go @@ -179,6 +179,7 @@ func (self *ProtobufClient) sendResponse(response *protocol.Response) { case req.responseChan <- response: default: log.Error("ProtobufClient: Response buffer full! ", self.hostAndPort, response) + panic("fuck, dropping shit!") // if it's an end stream response, we have to send it so start it in a goroutine so we can make sure it gets through without blocking the reading of responses. if *response.Type == protocol.Response_END_STREAM || *response.Type == protocol.Response_WRITE_OK || *response.Type == protocol.Response_ACCESS_DENIED { go func() { diff --git a/src/integration/server_test.go b/src/integration/server_test.go index 34664d0db60..b27394242b2 100644 --- a/src/integration/server_test.go +++ b/src/integration/server_test.go @@ -8,6 +8,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + . "launchpad.net/gocheck" "net" "net/http" "net/url" @@ -15,7 +16,6 @@ import ( "path/filepath" "syscall" "time" - . "launchpad.net/gocheck" ) type ServerSuite struct { @@ -216,13 +216,13 @@ func (self *ServerProcess) VerifyForbiddenQuery(database, query string, onlyLoca func (self *ServerProcess) Post(url, data string, c *C) *http.Response { err := self.Request("POST", url, data, c) - time.Sleep(time.Millisecond * 10) + time.Sleep(time.Millisecond * 100) return err } func (self *ServerProcess) Delete(url, body string, c *C) *http.Response { err := self.Request("DELETE", url, body, c) - time.Sleep(time.Millisecond * 10) + time.Sleep(time.Millisecond * 100) return err } diff --git a/src/parser/query_spec.go b/src/parser/query_spec.go index 0c53c1a812a..7fedd1984f9 100644 --- a/src/parser/query_spec.go +++ b/src/parser/query_spec.go @@ -15,6 +15,8 @@ type QuerySpec struct { endTime time.Time seriesValuesAndColumns map[*Value][]string RunAgainstAllServersInShard bool + groupByInterval *time.Duration + groupByColumnCount int } func NewQuerySpec(user common.User, database string, query *Query) *QuerySpec { @@ -89,8 +91,20 @@ func (self *QuerySpec) GetGroupByInterval() *time.Duration { if self.query.SelectQuery == nil { return nil } - duration, _ := self.query.SelectQuery.GetGroupByClause().GetGroupByTime() - return duration + if self.groupByInterval == nil { + self.groupByInterval, _ = self.query.SelectQuery.GetGroupByClause().GetGroupByTime() + } + return self.groupByInterval +} + +func (self *QuerySpec) GetGroupByColumnCount() int { + if self.query.SelectQuery == nil { + return 0 + } + if self.groupByColumnCount == 0 { + self.groupByColumnCount = len(self.query.SelectQuery.GetGroupByClause().Elems) - 1 + } + return self.groupByColumnCount } func (self *QuerySpec) IsRegex() bool { From 0e8a5ce4a22d88697022db28393182cc713ac2ab Mon Sep 17 00:00:00 2001 From: John Shahid Date: Tue, 1 Apr 2014 14:06:54 -0400 Subject: [PATCH 2/2] refactoring to make the code clear and simple and fix some bugs --- src/coordinator/coordinator.go | 100 ++++++++++++++++----------------- 1 file changed, 49 insertions(+), 51 deletions(-) diff --git a/src/coordinator/coordinator.go b/src/coordinator/coordinator.go index a0079bdb701..dffd675e469 100644 --- a/src/coordinator/coordinator.go +++ b/src/coordinator/coordinator.go @@ -217,8 +217,9 @@ func (self *CoordinatorImpl) shouldAggregateLocally(shards []*cluster.ShardData, } func (self *CoordinatorImpl) shouldQuerySequentially(shards []*cluster.ShardData, querySpec *parser.QuerySpec) bool { - // if we're not aggregating locally, that means all the raw points are being sent back in this query. Do it - // sequentially so we don't fill up memory like crazy. + // if we're not aggregating locally, that means all the raw points + // are being sent back in this query. Do it sequentially so we don't + // fill up memory like crazy. return !self.shouldAggregateLocally(shards, querySpec) } @@ -273,57 +274,23 @@ func (self *CoordinatorImpl) getShardsAndProcessor(querySpec *parser.QuerySpec, return shards, processor, seriesClosed, nil } -func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWriter SeriesWriter) error { - shards, processor, seriesClosed, err := self.getShardsAndProcessor(querySpec, seriesWriter) - if err != nil { - return err - } - - responses := make([]chan *protocol.Response, len(shards), len(shards)) - - shardConcurrentLimit := self.config.ConcurrentShardQueryLimit - if self.shouldQuerySequentially(shards, querySpec) { - log.Debug("Querying shards sequentially") - shardConcurrentLimit = 1 - } - log.Debug("Shard concurrent limit: ", shardConcurrentLimit) - for i := 0; i < shardConcurrentLimit && i < len(shards); i++ { - shard := shards[i] - responseChan := make(chan *protocol.Response, shard.QueryResponseBufferSize(querySpec, self.config.LevelDbPointBatchSize)) - // We query shards for data and stream them to query processor - go shard.Query(querySpec, responseChan) - responses[i] = responseChan - } - nextIndex := shardConcurrentLimit - // don't queue up new shards to query if we've hit the limit for the query - shouldContinue := false - - for i, responseChan := range responses { - log.Debug("READING: shard: ", i, shards[i].String()) - - // Do this because it's possible should continue was false so we haven't set the other response channels. - if responseChan == nil { - break - } +func (self *CoordinatorImpl) readFromResposneChannels(processor cluster.QueryProcessor, + writer SeriesWriter, + isExplainQuery bool, + channels []<-chan *protocol.Response) (err error) { + for _, responseChan := range channels { for { response := <-responseChan //log.Debug("GOT RESPONSE: ", response.Type, response.Series) log.Debug("GOT RESPONSE: ", response.Type) if *response.Type == endStreamResponse || *response.Type == accessDeniedResponse { - if response.ErrorMessage != nil && err == nil { - err = common.NewQueryError(common.InvalidArgument, *response.ErrorMessage) - } - if nextIndex < len(shards) && shouldContinue { - shard := shards[nextIndex] - responseChan := make(chan *protocol.Response, shard.QueryResponseBufferSize(querySpec, self.config.LevelDbPointBatchSize)) - // We query shards for data and stream them to query processor - log.Debug("Querying Shard: ", nextIndex, shard.String()) - go shard.Query(querySpec, responseChan) - responses[nextIndex] = responseChan - nextIndex += 1 + if response.ErrorMessage == nil { + break } - break + + err = common.NewQueryError(common.InvalidArgument, *response.ErrorMessage) + return } if response.Series == nil || len(response.Series.Points) == 0 { @@ -338,18 +305,49 @@ func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWri // if the data wasn't aggregated at the shard level, aggregate // the data here log.Debug("YIELDING: %d points with %d columns", len(response.Series.Points), len(response.Series.Fields)) - shouldContinue = processor.YieldSeries(response.Series) - log.Debug("ShouldContinue: ", shouldContinue) + processor.YieldSeries(response.Series) continue } // If we have EXPLAIN query, we don't write actual points (of // response.Type Query) to the client - if !(*response.Type == queryResponse && querySpec.IsExplainQuery()) { - seriesWriter.Write(response.Series) + if !(*response.Type == queryResponse && isExplainQuery) { + writer.Write(response.Series) } } - log.Debug("DONE: shard: ", shards[i].String()) + } + return +} + +func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWriter SeriesWriter) error { + shards, processor, seriesClosed, err := self.getShardsAndProcessor(querySpec, seriesWriter) + if err != nil { + return err + } + + shardConcurrentLimit := self.config.ConcurrentShardQueryLimit + if self.shouldQuerySequentially(shards, querySpec) { + log.Debug("Querying shards sequentially") + shardConcurrentLimit = 1 + } + log.Debug("Shard concurrent limit: ", shardConcurrentLimit) + for i := 0; i < len(shards); i += shardConcurrentLimit { + responses := make([]<-chan *protocol.Response, 0, shardConcurrentLimit) + + for j := 0; j < shardConcurrentLimit && i+j < len(shards); j++ { + shard := shards[i+j] + responseChan := make(chan *protocol.Response, shard.QueryResponseBufferSize(querySpec, self.config.LevelDbPointBatchSize)) + // We query shards for data and stream them to query processor + log.Debug("QUERYING: shard: ", i+j, shard.String()) + go shard.Query(querySpec, responseChan) + responses = append(responses, responseChan) + } + + err := self.readFromResposneChannels(processor, seriesWriter, querySpec.IsExplainQuery(), responses) + if err != nil { + log.Error("Reading responses from channels returned an error: %s", err) + return err + } } if processor != nil {