Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
add bitmapPostings
Browse files Browse the repository at this point in the history
Signed-off-by: naivewong <867245430@qq.com>
  • Loading branch information
naivewong committed Jun 17, 2019
1 parent bf6c0ae commit 7cfcf3d
Show file tree
Hide file tree
Showing 5 changed files with 284 additions and 45 deletions.
4 changes: 2 additions & 2 deletions encoding/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ type Encbuf struct {
Count uint8
}

func (e *Encbuf) Reset() {
e.B = e.B[:0]
func (e *Encbuf) Reset() {
e.B = e.B[:0]
e.Count = 0
}

Expand Down
7 changes: 6 additions & 1 deletion index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ func (w *Writer) WritePostings(name, value string, it Postings) error {
// The base.
w.buf2.PutUvarint32(refs[0])
// The width.
width := bits.Len32(uint32(refs[len(refs)-1]-refs[0]))
width := bits.Len32(uint32(refs[len(refs)-1] - refs[0]))
w.buf2.PutByte(byte(width))
for _, r := range refs {
w.buf2.PutBits(uint64(r-refs[0]), width)
Expand All @@ -541,6 +541,8 @@ func (w *Writer) WritePostings(name, value string, it Postings) error {
writeDeltaBlockPostings(&w.buf2, refs)
case 4:
writeBaseDeltaBlockPostings(&w.buf2, refs)
case 5:
writeBitmapPostings(&w.buf2, refs)
}

w.uint32s = refs
Expand Down Expand Up @@ -1061,6 +1063,9 @@ func (dec *Decoder) Postings(b []byte) (int, Postings, error) {
case 4:
l := d.Get()
return n, newBaseDeltaBlockPostings(l, n), d.Err()
case 5:
l := d.Get()
return n, newBitmapPostings(l), d.Err()
default:
return n, EmptyPostings(), d.Err()
}
Expand Down
7 changes: 7 additions & 0 deletions index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/prometheus/tsdb/chunkenc"
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/encoding"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil"
)
Expand Down Expand Up @@ -338,6 +339,12 @@ func TestPersistence_index_e2e(t *testing.T) {
err = iw.Close()
testutil.Ok(t, err)

f, err := fileutil.OpenMmapFile(filepath.Join(dir, indexFilename))
testutil.Ok(t, err)
toc, err := NewTOCFromByteSlice(realByteSlice(f.Bytes()))
testutil.Ok(t, err)
t.Log("size of postings =", toc.LabelIndicesTable-toc.Postings)

ir, err := NewFileReader(filepath.Join(dir, indexFilename))
testutil.Ok(t, err)

Expand Down
185 changes: 153 additions & 32 deletions index/postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,8 +692,8 @@ func (it *bigEndianPostings) Err() error {
return nil
}

// 1 is bigEndian, 2 is baseDelta, 3 is deltaBlock, 4 is baseDeltaBlock.
const postingsType = 4
// 1 is bigEndian, 2 is baseDelta, 3 is deltaBlock, 4 is baseDeltaBlock, 5 is bitmapPostings.
const postingsType = 5

type bitSlice struct {
bstream []byte
Expand Down Expand Up @@ -731,8 +731,8 @@ func (bs *bitSlice) readBits(offset int) uint64 {
return u
}

if nbits > int(8 - count) {
u = (u << uint(8 - count)) | uint64((bs.bstream[idx]<<count)>>count)
if nbits > int(8-count) {
u = (u << uint(8-count)) | uint64((bs.bstream[idx]<<count)>>count)
nbits -= int(8 - count)
idx += 1

Expand Down Expand Up @@ -827,10 +827,10 @@ func (it *deltaBlockPostings) At() uint64 {
}

func (it *deltaBlockPostings) Next() bool {
if it.offset >= len(it.bs.bstream) << 3 || it.idx >= it.size {
if it.offset >= len(it.bs.bstream)<<3 || it.idx >= it.size {
return false
}
if it.offset % (deltaBlockSize << 3) == 0 {
if it.offset%(deltaBlockSize<<3) == 0 {
val, n := binary.Uvarint(it.bs.bstream[it.offset>>3:])
if n < 1 {
return false
Expand All @@ -854,13 +854,13 @@ func (it *deltaBlockPostings) Next() bool {
it.idxBlock = 1
return true
}

it.cur = it.bs.readBits(it.offset) + it.cur
it.offset += it.bs.width
it.idx += 1
it.idxBlock += 1
if it.idxBlock == it.count {
it.offset = ((it.offset-1) / (deltaBlockSize << 3) + 1) * deltaBlockSize << 3
it.offset = ((it.offset-1)/(deltaBlockSize<<3) + 1) * deltaBlockSize << 3
}
return true
}
Expand All @@ -871,18 +871,18 @@ func (it *deltaBlockPostings) Seek(x uint64) bool {
}

startOff := (it.offset - 1) / (deltaBlockSize << 3) * deltaBlockSize
num := (len(it.bs.bstream) - 1) / deltaBlockSize - (it.offset - 1) / (deltaBlockSize << 3) + 1
num := (len(it.bs.bstream)-1)/deltaBlockSize - (it.offset-1)/(deltaBlockSize<<3) + 1
// Do binary search between current position and end.
i := sort.Search(num, func(i int) bool {
val, _ := binary.Uvarint(it.bs.bstream[startOff+i*deltaBlockSize:])
return val > x
})
if i > 0 {
// Go to the previous block because the previous block
// Go to the previous block because the previous block
// may contain the first value >= x.
i -= 1
}
it.offset = (startOff + i * deltaBlockSize) << 3
it.offset = (startOff + i*deltaBlockSize) << 3
for it.Next() {
if it.At() >= x {
return true
Expand All @@ -903,9 +903,9 @@ func writeDeltaBlockPostings(e *encoding.Encbuf, arr []uint32) {
var preVal uint32
var max int
for i < len(arr) {
e.PutUvarint32(arr[i]) // Put base.
e.PutUvarint32(arr[i]) // Put base.
e.PutUvarint64(uint64(i)) // Put idx.
remaining = (deltaBlockSize - (len(e.B) - startLen) % deltaBlockSize - 1) << 3
remaining = (deltaBlockSize - (len(e.B)-startLen)%deltaBlockSize - 1) << 3
deltas = deltas[:0]
preVal = arr[i]
max = -1
Expand All @@ -916,7 +916,7 @@ func writeDeltaBlockPostings(e *encoding.Encbuf, arr []uint32) {
if cur <= max {
cur = max
}
if remaining - cur * (len(deltas) + 1) - (((bits.Len(uint(len(deltas))) >> 3) + 1) << 3) >= 0 {
if remaining-cur*(len(deltas)+1)-(((bits.Len(uint(len(deltas)))>>3)+1)<<3) >= 0 {
deltas = append(deltas, delta)
max = cur
preVal = arr[i]
Expand Down Expand Up @@ -946,7 +946,7 @@ func writeDeltaBlockPostings(e *encoding.Encbuf, arr []uint32) {
e.PutBits(uint64(0), remaining)
}
e.Count = 0

// There can be one more extra 0.
e.B = e.B[:len(e.B)-(len(e.B)-startLen)%deltaBlockSize]
}
Expand Down Expand Up @@ -983,10 +983,10 @@ func (it *baseDeltaBlockPostings) At() uint64 {
}

func (it *baseDeltaBlockPostings) Next() bool {
if it.offset >= len(it.bs.bstream) << 3 || it.idx >= it.size {
if it.offset >= len(it.bs.bstream)<<3 || it.idx >= it.size {
return false
}
if it.offset % (deltaBlockSize << 3) == 0 {
if it.offset%(deltaBlockSize<<3) == 0 {
val, n := binary.Uvarint(it.bs.bstream[it.offset>>3:])
if n < 1 {
return false
Expand All @@ -1011,13 +1011,13 @@ func (it *baseDeltaBlockPostings) Next() bool {
it.idxBlock = 1
return true
}

it.cur = it.bs.readBits(it.offset) + it.base
it.offset += it.bs.width
it.idx += 1
it.idxBlock += 1
if it.idxBlock == it.count {
it.offset = ((it.offset-1) / (deltaBlockSize << 3) + 1) * deltaBlockSize << 3
it.offset = ((it.offset-1)/(deltaBlockSize<<3) + 1) * deltaBlockSize << 3
}
return true
}
Expand All @@ -1028,40 +1028,40 @@ func (it *baseDeltaBlockPostings) Seek(x uint64) bool {
}

startOff := (it.offset - 1) / (deltaBlockSize << 3) * deltaBlockSize
num := (len(it.bs.bstream) - 1) / deltaBlockSize - (it.offset - 1) / (deltaBlockSize << 3) + 1
num := (len(it.bs.bstream)-1)/deltaBlockSize - (it.offset-1)/(deltaBlockSize<<3) + 1
// Do binary search between current position and end.
i := sort.Search(num, func(i int) bool {
val, _ := binary.Uvarint(it.bs.bstream[startOff+i*deltaBlockSize:])
return val > x
})
if i > 0 {
// Go to the previous block because the previous block
// Go to the previous block because the previous block
// may contain the first value >= x.
i -= 1
}
it.offset = (startOff + i * deltaBlockSize) << 3
it.offset = (startOff + i*deltaBlockSize) << 3

// Read base, idx, and width.
it.Next()
if x <= it.base {
return true
} else {
temp := x - it.base
j := sort.Search(it.count - it.idxBlock, func(i int) bool {
return it.bs.readBits(it.offset + i * it.bs.width) >= temp
j := sort.Search(it.count-it.idxBlock, func(i int) bool {
return it.bs.readBits(it.offset+i*it.bs.width) >= temp
})

if j < it.count - it.idxBlock {
if j < it.count-it.idxBlock {
it.offset += j * it.bs.width
it.cur = it.bs.readBits(it.offset) + it.base
it.offset += it.bs.width
it.idxBlock += j + 1
it.idx += j + 1
if it.idxBlock == it.count {
it.offset = ((it.offset-1) / (deltaBlockSize << 3) + 1) * deltaBlockSize << 3
it.offset = ((it.offset-1)/(deltaBlockSize<<3) + 1) * deltaBlockSize << 3
}
} else {
it.offset = (startOff + (i + 1) * deltaBlockSize) << 3
it.offset = (startOff + (i+1)*deltaBlockSize) << 3
return it.Next()
}
return true
Expand All @@ -1080,17 +1080,17 @@ func writeBaseDeltaBlockPostings(e *encoding.Encbuf, arr []uint32) {
var base uint32
var max int
for i < len(arr) {
e.PutUvarint32(arr[i]) // Put base.
e.PutUvarint32(arr[i]) // Put base.
e.PutUvarint64(uint64(i)) // Put idx.
remaining = (deltaBlockSize - (len(e.B) - startLen) % deltaBlockSize - 1) << 3
remaining = (deltaBlockSize - (len(e.B)-startLen)%deltaBlockSize - 1) << 3
deltas = deltas[:0]
base = arr[i]
max = -1
i += 1
for i < len(arr) {
delta := arr[i] - base
cur := bits.Len32(delta)
if remaining - cur * (len(deltas) + 1) - (((bits.Len(uint(len(deltas))) >> 3) + 1) << 3) >= 0 {
if remaining-cur*(len(deltas)+1)-(((bits.Len(uint(len(deltas)))>>3)+1)<<3) >= 0 {
deltas = append(deltas, delta)
max = cur
} else {
Expand Down Expand Up @@ -1119,8 +1119,129 @@ func writeBaseDeltaBlockPostings(e *encoding.Encbuf, arr []uint32) {
e.PutBits(uint64(0), remaining)
}
e.Count = 0

// There can be one more extra 0.
e.B = e.B[:len(e.B)-(len(e.B)-startLen)%deltaBlockSize]
}
}

// 8bits -> 256/8=32bytes, 12bits -> 4096/8=512bytes, 16bits -> 65536/8=8192bytes.
const bitmapBits = 8

// Bitmap block format.
// ┌──────────┬────────┐
// │ key <4b> │ bitmap │
// └──────────┴────────┘
type bitmapPostings struct {
bs []byte
cur uint64
inside bool
idx1 int
idx2 int
bitmapSize int
key uint32
}

func newBitmapPostings(bstream []byte) *bitmapPostings {
return &bitmapPostings{bs: bstream, bitmapSize: 1 << (bitmapBits - 3)}
}

func (it *bitmapPostings) At() uint64 {
return it.cur
}

func (it *bitmapPostings) Next() bool {
if it.inside {
for it.idx1 < it.bitmapSize {
if it.bs[it.idx1+4] == byte(0) {
it.idx1 += 1
continue
}
for it.idx1 < it.bitmapSize {
if it.bs[it.idx1+4]&(1<<uint(7-it.idx2)) != byte(0) {
it.cur = uint64(it.key<<bitmapBits) + uint64(it.idx1*8+it.idx2)
it.idx2 += 1
if it.idx2 == 8 {
it.idx1 += 1
it.idx2 = 0
}
return true
} else {
it.idx2 += 1
if it.idx2 == 8 {
it.idx1 += 1
it.idx2 = 0
}
}
}
}
it.bs = it.bs[it.bitmapSize+4:]
it.inside = false
it.idx1 = 0
return it.Next()
} else {
if len(it.bs)-4 >= it.bitmapSize {
it.key = binary.BigEndian.Uint32(it.bs)
it.inside = true
return it.Next()
} else {
return false
}
}
}

func (it *bitmapPostings) Seek(x uint64) bool {
if it.cur >= x {
return true
}
curKey := uint32(x) >> bitmapBits
// curVal := uint32(x) & uint32((1 << uint(bitmapBits)) - 1)
i := sort.Search(len(it.bs)/(it.bitmapSize+4), func(i int) bool {
return binary.BigEndian.Uint32(it.bs[i*(it.bitmapSize+4):]) > curKey
})
if i > 0 {
i -= 1
if i > 0 {
it.idx1 = 0
it.idx2 = 0
it.bs = it.bs[i*(it.bitmapSize+4):]
it.inside = false
}
}
for it.Next() {
if it.At() >= x {
return true
}
}
return false
}

func (it *bitmapPostings) Err() error {
return nil
}

func writeBitmapPostings(e *encoding.Encbuf, arr []uint32) {
key := uint32(0xffffffff)
bitmapSize := 1 << (bitmapBits - 3)
mask := uint32((1 << uint(bitmapBits)) - 1)
var curKey uint32
var curVal uint32
var offset int // The starting offset of the bitmap of each block.
var idx1 int
var idx2 int
for _, val := range arr {
curKey = val >> bitmapBits
curVal = val & mask
idx1 = int(curVal) >> 3
idx2 = int(curVal) % 8
if curKey != key {
key = curKey
e.PutBE32(uint32(key))
offset = len(e.Get())
for i := 0; i < bitmapSize; i++ {
e.PutByte(byte(0))
}
}
e.B[offset+idx1] |= 1 << uint(7-idx2)
}
}
Loading

0 comments on commit 7cfcf3d

Please sign in to comment.