Skip to content

Commit

Permalink
merge upstream grpc.Server changes changing the dispatch logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Michal Witkowski committed Oct 11, 2015
1 parent 77edc97 commit 00dd588
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 14 deletions.
25 changes: 16 additions & 9 deletions proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,18 @@ func (s *Proxy) Serve(lis net.Listener) error {
s.conns[st] = true
s.mu.Unlock()

go func() {
st.HandleStreams(func(stream *transport.Stream) {
var wg sync.WaitGroup
st.HandleStreams(func(stream *transport.Stream) {
wg.Add(1)
go func() {
s.handleStream(st, stream)
})
s.mu.Lock()
delete(s.conns, st)
s.mu.Unlock()
}()
wg.Done()
}()
})
wg.Wait()
s.mu.Lock()
delete(s.conns, st)
s.mu.Unlock()
}
}

Expand Down Expand Up @@ -154,6 +158,7 @@ func (s *Proxy) handleStream(frontTrans transport.ServerTransport, frontStream *
// data coming from backend back to client call
egressPathChan := s.forwardDataFrames(backendStream, frontStream, frontTrans)

// wait for both data streams to complete.
egressErr := <- egressPathChan
ingressErr := <- ingressPathChan
if egressErr != nil || ingressErr != nil {
Expand All @@ -179,8 +184,8 @@ func (s *Proxy) backendTransportStream(ctx context.Context) (transport.ClientTra
return nil, nil, grpc.Errorf(codes.Aborted, "cant dial to backend: %v", err)
}
}
// TODO(michal): PickTransport IS NOT IN UPSTREAM GRPC!
_, backendTrans, err := grpcConn.PickTransport(ctx)
// TODO(michal): ClientConn.Picker() IS NOT IN UPSTREAM GRPC! https://github.com/grpc/grpc-go/pull/397
backendTrans, err := grpcConn.Picker().Pick(ctx)
frontendStream, _ := transport.StreamFromContext(ctx)
callHdr := &transport.CallHdr{
Method: frontendStream.Method(),
Expand All @@ -197,11 +202,13 @@ func (s *Proxy) backendTransportStream(ctx context.Context) (transport.ClientTra
// It returns an error channel. `nil` on it signifies everything was fine, anything else is a serious problem.
func (s *Proxy) forwardDataFrames(srcStream *transport.Stream, dstStream *transport.Stream, dstTransport transportWriter) chan error {
ret := make(chan error)

go func () {
data := make([]byte, 4096)
opt := &transport.Options{}
for {
n, err := srcStream.Read(data)

if err == io.EOF {
ret <- nil
break
Expand Down
8 changes: 3 additions & 5 deletions proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ import (
"time"
"net"
"testing"
"io"

"github.com/mwitkow-io/grpc-reverseproxy"
pb "github.com/mwitkow-io/grpc-reverseproxy/testservice"

"github.com/mwitkow-io/grpc-proxy"
pb "github.com/mwitkow-io/grpc-proxy/testservice"

"github.com/stretchr/testify/suite"
"github.com/stretchr/testify/require"
Expand All @@ -30,8 +31,6 @@ import (
"google.golang.org/grpc/metadata"
"golang.org/x/net/context"
"github.com/stretchr/testify/assert"
"io"
"google.golang.org/grpc/transport"
)


Expand Down Expand Up @@ -171,7 +170,6 @@ func (s *ProxyHappySuite) SetupSuite() {
proxyClientConn, err := grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure())
require.NoError(s.T(), err, "must not error on deferred client Dial")
proxyServer := proxy.NewServer(func(ctx context.Context) (*grpc.ClientConn, error) {
transport.StreamFromContext()
md, ok := metadata.FromContext(ctx)
if ok {
if _, exists := md[rejectingMdKey]; exists {
Expand Down

0 comments on commit 00dd588

Please sign in to comment.