Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"fmt"
"time"

"github.com/klev-dev/klevdb/index"
"github.com/klev-dev/klevdb/message"
"github.com/klev-dev/klevdb/segment"
"github.com/klev-dev/klevdb/pkg/index"
"github.com/klev-dev/klevdb/pkg/message"
"github.com/klev-dev/klevdb/pkg/segment"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

"github.com/klev-dev/klevdb/message"
"github.com/klev-dev/klevdb/pkg/message"
)

var v1opts = VersionOptions{NewSegmentsVersion: V1}
Expand Down
18 changes: 8 additions & 10 deletions compact/deletes.go → compact_deletes.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
package compact
package klevdb

import (
"context"
"time"

art "github.com/plar/go-adaptive-radix-tree/v2"

"github.com/klev-dev/klevdb"
)

// FindDeletes returns a set of offsets for messages with
// nil value for a given key, before a given time.
//
// Messages that have a nil value are considered deletes
// for this key, and therefore eligible for deletion.
func FindDeletes(ctx context.Context, l klevdb.Log, before time.Time) (map[int64]struct{}, error) {
func FindDeletes(ctx context.Context, l Log, before time.Time) (map[int64]struct{}, error) {
maxOffset, err := l.NextOffset()
if err != nil {
return nil, err
Expand All @@ -24,7 +22,7 @@ func FindDeletes(ctx context.Context, l klevdb.Log, before time.Time) (map[int64
var offsets = map[int64]struct{}{}

SEARCH:
for offset := klevdb.OffsetOldest; offset < maxOffset; {
for offset := OffsetOldest; offset < maxOffset; {
nextOffset, msgs, err := l.Consume(offset, 32)
if err != nil {
return nil, err
Expand Down Expand Up @@ -62,26 +60,26 @@ SEARCH:
return offsets, nil
}

// Deletes tries to remove messages with nil value before given time.
// CompactDeletes tries to remove messages with nil value before given time.
// It will not remove messages for keys it sees before that offset.
//
// This is similar to removing keys, which were deleted (e.g. value set to nil)
// and are therefore no longer relevant/active.
//
// returns the offsets it deleted and the amount of storage freed
func Deletes(ctx context.Context, l klevdb.Log, before time.Time) (map[int64]struct{}, int64, error) {
func CompactDeletes(ctx context.Context, l Log, before time.Time) (map[int64]struct{}, int64, error) {
offsets, err := FindDeletes(ctx, l, before)
if err != nil {
return nil, 0, err
}
return l.Delete(offsets)
}

// DeletesMulti is similar to Deletes, but will try to remove messages from multiple segments
func DeletesMulti(ctx context.Context, l klevdb.Log, before time.Time, backoff klevdb.DeleteMultiBackoff) (map[int64]struct{}, int64, error) {
// CompactDeletesMulti is similar to Deletes, but will try to remove messages from multiple segments
func CompactDeletesMulti(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) {
offsets, err := FindDeletes(ctx, l, before)
if err != nil {
return nil, 0, err
}
return klevdb.DeleteMulti(ctx, l, offsets, backoff)
return DeleteMulti(ctx, l, offsets, backoff)
}
18 changes: 9 additions & 9 deletions compact/deletes_test.go → compact_deletes_test.go
Original file line number Diff line number Diff line change
@@ -1,46 +1,46 @@
package compact
package klevdb

import (
"context"
"slices"
"testing"
"time"

"github.com/klev-dev/klevdb"
"github.com/klev-dev/klevdb/message"
"github.com/stretchr/testify/require"

"github.com/klev-dev/klevdb/pkg/message"
)

func TestDeletes(t *testing.T) {
msgs := message.Gen(5)

t.Run("Empty", func(t *testing.T) {
l, err := klevdb.Open(t.TempDir(), klevdb.Options{KeyIndex: true})
l, err := Open(t.TempDir(), Options{KeyIndex: true})
require.NoError(t, err)
defer l.Close()

off, cmp, err := Deletes(context.TODO(), l, time.Now())
off, cmp, err := CompactDeletes(context.TODO(), l, time.Now())
require.NoError(t, err)
require.Empty(t, off)
require.Equal(t, int64(0), cmp)
})

t.Run("None", func(t *testing.T) {
l, err := klevdb.Open(t.TempDir(), klevdb.Options{KeyIndex: true})
l, err := Open(t.TempDir(), Options{KeyIndex: true})
require.NoError(t, err)
defer l.Close()

_, err = l.Publish(msgs)
require.NoError(t, err)

off, cmp, err := Deletes(context.TODO(), l, time.Now())
off, cmp, err := CompactDeletes(context.TODO(), l, time.Now())
require.NoError(t, err)
require.Empty(t, off)
require.Equal(t, int64(0), cmp)
})

t.Run("Dups", func(t *testing.T) {
l, err := klevdb.Open(t.TempDir(), klevdb.Options{KeyIndex: true})
l, err := Open(t.TempDir(), Options{KeyIndex: true})
require.NoError(t, err)
defer l.Close()

Expand All @@ -55,7 +55,7 @@ func TestDeletes(t *testing.T) {
_, err = l.Publish(msgs)
require.NoError(t, err)

off, cmp, err := Deletes(context.TODO(), l, time.Now())
off, cmp, err := CompactDeletes(context.TODO(), l, time.Now())
require.NoError(t, err)
require.Len(t, off, 5)
for i := range nmsgs {
Expand Down
14 changes: 6 additions & 8 deletions compact/updates.go → compact_updates.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
package compact
package klevdb

import (
"context"
"time"

art "github.com/plar/go-adaptive-radix-tree/v2"

"github.com/klev-dev/klevdb"
)

// FindUpdates returns a set of offsets for messages that have
// the same key further in the log, before a given time.
//
// Messages before the last one for a given key are considered updates
// that are no longer relevant, and therefore are eligible for deletion.
func FindUpdates(ctx context.Context, l klevdb.Log, before time.Time) (map[int64]struct{}, error) {
func FindUpdates(ctx context.Context, l Log, before time.Time) (map[int64]struct{}, error) {
maxOffset, err := l.NextOffset()
if err != nil {
return nil, err
Expand All @@ -24,7 +22,7 @@ func FindUpdates(ctx context.Context, l klevdb.Log, before time.Time) (map[int64
var offsets = map[int64]struct{}{}

SEARCH:
for offset := klevdb.OffsetOldest; offset < maxOffset; {
for offset := OffsetOldest; offset < maxOffset; {
nextOffset, msgs, err := l.Consume(offset, 32)
if err != nil {
return nil, err
Expand Down Expand Up @@ -60,7 +58,7 @@ SEARCH:
// leaving only the current value (last update) for a key.
//
// returns the offsets it deleted and the amount of storage freed
func Updates(ctx context.Context, l klevdb.Log, before time.Time) (map[int64]struct{}, int64, error) {
func CompactUpdates(ctx context.Context, l Log, before time.Time) (map[int64]struct{}, int64, error) {
offsets, err := FindUpdates(ctx, l, before)
if err != nil {
return nil, 0, err
Expand All @@ -69,10 +67,10 @@ func Updates(ctx context.Context, l klevdb.Log, before time.Time) (map[int64]str
}

// UpdatesMulti is similar to Updates, but will try to remove messages from multiple segments
func UpdatesMulti(ctx context.Context, l klevdb.Log, before time.Time, backoff klevdb.DeleteMultiBackoff) (map[int64]struct{}, int64, error) {
func CompactUpdatesMulti(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) {
offsets, err := FindUpdates(ctx, l, before)
if err != nil {
return nil, 0, err
}
return klevdb.DeleteMulti(ctx, l, offsets, backoff)
return DeleteMulti(ctx, l, offsets, backoff)
}
42 changes: 21 additions & 21 deletions compact/updates_test.go → compact_updates_test.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,39 @@
package compact
package klevdb

import (
"context"
"slices"
"testing"
"time"

"github.com/klev-dev/klevdb"
"github.com/klev-dev/klevdb/message"
"github.com/stretchr/testify/require"

"github.com/klev-dev/klevdb/pkg/message"
)

func TestUpdates(t *testing.T) {
msgs := message.Gen(5)

t.Run("Empty", func(t *testing.T) {
l, err := klevdb.Open(t.TempDir(), klevdb.Options{KeyIndex: true})
l, err := Open(t.TempDir(), Options{KeyIndex: true})
require.NoError(t, err)
defer l.Close()

off, cmp, err := Updates(context.TODO(), l, time.Now())
off, cmp, err := CompactUpdates(context.TODO(), l, time.Now())
require.NoError(t, err)
require.Empty(t, off)
require.Equal(t, int64(0), cmp)
})

t.Run("None", func(t *testing.T) {
l, err := klevdb.Open(t.TempDir(), klevdb.Options{KeyIndex: true})
l, err := Open(t.TempDir(), Options{KeyIndex: true})
require.NoError(t, err)
defer l.Close()

_, err = l.Publish(msgs)
require.NoError(t, err)

off, cmp, err := Updates(context.TODO(), l, time.Now())
off, cmp, err := CompactUpdates(context.TODO(), l, time.Now())
require.NoError(t, err)
require.Empty(t, off)
require.Equal(t, int64(0), cmp)
Expand All @@ -44,19 +44,19 @@ func TestUpdates(t *testing.T) {
})

t.Run("First", func(t *testing.T) {
l, err := klevdb.Open(t.TempDir(), klevdb.Options{KeyIndex: true})
l, err := Open(t.TempDir(), Options{KeyIndex: true})
require.NoError(t, err)
defer l.Close()

_, err = l.Publish(msgs)
require.NoError(t, err)

dmsgs := []message.Message{msgs[0]}
dmsgs := []Message{msgs[0]}
dmsgs[0].Value = []byte("abc")
_, err = l.Publish(dmsgs)
require.NoError(t, err)

off, cmp, err := Updates(context.TODO(), l, time.Now())
off, cmp, err := CompactUpdates(context.TODO(), l, time.Now())
require.NoError(t, err)
require.Len(t, off, 1)
require.Contains(t, off, int64(0))
Expand All @@ -68,19 +68,19 @@ func TestUpdates(t *testing.T) {
})

t.Run("Last", func(t *testing.T) {
l, err := klevdb.Open(t.TempDir(), klevdb.Options{KeyIndex: true})
l, err := Open(t.TempDir(), Options{KeyIndex: true})
require.NoError(t, err)
defer l.Close()

_, err = l.Publish(msgs)
require.NoError(t, err)

dmsgs := []message.Message{msgs[4]}
dmsgs := []Message{msgs[4]}
dmsgs[0].Value = []byte("abc")
_, err = l.Publish(dmsgs)
require.NoError(t, err)

off, cmp, err := Updates(context.TODO(), l, time.Now())
off, cmp, err := CompactUpdates(context.TODO(), l, time.Now())
require.NoError(t, err)
require.Len(t, off, 1)
require.Contains(t, off, int64(4))
Expand All @@ -92,7 +92,7 @@ func TestUpdates(t *testing.T) {
})

t.Run("Multi", func(t *testing.T) {
l, err := klevdb.Open(t.TempDir(), klevdb.Options{KeyIndex: true})
l, err := Open(t.TempDir(), Options{KeyIndex: true})
require.NoError(t, err)
defer l.Close()

Expand All @@ -102,7 +102,7 @@ func TestUpdates(t *testing.T) {
_, err = l.Publish(msgs)
require.NoError(t, err)

off, cmp, err := Updates(context.TODO(), l, time.Now())
off, cmp, err := CompactUpdates(context.TODO(), l, time.Now())
require.NoError(t, err)
require.Len(t, off, len(msgs))
for i := range msgs {
Expand All @@ -116,7 +116,7 @@ func TestUpdates(t *testing.T) {
})

t.Run("Time", func(t *testing.T) {
l, err := klevdb.Open(t.TempDir(), klevdb.Options{KeyIndex: true})
l, err := Open(t.TempDir(), Options{KeyIndex: true})
require.NoError(t, err)
defer l.Close()

Expand All @@ -130,7 +130,7 @@ func TestUpdates(t *testing.T) {
_, err = l.Publish(nmsgs)
require.NoError(t, err)

off, cmp, err := Updates(context.TODO(), l, nmsgs[2].Time)
off, cmp, err := CompactUpdates(context.TODO(), l, nmsgs[2].Time)
require.NoError(t, err)
require.Len(t, off, 3)
for i := range 3 {
Expand All @@ -144,21 +144,21 @@ func TestUpdates(t *testing.T) {
})

t.Run("NilKey", func(t *testing.T) {
l, err := klevdb.Open(t.TempDir(), klevdb.Options{KeyIndex: true})
l, err := Open(t.TempDir(), Options{KeyIndex: true})
require.NoError(t, err)
defer l.Close()

_, err = l.Publish([]klevdb.Message{
_, err = l.Publish([]Message{
{Key: []byte("x")},
{},
{},
})
require.NoError(t, err)

off, cmp, err := Updates(context.TODO(), l, time.Now())
off, cmp, err := CompactUpdates(context.TODO(), l, time.Now())
require.NoError(t, err)
require.Len(t, off, 1)
require.Contains(t, off, int64(1))
require.Equal(t, l.Size(klevdb.Message{}), cmp)
require.Equal(t, l.Size(Message{}), cmp)
})
}
3 changes: 2 additions & 1 deletion delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import (
"testing"
"time"

"github.com/klev-dev/klevdb/message"
"github.com/stretchr/testify/require"

"github.com/klev-dev/klevdb/pkg/message"
)

func TestDeleteMulti(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (

"github.com/gofrs/flock"

"github.com/klev-dev/klevdb/index"
"github.com/klev-dev/klevdb/message"
"github.com/klev-dev/klevdb/segment"
"github.com/klev-dev/klevdb/pkg/index"
"github.com/klev-dev/klevdb/pkg/message"
"github.com/klev-dev/klevdb/pkg/segment"
)

var errNoKeyIndex = fmt.Errorf("%w by key", ErrNoIndex)
Expand Down
2 changes: 1 addition & 1 deletion blocking.go → log_blocking.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package klevdb
import (
"context"

"github.com/klev-dev/klevdb/notify"
"github.com/klev-dev/klevdb/pkg/notify"
)

// BlockingLog enhances [Log] adding blocking consume
Expand Down
Loading
Loading