diff --git a/api/model.go b/api/model.go index 4e42b15252..ea52a01c3d 100644 --- a/api/model.go +++ b/api/model.go @@ -120,6 +120,26 @@ type EngineUpgradeInput struct { Image string `json:"image"` } +type Node struct { + client.Resource + Name string `json:"name"` + AllowScheduling bool `json:"allowScheduling"` + Disks map[string]types.Disk `json:"disks"` +} + +type DiskInput struct { + Path string `json:"path"` + MaximumStorage int64 `json:"maximumStorage"` +} + +type DiskRemoveInput struct { + Path string `json:"path"` +} + +type SchedulingInput struct { + AllowScheduling bool `json:"allowScheduling"` +} + func NewSchema() *client.Schemas { schemas := &client.Schemas{} @@ -137,6 +157,9 @@ func NewSchema() *client.Schemas { schemas.AddType("engineUpgradeInput", EngineUpgradeInput{}) schemas.AddType("replica", Replica{}) schemas.AddType("controller", Controller{}) + schemas.AddType("diskInput", DiskInput{}) + schemas.AddType("diskRemoveInput", DiskRemoveInput{}) + schemas.AddType("schedulingInput", SchedulingInput{}) hostSchema(schemas.AddType("host", Host{})) volumeSchema(schemas.AddType("volume", Volume{})) @@ -144,10 +167,44 @@ func NewSchema() *client.Schemas { settingSchema(schemas.AddType("setting", Setting{})) recurringSchema(schemas.AddType("recurringInput", RecurringInput{})) engineImageSchema(schemas.AddType("engineImage", EngineImage{})) + nodeSchema(schemas.AddType("node", Node{})) return schemas } +func nodeSchema(node *client.Schema) { + node.CollectionMethods = []string{"GET", "POST"} + node.ResourceMethods = []string{"GET"} + + node.ResourceActions = map[string]client.Action{ + "mountPathUpdate": { + Input: "diskInput", + Output: "node", + }, + "mountPathRemove": { + Input: "diskRemoveInput", + Output: "node", + }, + + "schedulingSet": { + Input: "schedulingInput", + Output: "node", + }, + } + + disks := node.ResourceFields["disks"] + disks.Update = true + disks.Required = true + disks.Unique = false + node.ResourceFields["disks"] = disks + + allowScheduling := node.ResourceFields["allowScheduling"] + allowScheduling.Update = true + allowScheduling.Required = true + allowScheduling.Unique = false + node.ResourceFields["allowScheduling"] = allowScheduling +} + func engineImageSchema(engineImage *client.Schema) { engineImage.CollectionMethods = []string{"GET", "POST"} engineImage.ResourceMethods = []string{"GET", "DELETE"} @@ -560,3 +617,31 @@ func NewServer(m *manager.VolumeManager) *Server { } return s } + +func toNodeResource(node *longhorn.Node, apiContext *api.ApiContext) *Node { + n := &Node{ + Resource: client.Resource{ + Id: node.Name, + Type: "node", + Actions: map[string]string{}, + Links: map[string]string{}, + }, + Name: node.Name, + AllowScheduling: node.Spec.AllowScheduling, + Disks: node.Spec.Disks, + } + n.Actions = map[string]string{ + "mountPathUpdate": apiContext.UrlBuilder.ActionLink(n.Resource, "mountPathUpdate"), + "mountPathRemove": apiContext.UrlBuilder.ActionLink(n.Resource, "mountPathRemove"), + "schedulingSet": apiContext.UrlBuilder.ActionLink(n.Resource, "schedulingSet"), + } + return n +} + +func toNodeCollection(nodeList []*longhorn.Node, apiContext *api.ApiContext) *client.GenericCollection { + data := []interface{}{} + for _, node := range nodeList { + data = append(data, toNodeResource(node, apiContext)) + } + return &client.GenericCollection{Data: data} +} diff --git a/api/node.go b/api/node.go index cd89b38363..54f8b0d3b4 100644 --- a/api/node.go +++ b/api/node.go @@ -6,6 +6,7 @@ import ( "github.com/gorilla/mux" "github.com/pkg/errors" "github.com/rancher/go-rancher/api" + "github.com/rancher/longhorn-manager/types" ) func (s *Server) NodeList(rw http.ResponseWriter, req *http.Request) error { @@ -34,3 +35,100 @@ func (s *Server) NodeGet(rw http.ResponseWriter, req *http.Request) error { apiContext.Write(toHostResource(id, ip)) return nil } + +func (s *Server) MountNodeList(rw http.ResponseWriter, req *http.Request) error { + apiContext := api.GetApiContext(req) + nodeList, err := s.m.GetManagerNode() + if err != nil { + return errors.Wrap(err, "fail to list nodes") + } + apiContext.Write(toNodeCollection(nodeList, apiContext)) + return nil +} + +func (s *Server) MountNodeGet(rw http.ResponseWriter, req *http.Request) error { + apiContext := api.GetApiContext(req) + id := mux.Vars(req)["id"] + + node, err := s.m.GetNode(id) + if err != nil { + return errors.Wrap(err, "fail to get node") + } + apiContext.Write(toNodeResource(node, apiContext)) + return nil +} + +func (s *Server) NodeScheduleSet(rw http.ResponseWriter, req *http.Request) error { + var input SchedulingInput + apiContext := api.GetApiContext(req) + if err := apiContext.Read(&input); err != nil { + return err + } + + id := mux.Vars(req)["id"] + ni, err := s.m.GetNode(id) + if err != nil { + return errors.Wrap(err, "fail to get node") + } + ni.Spec.AllowScheduling = input.AllowScheduling + n, err := s.m.UpdateNode(ni) + + apiContext.Write(toNodeResource(n, apiContext)) + return nil +} + +func (s *Server) MountPathDelete(rw http.ResponseWriter, req *http.Request) (err error) { + defer func() { + err = errors.Wrap(err, "fail to delete mount path") + }() + var input DiskRemoveInput + apiContext := api.GetApiContext(req) + if err := apiContext.Read(&input); err != nil { + return err + } + + id := mux.Vars(req)["id"] + node, err := s.m.GetNode(id) + if err != nil { + return err + } + delete(node.Spec.Disks, input.Path) + n, err := s.m.UpdateNode(node) + if err != nil { + return err + } + apiContext.Write(toNodeResource(n, apiContext)) + return nil +} + +func (s *Server) MountPathUpdate(rw http.ResponseWriter, req *http.Request) error { + var input DiskInput + + apiContext := api.GetApiContext(req) + if err := apiContext.Read(&input); err != nil { + return err + } + + id := mux.Vars(req)["id"] + node, err := s.m.GetNode(id) + if err != nil { + return errors.Wrap(err, "fail to get node") + } + + // TODO user only can update directory on node. + // Need to suppport multiple disk setting later. + disks := map[string]types.Disk{} + mountDisk := types.Disk{ + Path: input.Path, + MaximumStorage: input.MaximumStorage, + } + disks[input.Path] = mountDisk + node.Spec.Disks = disks + n, err := s.m.UpdateNode(node) + if err != nil { + return err + } + apiContext.Write(toNodeResource(n, apiContext)) + + return nil +} diff --git a/api/router.go b/api/router.go index 91f51c917c..c851bb1818 100644 --- a/api/router.go +++ b/api/router.go @@ -97,6 +97,17 @@ func NewRouter(s *Server) *mux.Router { r.Methods("GET").Path("/v1/hosts").Handler(f(schemas, s.NodeList)) r.Methods("GET").Path("/v1/hosts/{id}").Handler(f(schemas, s.NodeGet)) + r.Methods("GET").Path("/v1/nodes").Handler(f(schemas, s.MountNodeList)) + r.Methods("GET").Path("/v1/nodes/{id}").Handler(f(schemas, s.MountNodeGet)) + nodeActions := map[string]func(http.ResponseWriter, *http.Request) error{ + "mountPathUpdate": s.MountPathUpdate, + "mountPathRemove": s.MountPathDelete, + "schedulingSet": s.NodeScheduleSet, + } + for name, action := range nodeActions { + r.Methods("POST").Path("/v1/nodes/{id}").Queries("action", name).Handler(f(schemas, action)) + } + r.Methods("GET").Path("/v1/engineimages").Handler(f(schemas, s.EngineImageList)) r.Methods("GET").Path("/v1/engineimages/{name}").Handler(f(schemas, s.EngineImageGet)) r.Methods("DELETE").Path("/v1/engineimages/{name}").Handler(f(schemas, s.EngineImageDelete)) diff --git a/controller/controller_manager.go b/controller/controller_manager.go index 99488af2f7..22d5184e27 100644 --- a/controller/controller_manager.go +++ b/controller/controller_manager.go @@ -66,6 +66,7 @@ func StartControllers(stopCh chan struct{}, controllerID, serviceAccount, manage engineInformer := lhInformerFactory.Longhorn().V1alpha1().Engines() volumeInformer := lhInformerFactory.Longhorn().V1alpha1().Volumes() engineImageInformer := lhInformerFactory.Longhorn().V1alpha1().EngineImages() + nodeInformer := lhInformerFactory.Longhorn().V1alpha1().Nodes() podInformer := kubeInformerFactory.Core().V1().Pods() jobInformer := kubeInformerFactory.Batch().V1().Jobs() @@ -73,7 +74,7 @@ func StartControllers(stopCh chan struct{}, controllerID, serviceAccount, manage daemonSetInformer := kubeInformerFactory.Apps().V1beta2().DaemonSets() ds := datastore.NewDataStore(volumeInformer, engineInformer, replicaInformer, engineImageInformer, lhClient, - podInformer, cronJobInformer, daemonSetInformer, kubeClient, namespace) + podInformer, cronJobInformer, daemonSetInformer, kubeClient, namespace, nodeInformer) rc := NewReplicaController(ds, scheme, replicaInformer, podInformer, jobInformer, kubeClient, namespace, controllerID) ec := NewEngineController(ds, scheme, engineInformer, podInformer, kubeClient, diff --git a/controller/replica_controller.go b/controller/replica_controller.go index e5061a543f..9e7683f365 100644 --- a/controller/replica_controller.go +++ b/controller/replica_controller.go @@ -27,6 +27,7 @@ import ( "k8s.io/kubernetes/pkg/controller" "github.com/rancher/longhorn-manager/datastore" + "github.com/rancher/longhorn-manager/scheduler" "github.com/rancher/longhorn-manager/types" "github.com/rancher/longhorn-manager/util" @@ -78,6 +79,7 @@ type ReplicaController struct { queue workqueue.RateLimitingInterface instanceHandler *InstanceHandler + scheduler *scheduler.ReplicaScheduler } func NewReplicaController( @@ -110,6 +112,7 @@ func NewReplicaController( queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "longhorn-replica"), } rc.instanceHandler = NewInstanceHandler(podInformer, kubeClient, namespace, rc, rc.eventRecorder) + rc.scheduler = scheduler.NewReplicaScheduler(ds, scheme, kubeClient, replicaInformer, podInformer, namespace, controllerID) replicaInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -270,6 +273,11 @@ func (rc *ReplicaController) syncReplica(key string) (err error) { return err } } + // check whether the replica need to be scheduled + err = rc.scheduler.ScheduleReplica(replica) + if err != nil { + return err + } return rc.instanceHandler.ReconcileInstanceState(replica, &replica.Spec.InstanceSpec, &replica.Status.InstanceStatus) } diff --git a/controller/replica_controller_test.go b/controller/replica_controller_test.go index 4e2480a80f..36b5bf1d13 100644 --- a/controller/replica_controller_test.go +++ b/controller/replica_controller_test.go @@ -85,6 +85,7 @@ func newTestReplicaController(lhInformerFactory lhinformerfactory.SharedInformer engineInformer := lhInformerFactory.Longhorn().V1alpha1().Engines() replicaInformer := lhInformerFactory.Longhorn().V1alpha1().Replicas() engineImageInformer := lhInformerFactory.Longhorn().V1alpha1().EngineImages() + nodeInformer := lhInformerFactory.Longhorn().V1alpha1().Nodes() podInformer := kubeInformerFactory.Core().V1().Pods() jobInformer := kubeInformerFactory.Batch().V1().Jobs() @@ -92,7 +93,7 @@ func newTestReplicaController(lhInformerFactory lhinformerfactory.SharedInformer daemonSetInformer := kubeInformerFactory.Apps().V1beta2().DaemonSets() ds := datastore.NewDataStore(volumeInformer, engineInformer, replicaInformer, engineImageInformer, lhClient, - podInformer, cronJobInformer, daemonSetInformer, kubeClient, TestNamespace) + podInformer, cronJobInformer, daemonSetInformer, kubeClient, TestNamespace, nodeInformer) rc := NewReplicaController(ds, scheme.Scheme, replicaInformer, podInformer, jobInformer, kubeClient, TestNamespace, controllerID) diff --git a/controller/volume_controller.go b/controller/volume_controller.go index ca40f72f63..ead2c54c52 100644 --- a/controller/volume_controller.go +++ b/controller/volume_controller.go @@ -38,10 +38,6 @@ var ( ) const ( - // longhornDirectory is the directory going to be bind mounted on the - // host to provide storage space to replica data - longhornDirectory = "/var/lib/rancher/longhorn/" - LabelRecurringJob = "RecurringJob" CronJobBackoffLimit = 3 @@ -843,7 +839,6 @@ func (vc *VolumeController) createReplica(v *longhorn.Volume) (*longhorn.Replica replica.Spec.RestoreFrom = v.Spec.FromBackup replica.Spec.RestoreName = backupID } - replica.Spec.DataPath = longhornDirectory + "/replicas/" + v.Name + "-" + util.RandomID() return vc.ds.CreateReplica(replica) } diff --git a/controller/volume_controller_test.go b/controller/volume_controller_test.go index 6665e6767a..5c8be9a180 100644 --- a/controller/volume_controller_test.go +++ b/controller/volume_controller_test.go @@ -42,13 +42,14 @@ func newTestVolumeController(lhInformerFactory lhinformerfactory.SharedInformerF engineInformer := lhInformerFactory.Longhorn().V1alpha1().Engines() replicaInformer := lhInformerFactory.Longhorn().V1alpha1().Replicas() engineImageInformer := lhInformerFactory.Longhorn().V1alpha1().EngineImages() + nodeInformer := lhInformerFactory.Longhorn().V1alpha1().Nodes() podInformer := kubeInformerFactory.Core().V1().Pods() cronJobInformer := kubeInformerFactory.Batch().V1beta1().CronJobs() daemonSetInformer := kubeInformerFactory.Apps().V1beta2().DaemonSets() ds := datastore.NewDataStore(volumeInformer, engineInformer, replicaInformer, engineImageInformer, lhClient, - podInformer, cronJobInformer, daemonSetInformer, kubeClient, TestNamespace) + podInformer, cronJobInformer, daemonSetInformer, kubeClient, TestNamespace, nodeInformer) initSettings(ds) vc := NewVolumeController(ds, scheme.Scheme, volumeInformer, engineInformer, replicaInformer, kubeClient, TestNamespace, controllerID, TestServiceAccount, TestManagerImage) @@ -366,8 +367,7 @@ func (s *TestSuite) runTestCases(c *C, testCases map[string]*VolumeTestCase) { for _, expectR = range tc.expectReplicas { break } - // DataPath is randomized - c.Assert(retR.Spec.DataPath, Not(Equals), "") + // DataPath is "" before scheduled retR.Spec.DataPath = "" c.Assert(retR.Spec, DeepEquals, expectR.Spec) c.Assert(retR.Status, DeepEquals, expectR.Status) diff --git a/datastore/datastore.go b/datastore/datastore.go index 823bdc2cd8..2da11618d1 100644 --- a/datastore/datastore.go +++ b/datastore/datastore.go @@ -36,6 +36,9 @@ type DataStore struct { cjStoreSynced cache.InformerSynced dsLister appslisters_v1beta2.DaemonSetLister dsStoreSynced cache.InformerSynced + + nLister lhlisters.NodeLister + nStoreSynced cache.InformerSynced } func NewDataStore( @@ -49,7 +52,7 @@ func NewDataStore( cronJobInformer batchinformers_v1beta1.CronJobInformer, daemonSetInformer appsinformers_v1beta2.DaemonSetInformer, kubeClient clientset.Interface, - namespace string) *DataStore { + namespace string, nodeInformer lhinformers.NodeInformer) *DataStore { return &DataStore{ namespace: namespace, @@ -71,6 +74,9 @@ func NewDataStore( cjStoreSynced: cronJobInformer.Informer().HasSynced, dsLister: daemonSetInformer.Lister(), dsStoreSynced: daemonSetInformer.Informer().HasSynced, + + nLister: nodeInformer.Lister(), + nStoreSynced: nodeInformer.Informer().HasSynced, } } diff --git a/datastore/kubernetes.go b/datastore/kubernetes.go index 104ff51fac..1549757fcc 100644 --- a/datastore/kubernetes.go +++ b/datastore/kubernetes.go @@ -147,3 +147,22 @@ func (s *DataStore) DeleteEngineImageDaemonSet(name string) error { } return nil } + +func (s *DataStore) GetManagerNode() ([]string, error) { + selector, err := s.getManagerSelector() + if err != nil { + return nil, err + } + podList, err := s.pLister.Pods(s.namespace).List(selector) + if err != nil { + return nil, err + } + if len(podList) == 0 { + return nil, fmt.Errorf("cannot find manager pods by label %v", s.getManagerLabel()) + } + nodeList := make([]string, 0) + for _, pod := range podList { + nodeList = append(nodeList, pod.Spec.NodeName) + } + return nodeList, nil +} diff --git a/datastore/longhorn.go b/datastore/longhorn.go index 066b2282b3..ad6c5a6507 100644 --- a/datastore/longhorn.go +++ b/datastore/longhorn.go @@ -24,6 +24,8 @@ const ( NameMaximumLength = 40 SettingName = "longhorn-manager-settings" + + longhornDirectory = "/var/lib/rancher/longhorn/" ) var ( @@ -437,3 +439,61 @@ func (s *DataStore) ListEngineImages() (map[string]*longhorn.EngineImage, error) } return itemMap, nil } + +func (s *DataStore) CreateNode(node *longhorn.Node) (*longhorn.Node, error) { + return s.lhClient.LonghornV1alpha1().Nodes(s.namespace).Create(node) +} + +// CreateDefaultNode will set default directory to node replica mount path +func (s *DataStore) CreateDefaultNode(name string) (*longhorn.Node, error) { + disks := map[string]types.Disk{} + defaultDisk := types.Disk{ + Path: longhornDirectory, + } + disks[longhornDirectory] = defaultDisk + + node := &longhorn.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: types.NodeSpec{ + Name: name, + AllowScheduling: true, + Disks: disks, + }, + } + return s.CreateNode(node) +} + +func (s *DataStore) GetNode(name string) (*longhorn.Node, error) { + result, err := s.nLister.Nodes(s.namespace).Get(name) + if err != nil { + if apierrors.IsNotFound(err) { + return nil, nil + } + return nil, err + } + return result.DeepCopy(), nil +} + +func (s *DataStore) UpdateNode(node *longhorn.Node) (*longhorn.Node, error) { + return s.lhClient.LonghornV1alpha1().Nodes(s.namespace).Update(node) +} + +func (s *DataStore) GetNodeList(nodeNameList []string) ([]*longhorn.Node, error) { + nodeList := make([]*longhorn.Node, 0) + for _, nodeName := range nodeNameList { + node, err := s.GetNode(nodeName) + if err != nil { + return nil, err + } + if node == nil { + node, err = s.CreateDefaultNode(nodeName) + if err != nil { + return nil, err + } + } + nodeList = append(nodeList, node) + } + return nodeList, nil +} diff --git a/deploy/01-prerequisite/03-crd.yaml b/deploy/01-prerequisite/03-crd.yaml index 24abdeab67..ddbbd608b6 100644 --- a/deploy/01-prerequisite/03-crd.yaml +++ b/deploy/01-prerequisite/03-crd.yaml @@ -87,3 +87,21 @@ spec: singular: engineimage scope: Namespaced version: v1alpha1 +--- +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + labels: + longhorn-manager: Node + name: nodes.longhorn.rancher.io +spec: + group: longhorn.rancher.io + names: + kind: Node + listKind: NodeList + plural: nodes + shortNames: + - lhnode + singular: node + scope: Namespaced + version: v1alpha1 diff --git a/k8s/pkg/apis/longhorn/v1alpha1/register.go b/k8s/pkg/apis/longhorn/v1alpha1/register.go index 0edfbf3859..eb7d1bbc0e 100644 --- a/k8s/pkg/apis/longhorn/v1alpha1/register.go +++ b/k8s/pkg/apis/longhorn/v1alpha1/register.go @@ -31,6 +31,8 @@ func addKnownTypes(scheme *runtime.Scheme) error { &SettingList{}, &EngineImage{}, &EngineImageList{}, + &Node{}, + &NodeList{}, ) metav1.AddToGroupVersion(scheme, SchemeGroupVersion) return nil diff --git a/k8s/pkg/apis/longhorn/v1alpha1/types.go b/k8s/pkg/apis/longhorn/v1alpha1/types.go index 7ec040c7b9..59c249fcd8 100644 --- a/k8s/pkg/apis/longhorn/v1alpha1/types.go +++ b/k8s/pkg/apis/longhorn/v1alpha1/types.go @@ -108,3 +108,23 @@ type EngineImageList struct { metav1.ListMeta `json:"metadata"` Items []EngineImage `json:"items"` } + +// +genclient +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +// +resource:path=Node +// +genclient:noStatus + +type Node struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata"` + Spec types.NodeSpec `json:"spec"` +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +// +resource:path=nodes + +type NodeList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata"` + Items []Node `json:"items"` +} diff --git a/k8s/pkg/apis/longhorn/v1alpha1/zz_generated.deepcopy.go b/k8s/pkg/apis/longhorn/v1alpha1/zz_generated.deepcopy.go index 7b7081345c..5c6901d3d2 100644 --- a/k8s/pkg/apis/longhorn/v1alpha1/zz_generated.deepcopy.go +++ b/k8s/pkg/apis/longhorn/v1alpha1/zz_generated.deepcopy.go @@ -146,6 +146,66 @@ func (in *EngineList) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Node) DeepCopyInto(out *Node) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Node. +func (in *Node) DeepCopy() *Node { + if in == nil { + return nil + } + out := new(Node) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *Node) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NodeList) DeepCopyInto(out *NodeList) { + *out = *in + out.TypeMeta = in.TypeMeta + out.ListMeta = in.ListMeta + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]Node, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodeList. +func (in *NodeList) DeepCopy() *NodeList { + if in == nil { + return nil + } + out := new(NodeList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *NodeList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Replica) DeepCopyInto(out *Replica) { *out = *in diff --git a/k8s/pkg/client/clientset/versioned/typed/longhorn/v1alpha1/fake/fake_longhorn_client.go b/k8s/pkg/client/clientset/versioned/typed/longhorn/v1alpha1/fake/fake_longhorn_client.go index fbfd4c2712..508c5f75f2 100644 --- a/k8s/pkg/client/clientset/versioned/typed/longhorn/v1alpha1/fake/fake_longhorn_client.go +++ b/k8s/pkg/client/clientset/versioned/typed/longhorn/v1alpha1/fake/fake_longhorn_client.go @@ -36,6 +36,10 @@ func (c *FakeLonghornV1alpha1) EngineImages(namespace string) v1alpha1.EngineIma return &FakeEngineImages{c, namespace} } +func (c *FakeLonghornV1alpha1) Nodes(namespace string) v1alpha1.NodeInterface { + return &FakeNodes{c, namespace} +} + func (c *FakeLonghornV1alpha1) Replicas(namespace string) v1alpha1.ReplicaInterface { return &FakeReplicas{c, namespace} } diff --git a/k8s/pkg/client/clientset/versioned/typed/longhorn/v1alpha1/fake/fake_node.go b/k8s/pkg/client/clientset/versioned/typed/longhorn/v1alpha1/fake/fake_node.go new file mode 100644 index 0000000000..dd26eb6be1 --- /dev/null +++ b/k8s/pkg/client/clientset/versioned/typed/longhorn/v1alpha1/fake/fake_node.go @@ -0,0 +1,128 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + v1alpha1 "github.com/rancher/longhorn-manager/k8s/pkg/apis/longhorn/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + schema "k8s.io/apimachinery/pkg/runtime/schema" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + testing "k8s.io/client-go/testing" +) + +// FakeNodes implements NodeInterface +type FakeNodes struct { + Fake *FakeLonghornV1alpha1 + ns string +} + +var nodesResource = schema.GroupVersionResource{Group: "longhorn.rancher.io", Version: "v1alpha1", Resource: "nodes"} + +var nodesKind = schema.GroupVersionKind{Group: "longhorn.rancher.io", Version: "v1alpha1", Kind: "Node"} + +// Get takes name of the node, and returns the corresponding node object, and an error if there is any. +func (c *FakeNodes) Get(name string, options v1.GetOptions) (result *v1alpha1.Node, err error) { + obj, err := c.Fake. + Invokes(testing.NewGetAction(nodesResource, c.ns, name), &v1alpha1.Node{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.Node), err +} + +// List takes label and field selectors, and returns the list of Nodes that match those selectors. +func (c *FakeNodes) List(opts v1.ListOptions) (result *v1alpha1.NodeList, err error) { + obj, err := c.Fake. + Invokes(testing.NewListAction(nodesResource, nodesKind, c.ns, opts), &v1alpha1.NodeList{}) + + if obj == nil { + return nil, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &v1alpha1.NodeList{} + for _, item := range obj.(*v1alpha1.NodeList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Watch returns a watch.Interface that watches the requested nodes. +func (c *FakeNodes) Watch(opts v1.ListOptions) (watch.Interface, error) { + return c.Fake. + InvokesWatch(testing.NewWatchAction(nodesResource, c.ns, opts)) + +} + +// Create takes the representation of a node and creates it. Returns the server's representation of the node, and an error, if there is any. +func (c *FakeNodes) Create(node *v1alpha1.Node) (result *v1alpha1.Node, err error) { + obj, err := c.Fake. + Invokes(testing.NewCreateAction(nodesResource, c.ns, node), &v1alpha1.Node{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.Node), err +} + +// Update takes the representation of a node and updates it. Returns the server's representation of the node, and an error, if there is any. +func (c *FakeNodes) Update(node *v1alpha1.Node) (result *v1alpha1.Node, err error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateAction(nodesResource, c.ns, node), &v1alpha1.Node{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.Node), err +} + +// Delete takes name of the node and deletes it. Returns an error if one occurs. +func (c *FakeNodes) Delete(name string, options *v1.DeleteOptions) error { + _, err := c.Fake. + Invokes(testing.NewDeleteAction(nodesResource, c.ns, name), &v1alpha1.Node{}) + + return err +} + +// DeleteCollection deletes a collection of objects. +func (c *FakeNodes) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error { + action := testing.NewDeleteCollectionAction(nodesResource, c.ns, listOptions) + + _, err := c.Fake.Invokes(action, &v1alpha1.NodeList{}) + return err +} + +// Patch applies the patch and returns the patched node. +func (c *FakeNodes) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.Node, err error) { + obj, err := c.Fake. + Invokes(testing.NewPatchSubresourceAction(nodesResource, c.ns, name, data, subresources...), &v1alpha1.Node{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.Node), err +} diff --git a/k8s/pkg/client/clientset/versioned/typed/longhorn/v1alpha1/generated_expansion.go b/k8s/pkg/client/clientset/versioned/typed/longhorn/v1alpha1/generated_expansion.go index a162558850..78105db08d 100644 --- a/k8s/pkg/client/clientset/versioned/typed/longhorn/v1alpha1/generated_expansion.go +++ b/k8s/pkg/client/clientset/versioned/typed/longhorn/v1alpha1/generated_expansion.go @@ -22,6 +22,8 @@ type EngineExpansion interface{} type EngineImageExpansion interface{} +type NodeExpansion interface{} + type ReplicaExpansion interface{} type SettingExpansion interface{} diff --git a/k8s/pkg/client/clientset/versioned/typed/longhorn/v1alpha1/longhorn_client.go b/k8s/pkg/client/clientset/versioned/typed/longhorn/v1alpha1/longhorn_client.go index b769a47f41..a10b9ed147 100644 --- a/k8s/pkg/client/clientset/versioned/typed/longhorn/v1alpha1/longhorn_client.go +++ b/k8s/pkg/client/clientset/versioned/typed/longhorn/v1alpha1/longhorn_client.go @@ -29,6 +29,7 @@ type LonghornV1alpha1Interface interface { RESTClient() rest.Interface EnginesGetter EngineImagesGetter + NodesGetter ReplicasGetter SettingsGetter VolumesGetter @@ -47,6 +48,10 @@ func (c *LonghornV1alpha1Client) EngineImages(namespace string) EngineImageInter return newEngineImages(c, namespace) } +func (c *LonghornV1alpha1Client) Nodes(namespace string) NodeInterface { + return newNodes(c, namespace) +} + func (c *LonghornV1alpha1Client) Replicas(namespace string) ReplicaInterface { return newReplicas(c, namespace) } diff --git a/k8s/pkg/client/clientset/versioned/typed/longhorn/v1alpha1/node.go b/k8s/pkg/client/clientset/versioned/typed/longhorn/v1alpha1/node.go new file mode 100644 index 0000000000..e4c91ef43e --- /dev/null +++ b/k8s/pkg/client/clientset/versioned/typed/longhorn/v1alpha1/node.go @@ -0,0 +1,157 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1alpha1 "github.com/rancher/longhorn-manager/k8s/pkg/apis/longhorn/v1alpha1" + scheme "github.com/rancher/longhorn-manager/k8s/pkg/client/clientset/versioned/scheme" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + rest "k8s.io/client-go/rest" +) + +// NodesGetter has a method to return a NodeInterface. +// A group's client should implement this interface. +type NodesGetter interface { + Nodes(namespace string) NodeInterface +} + +// NodeInterface has methods to work with Node resources. +type NodeInterface interface { + Create(*v1alpha1.Node) (*v1alpha1.Node, error) + Update(*v1alpha1.Node) (*v1alpha1.Node, error) + Delete(name string, options *v1.DeleteOptions) error + DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error + Get(name string, options v1.GetOptions) (*v1alpha1.Node, error) + List(opts v1.ListOptions) (*v1alpha1.NodeList, error) + Watch(opts v1.ListOptions) (watch.Interface, error) + Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.Node, err error) + NodeExpansion +} + +// nodes implements NodeInterface +type nodes struct { + client rest.Interface + ns string +} + +// newNodes returns a Nodes +func newNodes(c *LonghornV1alpha1Client, namespace string) *nodes { + return &nodes{ + client: c.RESTClient(), + ns: namespace, + } +} + +// Get takes name of the node, and returns the corresponding node object, and an error if there is any. +func (c *nodes) Get(name string, options v1.GetOptions) (result *v1alpha1.Node, err error) { + result = &v1alpha1.Node{} + err = c.client.Get(). + Namespace(c.ns). + Resource("nodes"). + Name(name). + VersionedParams(&options, scheme.ParameterCodec). + Do(). + Into(result) + return +} + +// List takes label and field selectors, and returns the list of Nodes that match those selectors. +func (c *nodes) List(opts v1.ListOptions) (result *v1alpha1.NodeList, err error) { + result = &v1alpha1.NodeList{} + err = c.client.Get(). + Namespace(c.ns). + Resource("nodes"). + VersionedParams(&opts, scheme.ParameterCodec). + Do(). + Into(result) + return +} + +// Watch returns a watch.Interface that watches the requested nodes. +func (c *nodes) Watch(opts v1.ListOptions) (watch.Interface, error) { + opts.Watch = true + return c.client.Get(). + Namespace(c.ns). + Resource("nodes"). + VersionedParams(&opts, scheme.ParameterCodec). + Watch() +} + +// Create takes the representation of a node and creates it. Returns the server's representation of the node, and an error, if there is any. +func (c *nodes) Create(node *v1alpha1.Node) (result *v1alpha1.Node, err error) { + result = &v1alpha1.Node{} + err = c.client.Post(). + Namespace(c.ns). + Resource("nodes"). + Body(node). + Do(). + Into(result) + return +} + +// Update takes the representation of a node and updates it. Returns the server's representation of the node, and an error, if there is any. +func (c *nodes) Update(node *v1alpha1.Node) (result *v1alpha1.Node, err error) { + result = &v1alpha1.Node{} + err = c.client.Put(). + Namespace(c.ns). + Resource("nodes"). + Name(node.Name). + Body(node). + Do(). + Into(result) + return +} + +// Delete takes name of the node and deletes it. Returns an error if one occurs. +func (c *nodes) Delete(name string, options *v1.DeleteOptions) error { + return c.client.Delete(). + Namespace(c.ns). + Resource("nodes"). + Name(name). + Body(options). + Do(). + Error() +} + +// DeleteCollection deletes a collection of objects. +func (c *nodes) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error { + return c.client.Delete(). + Namespace(c.ns). + Resource("nodes"). + VersionedParams(&listOptions, scheme.ParameterCodec). + Body(options). + Do(). + Error() +} + +// Patch applies the patch and returns the patched node. +func (c *nodes) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.Node, err error) { + result = &v1alpha1.Node{} + err = c.client.Patch(pt). + Namespace(c.ns). + Resource("nodes"). + SubResource(subresources...). + Name(name). + Body(data). + Do(). + Into(result) + return +} diff --git a/k8s/pkg/client/informers/externalversions/generic.go b/k8s/pkg/client/informers/externalversions/generic.go index 9db3b8f7d3..634fc19159 100644 --- a/k8s/pkg/client/informers/externalversions/generic.go +++ b/k8s/pkg/client/informers/externalversions/generic.go @@ -57,6 +57,8 @@ func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource return &genericInformer{resource: resource.GroupResource(), informer: f.Longhorn().V1alpha1().Engines().Informer()}, nil case v1alpha1.SchemeGroupVersion.WithResource("engineimages"): return &genericInformer{resource: resource.GroupResource(), informer: f.Longhorn().V1alpha1().EngineImages().Informer()}, nil + case v1alpha1.SchemeGroupVersion.WithResource("nodes"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Longhorn().V1alpha1().Nodes().Informer()}, nil case v1alpha1.SchemeGroupVersion.WithResource("replicas"): return &genericInformer{resource: resource.GroupResource(), informer: f.Longhorn().V1alpha1().Replicas().Informer()}, nil case v1alpha1.SchemeGroupVersion.WithResource("settings"): diff --git a/k8s/pkg/client/informers/externalversions/longhorn/v1alpha1/interface.go b/k8s/pkg/client/informers/externalversions/longhorn/v1alpha1/interface.go index 12817c597d..27b0c7f230 100644 --- a/k8s/pkg/client/informers/externalversions/longhorn/v1alpha1/interface.go +++ b/k8s/pkg/client/informers/externalversions/longhorn/v1alpha1/interface.go @@ -28,6 +28,8 @@ type Interface interface { Engines() EngineInformer // EngineImages returns a EngineImageInformer. EngineImages() EngineImageInformer + // Nodes returns a NodeInformer. + Nodes() NodeInformer // Replicas returns a ReplicaInformer. Replicas() ReplicaInformer // Settings returns a SettingInformer. @@ -57,6 +59,11 @@ func (v *version) EngineImages() EngineImageInformer { return &engineImageInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} } +// Nodes returns a NodeInformer. +func (v *version) Nodes() NodeInformer { + return &nodeInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} +} + // Replicas returns a ReplicaInformer. func (v *version) Replicas() ReplicaInformer { return &replicaInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} diff --git a/k8s/pkg/client/informers/externalversions/longhorn/v1alpha1/node.go b/k8s/pkg/client/informers/externalversions/longhorn/v1alpha1/node.go new file mode 100644 index 0000000000..2eb2757d6a --- /dev/null +++ b/k8s/pkg/client/informers/externalversions/longhorn/v1alpha1/node.go @@ -0,0 +1,89 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by informer-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + time "time" + + longhorn_v1alpha1 "github.com/rancher/longhorn-manager/k8s/pkg/apis/longhorn/v1alpha1" + versioned "github.com/rancher/longhorn-manager/k8s/pkg/client/clientset/versioned" + internalinterfaces "github.com/rancher/longhorn-manager/k8s/pkg/client/informers/externalversions/internalinterfaces" + v1alpha1 "github.com/rancher/longhorn-manager/k8s/pkg/client/listers/longhorn/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + watch "k8s.io/apimachinery/pkg/watch" + cache "k8s.io/client-go/tools/cache" +) + +// NodeInformer provides access to a shared informer and lister for +// Nodes. +type NodeInformer interface { + Informer() cache.SharedIndexInformer + Lister() v1alpha1.NodeLister +} + +type nodeInformer struct { + factory internalinterfaces.SharedInformerFactory + tweakListOptions internalinterfaces.TweakListOptionsFunc + namespace string +} + +// NewNodeInformer constructs a new informer for Node type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewNodeInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { + return NewFilteredNodeInformer(client, namespace, resyncPeriod, indexers, nil) +} + +// NewFilteredNodeInformer constructs a new informer for Node type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFilteredNodeInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options v1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.LonghornV1alpha1().Nodes(namespace).List(options) + }, + WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.LonghornV1alpha1().Nodes(namespace).Watch(options) + }, + }, + &longhorn_v1alpha1.Node{}, + resyncPeriod, + indexers, + ) +} + +func (f *nodeInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return NewFilteredNodeInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) +} + +func (f *nodeInformer) Informer() cache.SharedIndexInformer { + return f.factory.InformerFor(&longhorn_v1alpha1.Node{}, f.defaultInformer) +} + +func (f *nodeInformer) Lister() v1alpha1.NodeLister { + return v1alpha1.NewNodeLister(f.Informer().GetIndexer()) +} diff --git a/k8s/pkg/client/listers/longhorn/v1alpha1/expansion_generated.go b/k8s/pkg/client/listers/longhorn/v1alpha1/expansion_generated.go index dbbe8ebc2a..5c568f8d15 100644 --- a/k8s/pkg/client/listers/longhorn/v1alpha1/expansion_generated.go +++ b/k8s/pkg/client/listers/longhorn/v1alpha1/expansion_generated.go @@ -34,6 +34,14 @@ type EngineImageListerExpansion interface{} // EngineImageNamespaceLister. type EngineImageNamespaceListerExpansion interface{} +// NodeListerExpansion allows custom methods to be added to +// NodeLister. +type NodeListerExpansion interface{} + +// NodeNamespaceListerExpansion allows custom methods to be added to +// NodeNamespaceLister. +type NodeNamespaceListerExpansion interface{} + // ReplicaListerExpansion allows custom methods to be added to // ReplicaLister. type ReplicaListerExpansion interface{} diff --git a/k8s/pkg/client/listers/longhorn/v1alpha1/node.go b/k8s/pkg/client/listers/longhorn/v1alpha1/node.go new file mode 100644 index 0000000000..75b886d7d2 --- /dev/null +++ b/k8s/pkg/client/listers/longhorn/v1alpha1/node.go @@ -0,0 +1,94 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by lister-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1alpha1 "github.com/rancher/longhorn-manager/k8s/pkg/apis/longhorn/v1alpha1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" +) + +// NodeLister helps list Nodes. +type NodeLister interface { + // List lists all Nodes in the indexer. + List(selector labels.Selector) (ret []*v1alpha1.Node, err error) + // Nodes returns an object that can list and get Nodes. + Nodes(namespace string) NodeNamespaceLister + NodeListerExpansion +} + +// nodeLister implements the NodeLister interface. +type nodeLister struct { + indexer cache.Indexer +} + +// NewNodeLister returns a new NodeLister. +func NewNodeLister(indexer cache.Indexer) NodeLister { + return &nodeLister{indexer: indexer} +} + +// List lists all Nodes in the indexer. +func (s *nodeLister) List(selector labels.Selector) (ret []*v1alpha1.Node, err error) { + err = cache.ListAll(s.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.Node)) + }) + return ret, err +} + +// Nodes returns an object that can list and get Nodes. +func (s *nodeLister) Nodes(namespace string) NodeNamespaceLister { + return nodeNamespaceLister{indexer: s.indexer, namespace: namespace} +} + +// NodeNamespaceLister helps list and get Nodes. +type NodeNamespaceLister interface { + // List lists all Nodes in the indexer for a given namespace. + List(selector labels.Selector) (ret []*v1alpha1.Node, err error) + // Get retrieves the Node from the indexer for a given namespace and name. + Get(name string) (*v1alpha1.Node, error) + NodeNamespaceListerExpansion +} + +// nodeNamespaceLister implements the NodeNamespaceLister +// interface. +type nodeNamespaceLister struct { + indexer cache.Indexer + namespace string +} + +// List lists all Nodes in the indexer for a given namespace. +func (s nodeNamespaceLister) List(selector labels.Selector) (ret []*v1alpha1.Node, err error) { + err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.Node)) + }) + return ret, err +} + +// Get retrieves the Node from the indexer for a given namespace and name. +func (s nodeNamespaceLister) Get(name string) (*v1alpha1.Node, error) { + obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(v1alpha1.Resource("node"), name) + } + return obj.(*v1alpha1.Node), nil +} diff --git a/manager/volume.go b/manager/volume.go index 5a7ef493c2..7947bd36ce 100644 --- a/manager/volume.go +++ b/manager/volume.go @@ -345,3 +345,27 @@ func (m *VolumeManager) EngineUpgrade(volumeName, image string) error { return nil } + +func (m *VolumeManager) GetNode(name string) (*longhorn.Node, error) { + node, err := m.ds.GetNode(name) + if err != nil { + return nil, err + } + if node == nil { + // create node for default path + return m.ds.CreateDefaultNode(name) + } + return node, nil +} + +func (m *VolumeManager) UpdateNode(node *longhorn.Node) (*longhorn.Node, error) { + return m.ds.UpdateNode(node) +} + +func (m *VolumeManager) GetManagerNode() ([]*longhorn.Node, error) { + nodeNameList, err := m.ds.GetManagerNode() + if err != nil { + return nil, err + } + return m.ds.GetNodeList(nodeNameList) +} diff --git a/scheduler/algorithm/replica_core_scheduler.go b/scheduler/algorithm/replica_core_scheduler.go new file mode 100644 index 0000000000..a2d80341e0 --- /dev/null +++ b/scheduler/algorithm/replica_core_scheduler.go @@ -0,0 +1,33 @@ +package algorithm + +import ( + longhorn "github.com/rancher/longhorn-manager/k8s/pkg/apis/longhorn/v1alpha1" +) + +type ReplicaChecker struct { + NodeMap map[string]string + Replicas map[string]*longhorn.Replica +} + +func NewReplicaChecker(nodeMap map[string]string, replicas map[string]*longhorn.Replica) *ReplicaChecker { + return &ReplicaChecker{ + NodeMap: nodeMap, + Replicas: replicas, + } +} + +func (rc *ReplicaChecker) ReplicasAffinity() map[string]string { + for nodeName := range rc.NodeMap { + isExist := false + for _, r := range rc.Replicas { + if r.Spec.NodeID != "" && r.Spec.NodeID == nodeName { + isExist = true + break + } + } + if isExist { + delete(rc.NodeMap, nodeName) + } + } + return rc.NodeMap +} diff --git a/scheduler/replica_scheduler.go b/scheduler/replica_scheduler.go new file mode 100644 index 0000000000..914358d8a1 --- /dev/null +++ b/scheduler/replica_scheduler.go @@ -0,0 +1,141 @@ +package scheduler + +import ( + "github.com/Sirupsen/logrus" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + coreinformers "k8s.io/client-go/informers/core/v1" + clientset "k8s.io/client-go/kubernetes" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + + "github.com/rancher/longhorn-manager/datastore" + checker "github.com/rancher/longhorn-manager/scheduler/algorithm" + "github.com/rancher/longhorn-manager/types" + "github.com/rancher/longhorn-manager/util" + + longhorn "github.com/rancher/longhorn-manager/k8s/pkg/apis/longhorn/v1alpha1" + lhinformers "github.com/rancher/longhorn-manager/k8s/pkg/client/informers/externalversions/longhorn/v1alpha1" + lhlisters "github.com/rancher/longhorn-manager/k8s/pkg/client/listers/longhorn/v1alpha1" +) + +const ( + // longhornDirectory is the directory going to be bind mounted on the + // host to provide storage space to replica data by default + longhornDirectory = "/var/lib/rancher/longhorn/" +) + +type ReplicaScheduler struct { + // which namespace controller is running with + namespace string + // use as the OwnerID of replica + controllerID string + + ds *datastore.DataStore + + kubeClient clientset.Interface + eventRecorder record.EventRecorder + + pLister corelisters.PodLister + rStoreSynced cache.InformerSynced + rLister lhlisters.ReplicaLister + + queue workqueue.RateLimitingInterface +} + +func NewReplicaScheduler( + ds *datastore.DataStore, + scheme *runtime.Scheme, + kubeClient clientset.Interface, + replicaInformer lhinformers.ReplicaInformer, + podInformer coreinformers.PodInformer, + namespace string, controllerID string) *ReplicaScheduler { + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(logrus.Infof) + // TODO: remove the wrapper when every clients have moved to use the clientset. + eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events("")}) + + rcScheduler := &ReplicaScheduler{ + namespace: namespace, + controllerID: controllerID, + kubeClient: kubeClient, + pLister: podInformer.Lister(), + eventRecorder: eventBroadcaster.NewRecorder(scheme, v1.EventSource{Component: "longhorn-replica-scheduler"}), + ds: ds, + rStoreSynced: replicaInformer.Informer().HasSynced, + rLister: replicaInformer.Lister(), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "longhorn-replica"), + } + + return rcScheduler +} + +func (rcs *ReplicaScheduler) ScheduleReplica(replica *longhorn.Replica) error { + // only called when replica is starting for the first time + if replica.Spec.DesireState == types.InstanceStateRunning && replica.Spec.NodeID == "" { + // get replica list without current replica + replicas, err := rcs.ds.GetVolumeReplicas(replica.Spec.VolumeName) + if err != nil { + return err + } + if _, ok := replicas[replica.Name]; ok { + delete(replicas, replica.Name) + } + + // get all hosts + nodeIPMap, err := rcs.ds.GetManagerNodeIPMap() + if err != nil { + return err + } + rchecker := checker.NewReplicaChecker(nodeIPMap, replicas) + + // TODO Need to add capacity. + // Just make sure replica of the same volume be scheduled to different nodes for now. + nodeMap := rchecker.ReplicasAffinity() + // if other replica has allocated to different nodes, then choose a random one + nodeID := "" + if len(nodeMap) == 0 { + nodeID = rcs.getRandomNode(nodeIPMap) + } else { + nodeID = rcs.getRandomNode(nodeMap) + } + + replica.Spec.NodeID = nodeID + // set replica DataPath on specified node + node, err := rcs.ds.GetNode(nodeID) + if err != nil { + return err + } + + // TODO there just have a dataPath to set for now + var dataPath string + if node == nil { + dataPath = longhornDirectory + } else { + for dataPath = range node.Spec.Disks { + break + } + } + replica.Spec.DataPath = dataPath + "/replicas/" + replica.Spec.VolumeName + "-" + util.RandomID() + _, err = rcs.ds.UpdateReplica(replica) + if err != nil { + return err + } + } + return nil +} + +func (rcs *ReplicaScheduler) getRandomNode(nodeMap map[string]string) string { + var node string + + // map is random in Go + for node = range nodeMap { + break + } + + return node +} diff --git a/types/deepcopy.go b/types/deepcopy.go index 86a3bfd48f..8fed7006de 100644 --- a/types/deepcopy.go +++ b/types/deepcopy.go @@ -32,3 +32,14 @@ func (e *EngineStatus) DeepCopyInto(to *EngineStatus) { to.ReplicaModeMap[key] = value } } + +func (n *NodeSpec) DeepCopyInto(to *NodeSpec) { + *to = *n + if n.Disks == nil { + return + } + to.Disks = make(map[string]Disk) + for key, value := range n.Disks { + to.Disks[key] = value + } +} diff --git a/types/resource.go b/types/resource.go index d70d9596c7..3905d7bc14 100644 --- a/types/resource.go +++ b/types/resource.go @@ -155,3 +155,15 @@ type EngineImageStatus struct { DataFormatVersion int `json:"dataFormatVersion"` DataFormatMinVersion int `json:"dataFormatMinVersion"` } + +type NodeSpec struct { + Name string `json:"name"` + Disks map[string]Disk `json:"disks"` + AllowScheduling bool `json:"allowScheduling"` +} + +type Disk struct { + Path string `json:"path"` + MaximumStorage int64 `json:"maximumStorage"` + ScheduledStorage int64 `json:"scheduledStorage"` +}