Skip to content

Commit

Permalink
Use iostream for read/write
Browse files Browse the repository at this point in the history
  • Loading branch information
kelindar committed Dec 12, 2021
1 parent cc4cb3c commit e662543
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 172 deletions.
9 changes: 0 additions & 9 deletions column.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package column

import (
"fmt"
"io"
"reflect"
"sync"

Expand Down Expand Up @@ -163,14 +162,6 @@ func (c *column) Apply(r *commit.Reader) {
c.Column.Apply(r)
}

// Snapshot snapshots the column into a temporary buffer and writes the content into the
// destionation io.Writer.
func (c *column) WriteTo(w io.Writer, tmp *commit.Buffer) (int64, error) {
tmp.Reset(c.name)
c.Column.Snapshot(tmp)
return tmp.WriteTo(w)
}

// Value retrieves a value at a specified index
func (c *column) Value(idx uint32) (v interface{}, ok bool) {
v, ok = c.Column.Value(idx)
Expand Down
144 changes: 40 additions & 104 deletions commit/buffer_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,164 +8,100 @@ import (
"io"
"reflect"
"unsafe"

"github.com/kelindar/iostream"
)

// --------------------------- WriteTo ----------------------------

// WriteTo writes data to w until there's no more data to write or when an error occurs. The return
// value n is the number of bytes written. Any error encountered during the write is also returned.
func (b *Buffer) WriteTo(w io.Writer) (n int64, err error) {
nName, err := writeBytesTo(w, toBytes(b.Column))
if err != nil {
return 0, err
}

nLast, err := writeUintTo(w, int(b.last))
if err != nil {
return 0, err
func (b *Buffer) WriteTo(dst io.Writer) (int64, error) {
w := iostream.NewWriter(dst)
if err := w.WriteString(b.Column); err != nil {
return w.Offset(), err
}

nHead, err := writeChunksTo(w, b.chunks)
if err != nil {
return 0, err
if err := w.WriteInt32(b.last); err != nil {
return w.Offset(), err
}

nBody, err := writeBytesTo(w, b.buffer)
if err != nil {
return 0, err
if err := writeChunksTo(w, b.chunks); err != nil {
return w.Offset(), err
}

n += int64(nLast + nName + nHead + nBody)
return
}

// writeBytesTo writes the string to the output buffer
func writeBytesTo(w io.Writer, v []byte) (n int, err error) {
nSize, err := writeUintTo(w, len(v))
if err != nil {
return 0, err
}

nText, err := w.Write(v)
if err != nil {
return 0, err
}

n += nSize + nText
return
err := w.WriteBytes(b.buffer)
return w.Offset(), err
}

// writeChunksTo writes a header with chunk offsets
func writeChunksTo(w io.Writer, chunks []header) (n int, err error) {
m, err := writeUintTo(w, len(chunks))
if err != nil {
return 0, err
func writeChunksTo(w *iostream.Writer, chunks []header) error {
if err := w.WriteUvarint(uint64(len(chunks))); err != nil {
return err
}

n += m
var temp [12]byte
for _, v := range chunks {
binary.BigEndian.PutUint32(temp[0:4], v.Chunk)
binary.BigEndian.PutUint32(temp[4:8], v.Start)
binary.BigEndian.PutUint32(temp[8:12], v.Value)
m, err := w.Write(temp[:])
if err != nil {
return 0, err
if _, err := w.Write(temp[:]); err != nil {
return err
}

n += m
}
return
}

// writeUintTo writes the length of something into the destination writer
func writeUintTo(w io.Writer, v int) (n int, err error) {
var temp [4]byte
binary.BigEndian.PutUint32(temp[:], uint32(v))
return w.Write(temp[:])
return nil
}

// --------------------------- ReadFrom ----------------------------

// ReadFrom reads data from r until EOF or error. The return value n is the number of
// bytes read. Any error except EOF encountered during the read is also returned.
func (b *Buffer) ReadFrom(r io.Reader) (n int64, err error) {
name, nName, err := readBytesFrom(r)
if err != nil {
return 0, err
func (b *Buffer) ReadFrom(src io.Reader) (int64, error) {
r := iostream.NewReader(src)
var err error
if b.Column, err = r.ReadString(); err != nil {
return r.Offset(), err
}

last, nLast, err := readUintFrom(r)
if err != nil {
return 0, err
if b.last, err = r.ReadInt32(); err != nil {
return r.Offset(), err
}

head, nHead, err := readChunksFrom(r)
if err != nil {
return 0, err
if b.chunks, err = readChunksFrom(r); err != nil {
return r.Offset(), err
}

body, nBody, err := readBytesFrom(r)
if err != nil {
return 0, err
if b.buffer, err = r.ReadBytes(); err != nil {
return r.Offset(), err
}

b.Column = string(name)
b.chunks = head
b.buffer = body
b.last = int32(last)
if len(head) > 0 {
last := head[len(head)-1]
if len(b.chunks) > 0 {
last := b.chunks[len(b.chunks)-1]
b.chunk = last.Chunk
}

n += int64(nName + nLast + nHead + nBody)
return
}

// readBytesFrom reads the bytes prefixed with the length from the reader
func readBytesFrom(r io.Reader) (v []byte, n int, err error) {
size, nSize, err := readUintFrom(r)
if err != nil {
return nil, 0, err
}

v = make([]byte, size)
n, err = io.ReadFull(r, v)
n += nSize
return
return r.Offset(), nil
}

// readChunksFrom reads the list of chunks from the reader
func readChunksFrom(r io.Reader) (v []header, n int, err error) {
size, m, err := readUintFrom(r)
func readChunksFrom(r *iostream.Reader) ([]header, error) {
size, err := r.ReadUvarint()
if err != nil {
return nil, 0, err
return nil, err
}

n += m
v = make([]header, size)
v := make([]header, size)
var temp [12]byte
for i := 0; i < size; i++ {
m, err := io.ReadFull(r, temp[:])
if err != nil {
return nil, 0, err
for i := 0; i < int(size); i++ {
if _, err := io.ReadFull(r, temp[:]); err != nil {
return nil, err
}

v[i].Chunk = binary.BigEndian.Uint32(temp[0:4])
v[i].Start = binary.BigEndian.Uint32(temp[4:8])
v[i].Value = binary.BigEndian.Uint32(temp[8:12])
n += m
}
return
}

// readUintFrom reads the unsigned integer from the reader
func readUintFrom(r io.Reader) (v int, n int, err error) {
var temp [4]byte
n, err = io.ReadFull(r, temp[:])
v = int(binary.BigEndian.Uint32(temp[:]))
return
return v, nil
}

// toBytes converts a string to a byte slice without allocating.
Expand Down
5 changes: 3 additions & 2 deletions commit/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,11 @@ func TestWriteTo(t *testing.T) {
n, err := input.WriteTo(buffer)
assert.NoError(t, err)
assert.Equal(t, int64(buffer.Len()), n)
assert.Equal(t, int64(45), n)
assert.Equal(t, int64(36), n)

output := NewBuffer(0)
output.ReadFrom(buffer)
m, err := output.ReadFrom(buffer)
assert.Equal(t, int64(buffer.Len()), m)
assert.Equal(t, input, output)
}

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.17
require (
github.com/kelindar/bitmap v1.1.4
github.com/kelindar/intmap v1.1.0
github.com/kelindar/iostream v1.2.0
github.com/kelindar/smutex v1.0.0
github.com/klauspost/compress v1.13.6
github.com/stretchr/testify v1.7.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ github.com/kelindar/bitmap v1.1.4 h1:rNwZ6RMRhrE3Um0QqBwFoJAcAzYi/4M7XGDCZYS6TOU
github.com/kelindar/bitmap v1.1.4/go.mod h1:shAFyS8BOif+pvJ05GqxnCM0SdohHQjKvDetqI/9z6M=
github.com/kelindar/intmap v1.1.0 h1:S+YEDvw5FQus5UJDEG+xsLp8il3BTYqBMkkuVVZPMH8=
github.com/kelindar/intmap v1.1.0/go.mod h1:tDanawPWq1B0HC+X3W8Z6IKNrJqxjruy6CdyTlf6Nic=
github.com/kelindar/iostream v1.2.0 h1:UaFLsj/0quGMZxI/rpHmI8u7vNKY9xSyw4mQ9YCTPWA=
github.com/kelindar/iostream v1.2.0/go.mod h1:MkjMuVb6zGdPQVdwLnFRO0xOTOdDvBWTztFmjRDQkXk=
github.com/kelindar/smutex v1.0.0 h1:+LIZYwPz+v3IWPOse764fNaVQGMVxKV6mbD6OWjQV3o=
github.com/kelindar/smutex v1.0.0/go.mod h1:nMbCZeAHWCsY9Kt4JqX7ETd+NJeR6Swy9im+Th+qUZQ=
github.com/kelindar/xxrand v1.0.1 h1:TG9Ix5h3ulBXVWwRUF8ePXl65FjIj48CzsgZw0nHvfY=
Expand Down

0 comments on commit e662543

Please sign in to comment.