Skip to content

Commit

Permalink
scale by a number of replicas
Browse files Browse the repository at this point in the history
  • Loading branch information
henrod committed Aug 17, 2017
1 parent 20d731b commit 58cb16d
Show file tree
Hide file tree
Showing 7 changed files with 392 additions and 170 deletions.
3 changes: 2 additions & 1 deletion api/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,13 @@ func (a *App) getRouter() *mux.Router {
r.HandleFunc("/scheduler/{schedulerName}", Chain(
NewSchedulerScaleHandler(a),
NewLoggingMiddleware(a),
NewAccessMiddleware(a),
NewBasicAuthMiddleware(a),
NewMetricsReporterMiddleware(a),
NewSentryMiddleware(),
NewNewRelicMiddleware(a),
NewVersionMiddleware(),
NewParamMiddleware(func() interface{} { return &models.SchedulerParams{} }),
NewValidationMiddleware(func() interface{} { return &models.SchedulerScaleParams{} }),
).ServeHTTP).Methods("POST").Name("schedulerScale")

r.HandleFunc("/scheduler/{schedulerName}/image", Chain(
Expand Down
20 changes: 14 additions & 6 deletions api/app_int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,10 +478,14 @@ var _ = Describe("App", func() {
Expect(err).NotTo(HaveOccurred())
Expect(len(pods.Items)).To(Equal(configYaml.AutoScaling.Min))

urlScale := fmt.Sprintf("http://%s/scheduler/%s?scaleup=1", app.Address, configYaml.Name)
request, err = http.NewRequest("POST", urlScale, nil)
bodyJson := map[string]interface{}{"scaleup": 1}
bts, _ := json.Marshal(bodyJson)
reader := strings.NewReader(string(bts))

urlScale := fmt.Sprintf("http://%s/scheduler/%s", app.Address, configYaml.Name)
request, err = http.NewRequest("POST", urlScale, reader)
Expect(err).NotTo(HaveOccurred())
request.Header.Add("Authorization", "Bearer token")
request.SetBasicAuth("user", "pass")

recorder = httptest.NewRecorder()
app.Router.ServeHTTP(recorder, request)
Expand Down Expand Up @@ -515,10 +519,14 @@ var _ = Describe("App", func() {
tx.SAdd(models.GetRoomStatusSetRedisKey(configYaml.Name, models.StatusReady), pods.Items[0].GetName())
tx.Exec()

urlScale := fmt.Sprintf("http://%s/scheduler/%s?scaledown=1", app.Address, configYaml.Name)
request, err = http.NewRequest("POST", urlScale, nil)
bodyJson := map[string]interface{}{"scaledown": 1}
bts, _ := json.Marshal(bodyJson)
reader := strings.NewReader(string(bts))

urlScale := fmt.Sprintf("http://%s/scheduler/%s", app.Address, configYaml.Name)
request, err = http.NewRequest("POST", urlScale, reader)
Expect(err).NotTo(HaveOccurred())
request.Header.Add("Authorization", "Bearer token")
request.SetBasicAuth("user", "pass")

recorder = httptest.NewRecorder()
app.Router.ServeHTTP(recorder, request)
Expand Down
138 changes: 27 additions & 111 deletions api/scheduler_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"errors"
"fmt"
"net/http"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -369,125 +368,42 @@ func (g *SchedulerScaleHandler) ServeHTTP(w http.ResponseWriter, r *http.Request
l := loggerFromContext(r.Context())
mr := metricsReporterFromCtx(r.Context())
params := schedulerParamsFromContext(r.Context())
scaleParams := schedulerScaleParamsFromCtx(r.Context())
if scaleParams == nil {
g.App.HandleError(w, http.StatusBadRequest, "ValidationFailedError", errors.New("empty body sent"))
return
}

amountUpStr := r.URL.Query().Get("scaleup")
amountDownStr := r.URL.Query().Get("scaledown")
logger := l.WithFields(logrus.Fields{
"source": "schedulerScaleHandler",
"operation": "scale",
})
logger.Infof("scaling scheduler '%s'", params.SchedulerName)

var amountUp, amountDown int
var err error

if amountUpStr == "" && amountDownStr == "" {
logger.Error("invalid scale parameter: scaleup and scaledown are empty")
g.App.HandleError(
w,
http.StatusUnprocessableEntity,
"invalid scale parameter",
errors.New("scaleup and scaledown are empty"),
)
return
} else if amountUpStr != "" && amountDownStr != "" {
logger.Error("invalid scale parameter: can't scale up and down")
g.App.HandleError(
w,
http.StatusUnprocessableEntity,
"invalid scale parameter",
errors.New("cannot scale up and down at once"),
)
return
} else if amountUpStr != "" {
amountUp, err = strconv.Atoi(amountUpStr)
if err != nil {
logger.WithError(err).Errorf("invalid scale up parameter: %s", amountUpStr)
g.App.HandleError(
w,
http.StatusBadRequest,
"invalid scale up parameter",
err,
)
return
} else if amountUp <= 0 {
logger.WithError(err).Errorf("invalid scale up parameter: %s", amountUpStr)
g.App.HandleError(
w,
http.StatusUnprocessableEntity,
"invalid scale up parameter",
errors.New("scaleup amount should be greater than zero"),
)
return
}
} else {
amountDown, err = strconv.Atoi(amountDownStr)
if err != nil {
logger.WithError(err).Errorf("invalid scale down parameter: %s", amountDownStr)
g.App.HandleError(
w,
http.StatusBadRequest,
"invalid scale down parameter",
err,
)
return
} else if amountDown <= 0 {
logger.WithError(err).Errorf("invalid scale down parameter: %s", amountDownStr)
g.App.HandleError(
w,
http.StatusUnprocessableEntity,
"invalid scale down parameter",
errors.New("scaledown amount should be greater than zero"),
)
return
}
}

scheduler := models.NewScheduler(params.SchedulerName, "", "")
err = scheduler.Load(g.App.DB)
if err != nil {
logger.WithError(err).Errorf("error loading scheduler %s from db", params.SchedulerName)
g.App.HandleError(w, http.StatusInternalServerError, "scheduler scale failed", err)
return
} else if scheduler.YAML == "" {
logger.WithError(err).Errorf("scheduler %s not found", params.SchedulerName)
g.App.HandleError(
w, http.StatusNotFound, "scheduler scale failed",
fmt.Errorf("scheduler %s not found", params.SchedulerName),
)
return
}
err := controller.ScaleScheduler(
logger,
mr,
g.App.DB,
g.App.RedisClient,
g.App.KubernetesClient,
g.App.Config.GetInt("scaleUpTimeoutSeconds"), g.App.Config.GetInt("scaleDownTimeoutSeconds"),
scaleParams.ScaleUp, scaleParams.ScaleDown, scaleParams.Replicas,
params.SchedulerName,
)

if amountUpStr != "" {
logger.Infof("manually scaling up scheduler %s in %d GRUs", params.SchedulerName, amountUp)
timeoutSec := g.App.Config.GetInt("scaleUpTimeoutSeconds")
err = controller.ScaleUp(
l,
mr,
g.App.DB,
g.App.RedisClient,
g.App.KubernetesClient,
scheduler,
amountUp,
timeoutSec,
false,
)
} else if amountDownStr != "" {
logger.Infof("manually scaling down scheduler %s in %d GRUs", params.SchedulerName, amountDown)
timeoutSec := g.App.Config.GetInt("scaleDownTimeoutSeconds")
err = controller.ScaleDown(
l,
mr,
g.App.DB,
g.App.RedisClient,
g.App.KubernetesClient,
scheduler,
amountDown,
timeoutSec,
)
}
if err != nil {
logger.WithError(err).Error("scheduler scale failed")
g.App.HandleError(w, http.StatusInternalServerError, "status scheduler failed", err)
status := http.StatusInternalServerError

if strings.Contains(err.Error(), "not found") {
status = http.StatusNotFound
} else if strings.Contains(err.Error(), "empty") {
status = http.StatusUnprocessableEntity
} else if strings.Contains(err.Error(), "invalid") {
status = http.StatusBadRequest
}

g.App.HandleError(w, status, "scale scheduler failed", err)
return
}
mr.WithSegment(models.SegmentSerialization, func() error {
Expand Down
Loading

0 comments on commit 58cb16d

Please sign in to comment.