Skip to content

Commit

Permalink
Merge pull request #4 from smira/one2many-1
Browse files Browse the repository at this point in the history
Proxying one to many: first iteration
  • Loading branch information
smira committed Nov 21, 2019
2 parents a0988ff + 992a975 commit 1f0cb46
Show file tree
Hide file tree
Showing 9 changed files with 1,250 additions and 23 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ go 1.13

require (
github.com/golang/protobuf v1.3.2
github.com/hashicorp/go-multierror v1.0.0
github.com/stretchr/testify v1.4.0
golang.org/x/net v0.0.0-20191116160921-f9c825593386
golang.org/x/net v0.0.0-20191116160921-f9c825593386 // indirect
google.golang.org/grpc v1.25.1
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y
github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
Expand Down
3 changes: 2 additions & 1 deletion proxy/handler.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright 2017 Michal Witkowski. All Rights Reserved.
// Copyright 2019 Andrey Smirnov. All Rights Reserved.
// See LICENSE for licensing terms.

package proxy
Expand Down Expand Up @@ -111,7 +112,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error
}

if len(backendConnections) != 1 {
return status.Error(codes.Unimplemented, "proxying to multiple backends not implemented yet")
return s.handlerMulti(serverStream, backendConnections)
}

// case of proxying one to one:
Expand Down
276 changes: 276 additions & 0 deletions proxy/handler_multi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
// Copyright 2019 Andrey Smirnov. All Rights Reserved.
// See LICENSE for licensing terms.

package proxy

import (
"errors"
"fmt"
"io"

"github.com/hashicorp/go-multierror"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func (s *handler) handlerMulti(serverStream grpc.ServerStream, backendConnections []backendConnection) error {
// wrap the stream for safe concurrent access
serverStream = &ServerStreamWrapper{ServerStream: serverStream}

s2cErrChan := s.forwardServerToClientsMulti(serverStream, backendConnections)
var c2sErrChan chan error

if true { // TODO: if unary
c2sErrChan = s.forwardClientsToServerMultiUnary(backendConnections, serverStream)
} else {
c2sErrChan = s.forwardClientsToServerMultiStreaming(backendConnections, serverStream)
}

for i := 0; i < 2; i++ {
select {
case s2cErr := <-s2cErrChan:
if s2cErr == io.EOF {
// this is the happy case where the sender has encountered io.EOF, and won't be sending anymore./
// the clientStream>serverStream may continue pumping though.
for i := range backendConnections {
if backendConnections[i].clientStream != nil {
backendConnections[i].clientStream.CloseSend() //nolint: errcheck
}
}
break
} else {
// however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need
// to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and
// exit with an error to the stack
return status.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr)
}
case c2sErr := <-c2sErrChan:
// c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error.
if c2sErr != io.EOF {
return c2sErr
}
return nil
}
}
return status.Errorf(codes.Internal, "gRPC proxying should never reach this stage.")
}

// formatError tries to format error from upstream as message to the client
func (s *handler) formatError(src *backendConnection, backendErr error) ([]byte, error) {
payload, err := src.backend.BuildError(backendErr)
if err != nil {
return nil, fmt.Errorf("error building error for %s: %w", src.backend, err)
}

if payload == nil {
err = backendErr
}

return payload, err
}

// sendError tries to deliver error back to the client via dst
//
// if sendError fails to deliver the error, error is returned
// if sendError successfully delivers the error, nil is returned
func (s *handler) sendError(src *backendConnection, dst grpc.ServerStream, backendErr error) error {
payload, err := s.formatError(src, backendErr)
if err != nil {
return err
}

f := &frame{payload: payload}
if err = dst.SendMsg(f); err != nil {
return fmt.Errorf("error sending error back: %w", err)
}

return nil
}

