Skip to content

Commit

Permalink
refactor(io): claims semantics of startSniff and stopSniff
Browse files Browse the repository at this point in the history
  • Loading branch information
searKing committed May 10, 2024
1 parent 73a444b commit c7ab1bc
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 30 deletions.
4 changes: 2 additions & 2 deletions go/io/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,12 +238,12 @@ func ExampleCountLines() {

func printSniff(r io.Reader, n int) {
b := make([]byte, n)
_, err := r.Read(b)
n, err := r.Read(b)
if err != nil {
log.Fatal(err)
}

fmt.Printf("%s", b)
fmt.Printf("%s", b[:n])
}

func printAll(r io.Reader) {
Expand Down
4 changes: 4 additions & 0 deletions go/io/sniff.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import (
// ReadSniffer is the interface that groups the basic Read and Sniff methods.
type ReadSniffer interface {
io.Reader

// Sniff starts or stops sniffing, restarts if stop and start called one by one
// true to start sniffing all data unread actually
// false to return a multi reader with all data sniff buffered and source
Sniff(sniffing bool) ReadSniffer
}

Expand Down
56 changes: 28 additions & 28 deletions go/io/sniff.reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@ import (
)

type sniffReader struct {
source io.Reader
buffer *bytes.Buffer
historyBuffers []io.Reader
// sniff start: read from [historyBuffers..., source] and buffered in buffer
// sniff stop: read from [buffer, historyBuffers..., source] and clean buffer and historyBuffers if meet EOF
source io.Reader

// virtual reader: buffer, historyBuffers..., source
buffer *bytes.Buffer // latest read data
historyBuffers []io.Reader // new, old, older read data

selectorF DynamicReaderFunc

Expand All @@ -23,7 +27,7 @@ func newSniffReader(r io.Reader) *sniffReader {
sr := &sniffReader{
source: r,
}
sr.resetSelector()
sr.stopSniff()
return sr
}

Expand All @@ -36,20 +40,10 @@ func (sr *sniffReader) Sniff(sniffing bool) ReadSniffer {
}
sr.sniffing = sniffing
if sniffing {
sr.shrinkToHistory()
// We don't need the buffer anymore.
// Reset it to release the internal slice.
sr.buffer = &bytes.Buffer{}

readers := sr.historyBuffers
readers = append(readers, sr.source)
reader := io.TeeReader(io.MultiReader(readers...), sr.buffer)
sr.selectorF = func() io.Reader {
return reader
}
sr.startSniff()
return sr
}
sr.resetSelector()
sr.stopSniff()
return sr
}

Expand All @@ -61,28 +55,34 @@ func (sr *sniffReader) shrinkToHistory() {
bufferReader := WatchReader(bytes.NewBuffer(sr.buffer.Bytes()), WatcherFunc(func(p []byte, n int, err error) (int, error) {
if err == io.EOF {
// historyBuffers is consumed head first, so can be cleared from head
sr.historyBuffers = sr.historyBuffers[1:] // recycle memory
sr.historyBuffers = sr.historyBuffers[1:] // remove head to recover space
}
return n, err
}))
var rs []io.Reader
rs = append(rs, bufferReader)
sr.historyBuffers = append(rs, sr.historyBuffers...)
sr.historyBuffers = append([]io.Reader{bufferReader}, sr.historyBuffers...)
}
sr.buffer = nil
}
}

// resetSelector stops sniff and return a MultiReader of history buffers and source
func (sr *sniffReader) resetSelector() {
// startSniff starts sniff and return a TeeReader that writes to buffer while reads from history buffers and source
func (sr *sniffReader) startSniff() {
sr.shrinkToHistory()
// We don't need the buffer anymore.
// Reset it to release the internal slice.
sr.buffer = &bytes.Buffer{}

readers := append(sr.historyBuffers, sr.source)
reader := io.MultiReader(readers...)
sr.selectorF = func() io.Reader {
return reader
}
reader := io.TeeReader(io.MultiReader(readers...), sr.buffer)
sr.selectorF = func() io.Reader { return reader }
}

func (sr *sniffReader) Read(p []byte) (n int, err error) {
return sr.selectorF.Read(p)
// stopSniff stops sniff and return a MultiReader of history buffers and source
func (sr *sniffReader) stopSniff() {
sr.shrinkToHistory()
readers := append(sr.historyBuffers, sr.source)
reader := io.MultiReader(readers...)
sr.selectorF = func() io.Reader { return reader }
}

func (sr *sniffReader) Read(p []byte) (n int, err error) { return sr.selectorF.Read(p) }
149 changes: 149 additions & 0 deletions go/io/sniff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright 2024 The searKing Author. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package io_test

import (
"io"
"strings"
"testing"

io_ "github.com/searKing/golang/go/io"
)

func shrinkSnifferMayBeWithHoleInHistoryBuffers(r io_.ReadSniffer, sniffing bool) {
// stop sniffing and start sniffing, to move buffer to history buffers
r.Sniff(!sniffing).Sniff(sniffing)
}

func TestSniffReaderSeeker(t *testing.T) {
r := strings.NewReader("HEADER BODY TAILER")
sniff := io_.SniffReader(r)

// start sniffing
sniff.Sniff(true)
// ["HEADER BODY TAILER"]

{ // sniff "HEADER"
b := make([]byte, len("HEADER"))
n, err := sniff.Read(b)
if err != nil {
t.Errorf("Error reading header: %v", err)
}
if string(b[:n]) != "HEADER" {
t.Errorf("expected %q, got %q", "HEADER", b[:n])
}
shrinkSnifferMayBeWithHoleInHistoryBuffers(sniff, true)
// ["HEADER BODY TAILER"]
}

{ // sniff "HEAD"
b := make([]byte, len("HEAD"))
n, err := sniff.Read(b)
if err != nil {
t.Errorf("Error reading body: %v", err)
}
if string(b[:n]) != "HEAD" {
t.Errorf("expected %q, got %q", " BODY", b[:n])
}
shrinkSnifferMayBeWithHoleInHistoryBuffers(sniff, true)
// ["HEADER BODY TAILER"]
}

// stop sniffing
sniff.Sniff(false)

{ // sniff "HEADER"
b := make([]byte, len("HEADER"))
n, err := sniff.Read(b)
if err != nil {
t.Errorf("Error reading header: %v", err)
}
if string(b[:n]) != "HEADER" {
t.Errorf("expected %q, got %q", "HEAD", b[:n])
}
}
{
b, err := io.ReadAll(sniff)
if err != nil {
t.Errorf("Error reading all: %v", err)
}
if string(b) != " BODY TAILER" {
t.Errorf("expected %q, got %q", " BODY TAIL", b)
}
}
}

type reader struct {
r io.Reader
}

func (r *reader) Read(b []byte) (int, error) {
return r.r.Read(b)
}

func TestSniffReaderNotSeeker(t *testing.T) {
r := &reader{r: strings.NewReader("HEADER BODY TAILER")}
sniff := io_.SniffReader(r)

// start sniffing
sniff.Sniff(true)
// ["HEADER BODY TAILER"]

{ // sniff "HEADER"
b := make([]byte, len("HEADER"))
n, err := sniff.Read(b)
if err != nil {
t.Errorf("Error reading header: %v", err)
}
if string(b[:n]) != "HEADER" {
t.Errorf("expected %q, got %q", "HEADER", b[:n])
}
shrinkSnifferMayBeWithHoleInHistoryBuffers(sniff, true)
// ["HEADER", " BODY TAILER"]
}

{ // sniff "HEAD"
b := make([]byte, len("HEAD"))
n, err := sniff.Read(b)
if err != nil {
t.Errorf("Error reading body: %v", err)
}
if string(b[:n]) != "HEAD" {
t.Errorf("expected %q, got %q", " BODY", b[:n])
}
shrinkSnifferMayBeWithHoleInHistoryBuffers(sniff, true)
// ["HEAD", "ER", " BODY TAILER"]
}

// stop sniffing
sniff.Sniff(false)

{ // sniff "HEADER"
b := make([]byte, len("HEADER"))
n, err := sniff.Read(b)
if err != nil {
t.Errorf("Error reading header: %v", err)
}
if string(b[:n]) != "HEAD" {
t.Errorf("expected %q, got %q", "HEAD", b[:n])
}
n, err = sniff.Read(b)
if err != nil {
t.Errorf("Error reading header: %v", err)
}
if string(b[:n]) != "ER" {
t.Errorf("expected %q, got %q", "ER", b[:n])
}
}
{
b, err := io.ReadAll(sniff)
if err != nil {
t.Errorf("Error reading all: %v", err)
}
if string(b) != " BODY TAILER" {
t.Errorf("expected %q, got %q", " BODY TAIL", b)
}
}
}

0 comments on commit c7ab1bc

Please sign in to comment.