Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Placement mutator #354

Merged
merged 3 commits into from Jan 23, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 0 additions & 22 deletions controller/handlers/error.go
Expand Up @@ -38,35 +38,13 @@ var (
ErrMsgDeletedColumn = "Bad request: column is already deleted"
// ErrMsgNotImplemented represents error message for method not implemented.
ErrMsgNotImplemented = "Not implemented"
// ErrMsgFailedToBuildInitialPlacement represents error message for initial placement failure
ErrMsgFailedToBuildInitialPlacement = "failed build initial placement"
// ErrMsgFailedToGetPlacementService represents error message for failure in getting placement service
ErrMsgFailedToGetPlacementService = "failed to get placement service"
// ErrMsgFailedToGetCurrentPlacement represents error message for failure in getting current placement
ErrMsgFailedToGetCurrentPlacement = "failed to get current placement"
// ErrMsgFailedToAddInstance represents error message for failure in adding instance
ErrMsgFailedToAddInstance = "failed to add instance to placement"
// ErrMsgFailedToReplaceInstance represents error message for failure in replacing instance
ErrMsgFailedToReplaceInstance = "failed to replace instance to placement"
// ErrMsgFailedToMarkAvailable represents error message for failure in marking instance or shard available
ErrMsgFailedToMarkAvailable = "failed to mark instance/shards available"
// ErrMsgFailedToMarshalPlacement represents error message for failure to marshal placement
ErrMsgFailedToMarshalPlacement = "failed to marshal placement"
// ErrMsgFailedToRemoveInstance represents error message for failure to remove instance from a placement
ErrMsgFailedToRemoveInstance = "failed to remove instance from placement"

// ErrRemoveOneInstance represents error message for not removing one instance at a time
ErrRemoveOneInstance = utils.APIError{
Code: http.StatusBadRequest,
Message: "expected to remove one instance at a time",
}

// ErrInvalidNumShards represents invalid number of shards
ErrInvalidNumShards = utils.APIError{
Code: http.StatusBadRequest,
Message: "invalid number of shards, should be exponential of 2",
}

// ErrMissingParameter represents error for missing parameter
ErrMissingParameter = utils.APIError{
Code: http.StatusBadRequest,
Expand Down
169 changes: 26 additions & 143 deletions controller/handlers/placement.go
Expand Up @@ -15,34 +15,25 @@
package handlers

import (
"fmt"
"net/http"

"github.com/gorilla/mux"
"github.com/m3db/m3/src/cluster/generated/proto/placementpb"
"github.com/m3db/m3/src/cluster/placement"
"github.com/m3db/m3/src/cluster/services"
"github.com/m3db/m3/src/x/instrument"
"github.com/uber-go/tally"
apiCom "github.com/uber/aresdb/api/common"
"github.com/uber/aresdb/cluster/kvstore"
mutatorCom "github.com/uber/aresdb/controller/mutators/common"
"github.com/uber/aresdb/utils"
"go.uber.org/zap"
)

// PlacementHandler handles placement requests
type PlacementHandler struct {
client *kvstore.EtcdClient
logger *zap.SugaredLogger
scope tally.Scope
placementMutator mutatorCom.PlacementMutator
}

// NewPlacementHandler creates placement handler
func NewPlacementHandler(logger *zap.SugaredLogger, scope tally.Scope, client *kvstore.EtcdClient) PlacementHandler {
func NewPlacementHandler(mutator mutatorCom.PlacementMutator) PlacementHandler {
return PlacementHandler{
client: client,
logger: logger,
scope: scope,
placementMutator: mutator,
}
}

Expand All @@ -57,13 +48,6 @@ func (h PlacementHandler) Register(router *mux.Router, wrappers ...utils.HTTPHan
router.HandleFunc("/{namespace}/datanode/instances/{instance}/available", utils.ApplyHTTPWrappers(h.MarkInstanceAvailable, wrappers...)).Methods(http.MethodPost)
}

func (h *PlacementHandler) getServiceID(namespace string) services.ServiceID {
return services.NewServiceID().
SetEnvironment(h.client.Environment).
SetZone(h.client.Zone).
SetName(utils.DataNodeServiceName(namespace))
}

func newInstancesFromProto(instancepbs []placementpb.Instance) ([]placement.Instance, error) {
instances := make([]placement.Instance, 0, len(instancepbs))
for _, instancepb := range instancepbs {
Expand All @@ -76,104 +60,47 @@ func newInstancesFromProto(instancepbs []placementpb.Instance) ([]placement.Inst
return instances, nil
}

func (h *PlacementHandler) placementOptions() placement.Options {
return placement.NewOptions().
SetInstrumentOptions(instrument.NewOptions().
SetLogger(h.logger.Desugar()).SetMetricsScope(h.scope)).
SetValidZone(h.client.Zone).
// if we specify more than one new instance, we want to add them all
SetAddAllCandidates(true).
// for now we want to make sure replacement does not affect existing instances not being replaced
SetAllowPartialReplace(false)
}

func validateAllAvailable(p placement.Placement) error {
for _, instance := range p.Instances() {
if !instance.IsAvailable() {
return utils.APIError{Code: http.StatusBadRequest, Message: fmt.Sprintf("instance %s is not available", instance.ID())}
}
}
return nil
}

func validateInitPlacementReq(req InitPlacementRequest) error {
// check number of shards
numShards := req.Body.NumShards
if (numShards & (^(-numShards))) != 0 {
return ErrInvalidNumShards
}
return nil
}

// Init initialize new placement
func (h *PlacementHandler) Init(rw *utils.ResponseWriter, r *http.Request) {
if h.client == nil {
rw.WriteError(ErrEtcdNotAvailable)
return
}
var req InitPlacementRequest
err := apiCom.ReadRequest(r, &req, rw.SetRequest)
if err != nil {
rw.WriteErrorWithCode(http.StatusBadRequest, err)
return
}

if err := validateInitPlacementReq(req); err != nil {
rw.WriteError(err)
return
}

newInstances, err := newInstancesFromProto(req.Body.NewInstances)
if err != nil {
rw.WriteErrorWithCode(http.StatusBadRequest, err)
return
}

placementSvc, err := h.client.Services.PlacementService(h.getServiceID(req.Namespace), h.placementOptions())
plm, err := h.placementMutator.BuildInitialPlacement(req.Namespace, req.Body.NumShards, req.Body.NumReplica, newInstances)
if err != nil {
rw.WriteError(utils.StackError(err, ErrMsgFailedToGetPlacementService))
return
}

p, err := placementSvc.BuildInitialPlacement(newInstances, req.Body.NumShards, req.Body.NumReplica)
if err != nil {
rw.WriteError(utils.StackError(err, ErrMsgFailedToBuildInitialPlacement))
rw.WriteError(err)
return
}
respondWithPlacement(p, rw)
respondWithPlacement(plm, rw)
}

// Get get the current placement
func (h *PlacementHandler) Get(rw *utils.ResponseWriter, r *http.Request) {
if h.client == nil {
rw.WriteError(ErrEtcdNotAvailable)
return
}
var req NamespaceRequest
err := apiCom.ReadRequest(r, &req, rw.SetRequest)
if err != nil {
rw.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
placementSvc, err := h.client.Services.PlacementService(h.getServiceID(req.Namespace), h.placementOptions())
plm, err := h.placementMutator.GetCurrentPlacement(req.Namespace)
if err != nil {
rw.WriteError(utils.StackError(err, ErrMsgFailedToGetPlacementService))
return
}
p, err := placementSvc.Placement()
if err != nil {
rw.WriteError(utils.StackError(err, ErrMsgFailedToGetCurrentPlacement))
rw.WriteError(err)
return
}
respondWithPlacement(p, rw)
respondWithPlacement(plm, rw)
}

// Add adds new instances
func (h *PlacementHandler) Add(rw *utils.ResponseWriter, r *http.Request) {
if h.client == nil {
rw.WriteError(ErrEtcdNotAvailable)
return
}
var req AddInstancesRequest
err := apiCom.ReadRequest(r, &req, rw.SetRequest)
if err != nil {
Expand All @@ -188,27 +115,16 @@ func (h *PlacementHandler) Add(rw *utils.ResponseWriter, r *http.Request) {
}

// validate all shards available in placement before adding instance
placementSvc, err := h.client.Services.PlacementService(h.getServiceID(req.Namespace),
h.placementOptions().SetValidateFnBeforeUpdate(validateAllAvailable))
if err != nil {
rw.WriteError(utils.StackError(err, ErrMsgFailedToGetPlacementService))
return
}

p, _, err := placementSvc.AddInstances(instances)
plm, err := h.placementMutator.AddInstance(req.Namespace, instances)
if err != nil {
rw.WriteError(utils.StackError(err, ErrMsgFailedToAddInstance))
rw.WriteError(err)
return
}
respondWithPlacement(p, rw)
respondWithPlacement(plm, rw)
}

// Replace replace existing instances within placement with new instances
func (h *PlacementHandler) Replace(rw *utils.ResponseWriter, r *http.Request) {
if h.client == nil {
rw.WriteError(ErrEtcdNotAvailable)
return
}
var req ReplaceInstanceRequest
err := apiCom.ReadRequest(r, &req, rw.SetRequest)
if err != nil {
Expand All @@ -223,28 +139,16 @@ func (h *PlacementHandler) Replace(rw *utils.ResponseWriter, r *http.Request) {
}

// validate all shards are available before replace instance
placementSvc, err := h.client.Services.PlacementService(h.getServiceID(req.Namespace),
h.placementOptions().SetValidateFnBeforeUpdate(validateAllAvailable))
plm, err := h.placementMutator.ReplaceInstance(req.Namespace, req.Body.LeavingInstances, newInstances)
if err != nil {
rw.WriteError(utils.StackError(err, ErrMsgFailedToGetPlacementService))
return
}

p, _, err := placementSvc.ReplaceInstances(req.Body.LeavingInstances, newInstances)
if err != nil {
rw.WriteError(utils.StackError(err, ErrMsgFailedToReplaceInstance))
rw.WriteError(err)
return
}

respondWithPlacement(p, rw)
respondWithPlacement(plm, rw)
}

// Remove remove instance from placement
func (h *PlacementHandler) Remove(rw *utils.ResponseWriter, r *http.Request) {
if h.client == nil {
rw.WriteError(ErrEtcdNotAvailable)
return
}
var req RemoveInstanceRequest
err := apiCom.ReadRequest(r, &req, rw.SetRequest)
if err != nil {
Expand All @@ -258,19 +162,12 @@ func (h *PlacementHandler) Remove(rw *utils.ResponseWriter, r *http.Request) {
}

// validate all shards are available before replace instance
placementSvc, err := h.client.Services.PlacementService(h.getServiceID(req.Namespace),
h.placementOptions().SetValidateFnBeforeUpdate(validateAllAvailable))
if err != nil {
rw.WriteError(utils.StackError(err, ErrMsgFailedToGetPlacementService))
return
}

p, err := placementSvc.RemoveInstances(req.Body.LeavingInstances)
plm, err := h.placementMutator.RemoveInstance(req.Namespace, req.Body.LeavingInstances)
if err != nil {
rw.WriteError(utils.StackError(err, ErrMsgFailedToRemoveInstance))
rw.WriteError(err)
return
}
respondWithPlacement(p, rw)
respondWithPlacement(plm, rw)
}

// MarkNamespaceAvailable marks all instance/shards in placement as available
Expand All @@ -282,19 +179,12 @@ func (h *PlacementHandler) MarkNamespaceAvailable(rw *utils.ResponseWriter, r *h
return
}

placementSvc, err := h.client.Services.PlacementService(h.getServiceID(req.Namespace), h.placementOptions())
plm, err := h.placementMutator.MarkNamespaceAvailable(req.Namespace)
if err != nil {
rw.WriteError(utils.StackError(err, ErrMsgFailedToGetPlacementService))
return
}

p, err := placementSvc.MarkAllShardsAvailable()
if err != nil {
rw.WriteError(utils.StackError(err, ErrMsgFailedToMarkAvailable))
rw.WriteError(err)
return
}

respondWithPlacement(p, rw)
respondWithPlacement(plm, rw)
}

// MarkInstanceAvailable marks one instance as available
Expand All @@ -306,21 +196,14 @@ func (h *PlacementHandler) MarkInstanceAvailable(rw *utils.ResponseWriter, r *ht
return
}

placementSvc, err := h.client.Services.PlacementService(h.getServiceID(req.Namespace), h.placementOptions())
if err != nil {
rw.WriteError(utils.StackError(err, ErrMsgFailedToGetPlacementService))
return
}

var p placement.Placement
if req.Body.AllShards {
p, err = placementSvc.MarkInstanceAvailable(req.Instance)
p, err = h.placementMutator.MarkInstanceAvailable(req.Namespace, req.Instance)
} else {
p, err = placementSvc.MarkShardsAvailable(req.Instance, req.Body.Shards...)
p, err = h.placementMutator.MarkShardsAvailable(req.Namespace, req.Instance, req.Body.Shards)
}

if err != nil {
rw.WriteError(utils.StackError(err, ErrMsgFailedToMarkAvailable))
rw.WriteError(err)
return
}
respondWithPlacement(p, rw)
Expand All @@ -329,7 +212,7 @@ func (h *PlacementHandler) MarkInstanceAvailable(rw *utils.ResponseWriter, r *ht
func respondWithPlacement(p placement.Placement, rw *utils.ResponseWriter) {
pb, err := p.Proto()
if err != nil {
rw.WriteError(utils.StackError(err, ErrMsgFailedToMarshalPlacement))
rw.WriteError(utils.StackError(err, "failed to marshal placement"))
return
}
rw.WriteObject(pb)
Expand Down
8 changes: 2 additions & 6 deletions controller/handlers/placement_test.go
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/m3db/m3/src/cluster/kv"
"github.com/m3db/m3/src/cluster/kv/mem"
"github.com/m3db/m3/src/cluster/services"
mutators "github.com/uber/aresdb/controller/mutators/etcd"
"net/http"
"net/http/httptest"
"testing"
Expand All @@ -30,9 +31,7 @@ import (
"github.com/m3db/m3/src/cluster/placement"
"github.com/m3db/m3/src/cluster/shard"
"github.com/stretchr/testify/assert"
"github.com/uber-go/tally"
"github.com/uber/aresdb/cluster/kvstore"
"go.uber.org/zap"
)

// m3ClientMock mocks m3client
Expand Down Expand Up @@ -62,9 +61,6 @@ func (c *m3ClientMock) TxnStore(opts kv.OverrideOptions) (kv.TxnStore, error) {
}

func TestPlacementHandler(t *testing.T) {
logger, _ := zap.NewDevelopment()
sugaredLogger := logger.Sugar()

t.Run("Should work for placement handler", func(t *testing.T) {
txnStore := mem.NewStore()
clusterServices, err := services.NewServices(
Expand Down Expand Up @@ -115,7 +111,7 @@ func TestPlacementHandler(t *testing.T) {
]
}`))

placementHandler := NewPlacementHandler(sugaredLogger, tally.NoopScope, &client)
placementHandler := NewPlacementHandler(mutators.NewPlacementMutator(&client))
testRouter := mux.NewRouter()
placementHandler.Register(testRouter)
testServer := httptest.NewUnstartedServer(testRouter)
Expand Down