diff --git a/.dockerignore b/.dockerignore index 0a03289..f2ec6e0 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,14 +1,15 @@ # THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT. # -# Generated on 2024-04-30T10:38:23Z by kres ebc009d. +# Generated on 2024-05-08T13:07:24Z by kres 1e986af. * +!zstd +!chunk.go !circular.go !circular_test.go !errors.go !options.go !reader.go -!streaming.go !go.mod !go.sum !.golangci.yml diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index fe3ef79..0306427 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -1,6 +1,6 @@ # THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT. # -# Generated on 2024-04-30T11:03:25Z by kres d15226e. +# Generated on 2024-05-08T11:26:09Z by kres 1e986af. name: default concurrency: @@ -31,7 +31,7 @@ jobs: if: (!startsWith(github.head_ref, 'renovate/') && !startsWith(github.head_ref, 'dependabot/')) services: buildkitd: - image: moby/buildkit:v0.13.1 + image: moby/buildkit:v0.13.2 options: --privileged ports: - 1234:1234 diff --git a/.golangci.yml b/.golangci.yml index a2fcc4f..a30d5e8 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,6 +1,6 @@ # THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT. # -# Generated on 2024-04-30T10:38:23Z by kres ebc009d. +# Generated on 2024-05-08T11:26:09Z by kres 1e986af. # options for analysis running run: @@ -54,7 +54,6 @@ linters-settings: goimports: local-prefixes: github.com/siderolabs/go-circular/ gomodguard: { } - gomnd: { } govet: enable-all: true lll: @@ -109,17 +108,18 @@ linters: disable: - exhaustivestruct - exhaustruct + - err113 - forbidigo - funlen - gochecknoglobals - gochecknoinits - godox - - goerr113 - gomnd - gomoddirectives - gosec - inamedparam - ireturn + - mnd - nestif - nonamedreturns - nosnakecase diff --git a/Dockerfile b/Dockerfile index 0a9a71d..078964b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,8 +1,8 @@ -# syntax = docker/dockerfile-upstream:1.7.0-labs +# syntax = docker/dockerfile-upstream:1.7.1-labs # THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT. # -# Generated on 2024-04-30T10:38:23Z by kres ebc009d. +# Generated on 2024-05-08T13:07:24Z by kres 1e986af. ARG TOOLCHAIN @@ -55,12 +55,13 @@ COPY go.sum go.sum RUN cd . RUN --mount=type=cache,target=/go/pkg go mod download RUN --mount=type=cache,target=/go/pkg go mod verify +COPY ./zstd ./zstd +COPY ./chunk.go ./chunk.go COPY ./circular.go ./circular.go COPY ./circular_test.go ./circular_test.go COPY ./errors.go ./errors.go COPY ./options.go ./options.go COPY ./reader.go ./reader.go -COPY ./streaming.go ./streaming.go RUN --mount=type=cache,target=/go/pkg go list -mod=readonly all >/dev/null # runs gofumpt diff --git a/Makefile b/Makefile index 92d699a..7edf097 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ # THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT. # -# Generated on 2024-04-30T11:03:09Z by kres d15226e. +# Generated on 2024-05-08T11:26:09Z by kres 1e986af. # common variables @@ -20,9 +20,9 @@ GRPC_GO_VERSION ?= 1.3.0 GRPC_GATEWAY_VERSION ?= 2.19.1 VTPROTOBUF_VERSION ?= 0.6.0 DEEPCOPY_VERSION ?= v0.5.6 -GOLANGCILINT_VERSION ?= v1.57.2 +GOLANGCILINT_VERSION ?= v1.58.0 GOFUMPT_VERSION ?= v0.6.0 -GO_VERSION ?= 1.22.2 +GO_VERSION ?= 1.22.3 GOIMPORTS_VERSION ?= v0.20.0 GO_BUILDFLAGS ?= GO_LDFLAGS ?= diff --git a/README.md b/README.md index ab0046a..28304a1 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,13 @@ # go-circular Package circular provides a buffer with circular semantics. + +## Design + +The buffer is split into chunks, the last chunk is not compressed and being written to, and previous chunks +are compressed and immutable. + +The buffer keeps a pointer to the current offset being written which is always incremented, while the index +of compressed chunks keeps the initial offset of each compressed chunk and its decompressed size. + +If the compressed chunk is being read, it is decompressed on the fly by the Reader. diff --git a/chunk.go b/chunk.go new file mode 100644 index 0000000..5bd966a --- /dev/null +++ b/chunk.go @@ -0,0 +1,15 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package circular + +type chunk struct { + // compressed data + compressed []byte + // start offset of the chunk, as it was in the circular buffer when the chunk was created + startOffset int64 + // uncompressed size of the chunk + size int64 + // [TODO]: have a unique (incrementing?) chunk ID for file-based storage +} diff --git a/circular.go b/circular.go index c02589a..6cbceb3 100644 --- a/circular.go +++ b/circular.go @@ -7,27 +7,33 @@ package circular import ( "fmt" + "math" + "slices" "sync" ) // Buffer implements circular buffer with a thread-safe writer, // that supports multiple readers each with its own offset. type Buffer struct { - // waking up readers on new writes + // waking up streaming readers on new writes cond *sync.Cond - // data slice, might grow up to MaxCapacity, then used + // compressed chunks, ordered from the smallest offset to the largest + chunks []chunk + + // the last uncompressed chunk, might grow up to MaxCapacity, then used // as circular buffer data []byte - // synchronizing access to data, off + // buffer options + opt Options + + // synchronizing access to data, off, chunks mu sync.Mutex // write offset, always goes up, actual offset in data slice // is (off % cap(data)) off int64 - - opt Options } // NewBuffer creates new Buffer with specified options. @@ -86,11 +92,15 @@ func (buf *Buffer) Write(p []byte) (int, error) { var n int for n < l { + rotate := false + i := int(buf.off % int64(buf.opt.MaxCapacity)) nn := buf.opt.MaxCapacity - i if nn > len(p) { nn = len(p) + } else { + rotate = true } copy(buf.data[i:], p[:nn]) @@ -98,6 +108,30 @@ func (buf *Buffer) Write(p []byte) (int, error) { buf.off += int64(nn) n += nn p = p[nn:] + + if rotate && buf.opt.NumCompressedChunks > 0 { + var compressionBuf []byte + + if len(buf.chunks) == buf.opt.NumCompressedChunks { + // going to drop the chunk, so reuse its buffer + compressionBuf = buf.chunks[0].compressed[:0] + } + + compressed, err := buf.opt.Compressor.Compress(buf.data, compressionBuf) + if err != nil { + return n, err + } + + buf.chunks = append(buf.chunks, chunk{ + compressed: compressed, + startOffset: buf.off - int64(buf.opt.MaxCapacity), + size: int64(buf.opt.MaxCapacity), + }) + + if len(buf.chunks) > buf.opt.NumCompressedChunks { + buf.chunks = slices.Delete(buf.chunks, 0, 1) + } + } } buf.cond.Broadcast() @@ -113,6 +147,47 @@ func (buf *Buffer) Capacity() int { return cap(buf.data) } +// NumCompressedChunks returns number of compressed chunks. +func (buf *Buffer) NumCompressedChunks() int { + buf.mu.Lock() + defer buf.mu.Unlock() + + return len(buf.chunks) +} + +// TotalCompressedSize reports the overall memory used by the circular buffer including compressed chunks. +func (buf *Buffer) TotalCompressedSize() int64 { + buf.mu.Lock() + defer buf.mu.Unlock() + + var size int64 + + for _, c := range buf.chunks { + size += int64(len(c.compressed)) + } + + return size + int64(cap(buf.data)) +} + +// TotalSize reports overall number of bytes available for reading in the buffer. +// +// TotalSize might be higher than TotalCompressedSize, because compressed chunks +// take less memory than uncompressed data. +func (buf *Buffer) TotalSize() int64 { + buf.mu.Lock() + defer buf.mu.Unlock() + + if len(buf.chunks) == 0 { + if buf.off < int64(cap(buf.data)) { + return buf.off + } + + return int64(cap(buf.data)) + } + + return buf.off - buf.chunks[0].startOffset +} + // Offset returns current write offset (number of bytes written). func (buf *Buffer) Offset() int64 { buf.mu.Lock() @@ -121,23 +196,16 @@ func (buf *Buffer) Offset() int64 { return buf.off } -// GetStreamingReader returns StreamingReader object which implements io.ReadCloser, io.Seeker. +// GetStreamingReader returns Reader object which implements io.ReadCloser, io.Seeker. // // StreamingReader starts at the most distant position in the past available. -func (buf *Buffer) GetStreamingReader() *StreamingReader { - buf.mu.Lock() - defer buf.mu.Unlock() +func (buf *Buffer) GetStreamingReader() *Reader { + r := buf.GetReader() - off := buf.off - int64(buf.opt.MaxCapacity-buf.opt.SafetyGap) - if off < 0 { - off = 0 - } + r.endOff = math.MaxInt64 + r.streaming = true - return &StreamingReader{ - buf: buf, - initialOff: off, - off: off, - } + return r } // GetReader returns Reader object which implements io.ReadCloser, io.Seeker. @@ -148,6 +216,20 @@ func (buf *Buffer) GetReader() *Reader { buf.mu.Lock() defer buf.mu.Unlock() + if len(buf.chunks) > 0 { + oldestChunk := buf.chunks[0] + + return &Reader{ + buf: buf, + + chunk: &oldestChunk, + + startOff: oldestChunk.startOffset, + endOff: buf.off, + off: oldestChunk.startOffset, + } + } + off := buf.off - int64(buf.opt.MaxCapacity-buf.opt.SafetyGap) if off < 0 { off = 0 diff --git a/circular_bench_test.go b/circular_bench_test.go new file mode 100644 index 0000000..9702408 --- /dev/null +++ b/circular_bench_test.go @@ -0,0 +1,134 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package circular_test + +//nolint:gci +import ( + "crypto/rand" + "io" + "testing" + + "github.com/siderolabs/gen/xtesting/must" + "github.com/stretchr/testify/require" + + "github.com/siderolabs/go-circular" + "github.com/siderolabs/go-circular/zstd" +) + +func BenchmarkWrite(b *testing.B) { + for _, test := range []struct { + name string + + options []circular.OptionFunc + }{ + { + name: "defaults", + }, + { + name: "growing buffer", + + options: []circular.OptionFunc{ + circular.WithInitialCapacity(16384), + circular.WithMaxCapacity(1048576), + }, + }, + { + name: "compression", + + options: []circular.OptionFunc{ + circular.WithInitialCapacity(16384), + circular.WithMaxCapacity(131072), + + circular.WithNumCompressedChunks(9, must.Value(zstd.NewCompressor())(b)), + }, + }, + } { + b.Run(test.name, func(b *testing.B) { + data, err := io.ReadAll(io.LimitReader(rand.Reader, 1024)) + require.NoError(b, err) + + buf, err := circular.NewBuffer(test.options...) + require.NoError(b, err) + + b.ReportAllocs() + b.ResetTimer() + + for range b.N { + _, err := buf.Write(data) + require.NoError(b, err) + } + }) + } +} + +func BenchmarkRead(b *testing.B) { + for _, test := range []struct { + name string + + options []circular.OptionFunc + }{ + { + name: "defaults", + }, + { + name: "growing buffer", + + options: []circular.OptionFunc{ + circular.WithInitialCapacity(16384), + circular.WithMaxCapacity(1048576), + }, + }, + { + name: "compression", + + options: []circular.OptionFunc{ + circular.WithInitialCapacity(16384), + circular.WithMaxCapacity(131072), + + circular.WithNumCompressedChunks(9, must.Value(zstd.NewCompressor())(b)), + }, + }, + } { + b.Run(test.name, func(b *testing.B) { + data, err := io.ReadAll(io.LimitReader(rand.Reader, 1024)) + require.NoError(b, err) + + buf, err := circular.NewBuffer(test.options...) + require.NoError(b, err) + + for range 65536 { + _, err := buf.Write(data) + require.NoError(b, err) + } + + b.ReportAllocs() + b.ResetTimer() + + for range b.N { + r := buf.GetReader() + + _, err := io.Copy(io.Discard, r) + require.NoError(b, err) + } + }) + } +} + +func testBenchmarkAllocs(t *testing.T, f func(b *testing.B), threshold int64) { + res := testing.Benchmark(f) + + allocs := res.AllocsPerOp() + if allocs > threshold { + t.Fatalf("Expected AllocsPerOp <= %d, got %d", threshold, allocs) + } +} + +func TestBenchmarkWriteAllocs(t *testing.T) { + testBenchmarkAllocs(t, BenchmarkWrite, 0) +} + +func TestBenchmarkReadAllocs(t *testing.T) { + testBenchmarkAllocs(t, BenchmarkRead, 5) +} diff --git a/circular_test.go b/circular_test.go index 027a09a..dac913a 100644 --- a/circular_test.go +++ b/circular_test.go @@ -8,415 +8,836 @@ package circular_test import ( "bytes" "context" + cryptorand "crypto/rand" + "fmt" "io" - "math/rand" + "math/rand/v2" "sync" "testing" "time" - "github.com/stretchr/testify/suite" + "github.com/siderolabs/gen/xtesting/must" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" "golang.org/x/time/rate" "github.com/siderolabs/go-circular" + "github.com/siderolabs/go-circular/zstd" ) -type CircularSuite struct { - suite.Suite +func TestWrites(t *testing.T) { + t.Parallel() + + for _, test := range []struct { + name string + + options []circular.OptionFunc + + expectedNumCompressedChunks int + expectedTotalCompressedSize int64 + expectedTotalSize int64 + }{ + { + name: "no chunks", + options: []circular.OptionFunc{ + circular.WithInitialCapacity(2048), circular.WithMaxCapacity(100_000), + }, + + expectedNumCompressedChunks: 0, + expectedTotalCompressedSize: 100_000, + expectedTotalSize: 100_000, + }, + { + name: "chunks", + options: []circular.OptionFunc{ + circular.WithInitialCapacity(2048), + circular.WithMaxCapacity(100_000), + circular.WithNumCompressedChunks(5, must.Value(zstd.NewCompressor())(t)), + }, + + expectedNumCompressedChunks: 5, + expectedTotalCompressedSize: 100_085, + expectedTotalSize: 554_675, + }, + } { + t.Run(test.name, func(t *testing.T) { + t.Parallel() + + req := require.New(t) + + buf, err := circular.NewBuffer(test.options...) + req.NoError(err) + + n, err := buf.Write(nil) + req.NoError(err) + req.Equal(0, n) + + n, err = buf.Write(make([]byte, 100)) + req.NoError(err) + req.Equal(100, n) + + n, err = buf.Write(make([]byte, 1000)) + req.NoError(err) + req.Equal(1000, n) + + req.Equal(2048, buf.Capacity()) + req.EqualValues(1100, buf.Offset()) + + n, err = buf.Write(make([]byte, 5000)) + req.NoError(err) + req.Equal(5000, n) + + req.Equal(8192, buf.Capacity()) + req.EqualValues(6100, buf.Offset()) + + for i := range 20 { + l := 1 << i + + n, err = buf.Write(make([]byte, l)) + req.NoError(err) + req.Equal(l, n) + } + + req.Equal(100000, buf.Capacity()) + req.EqualValues(6100+(1<<20)-1, buf.Offset()) + + req.EqualValues(test.expectedNumCompressedChunks, buf.NumCompressedChunks()) + req.EqualValues(test.expectedTotalCompressedSize, buf.TotalCompressedSize()) + req.EqualValues(test.expectedTotalSize, buf.TotalSize()) + }) + } } -func (suite *CircularSuite) TestWrites() { - buf, err := circular.NewBuffer(circular.WithInitialCapacity(2048), circular.WithMaxCapacity(100000)) - suite.Require().NoError(err) +func TestStreamingReadWriter(t *testing.T) { + t.Parallel() - n, err := buf.Write(nil) - suite.Require().NoError(err) - suite.Require().Equal(0, n) + for _, test := range []struct { + name string - n, err = buf.Write(make([]byte, 100)) - suite.Require().NoError(err) - suite.Require().Equal(100, n) + options []circular.OptionFunc + }{ + { + name: "no chunks", - n, err = buf.Write(make([]byte, 1000)) - suite.Require().NoError(err) - suite.Require().Equal(1000, n) + options: []circular.OptionFunc{ + circular.WithInitialCapacity(2048), + circular.WithMaxCapacity(65536), + }, + }, + { + name: "chunks", - suite.Require().Equal(2048, buf.Capacity()) - suite.Require().EqualValues(1100, buf.Offset()) + options: []circular.OptionFunc{ + circular.WithInitialCapacity(2048), + circular.WithMaxCapacity(16384), + circular.WithNumCompressedChunks(4, must.Value(zstd.NewCompressor())(t)), + }, + }, + } { + t.Run(test.name, func(t *testing.T) { + t.Parallel() - n, err = buf.Write(make([]byte, 5000)) - suite.Require().NoError(err) - suite.Require().Equal(5000, n) + req := require.New(t) + asrt := assert.New(t) - suite.Require().Equal(8192, buf.Capacity()) - suite.Require().EqualValues(6100, buf.Offset()) + buf, err := circular.NewBuffer(test.options...) + req.NoError(err) - for i := range 20 { - l := 1 << i + r := buf.GetStreamingReader() - n, err = buf.Write(make([]byte, l)) - suite.Require().NoError(err) - suite.Require().Equal(l, n) - } + size := 1048576 - suite.Require().Equal(100000, buf.Capacity()) - suite.Require().EqualValues(6100+(1<<20)-1, buf.Offset()) -} + data, err := io.ReadAll(io.LimitReader(cryptorand.Reader, int64(size))) + req.NoError(err) + + var wg sync.WaitGroup + defer wg.Wait() + + wg.Add(1) + + go func() { + defer wg.Done() + + p := data + + r := rate.NewLimiter(300_000, 1000) + + for i := 0; i < len(data); { + l := 100 + int(rand.Int32N(100)) + + if i+l > len(data) { + l = len(data) - i + } + + r.WaitN(context.Background(), l) //nolint:errcheck + + n, e := buf.Write(p[:l]) + if e != nil { + panic(e) + } + + if n != l { + panic(fmt.Sprintf("short write: %d != %d", n, l)) + } + + i += l + p = p[l:] + } + }() + + actual := make([]byte, size) -func (suite *CircularSuite) TestStreamingReadWriter() { - buf, err := circular.NewBuffer(circular.WithInitialCapacity(2048), circular.WithMaxCapacity(65536)) - suite.Require().NoError(err) + n, err := io.ReadFull(r, actual) + req.NoError(err) + req.Equal(size, n) - r := buf.GetStreamingReader() + req.Equal(data, actual) - size := 1048576 + s := make(chan error) - data := make([]byte, size) - for i := range data { - data[i] = byte(rand.Int31()) + go func() { + _, err = r.Read(make([]byte, 1)) + + s <- err + }() + + time.Sleep(50 * time.Millisecond) // wait for the goroutine to start + + req.NoError(r.Close()) + + // close should abort reader + asrt.ErrorIs(<-s, circular.ErrClosed) + + _, err = r.Read(nil) + req.ErrorIs(err, circular.ErrClosed) + }) } +} + +//nolint:gocognit +func TestStreamingMultipleReaders(t *testing.T) { + t.Parallel() + + for _, test := range []struct { + name string - var wg sync.WaitGroup - defer wg.Wait() + options []circular.OptionFunc + }{ + { + name: "no chunks", - wg.Add(1) + options: []circular.OptionFunc{ + circular.WithInitialCapacity(2048), + circular.WithMaxCapacity(65536), + }, + }, + { + name: "chunks", - go func() { - defer wg.Done() + options: []circular.OptionFunc{ + circular.WithInitialCapacity(2048), + circular.WithMaxCapacity(16384), + circular.WithNumCompressedChunks(4, must.Value(zstd.NewCompressor())(t)), + }, + }, + } { + t.Run(test.name, func(t *testing.T) { + t.Parallel() - p := data + req := require.New(t) - r := rate.NewLimiter(300_000, 1000) + buf, err := circular.NewBuffer(test.options...) + req.NoError(err) - for i := 0; i < len(data); { - l := 100 + rand.Intn(100) + n := 5 - if i+l > len(data) { - l = len(data) - i + readers := make([]*circular.Reader, n) + + for i := range n { + readers[i] = buf.GetStreamingReader() } - r.WaitN(context.Background(), l) //nolint:errcheck + size := 1048576 - n, e := buf.Write(p[:l]) - suite.Require().NoError(e) - suite.Require().Equal(l, n) + data, err := io.ReadAll(io.LimitReader(cryptorand.Reader, int64(size))) + req.NoError(err) - i += l - p = p[l:] - } - }() + var eg errgroup.Group - actual := make([]byte, size) + for _, reader := range readers { + eg.Go(func(reader *circular.Reader) func() error { + return func() error { + actual := make([]byte, size) - n, err := io.ReadFull(r, actual) - suite.Require().NoError(err) - suite.Require().Equal(size, n) + nn, err := io.ReadFull(reader, actual) + if err != nil { + return err + } - suite.Require().Equal(data, actual) + if size != nn { + return fmt.Errorf("short read: %d != %d", nn, size) + } - s := make(chan struct{}) + if !bytes.Equal(data, actual) { + return fmt.Errorf("data mismatch") + } - go func() { - _, err = r.Read(make([]byte, 1)) + return nil + } + }(reader)) + } - suite.Assert().Equal(err, circular.ErrClosed) + p := data - close(s) - }() + r := rate.NewLimiter(300_000, 1000) - time.Sleep(50 * time.Millisecond) // wait for the goroutine to start + for i := 0; i < len(data); { + l := 256 - suite.Require().NoError(r.Close()) + if i+l > len(data) { + l = len(data) - i + } - // close should abort reader - <-s + r.WaitN(context.Background(), l) //nolint:errcheck - _, err = r.Read(nil) - suite.Require().Equal(circular.ErrClosed, err) + n, e := buf.Write(p[:l]) + req.NoError(e) + req.Equal(l, n) + + i += l + p = p[l:] + } + + req.NoError(eg.Wait()) + }) + } } -func (suite *CircularSuite) TestStreamingMultipleReaders() { - buf, err := circular.NewBuffer(circular.WithInitialCapacity(2048), circular.WithMaxCapacity(65536)) - suite.Require().NoError(err) +func TestStreamingLateAndIdleReaders(t *testing.T) { + t.Parallel() - n := 5 + for _, test := range []struct { + name string - readers := make([]*circular.StreamingReader, n) + options []circular.OptionFunc - for i := range n { - readers[i] = buf.GetStreamingReader() - } + expectedLateReadSize int + expectedIdleReadSize int + }{ + { + name: "no chunks", - size := 1048576 + options: []circular.OptionFunc{ + circular.WithInitialCapacity(2048), + circular.WithMaxCapacity(65536), + circular.WithSafetyGap(256), + }, - data := make([]byte, size) - for i := range data { - data[i] = byte(rand.Int31()) - } + expectedLateReadSize: 65536 - 256, + expectedIdleReadSize: 65536, + }, + { + name: "chunks", - var wg sync.WaitGroup - defer wg.Wait() + options: []circular.OptionFunc{ + circular.WithInitialCapacity(2048), + circular.WithMaxCapacity(16384), + circular.WithSafetyGap(256), + circular.WithNumCompressedChunks(4, must.Value(zstd.NewCompressor())(t)), + }, - for i := range n { - wg.Add(1) + expectedLateReadSize: 67232, + expectedIdleReadSize: 16384, + }, + } { + t.Run(test.name, func(t *testing.T) { + t.Parallel() - go func() { - defer wg.Done() + req := require.New(t) - actual := make([]byte, size) + buf, err := circular.NewBuffer(test.options...) + req.NoError(err) - nn, err := io.ReadFull(readers[i], actual) - suite.Require().NoError(err) - suite.Assert().Equal(size, nn) + idleR := buf.GetStreamingReader() - suite.Assert().Equal(data, actual) - }() - } + size := 100000 - p := data + data, err := io.ReadAll(io.LimitReader(cryptorand.Reader, int64(size))) + req.NoError(err) - r := rate.NewLimiter(300_000, 1000) + n, err := buf.Write(data) + req.NoError(err) + req.Equal(size, n) - for i := 0; i < len(data); { - l := 256 + lateR := buf.GetStreamingReader() - if i+l > len(data) { - l = len(data) - i - } + closeCh := make(chan error) - r.WaitN(context.Background(), l) //nolint:errcheck + go func() { + time.Sleep(50 * time.Millisecond) - n, e := buf.Write(p[:l]) - suite.Require().NoError(e) - suite.Require().Equal(l, n) + closeCh <- lateR.Close() + }() - i += l - p = p[l:] - } -} + actual, err := io.ReadAll(lateR) + req.Equal(circular.ErrClosed, err) + req.Equal(test.expectedLateReadSize, len(actual)) + + req.Equal(data[size-test.expectedLateReadSize:], actual) + + req.NoError(<-closeCh) -func (suite *CircularSuite) TestStreamingLateAndIdleReaders() { - buf, err := circular.NewBuffer(circular.WithInitialCapacity(2048), circular.WithMaxCapacity(65536), circular.WithSafetyGap(256)) - suite.Require().NoError(err) + go func() { + time.Sleep(50 * time.Millisecond) - idleR := buf.GetStreamingReader() + closeCh <- idleR.Close() + }() - size := 100000 + actual, err = io.ReadAll(idleR) + req.Equal(circular.ErrClosed, err) + req.Equal(test.expectedIdleReadSize, len(actual)) - data := make([]byte, size) - for i := range data { - data[i] = byte(rand.Int31()) + req.Equal(data[size-test.expectedIdleReadSize:], actual) + + req.NoError(<-closeCh) + }) } +} - n, err := buf.Write(data) - suite.Require().NoError(err) - suite.Require().Equal(size, n) +func TestStreamingSeek(t *testing.T) { + t.Parallel() - lateR := buf.GetStreamingReader() + for _, test := range []struct { + name string - go func() { - time.Sleep(50 * time.Millisecond) + options []circular.OptionFunc - suite.Require().NoError(lateR.Close()) - }() + expectedOverflowSeek1 int64 + expectedOverflowSeek2 int64 + }{ + { + name: "no chunks", - actual, err := io.ReadAll(lateR) - suite.Require().Equal(circular.ErrClosed, err) - suite.Require().Equal(65536-256, len(actual)) + options: []circular.OptionFunc{ + circular.WithInitialCapacity(2048), + circular.WithMaxCapacity(65536), + circular.WithSafetyGap(256), + }, - suite.Require().Equal(data[size-65536+256:], actual) + expectedOverflowSeek1: 1024, + expectedOverflowSeek2: 2048, + }, + { + name: "chunks", - go func() { - time.Sleep(50 * time.Millisecond) + options: []circular.OptionFunc{ + circular.WithInitialCapacity(2048), + circular.WithMaxCapacity(16384), + circular.WithSafetyGap(256), + circular.WithNumCompressedChunks(3, must.Value(zstd.NewCompressor())(t)), + }, - suite.Require().NoError(idleR.Close()) - }() + expectedOverflowSeek1: 16384, + expectedOverflowSeek2: 16384, + }, + } { + t.Run(test.name, func(t *testing.T) { + t.Parallel() - actual, err = io.ReadAll(idleR) - suite.Require().Equal(circular.ErrClosed, err) - suite.Require().Equal(65536, len(actual)) + req := require.New(t) + asrt := assert.New(t) - suite.Require().Equal(data[size-65536:], actual) -} + buf, err := circular.NewBuffer(test.options...) + req.NoError(err) -func (suite *CircularSuite) TestStreamingSeek() { - buf, err := circular.NewBuffer(circular.WithInitialCapacity(2048), circular.WithMaxCapacity(65536), circular.WithSafetyGap(256)) - suite.Require().NoError(err) + _, err = buf.Write(bytes.Repeat([]byte{0xff}, 512)) + req.NoError(err) - _, err = buf.Write(bytes.Repeat([]byte{0xff}, 512)) - suite.Require().NoError(err) + r := buf.GetStreamingReader() - r := buf.GetStreamingReader() + _, err = buf.Write(bytes.Repeat([]byte{0xfe}, 512)) + req.NoError(err) - _, err = buf.Write(bytes.Repeat([]byte{0xfe}, 512)) - suite.Require().NoError(err) + off, err := r.Seek(0, io.SeekCurrent) + req.NoError(err) + asrt.EqualValues(0, off) - off, err := r.Seek(0, io.SeekCurrent) - suite.Require().NoError(err) - suite.Assert().EqualValues(0, off) + data := make([]byte, 256) - data := make([]byte, 256) + n, err := r.Read(data) + req.NoError(err) + asrt.Equal(256, n) + asrt.Equal(bytes.Repeat([]byte{0xff}, 256), data) - n, err := r.Read(data) - suite.Require().NoError(err) - suite.Assert().Equal(256, n) - suite.Assert().Equal(bytes.Repeat([]byte{0xff}, 256), data) + off, err = r.Seek(0, io.SeekCurrent) + req.NoError(err) + asrt.EqualValues(256, off) - off, err = r.Seek(0, io.SeekCurrent) - suite.Require().NoError(err) - suite.Assert().EqualValues(256, off) + off, err = r.Seek(-256, io.SeekEnd) + req.NoError(err) + asrt.EqualValues(768, off) - off, err = r.Seek(-256, io.SeekEnd) - suite.Require().NoError(err) - suite.Assert().EqualValues(768, off) + n, err = r.Read(data) + req.NoError(err) + asrt.Equal(256, n) + asrt.Equal(bytes.Repeat([]byte{0xfe}, 256), data) - n, err = r.Read(data) - suite.Require().NoError(err) - suite.Assert().Equal(256, n) - suite.Assert().Equal(bytes.Repeat([]byte{0xfe}, 256), data) + off, err = r.Seek(2048, io.SeekStart) + req.NoError(err) + asrt.EqualValues(1024, off) - off, err = r.Seek(2048, io.SeekStart) - suite.Require().NoError(err) - suite.Assert().EqualValues(1024, off) + _, err = buf.Write(bytes.Repeat([]byte{0xfe}, 65536-256)) + req.NoError(err) - _, err = buf.Write(bytes.Repeat([]byte{0xfe}, 65536-256)) - suite.Require().NoError(err) + off, err = r.Seek(0, io.SeekStart) + req.NoError(err) + asrt.EqualValues(test.expectedOverflowSeek1, off) - off, err = r.Seek(0, io.SeekStart) - suite.Require().NoError(err) - suite.Assert().EqualValues(1024, off) + _, err = buf.Write(bytes.Repeat([]byte{0xfe}, 1024)) + req.NoError(err) - _, err = buf.Write(bytes.Repeat([]byte{0xfe}, 1024)) - suite.Require().NoError(err) + off, err = r.Seek(0, io.SeekCurrent) + req.NoError(err) + asrt.EqualValues(test.expectedOverflowSeek2, off) - off, err = r.Seek(0, io.SeekCurrent) - suite.Require().NoError(err) - suite.Assert().EqualValues(2048, off) + _, err = r.Seek(-100, io.SeekStart) + req.ErrorIs(err, circular.ErrSeekBeforeStart) + }) + } +} - _, err = r.Seek(-100, io.SeekStart) - suite.Require().Equal(circular.ErrSeekBeforeStart, err) +func TestRegularReaderEmpty(t *testing.T) { + t.Parallel() + + for _, test := range []struct { + name string + + options []circular.OptionFunc + }{ + { + name: "no chunks", + }, + { + name: "chunks", + options: []circular.OptionFunc{ + circular.WithNumCompressedChunks(5, must.Value(zstd.NewCompressor())(t)), + }, + }, + } { + t.Run(test.name, func(t *testing.T) { + t.Parallel() + + req := require.New(t) + + buf, err := circular.NewBuffer(test.options...) + req.NoError(err) + + n, err := buf.GetReader().Read(nil) + req.Equal(0, n) + req.Equal(io.EOF, err) + }) + } } -func (suite *CircularSuite) TestRegularReaderEmpty() { - buf, err := circular.NewBuffer() - suite.Require().NoError(err) +func TestRegularReader(t *testing.T) { + t.Parallel() - n, err := buf.GetReader().Read(nil) - suite.Require().Equal(0, n) - suite.Require().Equal(io.EOF, err) -} + for _, test := range []struct { + name string -func (suite *CircularSuite) TestRegularReader() { - buf, err := circular.NewBuffer() - suite.Require().NoError(err) + options []circular.OptionFunc - _, err = buf.Write(bytes.Repeat([]byte{0xff}, 512)) - suite.Require().NoError(err) + size int + }{ + { + name: "no chunks", - r := buf.GetReader() + size: 512, + }, + { + name: "chunks", + options: []circular.OptionFunc{ + circular.WithMaxCapacity(65536), + circular.WithNumCompressedChunks(5, must.Value(zstd.NewCompressor())(t)), + }, - _, err = buf.Write(bytes.Repeat([]byte{0xfe}, 512)) - suite.Require().NoError(err) + size: 262144, + }, + } { + t.Run(test.name, func(t *testing.T) { + t.Parallel() - actual, err := io.ReadAll(r) - suite.Require().NoError(err) - suite.Require().Equal(bytes.Repeat([]byte{0xff}, 512), actual) -} + req := require.New(t) + + buf, err := circular.NewBuffer(test.options...) + req.NoError(err) -func (suite *CircularSuite) TestRegularReaderOutOfSync() { - buf, err := circular.NewBuffer(circular.WithInitialCapacity(2048), circular.WithMaxCapacity(65536), circular.WithSafetyGap(256)) - suite.Require().NoError(err) + data, err := io.ReadAll(io.LimitReader(cryptorand.Reader, int64(test.size))) + req.NoError(err) - _, err = buf.Write(bytes.Repeat([]byte{0xff}, 512)) - suite.Require().NoError(err) + _, err = buf.Write(data) + req.NoError(err) - r := buf.GetReader() + r := buf.GetReader() - _, err = buf.Write(bytes.Repeat([]byte{0xfe}, 65536-256)) - suite.Require().NoError(err) + _, err = buf.Write(bytes.Repeat([]byte{0xfe}, 512)) + req.NoError(err) - _, err = r.Read(nil) - suite.Require().Equal(err, circular.ErrOutOfSync) + actual, err := io.ReadAll(r) + req.NoError(err) + req.Equal(data, actual) + }) + } } -func (suite *CircularSuite) TestRegularReaderFull() { - buf, err := circular.NewBuffer(circular.WithInitialCapacity(2048), circular.WithMaxCapacity(4096), circular.WithSafetyGap(256)) - suite.Require().NoError(err) +func TestRegularReaderOutOfSync(t *testing.T) { + t.Parallel() + + for _, test := range []struct { + name string + + options []circular.OptionFunc + + expectOutOfSync bool + }{ + { + name: "no chunks", + + options: []circular.OptionFunc{ + circular.WithInitialCapacity(2048), + circular.WithMaxCapacity(65536), + circular.WithSafetyGap(256), + }, + + expectOutOfSync: true, + }, + { + name: "chunks, no out of sync", + options: []circular.OptionFunc{ + circular.WithInitialCapacity(2048), + circular.WithMaxCapacity(65536), + circular.WithSafetyGap(256), + circular.WithNumCompressedChunks(5, must.Value(zstd.NewCompressor())(t)), + }, + }, + { + name: "not enough chunks", + options: []circular.OptionFunc{ + circular.WithInitialCapacity(2048), + circular.WithMaxCapacity(2048), + circular.WithSafetyGap(256), + circular.WithNumCompressedChunks(2, must.Value(zstd.NewCompressor())(t)), + }, + + expectOutOfSync: true, + }, + } { + t.Run(test.name, func(t *testing.T) { + t.Parallel() + + req := require.New(t) + + buf, err := circular.NewBuffer(test.options...) + req.NoError(err) + + data, err := io.ReadAll(io.LimitReader(cryptorand.Reader, 512)) + req.NoError(err) + + _, err = buf.Write(data) + req.NoError(err) + + r := buf.GetReader() + + _, err = buf.Write(bytes.Repeat([]byte{0xfe}, 65536-256)) + req.NoError(err) + + actual := make([]byte, 512) + _, err = r.Read(actual) + + if test.expectOutOfSync { + req.ErrorIs(err, circular.ErrOutOfSync) + } else { + req.NoError(err) + req.Equal(data, actual) + } + }) + } +} + +func TestRegularReaderSafetyGap(t *testing.T) { + t.Parallel() + + for _, test := range []struct { + name string + + options []circular.OptionFunc - _, err = buf.Write(bytes.Repeat([]byte{0xff}, 6146)) - suite.Require().NoError(err) + expectedBytesRead int + }{ + { + name: "no chunks", - r := buf.GetReader() + options: []circular.OptionFunc{ + circular.WithInitialCapacity(2048), + circular.WithMaxCapacity(4096), + circular.WithSafetyGap(256), + }, - _, err = buf.Write(bytes.Repeat([]byte{0xfe}, 100)) - suite.Require().NoError(err) + expectedBytesRead: 4096 - 256, + }, + { + name: "chunks", - actual, err := io.ReadAll(r) - suite.Require().NoError(err) - suite.Require().Equal(bytes.Repeat([]byte{0xff}, 4096-256), actual) + options: []circular.OptionFunc{ + circular.WithInitialCapacity(2048), + circular.WithMaxCapacity(4096), + circular.WithSafetyGap(256), + circular.WithNumCompressedChunks(5, must.Value(zstd.NewCompressor())(t)), + }, - suite.Require().NoError(r.Close()) + expectedBytesRead: 6146, + }, + } { + t.Run(test.name, func(t *testing.T) { + t.Parallel() - _, err = r.Read(nil) - suite.Require().Equal(err, circular.ErrClosed) + req := require.New(t) + + buf, err := circular.NewBuffer(test.options...) + req.NoError(err) + + _, err = buf.Write(bytes.Repeat([]byte{0xff}, 6146)) + req.NoError(err) + + r := buf.GetReader() + + _, err = buf.Write(bytes.Repeat([]byte{0xfe}, 100)) + req.NoError(err) + + actual, err := io.ReadAll(r) + req.NoError(err) + req.Equal(bytes.Repeat([]byte{0xff}, test.expectedBytesRead), actual) + + req.NoError(r.Close()) + + _, err = r.Read(nil) + req.Equal(err, circular.ErrClosed) + }) + } } -func (suite *CircularSuite) TestRegularSeek() { - buf, err := circular.NewBuffer(circular.WithInitialCapacity(2048), circular.WithMaxCapacity(65536), circular.WithSafetyGap(256)) - suite.Require().NoError(err) +func TestRegularSeek(t *testing.T) { + t.Parallel() - _, err = buf.Write(bytes.Repeat([]byte{0xff}, 512)) - suite.Require().NoError(err) + for _, test := range []struct { + name string - _, err = buf.Write(bytes.Repeat([]byte{0xfe}, 512)) - suite.Require().NoError(err) + options []circular.OptionFunc - r := buf.GetReader() + expectOutOfSync bool + }{ + { + name: "no chunks", - _, err = buf.Write(bytes.Repeat([]byte{0xfc}, 512)) - suite.Require().NoError(err) + options: []circular.OptionFunc{ + circular.WithInitialCapacity(2048), + circular.WithMaxCapacity(65536), + circular.WithSafetyGap(256), + }, - off, err := r.Seek(0, io.SeekCurrent) - suite.Require().NoError(err) - suite.Assert().EqualValues(0, off) + expectOutOfSync: true, + }, + { + name: "chunks", - data := make([]byte, 256) + options: []circular.OptionFunc{ + circular.WithInitialCapacity(2048), + circular.WithMaxCapacity(16384), + circular.WithSafetyGap(256), + circular.WithNumCompressedChunks(5, must.Value(zstd.NewCompressor())(t)), + }, + }, + { + name: "tiny chunks", - n, err := r.Read(data) - suite.Require().NoError(err) - suite.Assert().Equal(256, n) - suite.Assert().Equal(bytes.Repeat([]byte{0xff}, 256), data) + options: []circular.OptionFunc{ + circular.WithInitialCapacity(256), + circular.WithMaxCapacity(256), + circular.WithSafetyGap(64), + circular.WithNumCompressedChunks(6, must.Value(zstd.NewCompressor())(t)), + }, - off, err = r.Seek(0, io.SeekCurrent) - suite.Require().NoError(err) - suite.Assert().EqualValues(256, off) + expectOutOfSync: true, + }, + } { + t.Run(test.name, func(t *testing.T) { + t.Parallel() - off, err = r.Seek(-256, io.SeekEnd) - suite.Require().NoError(err) - suite.Assert().EqualValues(768, off) + req := require.New(t) + asrt := assert.New(t) - n, err = r.Read(data) - suite.Require().NoError(err) - suite.Assert().Equal(256, n) - suite.Assert().Equal(bytes.Repeat([]byte{0xfe}, 256), data) + buf, err := circular.NewBuffer(test.options...) + req.NoError(err) - off, err = r.Seek(2048, io.SeekStart) - suite.Require().NoError(err) - suite.Assert().EqualValues(1024, off) + _, err = buf.Write(bytes.Repeat([]byte{0xff}, 512)) + req.NoError(err) - _, err = buf.Write(bytes.Repeat([]byte{0xfe}, 65536-256)) - suite.Require().NoError(err) + _, err = buf.Write(bytes.Repeat([]byte{0xfe}, 512)) + req.NoError(err) - off, err = r.Seek(0, io.SeekStart) - suite.Require().NoError(err) - suite.Assert().EqualValues(0, off) + r := buf.GetReader() - _, err = r.Seek(-100, io.SeekStart) - suite.Require().Equal(circular.ErrSeekBeforeStart, err) + _, err = buf.Write(bytes.Repeat([]byte{0xfc}, 512)) + req.NoError(err) - _, err = r.Read(nil) - suite.Require().Equal(circular.ErrOutOfSync, err) -} + off, err := r.Seek(0, io.SeekCurrent) + req.NoError(err) + asrt.EqualValues(0, off) + + data := make([]byte, 256) -func TestCircularSuite(t *testing.T) { - suite.Run(t, new(CircularSuite)) + n, err := r.Read(data) + req.NoError(err) + req.Equal(256, n) + req.Equal(bytes.Repeat([]byte{0xff}, 256), data) + + off, err = r.Seek(0, io.SeekCurrent) + req.NoError(err) + asrt.EqualValues(256, off) + + off, err = r.Seek(-256, io.SeekEnd) + req.NoError(err) + asrt.EqualValues(768, off) + + n, err = r.Read(data) + req.NoError(err) + asrt.Equal(256, n) + asrt.Equal(bytes.Repeat([]byte{0xfe}, 256), data) + + off, err = r.Seek(2048, io.SeekStart) + req.NoError(err) + asrt.EqualValues(1024, off) + + _, err = buf.Write(bytes.Repeat([]byte{0xfe}, 65536-256)) + req.NoError(err) + + off, err = r.Seek(0, io.SeekStart) + req.NoError(err) + asrt.EqualValues(0, off) + + _, err = r.Seek(-100, io.SeekStart) + req.ErrorIs(err, circular.ErrSeekBeforeStart) + + singleByte := make([]byte, 1) + _, err = r.Read(singleByte) + + if test.expectOutOfSync { + req.ErrorIs(err, circular.ErrOutOfSync) + } else { + req.NoError(err) + req.EqualValues(0xff, singleByte[0]) + } + }) + } } diff --git a/go.mod b/go.mod index b3df58a..9f5a54f 100644 --- a/go.mod +++ b/go.mod @@ -1,9 +1,12 @@ module github.com/siderolabs/go-circular -go 1.22 +go 1.22.3 require ( + github.com/klauspost/compress v1.17.8 + github.com/siderolabs/gen v0.4.8 github.com/stretchr/testify v1.9.0 + golang.org/x/sync v0.7.0 golang.org/x/time v0.5.0 ) diff --git a/go.sum b/go.sum index 7301572..dc77d29 100644 --- a/go.sum +++ b/go.sum @@ -1,23 +1,18 @@ -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= +github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/siderolabs/gen v0.4.8 h1:VNpbmDLhkXp7qcSEkKk1Ee7vU2afs3xvHrWLGR2UuiY= +github.com/siderolabs/gen v0.4.8/go.mod h1:7ROKVHHB68R3Amrd4a1ZXz/oMrXWF3Mg3lSEgnkJY5c= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -golang.org/x/time v0.1.0 h1:xYY+Bajn2a7VBmTM5GikTmnK8ZuX8YgnQCqZpbBNtmA= -golang.org/x/time v0.1.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/options.go b/options.go index 56281fa..7adb283 100644 --- a/options.go +++ b/options.go @@ -8,9 +8,21 @@ import "fmt" // Options defines settings for Buffer. type Options struct { + Compressor Compressor + InitialCapacity int MaxCapacity int SafetyGap int + + NumCompressedChunks int +} + +// Compressor implements an optional interface for chunk compression. +// +// Compress and Decompress append to the dest slice and return the result. +type Compressor interface { + Compress(src, dest []byte) ([]byte, error) + Decompress(src, dest []byte) ([]byte, error) } // defaultOptions returns default initial values. @@ -67,3 +79,19 @@ func WithSafetyGap(gap int) OptionFunc { return nil } } + +// WithNumCompressedChunks sets number of compressed chunks to keep in the buffer. +// +// Default is to keep no compressed chunks, only uncompressed circular buffer is used. +func WithNumCompressedChunks(num int, c Compressor) OptionFunc { + return func(opt *Options) error { + if num < 0 { + return fmt.Errorf("number of compressed chunks should be non-negative: %d", num) + } + + opt.NumCompressedChunks = num + opt.Compressor = c + + return nil + } +} diff --git a/reader.go b/reader.go index 82dbdbd..2c47e82 100644 --- a/reader.go +++ b/reader.go @@ -10,41 +10,120 @@ import ( ) // Reader implements seekable reader with local position in the Buffer which -// reads from the fixed part of the buffer. +// reads from the fixed part of the buffer, or performs streaming reads. // // Reader is not safe to be used with concurrent Read/Seek operations. type Reader struct { buf *Buffer + // if reading from a compressed chunk, chunk is set to non-nil value + // decompressedChunk is used to store the decompressed chunk, and also re-used as a decompression buffer + chunk *chunk + decompressedChunk []byte + startOff, endOff int64 off int64 - closed uint32 + // if streaming, endOff should be set to MaxInt64 + streaming bool + + closed atomic.Bool } +// StreamingReader is a backwards compatible type alias. +type StreamingReader = Reader + // Read implements io.Reader. +// +//nolint:gocognit func (r *Reader) Read(p []byte) (n int, err error) { - if atomic.LoadUint32(&r.closed) > 0 { + if r.closed.Load() { return n, ErrClosed } + if r.off == r.endOff { + return n, io.EOF + } + + if len(p) == 0 { + return n, nil + } + + if r.chunk != nil { + // how much we can read from the current chunk + nn := min(r.endOff, r.chunk.startOffset+r.chunk.size) - r.off + + if nn == 0 { + // switch to the next chunk, or if no chunk is found, switch to the circular buffer + // at this point, (r.chunk.startOffset + r.chunk.size) == r.off + r.resetChunk() + + r.buf.mu.Lock() + r.seekChunk() + r.buf.mu.Unlock() + + if r.chunk != nil { + nn = min(r.endOff, r.chunk.startOffset+r.chunk.size) - r.off + } + } + + // if r.chunk == nil, we need to switch to the last chunk as a circular buffer, so fallthrough below + if r.chunk != nil { + if len(r.decompressedChunk) == 0 { + r.decompressedChunk, err = r.buf.opt.Compressor.Decompress(r.chunk.compressed, r.decompressedChunk) + if err != nil { + return n, err + } + } + + if nn > int64(len(p)) { + nn = int64(len(p)) + } + + copy(p, r.decompressedChunk[r.off-r.chunk.startOffset:r.off-r.chunk.startOffset+nn]) + + n = int(nn) + r.off += nn + + return n, nil + } + } + + // from this point down, reading from the current chunk r.buf.mu.Lock() defer r.buf.mu.Unlock() if r.off < r.buf.off-int64(r.buf.opt.MaxCapacity) { + // check if there is a chunk that has r.off in its range + r.seekChunk() + + if r.chunk != nil { + return r.Read(p) + } + // reader is falling too much behind - return n, ErrOutOfSync + if !r.streaming { + return n, ErrOutOfSync + } + + // reset the offset to the first available position + r.off = r.buf.off - int64(r.buf.opt.MaxCapacity) } - if r.off == r.endOff { - return n, io.EOF + for r.streaming && r.off == r.buf.off { + r.buf.cond.Wait() + + if r.closed.Load() { + return n, ErrClosed + } } - if len(p) == 0 { - return n, err + if r.streaming { + n = int(r.buf.off - r.off) + } else { + n = int(r.endOff - r.off) } - n = int(r.endOff - r.off) if n > len(p) { n = len(p) } @@ -65,7 +144,10 @@ func (r *Reader) Read(p []byte) (n int, err error) { // Close implements io.Closer. func (r *Reader) Close() error { - atomic.StoreUint32(&r.closed, 1) + if !r.closed.Swap(true) { + // wake up readers + r.buf.cond.Broadcast() + } return nil } @@ -74,11 +156,19 @@ func (r *Reader) Close() error { func (r *Reader) Seek(offset int64, whence int) (int64, error) { newOff := r.off + endOff := r.endOff + + if r.streaming { + r.buf.mu.Lock() + endOff = r.buf.off + r.buf.mu.Unlock() + } + switch whence { case io.SeekCurrent: newOff += offset case io.SeekEnd: - newOff = r.endOff + offset + newOff = endOff + offset case io.SeekStart: newOff = r.startOff + offset } @@ -87,11 +177,63 @@ func (r *Reader) Seek(offset int64, whence int) (int64, error) { return r.off - r.startOff, ErrSeekBeforeStart } - if newOff > r.endOff { - newOff = r.endOff + if newOff > endOff { + newOff = endOff } r.off = newOff + if r.chunk != nil { + if r.off < r.chunk.startOffset || r.off >= r.chunk.startOffset+r.chunk.size { + // we fell out of the chunk + r.resetChunk() + } else { + // we are within the same chunk + return r.off - r.startOff, nil + } + } + + r.buf.mu.Lock() + defer r.buf.mu.Unlock() + + r.seekChunk() + + // in streaming mode, make sure the offset is within the buffer + if r.streaming && r.chunk == nil { + if len(r.buf.chunks) > 0 { + if r.off < r.buf.chunks[0].startOffset { + r.off = r.buf.chunks[0].startOffset + } + } else { + if r.off < endOff-int64(r.buf.opt.MaxCapacity-r.buf.opt.SafetyGap) { + r.off = endOff - int64(r.buf.opt.MaxCapacity-r.buf.opt.SafetyGap) + } + } + } + return r.off - r.startOff, nil } + +// seekChunk tries to find a chunk that contains the current reading offset. +// +// seekChunk assumes that r.chunk == nil and r.decompressedChunk is reset before the call. +// seekChunk should be called with r.buf.mu locked. +func (r *Reader) seekChunk() { + for i, c := range r.buf.chunks { + if r.off >= c.startOffset && r.off < c.startOffset+c.size { + // we found the chunk + r.chunk = &r.buf.chunks[i] + + break + } + } +} + +// resetChunk resets the current chunk and decompressed chunk. +func (r *Reader) resetChunk() { + r.chunk = nil + + if r.decompressedChunk != nil { + r.decompressedChunk = r.decompressedChunk[:0] + } +} diff --git a/streaming.go b/streaming.go deleted file mode 100644 index 1a293ad..0000000 --- a/streaming.go +++ /dev/null @@ -1,113 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. - -package circular - -import ( - "io" - "sync/atomic" -) - -// StreamingReader implements seekable reader with local position in the Buffer. -// -// StreamingReader is not safe to be used with concurrent Read/Seek operations. -// -// StreamingReader blocks for new data once it exhausts contents of the buffer. -type StreamingReader struct { - buf *Buffer - - initialOff int64 - off int64 - - closed uint32 -} - -// Read implements io.Reader. -func (r *StreamingReader) Read(p []byte) (n int, err error) { - if atomic.LoadUint32(&r.closed) > 0 { - return n, ErrClosed - } - - if len(p) == 0 { - return n, err - } - - r.buf.mu.Lock() - defer r.buf.mu.Unlock() - - if r.off < r.buf.off-int64(r.buf.opt.MaxCapacity) { - // reader is falling too much behind, so need to rewind to the first available position - r.off = r.buf.off - int64(r.buf.opt.MaxCapacity) - } - - for r.off == r.buf.off { - r.buf.cond.Wait() - - if atomic.LoadUint32(&r.closed) > 0 { - return n, ErrClosed - } - } - - n = int(r.buf.off - r.off) - if n > len(p) { - n = len(p) - } - - i := int(r.off % int64(r.buf.opt.MaxCapacity)) - - if l := r.buf.opt.MaxCapacity - i; l < n { - copy(p, r.buf.data[i:]) - copy(p[l:], r.buf.data[:n-l]) - } else { - copy(p, r.buf.data[i:i+n]) - } - - r.off += int64(n) - - return n, err -} - -// Close implements io.Closer. -func (r *StreamingReader) Close() error { - if atomic.CompareAndSwapUint32(&r.closed, 0, 1) { - // wake up readers - r.buf.cond.Broadcast() - } - - return nil -} - -// Seek implements io.Seeker. -func (r *StreamingReader) Seek(offset int64, whence int) (int64, error) { - newOff := r.off - - r.buf.mu.Lock() - writeOff := r.buf.off - r.buf.mu.Unlock() - - switch whence { - case io.SeekCurrent: - newOff += offset - case io.SeekEnd: - newOff = writeOff + offset - case io.SeekStart: - newOff = r.initialOff + offset - } - - if newOff < r.initialOff { - return r.off - r.initialOff, ErrSeekBeforeStart - } - - if newOff > writeOff { - newOff = writeOff - } - - if newOff < writeOff-int64(r.buf.opt.MaxCapacity-r.buf.opt.SafetyGap) { - newOff = writeOff - int64(r.buf.opt.MaxCapacity-r.buf.opt.SafetyGap) - } - - r.off = newOff - - return r.off - r.initialOff, nil -} diff --git a/zstd/zstd.go b/zstd/zstd.go new file mode 100644 index 0000000..08c5bda --- /dev/null +++ b/zstd/zstd.go @@ -0,0 +1,44 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +// Package zstd compression and decompression functions. +package zstd + +import ( + "github.com/klauspost/compress/zstd" +) + +// Compressor implements Compressor using zstd compression. +type Compressor struct { + dec *zstd.Decoder + enc *zstd.Encoder +} + +// NewCompressor creates new Compressor. +func NewCompressor(opts ...zstd.EOption) (*Compressor, error) { + dec, err := zstd.NewReader(nil) + if err != nil { + return nil, err + } + + enc, err := zstd.NewWriter(nil, opts...) + if err != nil { + return nil, err + } + + return &Compressor{ + dec: dec, + enc: enc, + }, nil +} + +// Compress data using zstd. +func (c *Compressor) Compress(src, dest []byte) ([]byte, error) { + return c.enc.EncodeAll(src, dest), nil +} + +// Decompress data using zstd. +func (c *Compressor) Decompress(src, dest []byte) ([]byte, error) { + return c.dec.DecodeAll(src, dest) +}