Skip to content

Commit

Permalink
Enable disk config for replica
Browse files Browse the repository at this point in the history
add replica scheduler support
  • Loading branch information
JacieChao committed Jun 13, 2018
1 parent 55726ed commit 1b228a4
Show file tree
Hide file tree
Showing 30 changed files with 1,112 additions and 11 deletions.
85 changes: 85 additions & 0 deletions api/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,26 @@ type EngineUpgradeInput struct {
Image string `json:"image"`
}

type Node struct {
client.Resource
Name string `json:"name"`
AllowScheduling bool `json:"allowScheduling"`
Disks map[string]types.Disk `json:"disks"`
}

type DiskInput struct {
Path string `json:"path"`
MaximumStorage int64 `json:"maximumStorage"`
}

type DiskRemoveInput struct {
Path string `json:"path"`
}

type SchedulingInput struct {
AllowScheduling bool `json:"allowScheduling"`
}

func NewSchema() *client.Schemas {
schemas := &client.Schemas{}

Expand All @@ -137,17 +157,54 @@ func NewSchema() *client.Schemas {
schemas.AddType("engineUpgradeInput", EngineUpgradeInput{})
schemas.AddType("replica", Replica{})
schemas.AddType("controller", Controller{})
schemas.AddType("diskInput", DiskInput{})
schemas.AddType("diskRemoveInput", DiskRemoveInput{})
schemas.AddType("schedulingInput", SchedulingInput{})

hostSchema(schemas.AddType("host", Host{}))
volumeSchema(schemas.AddType("volume", Volume{}))
backupVolumeSchema(schemas.AddType("backupVolume", BackupVolume{}))
settingSchema(schemas.AddType("setting", Setting{}))
recurringSchema(schemas.AddType("recurringInput", RecurringInput{}))
engineImageSchema(schemas.AddType("engineImage", EngineImage{}))
nodeSchema(schemas.AddType("node", Node{}))

return schemas
}

func nodeSchema(node *client.Schema) {
node.CollectionMethods = []string{"GET", "POST"}
node.ResourceMethods = []string{"GET"}

node.ResourceActions = map[string]client.Action{
"mountPathUpdate": {
Input: "diskInput",
Output: "node",
},
"mountPathRemove": {
Input: "diskRemoveInput",
Output: "node",
},

"schedulingSet": {
Input: "schedulingInput",
Output: "node",
},
}

disks := node.ResourceFields["disks"]
disks.Update = true
disks.Required = true
disks.Unique = false
node.ResourceFields["disks"] = disks

allowScheduling := node.ResourceFields["allowScheduling"]
allowScheduling.Update = true
allowScheduling.Required = true
allowScheduling.Unique = false
node.ResourceFields["allowScheduling"] = allowScheduling
}

func engineImageSchema(engineImage *client.Schema) {
engineImage.CollectionMethods = []string{"GET", "POST"}
engineImage.ResourceMethods = []string{"GET", "DELETE"}
Expand Down Expand Up @@ -560,3 +617,31 @@ func NewServer(m *manager.VolumeManager) *Server {
}
return s
}

func toNodeResource(node *longhorn.Node, apiContext *api.ApiContext) *Node {
n := &Node{
Resource: client.Resource{
Id: node.Name,
Type: "node",
Actions: map[string]string{},
Links: map[string]string{},
},
Name: node.Name,
AllowScheduling: node.Spec.AllowScheduling,
Disks: node.Spec.Disks,
}
n.Actions = map[string]string{
"mountPathUpdate": apiContext.UrlBuilder.ActionLink(n.Resource, "mountPathUpdate"),
"mountPathRemove": apiContext.UrlBuilder.ActionLink(n.Resource, "mountPathRemove"),
"schedulingSet": apiContext.UrlBuilder.ActionLink(n.Resource, "schedulingSet"),
}
return n
}

func toNodeCollection(nodeList []*longhorn.Node, apiContext *api.ApiContext) *client.GenericCollection {
data := []interface{}{}
for _, node := range nodeList {
data = append(data, toNodeResource(node, apiContext))
}
return &client.GenericCollection{Data: data}
}
98 changes: 98 additions & 0 deletions api/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/gorilla/mux"
"github.com/pkg/errors"
"github.com/rancher/go-rancher/api"
"github.com/rancher/longhorn-manager/types"
)

