Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
travisjeffery committed Mar 10, 2019
1 parent d97cd4c commit c553be6
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 13 deletions.
18 changes: 15 additions & 3 deletions api/v1/error.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
package log_v1

import grpc "google.golang.org/grpc"
import (
"fmt"

var (
ErrOffsetOutOfRange = grpc.Errorf(404, "offset out of range")
"google.golang.org/grpc/status"
)

type ErrOffsetOutOfRange struct {
Offset uint64
}

func (e ErrOffsetOutOfRange) GRPCStatus() *status.Status {
return status.New(404, fmt.Sprintf("offset out of range: %d", e.Offset))
}

func (e ErrOffsetOutOfRange) Error() string {
return e.GRPCStatus().Err().Error()
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
dmitri.shuralyov.com/service/change v0.0.0-20190301072032-c25fb47d71b3 // indirect
github.com/Shopify/sarama v1.21.0 // indirect
github.com/coreos/go-systemd v0.0.0-20190212144455-93d5ec2c7f76 // indirect
github.com/davecgh/go-spew v1.1.1
github.com/gliderlabs/ssh v0.1.3 // indirect
github.com/go-logfmt/logfmt v0.4.0 // indirect
github.com/gogo/protobuf v1.2.1
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk
github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20190212144455-93d5ec2c7f76/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
Expand Down
1 change: 1 addition & 0 deletions internal/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func (s *grpcServer) Produce(ctx context.Context, req *api.ProduceRequest) (*api
if err != nil {
return nil, err
}

return &api.ProduceResponse{FirstOffset: offset}, nil
}

Expand Down
17 changes: 12 additions & 5 deletions internal/grpc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ func testConsumeEmpty(t *testing.T, srv *grpc.Server, client api.LogClient) {
if consume != nil {
t.Fatalf("got consume: %v, want: nil", consume)
}
if grpc.Code(err) != grpc.Code(api.ErrOffsetOutOfRange) {
t.Fatalf("got err: %v, want: %v", err, api.ErrOffsetOutOfRange)
got, want := grpc.Code(err), grpc.Code(api.ErrOffsetOutOfRange{}.GRPCStatus().Err())
if got != want {
t.Fatalf("got code: %v, want: %v, err: %v", got, want, err)
}
}

Expand Down Expand Up @@ -98,8 +99,9 @@ func testConsumePastBoundary(t *testing.T, srv *grpc.Server, client api.LogClien
if consume != nil {
t.Fatal("consume not nil")
}
if grpc.Code(err) != grpc.Code(api.ErrOffsetOutOfRange) {
t.Fatalf("got err: %v, want: %v", err, api.ErrOffsetOutOfRange)
got, want := grpc.Code(err), grpc.Code(api.ErrOffsetOutOfRange{}.GRPCStatus().Err())
if got != want {
t.Fatalf("got err: %v, want: %v", got, want)
}
}

Expand All @@ -120,11 +122,16 @@ func testProduceConsumeStream(t *testing.T, srv *grpc.Server, client api.LogClie
stream, err := client.ProduceStream(ctx)
check(t, err)

for _, batch := range batches {
for offset, batch := range batches {
err = stream.Send(&api.ProduceRequest{
RecordBatch: batch,
})
check(t, err)
res, err := stream.Recv()
check(t, err)
if res.FirstOffset != uint64(offset) {
t.Fatalf("got offset: %d, want: %d", res.FirstOffset, offset)
}
}

}
Expand Down
4 changes: 1 addition & 3 deletions internal/log/log.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package log

import (
"fmt"
"sync"

api "github.com/travisjeffery/proglog/api/v1"
Expand Down Expand Up @@ -43,8 +42,7 @@ func (l *Log) ReadBatch(offset uint64) (*api.RecordBatch, error) {
l.init()
if l.activeSegment.nextOffset == 0 ||
l.activeSegment.nextOffset <= offset {
fmt.Println("heyheyhey")
return nil, api.ErrOffsetOutOfRange
return nil, api.ErrOffsetOutOfRange{Offset: offset}
}
entry, err := l.activeSegment.index.readEntry(offset)
if err != nil {
Expand Down
19 changes: 17 additions & 2 deletions internal/log/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import (
"os"
"reflect"
"testing"

api "github.com/travisjeffery/proglog/api/v1"
)

func TestLog(t *testing.T) {
for scenario, fn := range map[string]func(t *testing.T, log *Log){
"append and read a batch succeeds": func(t *testing.T, log *Log) {
append := &log_v1.RecordBatch{
Records: []*log_v1.Record{{
append := &api.RecordBatch{
Records: []*api.Record{{
Value: []byte("hello world"),
}},
}
Expand All @@ -30,6 +32,19 @@ func TestLog(t *testing.T) {
t.Fatalf("got read: %v, want: %v", read, append)
}
},
"offset out of range error": func(t *testing.T, log *Log) {
read, err := log.ReadBatch(0)
if read != nil {
t.Fatalf("expected read to be nil")
}
apiErr, ok := err.(api.ErrOffsetOutOfRange)
if !ok {
t.Fatalf("err type not ErrOffsetOutOfRange")
}
if apiErr.Offset != 0 {
t.Fatalf("got offset: %d, want: %d", apiErr.Offset, 0)
}
},
} {
t.Run(scenario, func(t *testing.T) {
base, err := ioutil.TempDir("", "log-test")
Expand Down

0 comments on commit c553be6

Please sign in to comment.