Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Querying large datasets take up large amount of memory #341

Merged
merged 2 commits into from
Apr 1, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions config.toml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,11 @@ protobuf_heartbeat = "200ms" # the heartbeat interval between the servers. must
# will be replayed from the WAL
write-buffer-size = 10000

# When queries get distributed out, the go in parallel. However, the responses must be sent in time order.
# This setting determines how many responses can be buffered in memory per shard before data starts gettind dropped.
query-shard-buffer-size = 1000
# When queries get distributed out to shards, they go in parallel. This means that results can get buffered
# in memory since results will come in any order, but have to be processed in the correct time order.
# Setting this higher will give better performance, but you'll need more memory. Setting this to 1 will ensure
# that you don't need to buffer in memory, but you won't get the best performance.
concurrent-shard-query-limit = 10

[leveldb]

Expand Down
35 changes: 34 additions & 1 deletion src/cluster/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,13 @@ type ShardData struct {
shardType ShardType
durationIsSplit bool
shardDuration time.Duration
shardSeconds int64
localServerId uint32
IsLocal bool
}

func NewShard(id uint32, startTime, endTime time.Time, shardType ShardType, durationIsSplit bool, wal WAL) *ShardData {
shardDuration := endTime.Sub(startTime)
return &ShardData{
id: id,
startTime: startTime,
Expand All @@ -87,7 +89,8 @@ func NewShard(id uint32, startTime, endTime time.Time, shardType ShardType, dura
serverIds: make([]uint32, 0),
shardType: shardType,
durationIsSplit: durationIsSplit,
shardDuration: endTime.Sub(startTime),
shardDuration: shardDuration,
shardSeconds: int64(shardDuration.Seconds()),
}
}

Expand Down Expand Up @@ -332,6 +335,36 @@ func (self *ShardData) ShouldAggregateLocally(querySpec *parser.QuerySpec) bool
return false
}

func (self *ShardData) QueryResponseBufferSize(querySpec *parser.QuerySpec, batchPointSize int) int {
groupByTime := querySpec.GetGroupByInterval()
if groupByTime == nil {
// If the group by time is nil, we shouldn't have to use a buffer since the shards should be queried sequentially.
// However, set this to something high just to be safe.
log.Info("BUFFER SIZE: 1000")
return 1000
}
tickCount := int(self.shardSeconds / int64(groupByTime.Seconds()))
if tickCount < 10 {
tickCount = 100
} else if tickCount > 1000 {
// cap this because each response should have up to this number of points in it.
tickCount = tickCount / batchPointSize

// but make sure it's at least 1k
if tickCount < 1000 {
tickCount = 1000
}
}
columnCount := querySpec.GetGroupByColumnCount()
if columnCount > 1 {
// we don't really know the cardinality for any column up front. This is a just a multiplier so we'll see how this goes.
// each response can have many points, so having a buffer of the ticks * 100 should be safe, but we'll see.
tickCount = tickCount * 100
}
log.Info("BUFFER SIZE: ", tickCount)
return tickCount
}