func (s *Server) NodeList(rw http.ResponseWriter, req *http.Request) error {
Expand Down Expand Up @@ -34,3 +35,100 @@ func (s *Server) NodeGet(rw http.ResponseWriter, req *http.Request) error {
apiContext.Write(toHostResource(id, ip))
return nil
}

func (s *Server) MountNodeList(rw http.ResponseWriter, req *http.Request) error {
apiContext := api.GetApiContext(req)
nodeList, err := s.m.GetManagerNode()
if err != nil {
return errors.Wrap(err, "fail to list nodes")
}
apiContext.Write(toNodeCollection(nodeList, apiContext))
return nil
}

func (s *Server) MountNodeGet(rw http.ResponseWriter, req *http.Request) error {
apiContext := api.GetApiContext(req)
id := mux.Vars(req)["id"]

node, err := s.m.GetNode(id)
if err != nil {
return errors.Wrap(err, "fail to get node")
}
apiContext.Write(toNodeResource(node, apiContext))
return nil
}

func (s *Server) NodeScheduleSet(rw http.ResponseWriter, req *http.Request) error {
var input SchedulingInput
apiContext := api.GetApiContext(req)
if err := apiContext.Read(&input); err != nil {
return err
}

id := mux.Vars(req)["id"]
ni, err := s.m.GetNode(id)
if err != nil {
return errors.Wrap(err, "fail to get node")
}
ni.Spec.AllowScheduling = input.AllowScheduling
n, err := s.m.UpdateNode(ni)

apiContext.Write(toNodeResource(n, apiContext))
return nil
}

func (s *Server) MountPathDelete(rw http.ResponseWriter, req *http.Request) (err error) {
defer func() {
err = errors.Wrap(err, "fail to delete mount path")
}()
var input DiskRemoveInput
apiContext := api.GetApiContext(req)
if err := apiContext.Read(&input); err != nil {
return err
}

id := mux.Vars(req)["id"]
node, err := s.m.GetNode(id)
if err != nil {
return err
}
delete(node.Spec.Disks, input.Path)
n, err := s.m.UpdateNode(node)
if err != nil {
return err
}
apiContext.Write(toNodeResource(n, apiContext))
return nil
}

func (s *Server) MountPathUpdate(rw http.ResponseWriter, req *http.Request) error {
var input DiskInput

apiContext := api.GetApiContext(req)
if err := apiContext.Read(&input); err != nil {
return err
}

id := mux.Vars(req)["id"]
node, err := s.m.GetNode(id)
if err != nil {
return errors.Wrap(err, "fail to get node")
}

// TODO user only can update directory on node.
// Need to suppport multiple disk setting later.
disks := map[string]types.Disk{}
mountDisk := types.Disk{
Path: input.Path,
MaximumStorage: input.MaximumStorage,
}
disks[input.Path] = mountDisk
node.Spec.Disks = disks
n, err := s.m.UpdateNode(node)
if err != nil {
return err
}
apiContext.Write(toNodeResource(n, apiContext))

return nil
}
11 changes: 11 additions & 0 deletions api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,17 @@ func NewRouter(s *Server) *mux.Router {
r.Methods("GET").Path("/v1/hosts").Handler(f(schemas, s.NodeList))
r.Methods("GET").Path("/v1/hosts/{id}").Handler(f(schemas, s.NodeGet))

r.Methods("GET").Path("/v1/nodes").Handler(f(schemas, s.MountNodeList))
r.Methods("GET").Path("/v1/nodes/{id}").Handler(f(schemas, s.MountNodeGet))
nodeActions := map[string]func(http.ResponseWriter, *http.Request) error{
"mountPathUpdate": s.MountPathUpdate,
"mountPathRemove": s.MountPathDelete,
"schedulingSet": s.NodeScheduleSet,
}
for name, action := range nodeActions {
r.Methods("POST").Path("/v1/nodes/{id}").Queries("action", name).Handler(f(schemas, action))
}

r.Methods("GET").Path("/v1/engineimages").Handler(f(schemas, s.EngineImageList))
r.Methods("GET").Path("/v1/engineimages/{name}").Handler(f(schemas, s.EngineImageGet))
r.Methods("DELETE").Path("/v1/engineimages/{name}").Handler(f(schemas, s.EngineImageDelete))
Expand Down
3 changes: 2 additions & 1 deletion controller/controller_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,15 @@ func StartControllers(stopCh chan struct{}, controllerID, serviceAccount, manage
engineInformer := lhInformerFactory.Longhorn().V1alpha1().Engines()
volumeInformer := lhInformerFactory.Longhorn().V1alpha1().Volumes()
engineImageInformer := lhInformerFactory.Longhorn().V1alpha1().EngineImages()
nodeInformer := lhInformerFactory.Longhorn().V1alpha1().Nodes()

podInformer := kubeInformerFactory.Core().V1().Pods()
jobInformer := kubeInformerFactory.Batch().V1().Jobs()
cronJobInformer := kubeInformerFactory.Batch().V1beta1().CronJobs()
daemonSetInformer := kubeInformerFactory.Apps().V1beta2().DaemonSets()

ds := datastore.NewDataStore(volumeInformer, engineInformer, replicaInformer, engineImageInformer, lhClient,
podInformer, cronJobInformer, daemonSetInformer, kubeClient, namespace)
podInformer, cronJobInformer, daemonSetInformer, kubeClient, namespace, nodeInformer)
rc := NewReplicaController(ds, scheme, replicaInformer, podInformer, jobInformer, kubeClient,
namespace, controllerID)
ec := NewEngineController(ds, scheme, engineInformer, podInformer, kubeClient,
Expand Down
8 changes: 8 additions & 0 deletions controller/replica_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/kubernetes/pkg/controller"

"github.com/rancher/longhorn-manager/datastore"
"github.com/rancher/longhorn-manager/scheduler"
"github.com/rancher/longhorn-manager/types"
"github.com/rancher/longhorn-manager/util"

Expand Down Expand Up @@ -78,6 +79,7 @@ type ReplicaController struct {
queue workqueue.RateLimitingInterface

instanceHandler *InstanceHandler
scheduler *scheduler.ReplicaScheduler
}

func NewReplicaController(
Expand Down Expand Up @@ -110,6 +112,7 @@ func NewReplicaController(
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "longhorn-replica"),
}
rc.instanceHandler = NewInstanceHandler(podInformer, kubeClient, namespace, rc, rc.eventRecorder)
rc.scheduler = scheduler.NewReplicaScheduler(ds, scheme, kubeClient, replicaInformer, podInformer, namespace, controllerID)

replicaInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
Expand Down Expand Up @@ -270,6 +273,11 @@ func (rc *ReplicaController) syncReplica(key string) (err error) {
return err
}
}
// check whether the replica need to be scheduled
err = rc.scheduler.ScheduleReplica(replica)
if err != nil {
return err
}

return rc.instanceHandler.ReconcileInstanceState(replica, &replica.Spec.InstanceSpec, &replica.Status.InstanceStatus)
}
Expand Down
3 changes: 2 additions & 1 deletion controller/replica_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,15 @@ func newTestReplicaController(lhInformerFactory lhinformerfactory.SharedInformer
engineInformer := lhInformerFactory.Longhorn().V1alpha1().Engines()
replicaInformer := lhInformerFactory.Longhorn().V1alpha1().Replicas()
engineImageInformer := lhInformerFactory.Longhorn().V1alpha1().EngineImages()
nodeInformer := lhInformerFactory.Longhorn().V1alpha1().Nodes()

podInformer := kubeInformerFactory.Core().V1().Pods()
jobInformer := kubeInformerFactory.Batch().V1().Jobs()
cronJobInformer := kubeInformerFactory.Batch().V1beta1().CronJobs()
daemonSetInformer := kubeInformerFactory.Apps().V1beta2().DaemonSets()

ds := datastore.NewDataStore(volumeInformer, engineInformer, replicaInformer, engineImageInformer, lhClient,
podInformer, cronJobInformer, daemonSetInformer, kubeClient, TestNamespace)
podInformer, cronJobInformer, daemonSetInformer, kubeClient, TestNamespace, nodeInformer)

rc := NewReplicaController(ds, scheme.Scheme, replicaInformer, podInformer, jobInformer, kubeClient, TestNamespace, controllerID)

Expand Down
5 changes: 0 additions & 5 deletions controller/volume_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ var (
)

const (
// longhornDirectory is the directory going to be bind mounted on the
// host to provide storage space to replica data
longhornDirectory = "/var/lib/rancher/longhorn/"

LabelRecurringJob = "RecurringJob"

CronJobBackoffLimit = 3
Expand Down Expand Up @@ -843,7 +839,6 @@ func (vc *VolumeController) createReplica(v *longhorn.Volume) (*longhorn.Replica
replica.Spec.RestoreFrom = v.Spec.FromBackup
replica.Spec.RestoreName = backupID
}
replica.Spec.DataPath = longhornDirectory + "/replicas/" + v.Name + "-" + util.RandomID()

return vc.ds.CreateReplica(replica)
}
Expand Down
6 changes: 3 additions & 3 deletions controller/volume_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ func newTestVolumeController(lhInformerFactory lhinformerfactory.SharedInformerF
engineInformer := lhInformerFactory.Longhorn().V1alpha1().Engines()
replicaInformer := lhInformerFactory.Longhorn().V1alpha1().Replicas()
engineImageInformer := lhInformerFactory.Longhorn().V1alpha1().EngineImages()
nodeInformer := lhInformerFactory.Longhorn().V1alpha1().Nodes()

podInformer := kubeInformerFactory.Core().V1().Pods()
cronJobInformer := kubeInformerFactory.Batch().V1beta1().CronJobs()
daemonSetInformer := kubeInformerFactory.Apps().V1beta2().DaemonSets()

ds := datastore.NewDataStore(volumeInformer, engineInformer, replicaInformer, engineImageInformer, lhClient,
podInformer, cronJobInformer, daemonSetInformer, kubeClient, TestNamespace)
podInformer, cronJobInformer, daemonSetInformer, kubeClient, TestNamespace, nodeInformer)
initSettings(ds)

vc := NewVolumeController(ds, scheme.Scheme, volumeInformer, engineInformer, replicaInformer, kubeClient, TestNamespace, controllerID, TestServiceAccount, TestManagerImage)
Expand Down Expand Up @@ -366,8 +367,7 @@ func (s *TestSuite) runTestCases(c *C, testCases map[string]*VolumeTestCase) {
for _, expectR = range tc.expectReplicas {
break
}
// DataPath is randomized
c.Assert(retR.Spec.DataPath, Not(Equals), "")
// DataPath is "" before scheduled
retR.Spec.DataPath = ""
c.Assert(retR.Spec, DeepEquals, expectR.Spec)
c.Assert(retR.Status, DeepEquals, expectR.Status)
Expand Down
8 changes: 7 additions & 1 deletion datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ type DataStore struct {
cjStoreSynced cache.InformerSynced
dsLister appslisters_v1beta2.DaemonSetLister
dsStoreSynced cache.InformerSynced

nLister lhlisters.NodeLister
nStoreSynced cache.InformerSynced
}

func NewDataStore(
Expand All @@ -49,7 +52,7 @@ func NewDataStore(
cronJobInformer batchinformers_v1beta1.CronJobInformer,
daemonSetInformer appsinformers_v1beta2.DaemonSetInformer,
kubeClient clientset.Interface,
namespace string) *DataStore {
namespace string, nodeInformer lhinformers.NodeInformer) *DataStore {

return &DataStore{
namespace: namespace,
Expand All @@ -71,6 +74,9 @@ func NewDataStore(
cjStoreSynced: cronJobInformer.Informer().HasSynced,
dsLister: daemonSetInformer.Lister(),
dsStoreSynced: daemonSetInformer.Informer().HasSynced,

nLister: nodeInformer.Lister(),
nStoreSynced: nodeInformer.Informer().HasSynced,
}
}

Expand Down

0 comments on commit 1b228a4

Please sign in to comment.