Skip to content

Commit

Permalink
Register all simulators before starting controllers
Browse files Browse the repository at this point in the history
  • Loading branch information
ibuildthecloud committed Oct 3, 2020
1 parent ba03066 commit 99dd837
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 3 deletions.
4 changes: 3 additions & 1 deletion modules/agent/pkg/agent/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Options struct {
ClusterID string
NoLeaderElect bool
CheckinInterval time.Duration
StartAfter <-chan struct{}
}

func Register(ctx context.Context, kubeConfig, namespace, clusterID string) error {
Expand Down Expand Up @@ -84,7 +85,8 @@ func Start(ctx context.Context, kubeConfig, namespace string, opts *Options) err
clientConfig,
fleetMapper,
mapper,
discovery)
discovery,
opts.StartAfter)
}

var (
Expand Down
8 changes: 7 additions & 1 deletion modules/agent/pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ func Register(ctx context.Context, leaderElect bool,
checkinInterval time.Duration,
fleetConfig *rest.Config, clientConfig clientcmd.ClientConfig,
fleetMapper, mapper meta.RESTMapper,
discovery discovery.CachedDiscoveryInterface) error {
discovery discovery.CachedDiscoveryInterface,
startChan <-chan struct{}) error {
appCtx, err := newContext(fleetNamespace, agentNamespace, clusterNamespace, clusterName,
fleetConfig, clientConfig, fleetMapper, mapper, discovery)
if err != nil {
Expand Down Expand Up @@ -124,6 +125,11 @@ func Register(ctx context.Context, leaderElect bool,
logrus.Fatal(err)
}
})
} else if startChan != nil {
go func() {
<-startChan
logrus.Fatalf("failed to start: %v", appCtx.start(ctx))
}()
} else {
return appCtx.start(ctx)
}
Expand Down
13 changes: 12 additions & 1 deletion modules/agent/pkg/simulator/simulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,21 @@ import (
)

var (
sem = semaphore.NewWeighted(50)
semMax = int64(50)
sem = semaphore.NewWeighted(semMax)
)

func Simulate(ctx context.Context, count int, kubeConfig, namespace, defaultNamespace string, opts agent.Options) error {
logrus.Infof("Starting %d simulators", count)

startAfter := make(chan struct{})
opts.StartAfter = startAfter

eg, ctx := errgroup.WithContext(ctx)
for i := 0; i < count; i++ {
i := i
if err := sem.Acquire(ctx, 1); err != nil {
close(startAfter)
return err
}
logrus.Infof("STARING %s%05d", namespace, i)
Expand All @@ -44,6 +49,12 @@ func Simulate(ctx context.Context, count int, kubeConfig, namespace, defaultName
})
}

if err := sem.Acquire(ctx, semMax); err != nil {
close(startAfter)
return err
}
close(startAfter)

eg.Go(func() error {
// wait forever unless one of the simulators dies
<-ctx.Done()
Expand Down

0 comments on commit 99dd837

Please sign in to comment.