Skip to content

Commit

Permalink
[#57] Fix and deprecate PlainObjectCodec (#58)
Browse files Browse the repository at this point in the history
This change fixes a bug that causes PlainObjectCodec to
lose additional messages from stream. json.Decoder has
an internal buffer that reads more than one message, but
is discarded after every use. Now PlainObjectCodec reuses
encoder and decoder within a buffered stream, however
using it directly in your code retains the old, incorrect
behaviour.

A user should now use plainObjectStream if he needs
plain JSON-RPC 2.0 stream without headers.
`NewPlainObjectStream` method has been added for this reason.
  • Loading branch information
mnowotnik committed Jul 11, 2022
1 parent c9c77b6 commit a896fc3
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 36 deletions.
89 changes: 56 additions & 33 deletions jsonrpc2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,44 +112,67 @@ func (h *testHandlerB) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jso
h.t.Errorf("testHandlerB got unexpected request %+v", req)
}

func TestClientServer(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
ctx := context.Background()
done := make(chan struct{})
type streamMaker func(conn io.ReadWriteCloser) jsonrpc2.ObjectStream

lis, err := net.Listen("tcp", "127.0.0.1:0") // any available address
if err != nil {
t.Fatal("Listen:", err)
func testClientServerForCodec(t *testing.T, streamMaker streamMaker) {
ctx := context.Background()
done := make(chan struct{})

lis, err := net.Listen("tcp", "127.0.0.1:0") // any available address
if err != nil {
t.Fatal("Listen:", err)
}
defer func() {
if lis == nil {
return // already closed
}
defer func() {
if lis == nil {
return // already closed
}
if err = lis.Close(); err != nil {
if !strings.HasSuffix(err.Error(), "use of closed network connection") {
t.Fatal(err)
}
if err = lis.Close(); err != nil {
if !strings.HasSuffix(err.Error(), "use of closed network connection") {
t.Fatal(err)
}
}()
}
}()

ha := testHandlerA{t: t}
go func() {
if err = serve(ctx, lis, &ha); err != nil {
if !strings.HasSuffix(err.Error(), "use of closed network connection") {
t.Error(err)
}
ha := testHandlerA{t: t}
go func() {
if err = serve(ctx, lis, &ha, streamMaker); err != nil {
if !strings.HasSuffix(err.Error(), "use of closed network connection") {
t.Error(err)
}
close(done)
}()

conn, err := net.Dial("tcp", lis.Addr().String())
if err != nil {
t.Fatal("Dial:", err)
}
testClientServer(ctx, t, jsonrpc2.NewBufferedStream(conn, jsonrpc2.VarintObjectCodec{}))
close(done)
}()

conn, err := net.Dial("tcp", lis.Addr().String())
if err != nil {
t.Fatal("Dial:", err)
}
testClientServer(ctx, t, streamMaker(conn))

lis.Close()
<-done // ensure Serve's error return (if any) is caught by this test
}

lis.Close()
<-done // ensure Serve's error return (if any) is caught by this test
func TestClientServer(t *testing.T) {
t.Run("tcp-varint-object-codec", func(t *testing.T) {
testClientServerForCodec(t, func(conn io.ReadWriteCloser) jsonrpc2.ObjectStream {
return jsonrpc2.NewBufferedStream(conn, jsonrpc2.VarintObjectCodec{})
})
})
t.Run("tcp-vscode-object-codec", func(t *testing.T) {
testClientServerForCodec(t, func(conn io.ReadWriteCloser) jsonrpc2.ObjectStream {
return jsonrpc2.NewBufferedStream(conn, jsonrpc2.VSCodeObjectCodec{})
})
})
t.Run("tcp-plain-object-codec", func(t *testing.T) {
testClientServerForCodec(t, func(conn io.ReadWriteCloser) jsonrpc2.ObjectStream {
return jsonrpc2.NewBufferedStream(conn, jsonrpc2.PlainObjectCodec{})
})
})
t.Run("tcp-plain-object-stream", func(t *testing.T) {
testClientServerForCodec(t, func(conn io.ReadWriteCloser) jsonrpc2.ObjectStream {
return jsonrpc2.NewPlainObjectStream(conn)
})
})
t.Run("websocket", func(t *testing.T) {
ctx := context.Background()
Expand Down Expand Up @@ -367,12 +390,12 @@ func TestConn_Close_waitingForResponse(t *testing.T) {
<-done
}

func serve(ctx context.Context, lis net.Listener, h jsonrpc2.Handler, opts ...jsonrpc2.ConnOpt) error {
func serve(ctx context.Context, lis net.Listener, h jsonrpc2.Handler, streamMaker streamMaker, opts ...jsonrpc2.ConnOpt) error {
for {
conn, err := lis.Accept()
if err != nil {
return err
}
jsonrpc2.NewConn(ctx, jsonrpc2.NewBufferedStream(conn, jsonrpc2.VarintObjectCodec{}), h, opts...)
jsonrpc2.NewConn(ctx, streamMaker(conn), h, opts...)
}
}
55 changes: 52 additions & 3 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ type bufferedObjectStream struct {
// objectStream is used to produce the bytes to write to the stream
// for the JSON-RPC 2.0 objects.
func NewBufferedStream(conn io.ReadWriteCloser, codec ObjectCodec) ObjectStream {
switch v := codec.(type) {
case PlainObjectCodec:
v.decoder = json.NewDecoder(conn)
v.encoder = json.NewEncoder(conn)
codec = v
}
return &bufferedObjectStream{
conn: conn,
w: bufio.NewWriter(conn),
Expand Down Expand Up @@ -164,14 +170,57 @@ func (VSCodeObjectCodec) ReadObject(stream *bufio.Reader, v interface{}) error {
}

// PlainObjectCodec reads/writes plain JSON-RPC 2.0 objects without a header.
type PlainObjectCodec struct{}
//
// Deprecated: use NewPlainObjectStream
type PlainObjectCodec struct {
decoder *json.Decoder
encoder *json.Encoder
}

// WriteObject implements ObjectCodec.
func (PlainObjectCodec) WriteObject(stream io.Writer, v interface{}) error {
func (c PlainObjectCodec) WriteObject(stream io.Writer, v interface{}) error {
if c.encoder != nil {
return c.encoder.Encode(v)
}
return json.NewEncoder(stream).Encode(v)
}

// ReadObject implements ObjectCodec.
func (PlainObjectCodec) ReadObject(stream *bufio.Reader, v interface{}) error {
func (c PlainObjectCodec) ReadObject(stream *bufio.Reader, v interface{}) error {
if c.decoder != nil {
return c.decoder.Decode(v)
}
return json.NewDecoder(stream).Decode(v)
}

// plainObjectStream reads/writes plain JSON-RPC 2.0 objects without a header.
type plainObjectStream struct {
conn io.Closer
decoder *json.Decoder
encoder *json.Encoder
}

// NewPlainObjectStream creates a buffered stream from a network
// connection (or other similar interface). The underlying
// objectStream produces plain JSON-RPC 2.0 objects without a header.
func NewPlainObjectStream(conn io.ReadWriteCloser) ObjectStream {
return &plainObjectStream{
conn: conn,
encoder: json.NewEncoder(conn),
decoder: json.NewDecoder(conn),
}
}

func (os *plainObjectStream) ReadObject(v interface{}) error {
return os.decoder.Decode(v)
}

// WriteObject serializes a value to JSON and writes it to a stream.
// Not thread-safe, a user must synchronize writes in a multithreaded environment.
func (os *plainObjectStream) WriteObject(v interface{}) error {
return os.encoder.Encode(v)
}

func (os *plainObjectStream) Close() error {
return os.conn.Close()
}

0 comments on commit a896fc3

Please sign in to comment.