Skip to content

Commit

Permalink
Merge 8a996e8 into 2649921
Browse files Browse the repository at this point in the history
  • Loading branch information
victor-carvalho committed Jul 31, 2020
2 parents 2649921 + 8a996e8 commit 3568601
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 31 deletions.
31 changes: 28 additions & 3 deletions controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6586,6 +6586,7 @@ containers:
It("should return error when it fails to create pod", func() {
pods := []*models.Pod{
&models.Pod{Name: "room-1"},
&models.Pod{Name: "room-2"},
}

scheduler := models.NewScheduler(configYaml1.Name, configYaml1.Game, string(configYaml1.ToYAML()))
Expand All @@ -6596,6 +6597,26 @@ containers:
"description": models.OpManagerRollingUpdate,
}, nil)).AnyTimes()

mockRedisClient.EXPECT().
TxPipeline().
Return(mockPipeline)

mockPipeline.EXPECT().
HMSet(gomock.Any(), gomock.Any()).
Do(func(schedulerName string, statusInfo map[string]interface{}) {
Expect(statusInfo["status"]).To(Equal(models.StatusCreating))
Expect(statusInfo["lastPing"]).To(BeNumerically("~", time.Now().Unix(), 1))
})

mockPipeline.EXPECT().
SAdd(models.GetRoomStatusSetRedisKey(configYaml1.Name, "creating"),
gomock.Any())

mockPipeline.EXPECT().
ZAdd(models.GetRoomPingRedisKey(configYaml1.Name), gomock.Any())

mockPipeline.EXPECT().Exec()

mockRedisClient.EXPECT().
TxPipeline().
Return(mockPipeline)
Expand All @@ -6616,21 +6637,25 @@ containers:

mockPipeline.EXPECT().Exec().Return(nil, errors.New("redis error"))

mt.MockGetPortsFromPoolAnyTimes(&configYaml1, mockRedisClient, mockPortChooser, workerPortRange, portStart, portEnd)
mt.MockPodNotFound(mockRedisClient, configYaml1.Name, gomock.Any()).AnyTimes()

now := time.Now()
timeoutErr, cancelErr, err := controller.SegmentAndReplacePods(
logger,
roomManager,
mr,
clientset,
mockDb,
mockRedisClient,
time.Now().Add(time.Minute),
now.Add(time.Minute),
&configYaml1,
pods,
scheduler,
opManager,
10*time.Second,
10,
1,
100,
2,
)
Expect(timeoutErr).ToNot(HaveOccurred())
Expect(cancelErr).ToNot(HaveOccurred())
Expand Down
72 changes: 44 additions & 28 deletions controller/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,22 +163,24 @@ func replacePodsAndWait(
) (err error) {
logger.Debug("starting to replace pods with new ones")

finishedReplace := make(chan struct{})
errChan := make(chan error)
childCtx, cancel := context.WithCancel(ctx)

errChan := make(chan error, goroutinePoolSize)

pods := make(chan *models.Pod, len(podsChunk))
for _, pod := range podsChunk {
pods <- pod
}
close(pods)

var wg sync.WaitGroup
logger.Infof("starting %d in-memory workers to replace %d pods", goroutinePoolSize, len(podsChunk))
for i := 0; i < goroutinePoolSize; i++ {
wg.Add(1)
go func() {
defer wg.Done()
replacePodWorker(
ctx,
err := replacePodWorker(
childCtx,
logger,
roomManager,
mr,
Expand All @@ -189,23 +191,24 @@ func replacePodsAndWait(
scheduler,
pods,
inRollingUpdate,
finishedReplace,
errChan,
)
// if there's an error in any of the workers cancel the operation
if err != nil {
cancel()
errChan <- err
}
}()
}

wg.Wait()

select {
case err = <-errChan:
logger.Error("operation terminated with error")
case <-ctx.Done():
logger.Debug("operation timedout/canceled")
case <-finishedReplace:
logger.Debug("all pods were successfully replaced")
logger.WithError(err).Debug("error replacing pods")
default:
logger.WithError(err).Trace("replacePodsAndWait finished successfully")
}

wg.Wait()

return err
}

Expand All @@ -221,12 +224,14 @@ func replacePodWorker(
scheduler *models.Scheduler,
pods <-chan *models.Pod,
inRollingUpdate bool,
finishedReplace chan struct{},
errChan chan<- error,
) {
) error {
for {
select {
case pod := <-pods:
case pod, ok := <-pods:
// this case is executed even after pods channel was closed we need this check
if !ok {
return nil
}
canceled, err := createNewRemoveOldPod(
ctx,
logger,
Expand All @@ -242,21 +247,16 @@ func replacePodWorker(
)

if err != nil {
errChan <- err
return
return err
}

if canceled {
return
return nil
}

logger.Infof("pods remaining to replace: %d", len(pods))
if len(pods) == 0 {
finishedReplace <- struct{}{}
return
}
case <-ctx.Done():
return
return nil
}
}
}
Expand Down Expand Up @@ -431,6 +431,14 @@ func waitTerminatingPods(

for {
exit := true

// go select statement chooses "randomly" when two branches are available, sot it can make select ignore
// ctx.Done() for iterations. This check prevents to enter select if the context was already cancelled.
if ctx.Err() != nil {
logger.Warn("operation canceled/timedout waiting for rooms to be removed")
return true
}

select {
case <-ctx.Done():
logger.Warn("operation canceled/timedout waiting for rooms to be removed")
Expand Down Expand Up @@ -498,7 +506,18 @@ func waitCreatingPods(

for {
exit := true

// go select statement chooses "randomly" when two branches are available, sot it can make select ignore
// ctx.Done() for iterations. This check prevents to enter select if the context was already cancelled.
if ctx.Err() != nil {
logger.Warn("operation canceled/timeout waiting for rooms to be created")
return true, nil
}

select {
case <-ctx.Done():
logger.Warn("operation canceled/timeout waiting for rooms to be created")
return true, nil
case <-ticker.C:
for i, pod := range createdPods {
createdPod, err := models.GetPodFromRedis(redisClient, mr, pod.GetName(), namespace)
Expand Down Expand Up @@ -587,9 +606,6 @@ func waitCreatingPods(
break
}
}
case <-ctx.Done():
logger.Warn("operation canceled/timeout waiting for rooms to be created")
return true, nil
}

if exit {
Expand Down

0 comments on commit 3568601

Please sign in to comment.