Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 22 additions & 11 deletions pkg/port/builtin/parent/parent.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func NewDriver(logWriter io.Writer, stateDir string) (port.ParentDriver, error)
socketPath: socketPath,
childReadyPipePath: childReadyPipePath,
ports: make(map[int]*port.Status, 0),
stoppers: make(map[int]func() error, 0),
stoppers: make(map[int]func(context.Context) error, 0),
nextID: 1,
}
return &d, nil
Expand All @@ -54,7 +54,7 @@ type driver struct {
childReadyPipePath string
mu sync.Mutex
ports map[int]*port.Status
stoppers map[int]func() error
stoppers map[int]func(context.Context) error
nextID int
}

Expand Down Expand Up @@ -139,21 +139,27 @@ func (d *driver) AddPort(ctx context.Context, spec port.Spec) (*port.Status, err
if err != nil {
return nil, err
}
// NOTE: routineStopCh is close-only channel. Do not send any data.
// See commit 4803f18fae1e39d200d98f09e445a97ccd6f5526 `Revert "port/builtin: RemovePort() block until conn is closed"`
routineStopCh := make(chan struct{})
routineStop := func() error {
routineStopCh <- struct{}{}
routineStoppedCh := make(chan error)
routineStop := func(ctx context.Context) error {
close(routineStopCh)
select {
case <-routineStopCh:
case <-time.After(5 * time.Second):
return errors.New("stop timeout after 5 seconds")
case stoppedResult, stoppedResultOk := <-routineStoppedCh:
if stoppedResultOk {
return stoppedResult
}
return errors.New("routineStoppedCh was closed without sending data?")
case <-ctx.Done():
return errors.Wrap(err, "timed out while waiting for routineStoppedCh after closing routineStopCh")
}
return nil
}
switch spec.Proto {
case "tcp", "tcp4", "tcp6":
err = tcp.Run(d.socketPath, spec, routineStopCh, d.logWriter)
err = tcp.Run(d.socketPath, spec, routineStopCh, routineStoppedCh, d.logWriter)
case "udp", "udp4", "udp6":
err = udp.Run(d.socketPath, spec, routineStopCh, d.logWriter)
err = udp.Run(d.socketPath, spec, routineStopCh, routineStoppedCh, d.logWriter)
default:
// NOTREACHED
return nil, errors.New("spec was not validated?")
Expand Down Expand Up @@ -194,7 +200,12 @@ func (d *driver) RemovePort(ctx context.Context, id int) error {
if !ok {
return errors.Errorf("unknown id: %d", id)
}
err := stop()
if _, ok := ctx.Deadline(); !ok {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, 5*time.Second)
defer cancel()
}
err := stop(ctx)
delete(d.stoppers, id)
delete(d.ports, id)
return err
Expand Down
6 changes: 3 additions & 3 deletions pkg/port/builtin/parent/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/rootless-containers/rootlesskit/pkg/port/builtin/msg"
)

func Run(socketPath string, spec port.Spec, stopCh chan struct{}, logWriter io.Writer) error {
func Run(socketPath string, spec port.Spec, stopCh <-chan struct{}, stoppedCh chan error, logWriter io.Writer) error {
ln, err := net.Listen(spec.Proto, net.JoinHostPort(spec.ParentIP, strconv.Itoa(spec.ParentPort)))
if err != nil {
fmt.Fprintf(logWriter, "listen: %v\n", err)
Expand All @@ -32,8 +32,8 @@ func Run(socketPath string, spec port.Spec, stopCh chan struct{}, logWriter io.W
}()
go func() {
defer func() {
ln.Close()
close(stopCh)
stoppedCh <- ln.Close()
close(stoppedCh)
}()
for {
select {
Expand Down
5 changes: 3 additions & 2 deletions pkg/port/builtin/parent/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/rootless-containers/rootlesskit/pkg/port/builtin/parent/udp/udpproxy"
)

func Run(socketPath string, spec port.Spec, stopCh chan struct{}, logWriter io.Writer) error {
func Run(socketPath string, spec port.Spec, stopCh <-chan struct{}, stoppedCh chan error, logWriter io.Writer) error {
addr, err := net.ResolveUDPAddr(spec.Proto, net.JoinHostPort(spec.ParentIP, strconv.Itoa(spec.ParentPort)))
if err != nil {
return err
Expand Down Expand Up @@ -51,7 +51,8 @@ func Run(socketPath string, spec port.Spec, stopCh chan struct{}, logWriter io.W
case <-stopCh:
// udpp.Close closes ln as well
udpp.Close()
close(stopCh)
stoppedCh <- nil
close(stoppedCh)
return
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/version/version.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package version

const Version = "0.14.3+dev"
const Version = "0.14.4+dev"