Skip to content

Commit

Permalink
Merge 5043e92 into d6760ca
Browse files Browse the repository at this point in the history
  • Loading branch information
vladikr committed Jul 10, 2017
2 parents d6760ca + 5043e92 commit 9132be8
Show file tree
Hide file tree
Showing 19 changed files with 307 additions and 190 deletions.
6 changes: 3 additions & 3 deletions cmd/virt-handler/virt-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func main() {
panic(err)
}
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&kubecorev1.EventSinkImpl{Interface: coreClient.Events(api.NamespaceDefault)})
broadcaster.StartRecordingToSink(&kubecorev1.EventSinkImpl{Interface: coreClient.Events(api.NamespaceAll)})
// TODO what is scheme used for in Recorder?
recorder := broadcaster.NewRecorder(kubeapi.Scheme, kubev1.EventSource{Component: "virt-handler", Host: *host})

Expand All @@ -108,7 +108,7 @@ func main() {
}

// Wire VM controller
vmListWatcher := kubecli.NewListWatchFromClient(restClient, "vms", api.NamespaceDefault, fields.Everything(), l)
vmListWatcher := kubecli.NewListWatchFromClient(restClient, "vms", api.NamespaceAll, fields.Everything(), l)
vmStore, vmQueue, vmController := virthandler.NewVMController(vmListWatcher, domainManager, recorder, *restClient, coreClient, *host)

