Skip to content

Commit

Permalink
Support configurable Zstd levels (#81)
Browse files Browse the repository at this point in the history
Part of #35.
This is somewhat complex because gRPC supports only static registration
of compression objects and collectors naturally let you provide
per-component configuration. To configure per-component compression
levels is somewhat supported, as follows:

There are 10 distinct compression levels, 1 through 10. Any one of them
can be configured through any component, but there is only one global
value permitted for the encoder configuration, per level, and decoder
configuration is singular (to keep it simple--it could be per level as
well but I didn't go this way).

Updated README to explain the caveat.

Note the implementation in `mru.go` is derived from a production-tested
implementation used internally. I've edited it for minor details, added
a Reset method, and written new tests.

These configurations are independent of the OTel-collector-wide
compression setting controlled by
https://github.com/mostynb/go-grpc-compression. Unlike that library, the
new support is thread-safe and reconfiguration at runtime results in
resetting the MRU cache.

The names used by these compression settings are "zstdarrow1" through
"zstdarrow10".
  • Loading branch information
jmacd committed Nov 6, 2023
1 parent e8f84cb commit 5176e5b
Show file tree
Hide file tree
Showing 15 changed files with 833 additions and 22 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: 1.21
go-version: "1.21"

- name: Build all modules
run: make build
Expand Down
106 changes: 106 additions & 0 deletions collector/compression/zstd/mru.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package zstd

import (
"sync"
"time"
)

// mru is a freelist whose two main benefits compared to sync.Pool are:
//
// - It doesn't perform any per-CPU caching; it has only a single
// cache. The cache is modelled as a stack, meaning that the most
// recently used item is always the next to be used. (Hence the name
// MRU.)
//
// - It isn't cleared when GC runs. Instead, items that haven't been used
// in a long time (1min) are released.
//
// An MRU freelist is most useful when the objects being freelisted are
// sufficiently valuable, or expensive to create, that they are worth keeping
// across GC passes. The drawbacks are that MRU isn't as performant under
// heavy concurrent access as sync.Pool, and that its sizing logic (1min TTL)
// is less sophisticated than sync.Pool's.
//
// A zero-initialized MRU is safe to use. Threadsafe.
type mru[T generational] struct {
mu sync.Mutex
reset Gen
freelist []T
putTimes []time.Time // putTimes[i] is when freelist[i] was Put()
zero T
}

// Gen is the reset time.
type Gen time.Time

type generational interface {
// generation uses monotonic time
generation() Gen
}

// TTL is modified in testing.
var TTL time.Duration = time.Minute

// Get returns an object from the freelist. If the list is empty, the return
// value is the zero value of T.
func (mru *mru[T]) Get() (T, Gen) {
mru.mu.Lock()
defer mru.mu.Unlock()

if n := len(mru.freelist); n > 0 {
ret := mru.freelist[n-1]
mru.freelist[n-1] = mru.zero // Allow GC to occur.
mru.freelist = mru.freelist[:n-1]
mru.putTimes = mru.putTimes[:n-1]
return ret, mru.reset
}

return mru.zero, mru.reset
}

func before(a, b Gen) bool {
return time.Time(a).Before(time.Time(b))
}

func (mru *mru[T]) Put(item T) {
mru.mu.Lock()
defer mru.mu.Unlock()

if before(item.generation(), mru.reset) {
return
}

now := time.Now()

mru.freelist = append(mru.freelist, item)
mru.putTimes = append(mru.putTimes, now)

// Evict any objects that haven't been touched recently.
for len(mru.putTimes) > 0 && now.Sub(mru.putTimes[0]) >= TTL {
// Shift values by one index in the slice, to preserve capacity.
l := len(mru.freelist)
copy(mru.freelist[0:l-1], mru.freelist[1:])
copy(mru.putTimes[0:l-1], mru.putTimes[1:])
mru.freelist[l-1] = mru.zero // Allow GC to occur.
mru.freelist = mru.freelist[:l-1]
mru.putTimes = mru.putTimes[:l-1]
}
}

func (mru *mru[T]) Size() int {
mru.mu.Lock()
defer mru.mu.Unlock()
return len(mru.putTimes)
}

func (mru *mru[T]) Reset() Gen {
mru.mu.Lock()
defer mru.mu.Unlock()
mru.reset = Gen(time.Now())
mru.freelist = nil
mru.putTimes = nil
return mru.reset
}
84 changes: 84 additions & 0 deletions collector/compression/zstd/mru_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package zstd

import (
"testing"

"github.com/stretchr/testify/require"
)

type gint struct {
value int
Gen
}

func TestMRUGet(t *testing.T) {
defer resetTest()

var m mru[*gint]
const cnt = 5

v, g := m.Get()
require.Nil(t, v)

for i := 0; i < cnt; i++ {
p := &gint{
value: i + 1,
Gen: g,
}
m.Put(p)
}

for i := 0; i < cnt; i++ {
v, _ = m.Get()
require.Equal(t, 5-i, v.value)
}

v, _ = m.Get()
require.Nil(t, v)
}

func TestMRUPut(t *testing.T) {
defer resetTest()

var m mru[*gint]
const cnt = 5

// Use zero TTL => no freelist
TTL = 0

g := m.Reset()

for i := 0; i < cnt; i++ {
p := &gint{
value: i + 1,
Gen: g,
}
m.Put(p)
}
require.Equal(t, 0, m.Size())
}

func TestMRUReset(t *testing.T) {
defer resetTest()

var m mru[*gint]

g := m.Reset()

m.Put(&gint{
Gen: g,
})
require.Equal(t, 1, m.Size())

m.Reset()
require.Equal(t, 0, m.Size())

// This doesn't take because its generation is before the reset.
m.Put(&gint{
Gen: g,
})
require.Equal(t, 0, m.Size())
}
Loading

0 comments on commit 5176e5b

Please sign in to comment.