Skip to content

Commit

Permalink
Merge pull request #8 from solarisdb/locallog
Browse files Browse the repository at this point in the history
locallog and the concept, but not tests yet
  • Loading branch information
dspasibenko committed Mar 15, 2024
2 parents c93f73c + e4bcecd commit d9ecb3a
Show file tree
Hide file tree
Showing 5 changed files with 303 additions and 63 deletions.
6 changes: 3 additions & 3 deletions golibs/container/lru/releasable.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (r *ReleasableCache[K, V]) GetOrCreate(ctx context.Context, k K) (Releasabl

// Release allows to return the object back into the cache and let the cache know that the client is not
// going to use the object. The client MUST NOT use the rlsbl after the call.
func (r *ReleasableCache[K, V]) Release(rlsbl *Releasable[V]) error {
func (r *ReleasableCache[K, V]) Release(rlsbl *Releasable[V]) {
r.lock.Lock()
defer r.lock.Unlock()
rlsbl.rh.refCounter--
Expand All @@ -166,7 +166,7 @@ func (r *ReleasableCache[K, V]) Release(rlsbl *Releasable[V]) error {
if r.onDeleteF != nil {
r.onDeleteF((rlsbl.k).(K), rlsbl.rh.value)
}
return nil
return
}
r.lruCache.Add((rlsbl.k).(K), rlsbl.rh.value)
if r.waiter != nil {
Expand All @@ -178,7 +178,7 @@ func (r *ReleasableCache[K, V]) Release(rlsbl *Releasable[V]) error {
}
}
rlsbl.rh = nil
return nil
return
}

// Close removes all not borrowed objects. The objects that are not released yet will be deleted after the
Expand Down
81 changes: 52 additions & 29 deletions pkg/storage/chunkfs/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ type (
UnsafePayload []byte
}

// AppendRecordsResult is used to report the append records operation result
AppendRecordsResult struct {
// Written is the number of records added to the chunk
Written int
// StartID is the first added record ID
StartID ulid.ULID
// LastID is the last added record ID
LastID ulid.ULID
}

metaBuf []byte
metaRec struct {
ID ulid.ULID
Expand Down Expand Up @@ -198,7 +208,7 @@ func (c *Chunk) init(fullCheck bool) error {
startOffs := c.freeOffset
var id ulid.ULID
pMax := int(c.mmf.Size() - int64(c.total*cMetaRecordSize))
for i := 0; i < int(c.total); i++ {
for i := 0; i < c.total; i++ {
mr := mb.get(i)
if mr.ID.Compare(id) < 0 {
return fmt.Errorf("the record #%d ID=%s is less than the previous one %s: %w", i, mr.ID.String(), id.String(), errCorrupted)
Expand Down Expand Up @@ -238,42 +248,48 @@ func (c *Chunk) close() error {
return err
}

// AppendRecords allows to add new records into the chunk.
func (c *Chunk) AppendRecords(recs []*solaris.Record) error {
pSize := payloadSize(recs)
size := pSize + int64(len(recs)*cMetaRecordSize)
if len(recs) == 0 {
return nil
}

// AppendRecords allows to add new records into the chunk. The chunk size can be extended if the records do not fit into
// the existing chunk. If the chunk reaches its maximum capacity it will not grow anymore. Only some records, that
// fit into the chunk will be written. The result will contain the number of records actually written
func (c *Chunk) AppendRecords(recs []*solaris.Record) (AppendRecordsResult, error) {
c.lock.Lock()
defer c.lock.Unlock()

if c.mmf == nil {
// chunk is closed
return fmt.Errorf("the chunk %s is closed: %w ", c.fn, errors.ErrClosed)
return AppendRecordsResult{}, fmt.Errorf("the chunk %s is closed: %w ", c.fn, errors.ErrClosed)
}
n, size := c.writable(recs)
if n == 0 {
return AppendRecordsResult{}, nil
}

if err := c.growForWrite(size); err != nil {
if err := c.growForWrite(int64(size)); err != nil {
// could not grow the Chunk
return err
return AppendRecordsResult{}, err
}

recs = recs[:n]
mb, err := c.getMetaBuf(int(c.total)+len(recs)-1, len(recs))
if err != nil {
return err
return AppendRecordsResult{}, err
}

pOffset := c.freeOffset
var startID, lastID ulid.ULID
for i, r := range recs {
mb.put(i, metaRec{ID: ulidutils.New(), offset: int32(pOffset), size: int32(len(r.Payload))})
lastID = ulidutils.New()
if i == 0 {
startID = lastID
}
mb.put(i, metaRec{ID: lastID, offset: int32(pOffset), size: int32(len(r.Payload))})
pOffset += len(r.Payload)
}

pBuf, err := c.mmf.Buffer(int64(c.freeOffset), int(pSize))
pSize := pOffset - c.freeOffset
pBuf, err := c.mmf.Buffer(int64(c.freeOffset), pSize)
if err != nil {
c.logger.Errorf("could not map payload-buffer with offset %d for size=%d: %v", c.freeOffset, pSize, err)
return fmt.Errorf("could not write data: %w", fmt.Errorf("could not map payload-buffer with offset %d for size=%d: %w", c.freeOffset, pSize, errors.ErrInternal))
return AppendRecordsResult{}, fmt.Errorf("could not write data: %w", fmt.Errorf("could not map payload-buffer with offset %d for size=%d: %w", c.freeOffset, pSize, errors.ErrInternal))
}
pOffset = 0
for _, r := range recs {
Expand All @@ -287,11 +303,11 @@ func (c *Chunk) AppendRecords(recs []*solaris.Record) error {
hdr, err := c.mmf.Buffer(int64(len(hdrVersion)), 4)
if err != nil {
c.logger.Errorf("could not map records counter buffer with offset %d for size=4: %v", len(hdrVersion), err)
return fmt.Errorf("could not map records counter buffer with offset %d for size=4: %w", c.freeOffset, errors.ErrInternal)
return AppendRecordsResult{}, fmt.Errorf("could not map records counter buffer with offset %d for size=4: %w", c.freeOffset, errors.ErrInternal)
}
binary.BigEndian.PutUint32(hdr, uint32(c.total))

return nil
return AppendRecordsResult{Written: n, StartID: startID, LastID: lastID}, nil
}

// getMetaBuf maps the meta-buffer for the index startIdx with ln number of meta-records
Expand Down Expand Up @@ -368,7 +384,7 @@ func (c *Chunk) OpenChunkReader(descending bool) (*ChunkReader, error) {
c.lock.RUnlock()
return nil, fmt.Errorf("the chunk %s is closed: %w ", c.fn, errors.ErrClosed)
}
mb, err := c.getMetaBuf(int(c.total-1), int(c.total))
mb, err := c.getMetaBuf(c.total-1, c.total)
if err != nil {
c.lock.RUnlock()
return nil, err
Expand All @@ -390,16 +406,23 @@ func (c *Chunk) available() int64 {
return c.mmf.Size() - int64(c.freeOffset+c.total*cMetaRecordSize)
}

func payloadSize(recs []*solaris.Record) int64 {
size := int64(0)
for _, r := range recs {
size += int64(len(r.Payload))
// writable returns the number of records and the total size of the records, that can fit into the
// chunk, even if it will grow.
func (c *Chunk) writable(recs []*solaris.Record) (int, int) {
maxAvaialbe := int(cfg.maxChunkSize) - c.freeOffset + c.total*cMetaRecordSize
totalSize := 0
for i, r := range recs {
recSize := len(r.Payload) + cMetaRecordSize
if totalSize+recSize > maxAvaialbe {
return i, totalSize
}
totalSize += recSize
}
return size
return len(recs), totalSize
}

func (cr *ChunkReader) HasNext() bool {
return cr.idx < int(cr.c.total) && cr.idx > -1
return cr.idx < cr.c.total && cr.idx > -1
}

func (cr *ChunkReader) Next() (UnsafeRecord, bool) {
Expand Down Expand Up @@ -429,16 +452,16 @@ func (cr *ChunkReader) Close() error {
func (cr *ChunkReader) SetStartID(startID ulid.ULID) int {
res := 0
if cr.inc == -1 {
cr.idx = sort.Search(int(cr.c.total), func(i int) bool {
cr.idx = sort.Search(cr.c.total, func(i int) bool {
return cr.mb.get(i).ID.Compare(startID) > 0
})
cr.idx--
res = cr.idx + 1
} else {
cr.idx = sort.Search(int(cr.c.total), func(i int) bool {
cr.idx = sort.Search(cr.c.total, func(i int) bool {
return cr.mb.get(i).ID.Compare(startID) >= 0
})
res = int(cr.c.total) - cr.idx
res = cr.c.total - cr.idx
}
return res
}
42 changes: 35 additions & 7 deletions pkg/storage/chunkfs/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ func TestChunk_Open(t *testing.T) {

fn := filepath.Join(dir, "c1")
c := NewChunk(fn, "c1")
assert.NotNil(t, c.AppendRecords(generateRecords(1, 1)))
_, err = c.AppendRecords(generateRecords(1, 1))
assert.NotNil(t, err)
_, err = c.OpenChunkReader(false)
assert.NotNil(t, err)
assert.Nil(t, c.Open(true))
Expand Down Expand Up @@ -86,7 +87,9 @@ func TestChunk_SimpleAppend(t *testing.T) {
c := NewChunk(fn, "c1")
assert.Nil(t, c.Open(false))
recs := generateRecords(3, 10)
assert.Nil(t, c.AppendRecords(recs))
arr, err := c.AppendRecords(recs)
assert.Nil(t, err)
assert.Equal(t, 3, arr.Written)
defer c.Close()

cr1, err := c.OpenChunkReader(false)
Expand Down Expand Up @@ -127,7 +130,9 @@ func TestChunk_AppendGrowth(t *testing.T) {
c := NewChunk(fn, "c1")
assert.Nil(t, c.Open(false))
recs := generateRecords(3, 10)
assert.Nil(t, c.AppendRecords(recs))
arr, err := c.AppendRecords(recs)
assert.Nil(t, err)
assert.Equal(t, 3, arr.Written)
defer c.Close()

fi, err := os.Stat(fn)
Expand All @@ -136,12 +141,14 @@ func TestChunk_AppendGrowth(t *testing.T) {

recs2 := generateRecords(100, 30)
recs = append(recs, recs2...)
assert.Nil(t, c.AppendRecords(recs2))
_, err = c.AppendRecords(recs2)
assert.Nil(t, err)
fi, err = os.Stat(fn)
assert.Nil(t, err)
assert.Equal(t, 2*cfg.newSize, fi.Size())

assert.Nil(t, c.AppendRecords(recs2))
_, err = c.AppendRecords(recs2)
assert.Nil(t, err)
fi, err = os.Stat(fn)
assert.Nil(t, err)
assert.Equal(t, 3*cfg.newSize, fi.Size())
Expand All @@ -159,15 +166,16 @@ func TestChunk_AppendGrowth(t *testing.T) {
cr1.Close()

container.SliceReverse(recs)
assert.Nil(t, c.AppendRecords(recs2))
_, err = c.AppendRecords(recs2)
assert.Nil(t, err)
fi, err = os.Stat(fn)
assert.Nil(t, err)
assert.Equal(t, 4*cfg.newSize, fi.Size())
recs = append(recs, recs2...)

before := c.freeOffset
assert.Equal(t, len(recs), int(c.total))
err = c.AppendRecords(generateRecords(1000, 30))
_, err = c.AppendRecords(generateRecords(1000, 30))
assert.NotNil(t, err)
assert.True(t, errors.Is(err, errors.ErrExhausted))
assert.Equal(t, before, c.freeOffset)
Expand All @@ -179,6 +187,26 @@ func TestChunk_AppendGrowth(t *testing.T) {
cr1.Close()
}

func TestChunk_AppendGrowth2(t *testing.T) {
dir, err := os.MkdirTemp("", "TestChunk_AppendGrowth2")
assert.Nil(t, err)
defer os.RemoveAll(dir)

cfg.newSize = files.BlockSize
cfg.maxChunkSize = 5 * files.BlockSize
cfg.maxGrowIncreaseSize = 1 * files.BlockSize

fn := filepath.Join(dir, "c1")
c := NewChunk(fn, "c1")
assert.Nil(t, c.Open(false))
defer c.Close()
recs := generateRecords(3000, 512)
arr, err := c.AppendRecords(recs)
assert.Nil(t, err)
assert.Equal(t, 38, arr.Written)
assert.True(t, arr.StartID.Compare(arr.LastID) < 0)
}

func checkRecords(t *testing.T, it *ChunkReader, recs []*solaris.Record) {
for _, rec := range recs {
assert.True(t, it.HasNext())
Expand Down
Loading

0 comments on commit d9ecb3a

Please sign in to comment.