// Wire Domain controller
Expand All @@ -133,7 +133,7 @@ func main() {
// Poplulate the VM store with known Domains on the host, to get deletes since the last run
for _, domain := range domainStore.List() {
d := domain.(*virt_api.Domain)
vmStore.Add(v1.NewVMReferenceFromName(d.ObjectMeta.Name))
vmStore.Add(v1.NewVMReferenceFromNameWithNS(d.ObjectMeta.Namespace, d.ObjectMeta.Name))
}

// Watch for VM changes
Expand Down
30 changes: 21 additions & 9 deletions pkg/api/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,12 @@ func (s SyncEvent) String() string {
}

func NewMinimalVM(vmName string) *VM {
return NewMinimalVMWithNS(kubeapi.NamespaceDefault, vmName)
}

func NewMinimalVMWithNS(namespace string, vmName string) *VM {
precond.CheckNotEmpty(vmName)
vm := NewVMReferenceFromName(vmName)
vm := NewVMReferenceFromNameWithNS(namespace, vmName)
vm.Spec = VMSpec{Domain: NewMinimalDomainSpec(vmName)}
vm.TypeMeta = metav1.TypeMeta{
APIVersion: GroupVersion.String(),
Expand All @@ -278,11 +282,15 @@ func NewMinimalVM(vmName string) *VM {

// TODO Namespace could be different, also store it somewhere in the domain, so that we can report deletes on handler startup properly
func NewVMReferenceFromName(name string) *VM {
return NewVMReferenceFromNameWithNS(kubeapi.NamespaceDefault, name)
}

func NewVMReferenceFromNameWithNS(namespace string, name string) *VM {
vm := &VM{
ObjectMeta: v1.ObjectMeta{
Name: name,
Namespace: kubeapi.NamespaceDefault,
SelfLink: fmt.Sprintf("/apis/%s/namespaces/%s/vms/%s", GroupVersion.String(), kubeapi.NamespaceDefault, name),
Namespace: namespace,
SelfLink: fmt.Sprintf("/apis/%s/namespaces/%s/vms/%s", GroupVersion.String(), namespace, name),
},
}
vm.SetGroupVersionKind(schema.GroupVersionKind{Group: GroupVersion.Group, Kind: "VM", Version: GroupVersion.Version})
Expand All @@ -302,12 +310,12 @@ type SpiceInfo struct {
Proxy string `json:"proxy,omitempty" ini:"proxy,omitempty"`
}

func NewSpice(vmName string) *Spice {
func NewSpice(namespace string, vmName string) *Spice {
return &Spice{
Info: SpiceInfo{},
ObjectMeta: v1.ObjectMeta{
Name: vmName,
Namespace: kubeapi.NamespaceDefault,
Namespace: namespace,
},
TypeMeta: metav1.TypeMeta{
APIVersion: GroupVersion.String(),
Expand All @@ -318,19 +326,23 @@ func NewSpice(vmName string) *Spice {

//TODO validate that this is correct
func NewMinimalMigration(name string, vmName string) *Migration {
migration := NewMigrationReferenceFromName(name)
return NewMinimalMigrationWithNS(kubeapi.NamespaceDefault, name, vmName)
}

func NewMinimalMigrationWithNS(namespace string, name string, vmName string) *Migration {
migration := NewMigrationReferenceFromName(namespace, name)
migration.Spec = MigrationSpec{
Selector: VMSelector{vmName},
}
return migration
}

func NewMigrationReferenceFromName(name string) *Migration {
func NewMigrationReferenceFromName(namespace string, name string) *Migration {
migration := &Migration{
ObjectMeta: v1.ObjectMeta{
Name: name,
Namespace: kubeapi.NamespaceDefault,
SelfLink: fmt.Sprintf("/apis/%s/namespaces/%s/%s", GroupVersion.String(), kubeapi.NamespaceDefault, name),
Namespace: namespace,
SelfLink: fmt.Sprintf("/apis/%s/namespaces/%s/%s", GroupVersion.String(), namespace, name),
},
TypeMeta: metav1.TypeMeta{
APIVersion: GroupVersion.String(),
Expand Down
9 changes: 4 additions & 5 deletions pkg/informers/virtinformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,13 @@ import (
"k8s.io/apimachinery/pkg/labels"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/api"
k8sv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"

"kubevirt.io/kubevirt/pkg/kubecli"

kubeapi "k8s.io/client-go/pkg/api"

kubev1 "kubevirt.io/kubevirt/pkg/api/v1"
"kubevirt.io/kubevirt/pkg/logging"
)
Expand Down Expand Up @@ -111,14 +110,14 @@ func (f *kubeInformerFactory) getInformer(key string, newFunc newSharedInformer)

func (f *kubeInformerFactory) VM() cache.SharedIndexInformer {
return f.getInformer("vmInformer", func() cache.SharedIndexInformer {
lw := cache.NewListWatchFromClient(f.restClient, "vms", kubeapi.NamespaceDefault, fields.Everything())
lw := cache.NewListWatchFromClient(f.restClient, "vms", api.NamespaceAll, fields.Everything())
return cache.NewSharedIndexInformer(lw, &kubev1.VM{}, f.defaultResync, cache.Indexers{})
})
}

func (f *kubeInformerFactory) Migration() cache.SharedIndexInformer {
return f.getInformer("migrationInformer", func() cache.SharedIndexInformer {
lw := cache.NewListWatchFromClient(f.restClient, "migrations", kubeapi.NamespaceDefault, fields.Everything())
lw := cache.NewListWatchFromClient(f.restClient, "migrations", api.NamespaceAll, fields.Everything())
return cache.NewSharedIndexInformer(lw, &kubev1.Migration{}, f.defaultResync, cache.Indexers{})
})
}
Expand All @@ -131,7 +130,7 @@ func (f *kubeInformerFactory) KubeVirtPod() cache.SharedIndexInformer {
panic(err)
}

lw := kubecli.NewListWatchFromClient(f.clientSet.CoreV1().RESTClient(), "pods", kubeapi.NamespaceDefault, fields.Everything(), labelSelector)
lw := kubecli.NewListWatchFromClient(f.clientSet.CoreV1().RESTClient(), "pods", api.NamespaceAll, fields.Everything(), labelSelector)
return cache.NewSharedIndexInformer(lw, &k8sv1.Pod{}, f.defaultResync, cache.Indexers{})
})
}
8 changes: 8 additions & 0 deletions pkg/rest/endpoints/decoders.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,14 @@ func NoopDecoder(_ context.Context, _ *http.Request) (interface{}, error) {
return nil, nil
}

func NotNamespacedDecodeRequestFunc(ctx context.Context, r *http.Request) (interface{}, error) {
headers, err := queryExtractor(ctx, r)
if err != nil {
return nil, err
}
return &Metadata{Headers: *headers}, nil
}

func NamespaceDecodeRequestFunc(ctx context.Context, r *http.Request) (interface{}, error) {
namespace, err := namespaceDecodeRequestFunc(ctx, r)
if err != nil {
Expand Down
13 changes: 13 additions & 0 deletions pkg/virt-api/rest/kubeproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func GenericResourceProxy(ws *restful.WebService, ctx context.Context, gvr schem
patch := endpoints.NewHandlerBuilder().Patch().Endpoint(NewGenericPatchEndpoint(cli, gvr, objResponseHandler)).Build(ctx)
post := endpoints.NewHandlerBuilder().Post(objPointer).Endpoint(NewGenericPostEndpoint(cli, gvr, objResponseHandler)).Build(ctx)
get := endpoints.NewHandlerBuilder().Get().Endpoint(NewGenericGetEndpoint(cli, gvr, objResponseHandler)).Build(ctx)
getListAllNamespaces := endpoints.NewHandlerBuilder().Get().Endpoint(NewGenericGetListEndpoint(cli, gvr, objListResponseHandler)).Decoder(endpoints.NotNamespacedDecodeRequestFunc).Build(ctx)
getList := endpoints.NewHandlerBuilder().Get().Endpoint(NewGenericGetListEndpoint(cli, gvr, objListResponseHandler)).Decoder(endpoints.NamespaceDecodeRequestFunc).Build(ctx)
deleteList := endpoints.NewHandlerBuilder().Delete().Endpoint(NewGenericDeleteListEndpoint(cli, gvr, objListResponseHandler)).Decoder(endpoints.NamespaceDecodeRequestFunc).Build(ctx)

Expand Down Expand Up @@ -109,6 +110,12 @@ func GenericResourceProxy(ws *restful.WebService, ctx context.Context, gvr schem
To(endpoints.MakeGoRestfulWrapper(get)).Writes(objExample).Doc("test4"), ws,
))

ws.Route(addGetAllNamespacesListParams(
ws.GET(gvr.Resource).
Produces(mime.MIME_JSON, mime.MIME_YAML).
To(endpoints.MakeGoRestfulWrapper(getListAllNamespaces)).Writes(listExample).Doc("test4"), ws,
))

ws.Route(
ws.PATCH(ResourcePath(gvr)).
Produces(mime.MIME_JSON_PATCH).
Expand Down Expand Up @@ -162,6 +169,12 @@ func addWatchGetListParams(builder *restful.RouteBuilder, ws *restful.WebService
Param(ws.QueryParameter("timeoutSeconds", "TimeoutSeconds for the list/watch call.").DataType("int"))
}

func addGetAllNamespacesListParams(builder *restful.RouteBuilder, ws *restful.WebService) *restful.RouteBuilder {
return builder.Param(fieldSelectorParam(ws)).Param(labelSelectorParam(ws)).
Param(ws.QueryParameter("resourceVersion", "When specified with a watch call, shows changes that occur after that particular version of a resource. Defaults to changes from the beginning of history.")).
Param(ws.QueryParameter("timeoutSeconds", "TimeoutSeconds for the list/watch call.").DataType("int"))
}

func addDeleteListParams(builder *restful.RouteBuilder, ws *restful.WebService) *restful.RouteBuilder {
return builder.Param(NameParam(ws)).Param(fieldSelectorParam(ws)).Param(labelSelectorParam(ws))
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/virt-api/rest/spice.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/api"
kubev1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/rest"

Expand Down Expand Up @@ -76,7 +75,7 @@ func spiceFromVM(vm *v1.VM, coreCli *kubernetes.Clientset) (*v1.Spice, error) {
for _, d := range vm.Spec.Domain.Devices.Graphics {
if strings.ToLower(d.Type) == "spice" {
port := d.Port
podList, err := coreCli.CoreV1().Pods(api.NamespaceDefault).List(unfinishedVMPodSelector(vm))
podList, err := coreCli.CoreV1().Pods(vm.GetObjectMeta().GetNamespace()).List(unfinishedVMPodSelector(vm))
if err != nil {
return nil, middleware.NewInternalServerError(err)
}
Expand All @@ -90,7 +89,7 @@ func spiceFromVM(vm *v1.VM, coreCli *kubernetes.Clientset) (*v1.Spice, error) {
pod := podList.Items[0]
ip := pod.Status.PodIP

spice := v1.NewSpice(vm.GetObjectMeta().GetName())
spice := v1.NewSpice(vm.GetObjectMeta().GetNamespace(), vm.GetObjectMeta().GetName())
spice.Info = v1.SpiceInfo{
Type: "spice",
Host: ip,
Expand Down
16 changes: 8 additions & 8 deletions pkg/virt-controller/services/generated_mock_vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,28 +103,28 @@ func (_mr *_MockVMServiceRecorder) UpdateMigration(arg0 interface{}) *gomock.Cal
return _mr.mock.ctrl.RecordCall(_mr.mock, "UpdateMigration", arg0)
}

func (_m *MockVMService) FetchVM(vmName string) (*v10.VM, bool, error) {
ret := _m.ctrl.Call(_m, "FetchVM", vmName)
func (_m *MockVMService) FetchVM(namespace string, vmName string) (*v10.VM, bool, error) {
ret := _m.ctrl.Call(_m, "FetchVM", namespace, vmName)
ret0, _ := ret[0].(*v10.VM)
ret1, _ := ret[1].(bool)
ret2, _ := ret[2].(error)
return ret0, ret1, ret2
}

func (_mr *_MockVMServiceRecorder) FetchVM(arg0 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "FetchVM", arg0)
func (_mr *_MockVMServiceRecorder) FetchVM(arg0, arg1 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "FetchVM", arg0, arg1)
}

func (_m *MockVMService) FetchMigration(migrationName string) (*v10.Migration, bool, error) {
ret := _m.ctrl.Call(_m, "FetchMigration", migrationName)
func (_m *MockVMService) FetchMigration(namespace string, migrationName string) (*v10.Migration, bool, error) {
ret := _m.ctrl.Call(_m, "FetchMigration", namespace, migrationName)
ret0, _ := ret[0].(*v10.Migration)
ret1, _ := ret[1].(bool)
ret2, _ := ret[2].(error)
return ret0, ret1, ret2
}

func (_mr *_MockVMServiceRecorder) FetchMigration(arg0 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "FetchMigration", arg0)
func (_mr *_MockVMServiceRecorder) FetchMigration(arg0, arg1 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "FetchMigration", arg0, arg1)
}

func (_m *MockVMService) StartMigration(migration *v10.Migration, vm *v10.VM, sourceNode *v1.Node, targetNode *v1.Node, targetPod *v1.Pod) error {
Expand Down
33 changes: 17 additions & 16 deletions pkg/virt-controller/services/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ type VMService interface {
GetRunningMigrationPods(*corev1.Migration) (*v1.PodList, error)
CreateMigrationTargetPod(migration *corev1.Migration, vm *corev1.VM) error
UpdateMigration(migration *corev1.Migration) error
FetchVM(vmName string) (*corev1.VM, bool, error)
FetchMigration(migrationName string) (*corev1.Migration, bool, error)
FetchVM(namespace string, vmName string) (*corev1.VM, bool, error)
FetchMigration(namespace string, migrationName string) (*corev1.Migration, bool, error)
StartMigration(migration *corev1.Migration, vm *corev1.VM, sourceNode *v1.Node, targetNode *v1.Node, targetPod *v1.Pod) error
GetMigrationJob(migration *corev1.Migration) (*v1.Pod, bool, error)
PutVm(vm *corev1.VM) (*corev1.VM, error)
Expand All @@ -73,7 +73,7 @@ func (v *vmService) StartVMPod(vm *corev1.VM) error {
return err
}

if _, err := v.KubeCli.Core().Pods(v1.NamespaceDefault).Create(pod); err != nil {
if _, err := v.KubeCli.Core().Pods(vm.GetObjectMeta().GetNamespace()).Create(pod); err != nil {
return err
}
return nil
Expand All @@ -82,7 +82,7 @@ func (v *vmService) StartVMPod(vm *corev1.VM) error {
// synchronously put updated VM object to API server.
func (v *vmService) PutVm(vm *corev1.VM) (*corev1.VM, error) {
logger := logging.DefaultLogger().Object(vm)
obj, err := v.RestClient.Put().Resource("vms").Body(vm).Name(vm.ObjectMeta.Name).Namespace(v1.NamespaceDefault).Do().Get()
obj, err := v.RestClient.Put().Resource("vms").Body(vm).Name(vm.ObjectMeta.Name).Namespace(vm.ObjectMeta.Namespace).Do().Get()
if err != nil {
logger.Error().Reason(err).Msg("Setting the VM state failed.")
return nil, err
Expand All @@ -93,15 +93,16 @@ func (v *vmService) PutVm(vm *corev1.VM) (*corev1.VM, error) {
func (v *vmService) DeleteVMPod(vm *corev1.VM) error {
precond.MustNotBeNil(vm)
precond.MustNotBeEmpty(vm.GetObjectMeta().GetName())
precond.MustNotBeEmpty(vm.GetObjectMeta().GetNamespace())

if err := v.KubeCli.CoreV1().Pods(v1.NamespaceDefault).DeleteCollection(nil, UnfinishedVMPodSelector(vm)); err != nil {
if err := v.KubeCli.CoreV1().Pods(vm.ObjectMeta.Namespace).DeleteCollection(nil, UnfinishedVMPodSelector(vm)); err != nil {
return err
}
return nil
}

func (v *vmService) GetRunningVMPods(vm *corev1.VM) (*v1.PodList, error) {
podList, err := v.KubeCli.CoreV1().Pods(v1.NamespaceDefault).List(UnfinishedVMPodSelector(vm))
podList, err := v.KubeCli.CoreV1().Pods(vm.ObjectMeta.Namespace).List(UnfinishedVMPodSelector(vm))

if err != nil {
return nil, err
Expand All @@ -111,12 +112,12 @@ func (v *vmService) GetRunningVMPods(vm *corev1.VM) (*v1.PodList, error) {

func (v *vmService) UpdateMigration(migration *corev1.Migration) error {
migrationName := migration.ObjectMeta.Name
_, err := v.RestClient.Put().Namespace(v1.NamespaceDefault).Resource("migrations").Body(migration).Name(migrationName).Do().Get()
_, err := v.RestClient.Put().Namespace(migration.ObjectMeta.Namespace).Resource("migrations").Body(migration).Name(migrationName).Do().Get()
return err
}

func (v *vmService) FetchVM(vmName string) (*corev1.VM, bool, error) {
resp, err := v.RestClient.Get().Namespace(v1.NamespaceDefault).Resource("vms").Name(vmName).Do().Get()
func (v *vmService) FetchVM(namespace string, vmName string) (*corev1.VM, bool, error) {
resp, err := v.RestClient.Get().Namespace(namespace).Resource("vms").Name(vmName).Do().Get()
if err != nil {
if errors.IsNotFound(err) {
return nil, false, nil
Expand All @@ -127,8 +128,8 @@ func (v *vmService) FetchVM(vmName string) (*corev1.VM, bool, error) {
return vm, true, nil
}

func (v *vmService) FetchMigration(migrationName string) (*corev1.Migration, bool, error) {
resp, err := v.RestClient.Get().Namespace(v1.NamespaceDefault).Resource("migrations").Name(migrationName).Do().Get()
func (v *vmService) FetchMigration(namespace string, migrationName string) (*corev1.Migration, bool, error) {
resp, err := v.RestClient.Get().Namespace(namespace).Resource("migrations").Name(migrationName).Do().Get()
if err != nil {
if errors.IsNotFound(err) {
return nil, false, nil
Expand Down Expand Up @@ -169,7 +170,7 @@ func (v *vmService) CreateMigrationTargetPod(migration *corev1.Migration, vm *co
pod.ObjectMeta.Labels[corev1.MigrationUIDLabel] = string(migration.GetObjectMeta().GetUID())
pod.Spec.Affinity = corev1.AntiAffinityFromVMNode(vm)
if err == nil {
_, err = v.KubeCli.CoreV1().Pods(v1.NamespaceDefault).Create(pod)
_, err = v.KubeCli.CoreV1().Pods(migration.GetObjectMeta().GetNamespace()).Create(pod)
}
return err
}
Expand All @@ -178,14 +179,14 @@ func (v *vmService) DeleteMigrationTargetPods(migration *corev1.Migration) error
precond.MustNotBeNil(migration)
precond.MustNotBeEmpty(migration.GetObjectMeta().GetName())

if err := v.KubeCli.CoreV1().Pods(v1.NamespaceDefault).DeleteCollection(nil, unfinishedMigrationTargetPodSelector(migration)); err != nil {
if err := v.KubeCli.CoreV1().Pods(migration.GetObjectMeta().GetNamespace()).DeleteCollection(nil, unfinishedMigrationTargetPodSelector(migration)); err != nil {
return err
}
return nil
}

func (v *vmService) GetRunningMigrationPods(migration *corev1.Migration) (*v1.PodList, error) {
podList, err := v.KubeCli.CoreV1().Pods(v1.NamespaceDefault).List(unfinishedMigrationTargetPodSelector(migration))
podList, err := v.KubeCli.CoreV1().Pods(migration.GetObjectMeta().GetNamespace()).List(unfinishedMigrationTargetPodSelector(migration))
if err != nil {
return nil, err
}
Expand All @@ -199,13 +200,13 @@ func (v *vmService) StartMigration(migration *corev1.Migration, vm *corev1.VM, s
if err != nil {
return err
}
_, err = v.KubeCli.CoreV1().Pods(v1.NamespaceDefault).Create(job)
_, err = v.KubeCli.CoreV1().Pods(migration.GetObjectMeta().GetNamespace()).Create(job)
return err
}

func (v *vmService) GetMigrationJob(migration *corev1.Migration) (*v1.Pod, bool, error) {
selector := migrationJobSelector(migration)
podList, err := v.KubeCli.CoreV1().Pods(v1.NamespaceDefault).List(selector)
podList, err := v.KubeCli.CoreV1().Pods(migration.ObjectMeta.Namespace).List(selector)
if err != nil {
return nil, false, err
}
Expand Down

0 comments on commit 9132be8

Please sign in to comment.