func (self *ShardData) logAndHandleDeleteQuery(querySpec *parser.QuerySpec, response chan *p.Response) {
queryString := querySpec.GetQueryStringWithTimeCondition()
request := self.createRequest(querySpec)
Expand Down
8 changes: 5 additions & 3 deletions src/configuration/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,11 @@ protobuf_heartbeat = "200ms" # the heartbeat interval between the servers. must
# will be replayed from the WAL
write-buffer-size = 10000

# When queries get distributed out, the go in parallel. However, the responses must be sent in time order.
# This setting determines how many responses can be buffered in memory per shard before data starts gettind dropped.
query-shard-buffer-size = 1000
# When queries get distributed out to shards, they go in parallel. This means that results can get buffered
# in memory since results will come in any order, but have to be processed in the correct time order.
# Setting this higher will give better performance, but you'll need more memory. Setting this to 1 will ensure
# that you don't need to buffer in memory, but you won't get the best performance.
concurrent-shard-query-limit = 10

[leveldb]

Expand Down
12 changes: 6 additions & 6 deletions src/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type ClusterConfig struct {
ProtobufTimeout duration `toml:"protobuf_timeout"`
ProtobufHeartbeatInterval duration `toml:"protobuf_heartbeat"`
WriteBufferSize int `toml"write-buffer-size"`
QueryShardBufferSize int `toml:"query-shard-buffer-size"`
ConcurrentShardQueryLimit int `toml:"concurrent-shard-query-limit"`
}

type LoggingConfig struct {
Expand Down Expand Up @@ -215,7 +215,7 @@ type Configuration struct {
WalRequestsPerLogFile int
LocalStoreWriteBufferSize int
PerServerWriteBufferSize int
QueryShardBufferSize int
ConcurrentShardQueryLimit int
}

func LoadConfiguration(fileName string) *Configuration {
Expand Down Expand Up @@ -254,9 +254,9 @@ func parseTomlConfiguration(filename string) (*Configuration, error) {
tomlConfiguration.WalConfig.RequestsPerLogFile = 10 * tomlConfiguration.WalConfig.IndexAfterRequests
}

defaultQueryShardBufferSize := 100
if tomlConfiguration.Cluster.QueryShardBufferSize != 0 {
defaultQueryShardBufferSize = tomlConfiguration.Cluster.QueryShardBufferSize
defaultConcurrentShardQueryLimit := 10
if tomlConfiguration.Cluster.ConcurrentShardQueryLimit != 0 {
defaultConcurrentShardQueryLimit = tomlConfiguration.Cluster.ConcurrentShardQueryLimit
}

if tomlConfiguration.Raft.Timeout.Duration == 0 {
Expand Down Expand Up @@ -297,7 +297,7 @@ func parseTomlConfiguration(filename string) (*Configuration, error) {
WalRequestsPerLogFile: tomlConfiguration.WalConfig.RequestsPerLogFile,
LocalStoreWriteBufferSize: tomlConfiguration.Storage.WriteBufferSize,
PerServerWriteBufferSize: tomlConfiguration.Cluster.WriteBufferSize,
QueryShardBufferSize: defaultQueryShardBufferSize,
ConcurrentShardQueryLimit: defaultConcurrentShardQueryLimit,
}

if config.LocalStoreWriteBufferSize == 0 {
Expand Down
90 changes: 56 additions & 34 deletions src/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,20 +159,12 @@ func (self *CoordinatorImpl) runListSeriesQuery(querySpec *parser.QuerySpec, ser
}
seriesYielded := make(map[string]bool)

responses := make([]chan *protocol.Response, 0)
for _, shard := range shortTermShards {
responseChan := make(chan *protocol.Response, self.config.QueryShardBufferSize)
go shard.Query(querySpec, responseChan)
responses = append(responses, responseChan)
}
for _, shard := range longTermShards {
responseChan := make(chan *protocol.Response, self.config.QueryShardBufferSize)
go shard.Query(querySpec, responseChan)
responses = append(responses, responseChan)
}
shards := append(shortTermShards, longTermShards...)

var err error
for _, responseChan := range responses {
for _, shard := range shards {
responseChan := make(chan *protocol.Response, shard.QueryResponseBufferSize(querySpec, self.config.LevelDbPointBatchSize))
go shard.Query(querySpec, responseChan)
for {
response := <-responseChan
if *response.Type == endStreamResponse || *response.Type == accessDeniedResponse {
Expand Down Expand Up @@ -224,6 +216,13 @@ func (self *CoordinatorImpl) shouldAggregateLocally(shards []*cluster.ShardData,
return true
}

func (self *CoordinatorImpl) shouldQuerySequentially(shards []*cluster.ShardData, querySpec *parser.QuerySpec) bool {
// if we're not aggregating locally, that means all the raw points
// are being sent back in this query. Do it sequentially so we don't
// fill up memory like crazy.
return !self.shouldAggregateLocally(shards, querySpec)
}

func (self *CoordinatorImpl) getShardsAndProcessor(querySpec *parser.QuerySpec, writer SeriesWriter) ([]*cluster.ShardData, cluster.QueryProcessor, chan bool, error) {
shards := self.clusterConfiguration.GetShards(querySpec)
shouldAggregateLocally := self.shouldAggregateLocally(shards, querySpec)
Expand Down Expand Up @@ -275,32 +274,23 @@ func (self *CoordinatorImpl) getShardsAndProcessor(querySpec *parser.QuerySpec,
return shards, processor, seriesClosed, nil
}

func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWriter SeriesWriter) error {
shards, processor, seriesClosed, err := self.getShardsAndProcessor(querySpec, seriesWriter)
if err != nil {
return err
}

responses := make([]chan *protocol.Response, 0)
for _, shard := range shards {
responseChan := make(chan *protocol.Response, self.config.QueryShardBufferSize)
// We query shards for data and stream them to query processor
go shard.Query(querySpec, responseChan)
responses = append(responses, responseChan)
}

for i, responseChan := range responses {
log.Debug("READING: shard: ", shards[i].String())
func (self *CoordinatorImpl) readFromResposneChannels(processor cluster.QueryProcessor,
writer SeriesWriter,
isExplainQuery bool,
channels []<-chan *protocol.Response) (err error) {
for _, responseChan := range channels {
for {
response := <-responseChan

//log.Debug("GOT RESPONSE: ", response.Type, response.Series)
log.Debug("GOT RESPONSE: ", response.Type)
if *response.Type == endStreamResponse || *response.Type == accessDeniedResponse {
if response.ErrorMessage != nil && err == nil {
err = common.NewQueryError(common.InvalidArgument, *response.ErrorMessage)
if response.ErrorMessage == nil {
break
}
break

err = common.NewQueryError(common.InvalidArgument, *response.ErrorMessage)
return
}

if response.Series == nil || len(response.Series.Points) == 0 {
Expand All @@ -321,11 +311,43 @@ func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWri

// If we have EXPLAIN query, we don't write actual points (of
// response.Type Query) to the client
if !(*response.Type == queryResponse && querySpec.IsExplainQuery()) {
seriesWriter.Write(response.Series)
if !(*response.Type == queryResponse && isExplainQuery) {
writer.Write(response.Series)
}
}
log.Debug("DONE: shard: ", shards[i].String())
}
return
}

func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWriter SeriesWriter) error {
shards, processor, seriesClosed, err := self.getShardsAndProcessor(querySpec, seriesWriter)
if err != nil {
return err
}

shardConcurrentLimit := self.config.ConcurrentShardQueryLimit
if self.shouldQuerySequentially(shards, querySpec) {
log.Debug("Querying shards sequentially")
shardConcurrentLimit = 1
}
log.Debug("Shard concurrent limit: ", shardConcurrentLimit)
for i := 0; i < len(shards); i += shardConcurrentLimit {
responses := make([]<-chan *protocol.Response, 0, shardConcurrentLimit)

for j := 0; j < shardConcurrentLimit && i+j < len(shards); j++ {
shard := shards[i+j]
responseChan := make(chan *protocol.Response, shard.QueryResponseBufferSize(querySpec, self.config.LevelDbPointBatchSize))
// We query shards for data and stream them to query processor
log.Debug("QUERYING: shard: ", i+j, shard.String())
go shard.Query(querySpec, responseChan)
responses = append(responses, responseChan)
}

err := self.readFromResposneChannels(processor, seriesWriter, querySpec.IsExplainQuery(), responses)
if err != nil {
log.Error("Reading responses from channels returned an error: %s", err)
return err
}
}

if processor != nil {
Expand Down
1 change: 1 addition & 0 deletions src/coordinator/protobuf_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func (self *ProtobufClient) sendResponse(response *protocol.Response) {
case req.responseChan <- response:
default:
log.Error("ProtobufClient: Response buffer full! ", self.hostAndPort, response)
panic("fuck, dropping shit!")
// if it's an end stream response, we have to send it so start it in a goroutine so we can make sure it gets through without blocking the reading of responses.
if *response.Type == protocol.Response_END_STREAM || *response.Type == protocol.Response_WRITE_OK || *response.Type == protocol.Response_ACCESS_DENIED {
go func() {
Expand Down
6 changes: 3 additions & 3 deletions src/integration/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
. "launchpad.net/gocheck"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"syscall"
"time"
. "launchpad.net/gocheck"
)

type ServerSuite struct {
Expand Down Expand Up @@ -216,13 +216,13 @@ func (self *ServerProcess) VerifyForbiddenQuery(database, query string, onlyLoca

func (self *ServerProcess) Post(url, data string, c *C) *http.Response {
err := self.Request("POST", url, data, c)
time.Sleep(time.Millisecond * 10)
time.Sleep(time.Millisecond * 100)
return err
}

func (self *ServerProcess) Delete(url, body string, c *C) *http.Response {
err := self.Request("DELETE", url, body, c)
time.Sleep(time.Millisecond * 10)
time.Sleep(time.Millisecond * 100)
return err
}

Expand Down
18 changes: 16 additions & 2 deletions src/parser/query_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ type QuerySpec struct {
endTime time.Time
seriesValuesAndColumns map[*Value][]string
RunAgainstAllServersInShard bool
groupByInterval *time.Duration
groupByColumnCount int
}

func NewQuerySpec(user common.User, database string, query *Query) *QuerySpec {
Expand Down Expand Up @@ -89,8 +91,20 @@ func (self *QuerySpec) GetGroupByInterval() *time.Duration {
if self.query.SelectQuery == nil {
return nil
}
duration, _ := self.query.SelectQuery.GetGroupByClause().GetGroupByTime()
return duration
if self.groupByInterval == nil {
self.groupByInterval, _ = self.query.SelectQuery.GetGroupByClause().GetGroupByTime()
}
return self.groupByInterval
}

func (self *QuerySpec) GetGroupByColumnCount() int {
if self.query.SelectQuery == nil {
return 0
}
if self.groupByColumnCount == 0 {
self.groupByColumnCount = len(self.query.SelectQuery.GetGroupByClause().Elems) - 1
}
return self.groupByColumnCount
}

func (self *QuerySpec) IsRegex() bool {
Expand Down