Skip to content

Commit

Permalink
feat: use contexts for the sync service
Browse files Browse the repository at this point in the history
This patch also simplifies several parts of the sync service given that we can now just cancel contexts to, e.g., cancel subscriptions.
  • Loading branch information
Stebalien committed Feb 4, 2020
1 parent c1060ca commit 17a21a4
Show file tree
Hide file tree
Showing 19 changed files with 198 additions and 241 deletions.
1 change: 1 addition & 0 deletions go.sum
Expand Up @@ -179,6 +179,7 @@ github.com/imdario/mergo v0.3.8 h1:CGgOkSJeqMRmt0D9XLWExdT4m4F1vd3FV3VPt+0VxkQ=
github.com/imdario/mergo v0.3.8/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/ipfs/go-cid v0.0.3 h1:UIAh32wymBpStoe83YCzwVQQ5Oy/H0FdxvUS6DJDzms=
github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-ipfs-util v0.0.1 h1:Wz9bL2wB2YBJqggkA4dD7oSmqB4cAnpNbGrlHJulv50=
github.com/ipfs/go-ipfs-util v0.0.1/go.mod h1:spsl5z8KUnrve+73pOhSVZND1SIxPW5RyBCNzQxlJBc=
github.com/ipfs/testground v0.0.0-20200121194104-fd53f27ef027/go.mod h1:gawN4GBDQfpi0HWwh25ZXK/bSxZVoMEjdlpSmjJt2t8=
github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA=
Expand Down
6 changes: 2 additions & 4 deletions pkg/dockermanager/dockermanager.go
Expand Up @@ -109,11 +109,9 @@ func (dm *Manager) Manage(
managers := make(map[string]workerHandle)

defer func() {
// cancel the remaining managers
for _, m := range managers {
m.cancel()
}
// wait for the running managers to exit
// They'll get canceled when we close the main context (deferred
// below).
for _, m := range managers {
<-m.done
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sidecar/docker_instance.go
Expand Up @@ -275,7 +275,7 @@ func (d *DockerInstanceManager) manageContainer(ctx context.Context, container *
}
}
}
return NewInstance(runenv, info.Config.Hostname, network, newDockerLogs(logs))
return NewInstance(ctx, runenv, info.Config.Hostname, network, newDockerLogs(logs))
}

