Skip to content

Commit

Permalink
zmq4: first stab at a Proxy impl
Browse files Browse the repository at this point in the history
  • Loading branch information
sbinet committed Jan 22, 2020
1 parent 7a88be9 commit 04ef8a0
Show file tree
Hide file tree
Showing 6 changed files with 530 additions and 39 deletions.
144 changes: 144 additions & 0 deletions proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright 2020 The go-zeromq Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package zmq4

import (
"context"
"log"

"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
)

type Proxy struct {
ctx context.Context // life-line of proxy
grp *errgroup.Group
cmds chan proxyCmd
}

type proxyCmd byte

const (
proxyStats proxyCmd = iota
proxyPause
proxyResume
proxyKill
)

// NewProxy creates a new proxy value.
// It proxies messages received on the frontend to the backend (and vice versa)
// If capture is not nil, messages proxied are also sent on that socket.
func NewProxy(ctx context.Context, front, back, capture Socket) *Proxy {
grp, ctx := errgroup.WithContext(ctx)
proxy := Proxy{
ctx: ctx,
grp: grp,
cmds: make(chan proxyCmd),
}
proxy.run(front, back, capture)
return &proxy
}

func (p *Proxy) Pause() { p.cmds <- proxyPause }
func (p *Proxy) Stats() { p.cmds <- proxyStats }
func (p *Proxy) Resume() { p.cmds <- proxyResume }
func (p *Proxy) Kill() { p.cmds <- proxyKill }

// Run runs the proxy loop.
func (p *Proxy) Run() error {
return p.grp.Wait()
}

func (p *Proxy) run(front, back, capture Socket) {
canRecv := func(sck Socket) bool {
switch sck.Type() {
case Push:
return false
default:
return true
}
}

canSend := func(sck Socket) bool {
switch sck.Type() {
case Pull:
return false
default:
return true
}
}

type Pipe struct {
name string
dst Socket
src Socket
}

var (
quit = make(chan struct{})
pipes = []Pipe{
{
name: "backend",
dst: back,
src: front,
},
{
name: "frontend",
dst: front,
src: back,
},
}
)

for i := range pipes {
pipe := pipes[i]
if pipe.src == nil || !canRecv(pipe.src) {
continue
}
p.grp.Go(func() error {
canSend := canSend(pipe.dst)
for {
msg, err := pipe.src.Recv()
select {
case <-p.ctx.Done():
return p.ctx.Err()
case <-quit:
return nil
default:
if canSend {
err = pipe.dst.Send(msg)
if err != nil {
log.Printf("could not forward to %s: %+v", pipe.name, err)
continue
}
}
if err == nil && capture != nil && len(msg.Frames) != 0 {
_ = capture.Send(msg)
}
}
}
})
}

p.grp.Go(func() error {
for {
select {
case <-p.ctx.Done():
return p.ctx.Err()
case cmd := <-p.cmds:
switch cmd {
case proxyPause, proxyResume, proxyStats:
// TODO
case proxyKill:
close(quit)
return nil
default:
// API error. panic.
panic(xerrors.Errorf("invalid control socket command: %v", cmd))
}
}
}
})
}
Loading

0 comments on commit 04ef8a0

Please sign in to comment.