Skip to content

Commit

Permalink
Training persistence WIP (#268)
Browse files Browse the repository at this point in the history
  • Loading branch information
vlad-tokarev committed Jul 29, 2020
1 parent d1cf8ef commit f4cf3ec
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 33 deletions.
15 changes: 11 additions & 4 deletions packages/operator/cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,24 @@ var mainCmd = &cobra.Command{
log.Error(err, "unable set up api server")
os.Exit(1)
}
apiServer.Run()
errCh := make(chan error, 3)
apiServer.Run(errCh)

quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit

log.Info("Shutdown Api Server ...")
select {
case <-quit:
log.Info("SIGINT was received")
case err := <-errCh:
log.Error(err, "Error during execution one of components")
}

log.Info("Shutdown Process ...")
ctx, cancel := context.WithTimeout(context.Background(), cfg.Common.GracefulTimeout)
defer cancel()
if err := apiServer.Close(ctx); err != nil {
log.Error(err, "Server shutdowns")
log.Error(err, "Unable to shutdown gracefully")
os.Exit(1)
}
},
Expand Down
108 changes: 81 additions & 27 deletions packages/operator/pkg/webserver/runners/v1/training/model_training.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@ import (
"errors"
"fmt"
odahuflowv1alpha1 "github.com/odahu/odahu-flow/packages/operator/api/v1alpha1"
"github.com/odahu/odahu-flow/packages/operator/pkg/apis/training"
odahu_errs "github.com/odahu/odahu-flow/packages/operator/pkg/errors"
mt_repository "github.com/odahu/odahu-flow/packages/operator/pkg/repository/training"
ctrl "sigs.k8s.io/controller-runtime"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sync"
"time"
odahu_errs "github.com/odahu/odahu-flow/packages/operator/pkg/errors"
)

const (
penaltyCleanPeriod = time.Second * 30
)

var (
Expand Down Expand Up @@ -78,49 +83,88 @@ func (r *Runner) Reconcile(request ctrl.Request) (ctrl.Result, error) {
func (r *Runner) String() string {
return r.name
}

// Check new items in storage and launch them in compute service
func (r *Runner) Launch(_ context.Context) error {

// Strategy: Create new items asap
// Create not finished but not new items sometimes (rare)
// Not create totally finished trainings

// TODO: Exclude finished trainings
trains, err := r.storage.GetModelTrainingList()
log.Info(fmt.Sprintf("%v new trains found", len(trains)))
// TODO: Ensure that user cannot update already launched training
func (r *Runner) Launch(_ context.Context, excluded []string) ([]string, error) {

// We will try to launch ALL but not finished trainings
//
// Because system may be in inconsistent state between service and DB
// We are unable guarantee that training is really launching now w/o checking this
//
// Such check that not finished training is really running now is achieved by attempt to
// launch it again. If we will get AlreadyExists it means that training is running on the service
//
// But to avoid infinite and frequent attempts to launch already running trainings
// we will penalize them to increase delay for such cases

// Fetch all not finished trainings
notFinished, err := r.storage.GetModelTrainingList()
if err != nil {
log.Error(err, "Error while fetch training list from DB")
return err
return excluded, err
}

wg := sync.WaitGroup{}
wg.Add(len(trains))
// Remove trainings that are probably already running (penalized trainings)
trains := make([]training.ModelTraining, 0)
for _, nft := range notFinished {
elemExcluded := false
for _, et := range excluded {
if nft.ID == et {
elemExcluded = true
}
}
if !elemExcluded {
trains = append(trains, nft)
}
}

var errs []error
if (len(trains)) > 0 {
log.Info(fmt.Sprintf("%v not launched trains", len(trains)))
}

// Run in parallel each item in service
// Launch trainings in parallel.
newPenalties := make(chan string, len(trains))
errCh := make(chan error)
for _, mt := range trains {
mt := mt
go func() {
defer wg.Done()
err := r.service.CreateModelTraining(&mt)
if err != nil {
log.Error(err, "Error while launch training %v", mt)
errs = append(errs, err)
if odahu_errs.IsAlreadyExistError(err) {
log.Info(fmt.Sprintf("%s training is already launched", mt.ID))
newPenalties <- mt.ID
errCh <- nil
} else {
log.Error(err, fmt.Sprintf("Error while launch training %s", mt.ID))
errCh <- err
}
} else {
log.Info(fmt.Sprintf("Launched: %v", mt))
errCh <- nil
}
}()
}

// Wait total completion
wg.Wait()
// Gather errors or nils
var resErr error
for i := 0; i < len(trains); i ++ {
cerr := <-errCh
if cerr != nil {
resErr = errors.New("one or more errors occurred while launch new trainings")
}
}

if len(errs) > 0 {
return errors.New("one or more errors occurred while launch new trainings")
// Gather ID of trainings that are already launched
// We penalize them to avoid attempts to launch on each iteration
close(newPenalties)
for p := range newPenalties {
excluded = append(excluded, p)
}
return nil

if resErr != nil {
return excluded, errors.New("one or more errors occurred while launch new trainings")
}
return excluded, nil
}

func (r *Runner) Run(ctx context.Context) (err error) {
Expand All @@ -141,12 +185,22 @@ func (r *Runner) Run(ctx context.Context) (err error) {
// We are launching new or hung trainings every `launchPeriod` seconds
t := time.NewTicker(r.launchPeriod)
defer t.Stop()
penaltyTicker := time.NewTicker(penaltyCleanPeriod)
defer penaltyTicker.Stop()

excluded := make([]string, 0)
for {
select {
case <-t.C:
err := r.Launch(ctx)
log.Error(err, "Error while launching new items")
excluded, err = r.Launch(ctx, excluded)
if err != nil {
log.Error(err, "Error while launching new items")
}
continue
case <-penaltyTicker.C:
// clean penalized trainings TODO: we should make this tick individual for each training
excluded = excluded[:0]
log.Info("Penalized trainings were cleaned")
continue
case <-ctx.Done():
log.Info(fmt.Sprintf("Cancellation signal was received in %v", r.String()))
Expand Down
13 changes: 11 additions & 2 deletions packages/operator/pkg/webserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,23 +98,32 @@ func NewAPIServer(config *config.Config) (*Server, error) {

// @license.name Apache 2.0
// @license.url http://www.apache.org/licenses/LICENSE-2.0.html
// TODO: What if one of goroutines return error during normal execution (w/o shutdown)
func (s *Server) Run() {
func (s *Server) Run(errorCh chan<- error) {

go func() {
if cerr := s.webServer.ListenAndServe(); cerr != nil && cerr != http.ErrServerClosed {
log.Error(cerr, "error in web webServer")
errorCh <- cerr
} else{
errorCh <- nil
}
}()

go func() {
if cerr := s.runManager.Run(); cerr != nil {
log.Error(cerr, "error in runners manager")
errorCh <- cerr
} else {
errorCh <- nil
}
}()

go func() {
if cerr := s.kubeManager.Start(s.kubeMgrStop); cerr != nil {
log.Error(cerr, "error in kubernetes manager")
errorCh <- cerr
} else {
errorCh <- nil
}
close(s.kubeMgrStopped) // We need a way to notify Shutdown function about completed stop
}()
Expand Down

0 comments on commit f4cf3ec

Please sign in to comment.