Skip to content

Commit

Permalink
Merge pull request #473 from influxdb/fix-patricks-crash
Browse files Browse the repository at this point in the history
Reduce the memory utilization of the engine when the cardinality of the group by columns is big
  • Loading branch information
jvshahid committed Apr 25, 2014
2 parents c335f71 + a3da8b9 commit c0eab7a
Show file tree
Hide file tree
Showing 14 changed files with 950 additions and 930 deletions.
2 changes: 1 addition & 1 deletion config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ write-buffer-size = 10000
# expected number of responses exceed this number then querying will
# happen sequentially and the buffer size will be limited to this
# number
max-response-buffer-size = 100000
max-response-buffer-size = 100

# When queries get distributed out to shards, they go in parallel. This means that results can get buffered
# in memory since results will come in any order, but have to be processed in the correct time order.
Expand Down
16 changes: 16 additions & 0 deletions src/cluster/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Shard interface {
StartTime() time.Time
EndTime() time.Time
Write(*p.Request) error
SyncWrite(*p.Request) error
Query(querySpec *parser.QuerySpec, response chan *p.Response)
IsMicrosecondInRange(t int64) bool
}
Expand Down Expand Up @@ -178,6 +179,21 @@ func (self *ShardData) ServerIds() []uint32 {
return self.serverIds
}

func (self *ShardData) SyncWrite(request *p.Request) error {
request.ShardId = &self.id
if err := self.store.Write(request); err != nil {
return err
}

for _, server := range self.clusterServers {
if err := server.Write(request); err != nil {
return err
}
}

return nil
}