// one:many proxying, unary call version (merging results)
func (s *handler) forwardClientsToServerMultiUnary(sources []backendConnection, dst grpc.ServerStream) chan error {
ret := make(chan error, 1)

payloadCh := make(chan []byte, len(sources))
errCh := make(chan error, len(sources))

for i := 0; i < len(sources); i++ {
go func(src *backendConnection) {
errCh <- func() error {
f := &frame{}
for j := 0; ; j++ {
if err := src.clientStream.RecvMsg(f); err != nil {
if err == io.EOF {
// This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two
// cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers
// will be nil.
dst.SetTrailer(src.clientStream.Trailer())
return nil
}

payload, err := s.formatError(src, err)
if err != nil {
return err
}

payloadCh <- payload
return nil
}
if j == 0 {
// This is a bit of a hack, but client to server headers are only readable after first client msg is
// received but must be written to server stream before the first msg is flushed.
// This is the only place to do it nicely.
md, err := src.clientStream.Header()
if err != nil {
payload, err := s.formatError(src, err)
if err != nil {
return err
}

payloadCh <- payload
return nil
}

if err := dst.SetHeader(md); err != nil {
return fmt.Errorf("error setting headers from client %s: %w", src.backend, err)
}
}

var err error
f.payload, err = src.backend.AppendInfo(f.payload)
if err != nil {
return fmt.Errorf("error appending info for %s: %w", src.backend, err)
}

payloadCh <- f.payload
}
}()
}(&sources[i])
}

go func() {
var multiErr *multierror.Error

for range sources {
multiErr = multierror.Append(multiErr, <-errCh)
}

if multiErr.ErrorOrNil() != nil {
ret <- multiErr.ErrorOrNil()
return
}

close(payloadCh)

var merged []byte
for b := range payloadCh {
merged = append(merged, b...)
}

ret <- dst.SendMsg(&frame{payload: merged})
}()

return ret
}

// one:many proxying, streaming version (no merge)
func (s *handler) forwardClientsToServerMultiStreaming(sources []backendConnection, dst grpc.ServerStream) chan error {
ret := make(chan error, 1)

errCh := make(chan error, len(sources))

for i := range sources {
go func(src *backendConnection) {
errCh <- func() error {
if src.connError != nil {
return s.sendError(src, dst, src.connError)
}

f := &frame{}
for j := 0; ; j++ {
if err := src.clientStream.RecvMsg(f); err != nil {
if err == io.EOF {
// This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two
// cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers
// will be nil.
dst.SetTrailer(src.clientStream.Trailer())
return nil
}
return s.sendError(src, dst, fmt.Errorf("error reading from client stream %s: %w", src.backend, err))
}
if j == 0 {
// This is a bit of a hack, but client to server headers are only readable after first client msg is
// received but must be written to server stream before the first msg is flushed.
// This is the only place to do it nicely.
md, err := src.clientStream.Header()
if err != nil {
return s.sendError(src, dst, fmt.Errorf("error getting headers from client stream %s: %w", src.backend, err))
}
if err := dst.SetHeader(md); err != nil {
return fmt.Errorf("error setting headers from client %s: %w", src.backend, err)
}
}

var err error
f.payload, err = src.backend.AppendInfo(f.payload)
if err != nil {
return fmt.Errorf("error appending info for %s: %w", src.backend, err)
}

if err = dst.SendMsg(f); err != nil {
return fmt.Errorf("error sending back to server from %s: %w", src.backend, err)
}
}

}()
}(&sources[i])
}

go func() {
var multiErr *multierror.Error

for range sources {
multiErr = multierror.Append(multiErr, <-errCh)
}

ret <- multiErr.ErrorOrNil()
}()

return ret
}

func (s *handler) forwardServerToClientsMulti(src grpc.ServerStream, destinations []backendConnection) chan error {
ret := make(chan error, 1)
go func() {
f := &frame{}
for {
if err := src.RecvMsg(f); err != nil {
ret <- err
return
}

liveDestinations := 0
for i := range destinations {
if destinations[i].clientStream == nil || destinations[i].connError != nil {
continue
}

if err := destinations[i].clientStream.SendMsg(f); err != nil {
// TODO: race with reading connError (?)
// skip this or keep using?
destinations[i].connError = err
} else {
liveDestinations++
}
}

if liveDestinations == 0 {
ret <- errors.New("no backend connections to forward to are available")
return
}
}
}()
return ret
}

0 comments on commit 1f0cb46

Please sign in to comment.