type dockerLink struct {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sidecar/instance.go
Expand Up @@ -43,9 +43,9 @@ type Logs interface {
}

// NewInstance constructs a new test instance handle.
func NewInstance(runenv *runtime.RunEnv, hostname string, network Network, logs Logs) (*Instance, error) {
func NewInstance(ctx context.Context, runenv *runtime.RunEnv, hostname string, network Network, logs Logs) (*Instance, error) {
// Get a redis reader/writer.
watcher, writer, err := sync.WatcherWriter(runenv)
watcher, writer, err := sync.WatcherWriter(ctx, runenv)
if err != nil {
return nil, fmt.Errorf("during sync.WatcherWriter: %w", err)
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/sidecar/k8s_instance.go
Expand Up @@ -85,7 +85,11 @@ func (d *K8sInstanceManager) Close() error {
func (d *K8sInstanceManager) manageContainer(ctx context.Context, container *dockermanager.Container) (inst *Instance, err error) {
// TODO: sidecar is racing to modify container network with CNI and pod getting ready
// we should probably adjust this function to be called when a pod is in `1/1 Ready` state, and not just listen on the docker socket
time.Sleep(20 * time.Second)
select {
case <-time.After(20 * time.Second):
case <-ctx.Done():
return nil, ctx.Err()
}

// Get the state/config of the cluster
info, err := container.Inspect(ctx)
Expand Down Expand Up @@ -245,7 +249,7 @@ func (d *K8sInstanceManager) manageContainer(ctx context.Context, container *doc
}
}

return NewInstance(runenv, info.Config.Hostname, network, newDockerLogs(logs))
return NewInstance(ctx, runenv, info.Config.Hostname, network, newDockerLogs(logs))
}

type k8sLink struct {
Expand Down
12 changes: 3 additions & 9 deletions pkg/sidecar/sidecar.go
Expand Up @@ -77,7 +77,7 @@ func Run(runnerName string, resultPath string) error {

// Wait for all the sidecars to enter the "network-initialized" state.
const netInitState = "network-initialized"
if _, err = instance.Writer.SignalEntry(netInitState); err != nil {
if _, err = instance.Writer.SignalEntry(ctx, netInitState); err != nil {
return fmt.Errorf("failed to signal network ready: %w", err)
}
instance.S().Infof("waiting for all networks to be ready")
Expand All @@ -93,22 +93,16 @@ func Run(runnerName string, resultPath string) error {
// Now let the test case tell us how to configure the network.
subtree := sync.NetworkSubtree(instance.Hostname)
networkChanges := make(chan *sync.NetworkConfig, 16)
closeSub, err := instance.Watcher.Subscribe(subtree, networkChanges)
if err != nil {
if err := instance.Watcher.Subscribe(ctx, subtree, networkChanges); err != nil {
return fmt.Errorf("failed to subscribe to network changes: %s", err)
}
defer func() {
if err := closeSub(); err != nil {
instance.S().Warnf("failed to close sub: %s", err)
}
}()
for cfg := range networkChanges {
instance.S().Infow("applying network change", "network", cfg)
if err := instance.Network.ConfigureNetwork(ctx, cfg); err != nil {
return fmt.Errorf("failed to update network %s: %w", cfg.Network, err)
}
if cfg.State != "" {
_, err := instance.Writer.SignalEntry(cfg.State)
_, err := instance.Writer.SignalEntry(ctx, cfg.State)
if err != nil {
return fmt.Errorf(
"failed to signal network state change %s: %w",
Expand Down
29 changes: 15 additions & 14 deletions plans/bitswap-tuning/test/transfer.go
Expand Up @@ -39,7 +39,7 @@ func Transfer(runenv *runtime.RunEnv) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

watcher, writer := sync.MustWatcherWriter(runenv)
watcher, writer := sync.MustWatcherWriter(ctx, runenv)

/// --- Tear down
defer func() {
Expand All @@ -64,7 +64,7 @@ func Transfer(runenv *runtime.RunEnv) error {
defer node.Close()

// Get sequence number of this host
seq, err := writer.Write(sync.PeerSubtree, host.InfoFromHost(node.Host))
seq, err := writer.Write(ctx, sync.PeerSubtree, host.InfoFromHost(node.Host))
if err != nil {
return err
}
Expand Down Expand Up @@ -93,37 +93,38 @@ func Transfer(runenv *runtime.RunEnv) error {
}

// Inform other nodes of the root CID
if _, err = writer.Write(RootCidSubtree, &rootCid); err != nil {
if _, err = writer.Write(ctx, RootCidSubtree, &rootCid); err != nil {
return fmt.Errorf("Failed to get Redis Sync RootCidSubtree %w", err)
}
} else if isLeech {
// Get the root CID from a seed
rootCidCh := make(chan *cid.Cid, 1)
cancelRootCidSub, err := watcher.Subscribe(RootCidSubtree, rootCidCh)
if err != nil {
sctx, cancelRootCidSub := context.WithCancel(ctx)
if err := watcher.Subscribe(sctx, RootCidSubtree, rootCidCh); err != nil {
return fmt.Errorf("Failed to subscribe to RootCidSubtree %w", err)
}

// Note: only need to get the root CID from one seed - it should be the
// same on all seeds (seed data is generated from repeatable random
// sequence)
select {
case rootCidPtr := <-rootCidCh:
cancelRootCidSub()
rootCid = *rootCidPtr
case <-time.After(timeout):
cancelRootCidSub()
rootCidPtr, ok := <-rootCidCh
cancelRootCidSub()
if !ok {
return fmt.Errorf("no root cid in %d seconds", timeout/time.Second)
}
rootCid = *rootCidPtr
}

// Get addresses of all peers
peerCh := make(chan *peer.AddrInfo)
cancelSub, err := watcher.Subscribe(sync.PeerSubtree, peerCh)
addrInfos, err := utils.AddrInfosFromChan(peerCh, runenv.TestInstanceCount, timeout)
sctx, cancelSub := context.WithCancel(ctx)
if err := watcher.Subscribe(sctx, sync.PeerSubtree, peerCh); err != nil {
return err
}
addrInfos, err := utils.AddrInfosFromChan(peerCh, runenv.TestInstanceCount)
if err != nil {
cancelSub()
return err
return fmt.Errorf("no addrs in %d seconds", timeout/time.Second)
}
cancelSub()

Expand Down
5 changes: 4 additions & 1 deletion plans/bitswap-tuning/utils/net.go
Expand Up @@ -30,7 +30,7 @@ func SetupNetwork(ctx context.Context, runenv *runtime.RunEnv, watcher *sync.Wat

latency := time.Duration(runenv.IntParam("latency_ms")) * time.Millisecond
bandwidth := runenv.IntParam("bandwidth_mb")
writer.Write(sync.NetworkSubtree(hostname), &sync.NetworkConfig{
_, err = writer.Write(ctx, sync.NetworkSubtree(hostname), &sync.NetworkConfig{
Network: "default",
Enable: true,
Default: sync.LinkShape{
Expand All @@ -39,6 +39,9 @@ func SetupNetwork(ctx context.Context, runenv *runtime.RunEnv, watcher *sync.Wat
},
State: "network-configured",
})
if err != nil {
return err
}

err = <-watcher.Barrier(ctx, "network-configured", int64(runenv.TestInstanceCount))
if err != nil {
Expand Down
13 changes: 5 additions & 8 deletions plans/bitswap-tuning/utils/peers.go
Expand Up @@ -4,22 +4,19 @@ import (
"bytes"
"context"
"fmt"
"time"

"github.com/libp2p/go-libp2p-core/peer"
host "github.com/libp2p/go-libp2p-host"
)

func AddrInfosFromChan(peerCh chan *peer.AddrInfo, count int, timeout time.Duration) ([]peer.AddrInfo, error) {
func AddrInfosFromChan(peerCh chan *peer.AddrInfo, count int) ([]peer.AddrInfo, error) {
var ais []peer.AddrInfo
for i := 1; i <= count; i++ {
select {
case ai := <-peerCh:
ais = append(ais, *ai)

case <-time.After(timeout):
return nil, fmt.Errorf("no new peers in %d seconds", timeout/time.Second)
ai, ok := <-peerCh
if !ok {
return ais, fmt.Errorf("subscription closed")
}
ais = append(ais, *ai)
}
return ais, nil
}
Expand Down
8 changes: 2 additions & 6 deletions plans/bitswap-tuning/utils/state.go
Expand Up @@ -12,15 +12,11 @@ func SignalAndWaitForAll(ctx context.Context, instanceCount int, stateName strin
doneCh := watcher.Barrier(ctx, state, int64(instanceCount))

// Signal we've entered the state.
_, err := writer.SignalEntry(state)
_, err := writer.SignalEntry(ctx, state)
if err != nil {
return err
}

// Wait until all others have signalled.
if err = <-doneCh; err != nil {
return err
}

return nil
return <-doneCh
}
42 changes: 21 additions & 21 deletions plans/dht/test/common.go
Expand Up @@ -126,7 +126,7 @@ func SetupNetwork(ctx context.Context, runenv *runtime.RunEnv, watcher *sync.Wat
return err
}

writer.Write(sync.NetworkSubtree(hostname), &sync.NetworkConfig{
writer.Write(ctx, sync.NetworkSubtree(hostname), &sync.NetworkConfig{
Network: "default",
Enable: true,
Default: sync.LinkShape{
Expand Down Expand Up @@ -168,13 +168,13 @@ func Setup(ctx context.Context, runenv *runtime.RunEnv, watcher *sync.Watcher, w
id := node.ID()
runenv.Message("I am %s with addrs: %v", id, node.Addrs())

if seq, err = writer.Write(sync.PeerSubtree, host.InfoFromHost(node)); err != nil {
if seq, err = writer.Write(ctx, sync.PeerSubtree, host.InfoFromHost(node)); err != nil {
return nil, nil, nil, seq, fmt.Errorf("failed to write peer subtree in sync service: %w", err)
}

peerCh := make(chan *peer.AddrInfo, 16)
cancelSub, err := watcher.Subscribe(sync.PeerSubtree, peerCh)
if err != nil {
sctx, cancelSub := context.WithCancel(ctx)
if err := watcher.Subscribe(sctx, sync.PeerSubtree, peerCh); err != nil {
return nil, nil, nil, seq, err
}
defer cancelSub()
Expand All @@ -183,15 +183,14 @@ func Setup(ctx context.Context, runenv *runtime.RunEnv, watcher *sync.Watcher, w
peers := make([]peer.AddrInfo, 0, runenv.TestInstanceCount)
// Grab list of other peers that are available for this run.
for i := 0; i < runenv.TestInstanceCount; i++ {
select {
case ai := <-peerCh:
if ai.ID == id {
continue
}
peers = append(peers, *ai)
case <-ctx.Done():
ai, ok := <-peerCh
if !ok {
return nil, nil, nil, seq, fmt.Errorf("no new peers in %d seconds", opts.Timeout/time.Second)
}
if ai.ID == id {
continue
}
peers = append(peers, *ai)
}

sort.Slice(peers, func(i, j int) bool {
Expand Down Expand Up @@ -240,7 +239,7 @@ func Bootstrap(ctx context.Context, runenv *runtime.RunEnv, watcher *sync.Watche
}
}()
// Announce ourself as a bootstrap node.
if _, err := writer.Write(BootstrapSubtree, host.InfoFromHost(dht.Host())); err != nil {
if _, err := writer.Write(ctx, BootstrapSubtree, host.InfoFromHost(dht.Host())); err != nil {
return err
}
// NOTE: If we start restricting the network, don't restrict
Expand Down Expand Up @@ -389,23 +388,24 @@ func Bootstrap(ctx context.Context, runenv *runtime.RunEnv, watcher *sync.Watche

// get all bootstrap peers.
func getBootstrappers(ctx context.Context, runenv *runtime.RunEnv, watcher *sync.Watcher, opts *SetupOpts) ([]peer.AddrInfo, error) {
// cancel the sub
ctx, cancel := context.WithCancel(ctx)
defer cancel()

peerCh := make(chan *peer.AddrInfo, opts.NBootstrap)
cancelSub, err := watcher.Subscribe(BootstrapSubtree, peerCh)
if err != nil {
if err := watcher.Subscribe(ctx, BootstrapSubtree, peerCh); err != nil {
return nil, err
}
defer cancelSub()

// TODO: remove this if it becomes too much coordination effort.
peers := make([]peer.AddrInfo, opts.NBootstrap)
// Grab list of other peers that are available for this run.
for i := 0; i < opts.NBootstrap; i++ {
select {
case ai := <-peerCh:
peers[i] = *ai
case <-ctx.Done():
return nil, fmt.Errorf("timed out waiting for bootstrappers")
ai, ok := <-peerCh
if !ok {
return peers, fmt.Errorf("timed out waiting for bootstrappers")
}
peers[i] = *ai
}
runenv.Message("got all bootstrappers: %d", len(peers))
return peers, nil
Expand Down Expand Up @@ -468,7 +468,7 @@ func Sync(
doneCh := watcher.Barrier(ctx, state, int64(runenv.TestInstanceCount))

// Signal we're in the same state.
_, err := writer.SignalEntry(state)
_, err := writer.SignalEntry(ctx, state)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion plans/dht/test/find_peers.go
Expand Up @@ -26,7 +26,7 @@ func FindPeers(runenv *runtime.RunEnv) error {
ctx, cancel := context.WithTimeout(context.Background(), opts.Timeout)
defer cancel()

watcher, writer := sync.MustWatcherWriter(runenv)
watcher, writer := sync.MustWatcherWriter(ctx, runenv)
defer watcher.Close()
defer writer.Close()

Expand Down
2 changes: 1 addition & 1 deletion plans/dht/test/find_providers.go
Expand Up @@ -29,7 +29,7 @@ func FindProviders(runenv *runtime.RunEnv) error {
ctx, cancel := context.WithTimeout(context.Background(), opts.Timeout)
defer cancel()

watcher, writer := sync.MustWatcherWriter(runenv)
watcher, writer := sync.MustWatcherWriter(ctx, runenv)
defer watcher.Close()
defer writer.Close()

Expand Down

0 comments on commit 17a21a4

Please sign in to comment.