Skip to content

Commit

Permalink
feat(stream): add time task agg in coordinator (#492)
Browse files Browse the repository at this point in the history
Signed-off-by: ZhangJian He <shoothzj@gmail.com>
  • Loading branch information
shoothzj committed Feb 25, 2024
1 parent 4983ea0 commit f91833a
Show file tree
Hide file tree
Showing 18 changed files with 660 additions and 724 deletions.
64 changes: 0 additions & 64 deletions app/ts-store/stream/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package stream
import (
"sync"
"sync/atomic"
"unsafe"

"github.com/openGemini/openGemini/lib/cpu"
"github.com/openGemini/openGemini/lib/util/lifted/vm/protoparser/influx"
Expand Down Expand Up @@ -64,69 +63,6 @@ func (p *CacheRowPool) Size() int64 {
return atomic.LoadInt64(&p.size)
}

func NewBuilderPool() *BuilderPool {
p := &BuilderPool{}
return p
}

type StringBuilder struct {
buf []byte
}

func (b *StringBuilder) String() string {
return *(*string)(unsafe.Pointer(&b.buf))
}

func (b *StringBuilder) NewString() string {
if len(b.buf) == 0 {
return ""
}
s := make([]byte, len(b.buf))
copy(s, b.buf)
return *(*string)(unsafe.Pointer(&s))
}

func (b *StringBuilder) Reset() {
b.buf = b.buf[:0]
}

func (b *StringBuilder) AppendByte(c byte) {
b.buf = append(b.buf, c)
}

func (b *StringBuilder) AppendString(s string) {
b.buf = append(b.buf, s...)
}

type BuilderPool struct {
pool sync.Pool
size int64
length int64
}

func (p *BuilderPool) Get() *StringBuilder {
c := p.pool.Get()
if c == nil {
atomic.AddInt64(&p.size, 1)
return &StringBuilder{}
}
atomic.AddInt64(&p.length, -1)
return c.(*StringBuilder)
}

func (p *BuilderPool) Put(r *StringBuilder) {
p.pool.Put(r)
atomic.AddInt64(&p.length, 1)
}

func (p *BuilderPool) Len() int64 {
return atomic.LoadInt64(&p.length)
}

func (p *BuilderPool) Size() int64 {
return atomic.LoadInt64(&p.size)
}

type RowsPool struct {
pool sync.Pool
}
Expand Down
98 changes: 0 additions & 98 deletions app/ts-store/stream/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package stream_test

import (
"bytes"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -107,28 +106,6 @@ func Test_WindowCachePool_Block(t *testing.T) {
}
}

func Test_builderPool(t *testing.T) {
bp := stream.NewBuilderPool()
sb := bp.Get()
for i := 0; i < 100; i++ {
sb.AppendString("xx")
}
f := sb.NewString()
if len(f) != 200 {
t.Error("len fail")
}
sb.Reset()
bp.Put(sb)
sb1 := bp.Get()
for i := 0; i < 100; i++ {
sb1.AppendString("xx")
}
f = sb1.NewString()
if len(f) != 200 {
t.Error("len fail")
}
}

func Test_CacheRowPool_Len(t *testing.T) {
pool := stream.NewCacheRowPool()
c1 := pool.Get()
Expand Down Expand Up @@ -160,78 +137,3 @@ func Test_CacheRowPool_Len(t *testing.T) {
t.Error(fmt.Sprintf("expect %v ,got %v", 2, pool.Size()))
}
}

func Test_BuilderPool_Len(t *testing.T) {
pool := stream.NewBuilderPool()
c1 := pool.Get()
if pool.Len() != 0 {
t.Error(fmt.Sprintf("expect %v ,got %v", 0, pool.Len()))
}
if pool.Size() != 1 {
t.Error(fmt.Sprintf("expect %v ,got %v", 1, pool.Size()))
}
c2 := pool.Get()
if pool.Len() != 0 {
t.Error(fmt.Sprintf("expect %v ,got %v", 0, pool.Len()))
}
if pool.Size() != 2 {
t.Error(fmt.Sprintf("expect %v ,got %v", 2, pool.Size()))
}
pool.Put(c1)
if pool.Len() != 1 {
t.Error(fmt.Sprintf("expect %v ,got %v", 1, pool.Len()))
}
if pool.Size() != 2 {
t.Error(fmt.Sprintf("expect %v ,got %v", 2, pool.Size()))
}
pool.Put(c2)
if pool.Len() != 2 {
t.Error(fmt.Sprintf("expect %v ,got %v", 2, pool.Len()))
}
if pool.Size() != 2 {
t.Error(fmt.Sprintf("expect %v ,got %v", 2, pool.Size()))
}
}

func Test_StringBuilder(t *testing.T) {
sb := stream.StringBuilder{}
sb.AppendString("xx")
str := sb.String()
strNew := sb.NewString()
sb.Reset()
sb.AppendString("aa")
str1 := sb.NewString()
if strNew != "xx" {
t.Fatal("unexpect", strNew)
}
if str != str1 {
t.Fatal("unexpect", str)
}
sb.Reset()
str2 := sb.NewString()
if str2 != "" {
t.Fatal("unexpect", str2)
}
}

func BenchmarkStringBuilder(t *testing.B) {
t.ReportAllocs()
t.ResetTimer()
sb := stream.StringBuilder{}
for i := 0; i < t.N; i++ {
for j := 0; j < 10000000; j++ {
sb.AppendString("key12345")
}
}
}

func BenchmarkBytesBuffer(t *testing.B) {
t.ReportAllocs()
t.ResetTimer()
bb := bytes.Buffer{}
for i := 0; i < t.N; i++ {
for j := 0; j < 10000000; j++ {
bb.WriteString("key12345")
}
}
}
Loading

0 comments on commit f91833a

Please sign in to comment.