func (self *ShardData) Write(request *p.Request) error {
request.ShardId = &self.id
requestNumber, err := self.wal.AssignSequenceNumbersAndLog(request, self)
Expand Down
10 changes: 7 additions & 3 deletions src/cluster/write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,15 @@ func (self *WriteBuffer) write(request *protocol.Request) {
attempts := 0
for {
self.shardIds[*request.ShardId] = true
requestNumber := *request.RequestNumber
err := self.writer.Write(request)
if err == nil {
self.shardCommitedRequestNumber[request.GetShardId()] = request.GetRequestNumber()
self.wal.Commit(requestNumber, self.serverId)
requestNumber := request.RequestNumber
if requestNumber == nil {
return
}

self.shardCommitedRequestNumber[request.GetShardId()] = *requestNumber
self.wal.Commit(*requestNumber, self.serverId)
return
}
if attempts%100 == 0 {
Expand Down
31 changes: 23 additions & 8 deletions src/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func (self *CoordinatorImpl) WriteSeriesData(user common.User, db string, series
return common.NewAuthorizationError("Insufficient permissions to write to %s", db)
}

err := self.CommitSeriesData(db, series)
err := self.CommitSeriesData(db, series, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -524,6 +524,7 @@ func (self *CoordinatorImpl) InterpolateValuesAndCommit(query string, db string,
r, _ := regexp.Compile(`\[.*?\]`)

if r.MatchString(targetName) {
serieses := map[string]*protocol.Series{}
for _, point := range series.Points {
targetNameWithValues := r.ReplaceAllStringFunc(targetName, func(match string) string {
fieldName := match[1 : len(match)-1]
Expand All @@ -538,10 +539,20 @@ func (self *CoordinatorImpl) InterpolateValuesAndCommit(query string, db string,
point.SequenceNumber = &sequenceNumber
}

newSeries := &protocol.Series{Name: &targetNameWithValues, Fields: series.Fields, Points: []*protocol.Point{point}}
if e := self.CommitSeriesData(db, []*protocol.Series{newSeries}); e != nil {
log.Error("Couldn't write data for continuous query: ", e)
newSeries := serieses[targetNameWithValues]
if newSeries == nil {
newSeries = &protocol.Series{Name: &targetNameWithValues, Fields: series.Fields, Points: []*protocol.Point{point}}
serieses[targetNameWithValues] = newSeries
continue
}
newSeries.Points = append(newSeries.Points, point)
}
seriesSlice := make([]*protocol.Series, 0, len(serieses))
for _, s := range serieses {
seriesSlice = append(seriesSlice, s)
}
if e := self.CommitSeriesData(db, seriesSlice, true); e != nil {
log.Error("Couldn't write data for continuous query: ", e)
}
} else {
newSeries := &protocol.Series{Name: &targetName, Fields: series.Fields, Points: series.Points}
Expand All @@ -554,15 +565,15 @@ func (self *CoordinatorImpl) InterpolateValuesAndCommit(query string, db string,
}
}

if e := self.CommitSeriesData(db, []*protocol.Series{newSeries}); e != nil {
if e := self.CommitSeriesData(db, []*protocol.Series{newSeries}, true); e != nil {
log.Error("Couldn't write data for continuous query: ", e)
}
}

return nil
}

func (self *CoordinatorImpl) CommitSeriesData(db string, serieses []*protocol.Series) error {
func (self *CoordinatorImpl) CommitSeriesData(db string, serieses []*protocol.Series, sync bool) error {
now := common.CurrentTime()

shardToSerieses := map[uint32]map[string]*protocol.Series{}
Expand All @@ -580,6 +591,7 @@ func (self *CoordinatorImpl) CommitSeriesData(db string, serieses []*protocol.Se
}

// sort the points by timestamp
// TODO: this isn't needed anymore
series.SortPointsTimeDescending()

for i := 0; i < len(series.Points); {
Expand Down Expand Up @@ -618,7 +630,7 @@ func (self *CoordinatorImpl) CommitSeriesData(db string, serieses []*protocol.Se
seriesesSlice = append(seriesesSlice, s)
}

err := self.write(db, seriesesSlice, shard)
err := self.write(db, seriesesSlice, shard, sync)
if err != nil {
log.Error("COORD error writing: ", err)
return err
Expand All @@ -628,8 +640,11 @@ func (self *CoordinatorImpl) CommitSeriesData(db string, serieses []*protocol.Se
return nil
}

func (self *CoordinatorImpl) write(db string, series []*protocol.Series, shard cluster.Shard) error {
func (self *CoordinatorImpl) write(db string, series []*protocol.Series, shard cluster.Shard, sync bool) error {
request := &protocol.Request{Type: &write, Database: &db, MultiSeries: series}
if sync {
return shard.SyncWrite(request)
}
return shard.Write(request)
}

Expand Down
2 changes: 1 addition & 1 deletion src/daemon/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ outer:
}
// wait for all logging messages to be printed
<-stopped
time.Sleep(time.Second)
time.Sleep(5 * time.Second)
os.Exit(0)
}

Expand Down
60 changes: 43 additions & 17 deletions src/datastore/leveldb_shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,29 +62,47 @@ func (self *LevelDbShard) Write(database string, series *protocol.Series) error
return errors.New("Unable to write no data. Series was nil or had no points.")
}

count := 0
for fieldIndex, field := range series.Fields {
temp := field
id, err := self.createIdForDbSeriesColumn(&database, series.Name, &temp)
if err != nil {
return err
}
keyBuffer := bytes.NewBuffer(make([]byte, 0, 24))
dataBuffer := proto.NewBuffer(nil)
for _, point := range series.Points {
keyBuffer := bytes.NewBuffer(make([]byte, 0, 24))
keyBuffer.Reset()
dataBuffer.Reset()

keyBuffer.Write(id)
binary.Write(keyBuffer, binary.BigEndian, self.convertTimestampToUint(point.GetTimestampInMicroseconds()))
binary.Write(keyBuffer, binary.BigEndian, *point.SequenceNumber)
timestamp := self.convertTimestampToUint(point.GetTimestampInMicroseconds())
// pass the uint64 by reference so binary.Write() doesn't create a new buffer
// see the source code for intDataSize() in binary.go
binary.Write(keyBuffer, binary.BigEndian, &timestamp)
binary.Write(keyBuffer, binary.BigEndian, point.SequenceNumber)
pointKey := keyBuffer.Bytes()

if point.Values[fieldIndex].GetIsNull() {
wb.Delete(pointKey)
continue
goto check
}

data, err := proto.Marshal(point.Values[fieldIndex])
err = dataBuffer.Marshal(point.Values[fieldIndex])
if err != nil {
return err
}
wb.Put(pointKey, data)
wb.Put(pointKey, dataBuffer.Bytes())
check:
count++
if count >= SIXTY_FOUR_KILOBYTES {
err = self.db.Write(self.writeOptions, wb)
if err != nil {
return err
}
count = 0
wb.Clear()
}
}
}

Expand Down Expand Up @@ -160,7 +178,7 @@ func (self *LevelDbShard) executeQueryForSeries(querySpec *parser.QuerySpec, ser
}

fieldCount := len(fields)
rawColumnValues := make([]*rawColumnValue, fieldCount, fieldCount)
rawColumnValues := make([]rawColumnValue, fieldCount, fieldCount)
query := querySpec.SelectQuery()

aliases := query.GetTableAliases(seriesName)
Expand All @@ -186,12 +204,14 @@ func (self *LevelDbShard) executeQueryForSeries(querySpec *parser.QuerySpec, ser

// TODO: clean up, this is super gnarly
// optimize for the case where we're pulling back only a single column or aggregate
buffer := bytes.NewBuffer(nil)
valueBuffer := proto.NewBuffer(nil)
for {
isValid := false
point := &protocol.Point{Values: make([]*protocol.FieldValue, fieldCount, fieldCount)}

for i, it := range iterators {
if rawColumnValues[i] != nil || !it.Valid() {
if rawColumnValues[i].value != nil || !it.Valid() {
continue
}

Expand All @@ -208,8 +228,7 @@ func (self *LevelDbShard) executeQueryForSeries(querySpec *parser.QuerySpec, ser
sequenceNumber := key[16:]

rawTime := key[8:16]
rawValue := &rawColumnValue{time: rawTime, sequence: sequenceNumber, value: value}
rawColumnValues[i] = rawValue
rawColumnValues[i] = rawColumnValue{time: rawTime, sequence: sequenceNumber, value: value}
}

var pointTimeRaw []byte
Expand All @@ -218,7 +237,7 @@ func (self *LevelDbShard) executeQueryForSeries(querySpec *parser.QuerySpec, ser
// and sequence number. that will become the timestamp and sequence of
// the next point.
for _, value := range rawColumnValues {
if value == nil {
if value.value == nil {
continue
}

Expand All @@ -229,7 +248,7 @@ func (self *LevelDbShard) executeQueryForSeries(querySpec *parser.QuerySpec, ser
for i, iterator := range iterators {
// if the value is nil or doesn't match the point's timestamp and sequence number
// then skip it
if rawColumnValues[i] == nil ||
if rawColumnValues[i].value == nil ||
!bytes.Equal(rawColumnValues[i].time, pointTimeRaw) ||
!bytes.Equal(rawColumnValues[i].sequence, pointSequenceRaw) {

Expand All @@ -249,19 +268,26 @@ func (self *LevelDbShard) executeQueryForSeries(querySpec *parser.QuerySpec, ser
}

fv := &protocol.FieldValue{}
err := proto.Unmarshal(rawColumnValues[i].value, fv)
valueBuffer.SetBuf(rawColumnValues[i].value)
err := valueBuffer.Unmarshal(fv)
if err != nil {
return err
}
point.Values[i] = fv
rawColumnValues[i] = nil
rawColumnValues[i].value = nil
}

var sequence uint64
// set the point sequence number and timestamp
binary.Read(bytes.NewBuffer(pointSequenceRaw), binary.BigEndian, &sequence)
var t uint64
binary.Read(bytes.NewBuffer(pointTimeRaw), binary.BigEndian, &t)

// set the point sequence number and timestamp
buffer.Reset()
buffer.Write(pointSequenceRaw)
binary.Read(buffer, binary.BigEndian, &sequence)
buffer.Reset()
buffer.Write(pointTimeRaw)
binary.Read(buffer, binary.BigEndian, &t)

time := self.convertUintTimestampToInt64(&t)
point.SetTimestampInMicroseconds(time)
point.SequenceNumber = &sequence
Expand Down
Loading

0 comments on commit c0eab7a

Please sign in to comment.