Skip to content

Commit

Permalink
Fixing hashpool
Browse files Browse the repository at this point in the history
Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>
  • Loading branch information
pedro-stanaka committed Sep 22, 2022
1 parent d3e3ab3 commit 2701f11
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 23 deletions.
33 changes: 14 additions & 19 deletions pkg/store/bucket.go
Expand Up @@ -103,6 +103,7 @@ const (

var (
errBlockSyncConcurrencyNotValid = errors.New("the block sync concurrency must be equal or greater than 1.")
hashPool = sync.Pool{New: func() interface{} { return xxhash.New() }}
)

type bucketStoreMetrics struct {
Expand Down Expand Up @@ -884,21 +885,12 @@ func blockSeries(
}

func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr, save func([]byte) ([]byte, error), calculateChecksum bool) error {

hp := sync.Pool{
New: func() interface{} {
return xxhash.New()
},
}
hasher := hp.Get().(hash.Hash64)
defer hp.Put(hasher)

if in.Encoding() == chunkenc.EncXOR {
b, err := save(in.Bytes())
if err != nil {
return err
}
out.Raw = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b, Hash: hashChunk(hasher, b, calculateChecksum)}
out.Raw = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b, Hash: hashChunk(b, calculateChecksum)}
return nil
}
if in.Encoding() != downsample.ChunkEncAggr {
Expand All @@ -918,7 +910,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag
if err != nil {
return err
}
out.Count = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b, Hash: hashChunk(hasher, b, calculateChecksum)}
out.Count = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b, Hash: hashChunk(b, calculateChecksum)}
case storepb.Aggr_SUM:
x, err := ac.Get(downsample.AggrSum)
if err != nil {
Expand All @@ -928,7 +920,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag
if err != nil {
return err
}
out.Sum = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b, Hash: hashChunk(hasher, b, calculateChecksum)}
out.Sum = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b, Hash: hashChunk(b, calculateChecksum)}
case storepb.Aggr_MIN:
x, err := ac.Get(downsample.AggrMin)
if err != nil {
Expand All @@ -938,7 +930,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag
if err != nil {
return err
}
out.Min = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b, Hash: hashChunk(hasher, b, calculateChecksum)}
out.Min = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b, Hash: hashChunk(b, calculateChecksum)}
case storepb.Aggr_MAX:
x, err := ac.Get(downsample.AggrMax)
if err != nil {
Expand All @@ -948,7 +940,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag
if err != nil {
return err
}
out.Max = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b, Hash: hashChunk(hasher, b, calculateChecksum)}
out.Max = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b, Hash: hashChunk(b, calculateChecksum)}
case storepb.Aggr_COUNTER:
x, err := ac.Get(downsample.AggrCounter)
if err != nil {
Expand All @@ -958,20 +950,23 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag
if err != nil {
return err
}
out.Counter = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b, Hash: hashChunk(hasher, b, calculateChecksum)}
out.Counter = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b, Hash: hashChunk(b, calculateChecksum)}
}
}
return nil
}

func hashChunk(i hash.Hash64, b []byte, doHash bool) uint64 {
func hashChunk(b []byte, doHash bool) uint64 {
hasher := hashPool.Get().(hash.Hash64)
defer hashPool.Put(hasher)

if !doHash {
return 0
}
i.Reset()
hasher.Reset()
// Write never returns an error on the hasher implementation
_, _ = i.Write(b)
return i.Sum64()
_, _ = hasher.Write(b)
return hasher.Sum64()
}

// debugFoundBlockSetOverview logs on debug level what exactly blocks we used for query in terms of
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/storepb/rpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions pkg/store/tsdb.go
Expand Up @@ -10,8 +10,6 @@ import (
"sort"
"sync"

"github.com/cespare/xxhash"

"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -199,7 +197,7 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer
Raw: &storepb.Chunk{
Type: storepb.Chunk_Encoding(chk.Chunk.Encoding() - 1), // Proto chunk encoding is one off to TSDB one.
Data: chk.Chunk.Bytes(),
Hash: hashChunk(xxhash.New(), chk.Chunk.Bytes(), r.CalculateChunkChecksums),
Hash: hashChunk(chk.Chunk.Bytes(), r.CalculateChunkChecksums),
},
}
frameBytesLeft -= c.Size()
Expand Down

0 comments on commit 2701f11

Please sign in to comment.