Skip to content
Permalink
Browse files

Add Kubernetes event log when the pod is launched

  • Loading branch information
s1061123 committed Mar 4, 2020
1 parent bfaf229 commit 079c853eba607dec9fd1a2b12e878db6498b8d64
1 go.mod
@@ -28,5 +28,6 @@ require (
k8s.io/code-generator v0.17.2 // indirect
k8s.io/gengo v0.0.0-20200127102705-1e9b17e831be // indirect
k8s.io/kube-openapi v0.0.0-20200121204235-bf4fb3bd569c // indirect
k8s.io/klog v1.0.0
k8s.io/kubernetes v1.13.0
)
1 go.sum
@@ -73,6 +73,7 @@ github.com/gogo/protobuf v1.3.0/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXP
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 h1:ZgQEtGgCBiWRM39fZuwSd1LwSqqSW0hOdXCYYDX0R3I=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v0.0.0-20161109072736-4bd1920723d7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
@@ -39,6 +39,15 @@ rules:
verbs:
- get
- update
- apiGroups:
- ""
- events.k8s.io
resources:
- events
verbs:
- create
- patch
- update
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1beta1
@@ -44,6 +44,15 @@ rules:
verbs:
- get
- update
- apiGroups:
- ""
- events.k8s.io
resources:
- events
verbs:
- create
- patch
- update
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
@@ -39,6 +39,15 @@ rules:
verbs:
- get
- update
- apiGroups:
- ""
- events.k8s.io
resources:
- events
verbs:
- create
- patch
- update
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1beta1
@@ -44,6 +44,15 @@ rules:
verbs:
- get
- update
- apiGroups:
- ""
- events.k8s.io
resources:
- events
verbs:
- create
- patch
- update
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
@@ -24,9 +24,14 @@ import (

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"
"k8s.io/klog"

"github.com/containernetworking/cni/libcni"
"github.com/containernetworking/cni/pkg/skel"
@@ -52,8 +57,10 @@ type NoK8sNetworkError struct {

// ClientInfo contains information given from k8s client
type ClientInfo struct {
Client kubernetes.Interface
NetClient netclient.K8sCniCncfIoV1Interface
Client kubernetes.Interface
NetClient netclient.K8sCniCncfIoV1Interface
EventBroadcaster record.EventBroadcaster
EventRecorder record.EventRecorder
}

// AddPod adds pod into kubernetes
@@ -76,6 +83,13 @@ func (c *ClientInfo) AddNetAttachDef(netattach *nettypes.NetworkAttachmentDefini
return c.NetClient.NetworkAttachmentDefinitions(netattach.ObjectMeta.Namespace).Create(netattach)
}

// Eventf puts event into kubernetes events
func (c *ClientInfo) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
if c != nil && c.EventRecorder != nil {
c.EventRecorder.Eventf(object, eventtype, reason, messageFmt, args...)
}
}

func (e *NoK8sNetworkError) Error() string { return string(e.message) }

// SetNetworkStatus sets network status into Pod annotation
@@ -282,33 +296,33 @@ func GetK8sArgs(args *skel.CmdArgs) (*types.K8sArgs, error) {

// TryLoadPodDelegates attempts to load Kubernetes-defined delegates and add them to the Multus config.
// Returns the number of Kubernetes-defined delegates added or an error.
func TryLoadPodDelegates(k8sArgs *types.K8sArgs, conf *types.NetConf, clientInfo *ClientInfo) (int, *ClientInfo, error) {
func TryLoadPodDelegates(k8sArgs *types.K8sArgs, conf *types.NetConf, clientInfo *ClientInfo) (int, *v1.Pod, *ClientInfo, error) {
var err error

logging.Debugf("TryLoadPodDelegates: %v, %v, %v", k8sArgs, conf, clientInfo)
clientInfo, err = GetK8sClient(conf.Kubeconfig, clientInfo)
if err != nil {
return 0, nil, err
return 0, nil, nil, err
}

if clientInfo == nil {
if len(conf.Delegates) == 0 {
// No available kube client and no delegates, we can't do anything
return 0, nil, logging.Errorf("TryLoadPodDelegates: must have either Kubernetes config or delegates")
return 0, nil, nil, logging.Errorf("TryLoadPodDelegates: must have either Kubernetes config or delegates")
}
return 0, nil, nil
return 0, nil, nil, nil
}

// Get the pod info. If cannot get it, we use cached delegates
pod, err := clientInfo.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
if err != nil {
logging.Debugf("TryLoadPodDelegates: Err in loading K8s cluster default network from pod annotation: %v, use cached delegates", err)
return 0, nil, nil
return 0, nil, nil, nil
}

delegate, err := tryLoadK8sPodDefaultNetwork(clientInfo, pod, conf)
if err != nil {
return 0, nil, logging.Errorf("TryLoadPodDelegates: error in loading K8s cluster default network from pod annotation: %v", err)
return 0, nil, nil, logging.Errorf("TryLoadPodDelegates: error in loading K8s cluster default network from pod annotation: %v", err)
}
if delegate != nil {
logging.Debugf("TryLoadPodDelegates: Overwrite the cluster default network with %v from pod annotations", delegate)
@@ -322,13 +336,13 @@ func TryLoadPodDelegates(k8sArgs *types.K8sArgs, conf *types.NetConf, clientInfo

if err != nil {
if _, ok := err.(*NoK8sNetworkError); ok {
return 0, clientInfo, nil
return 0, nil, clientInfo, nil
}
return 0, nil, logging.Errorf("TryLoadPodDelegates: error in getting k8s network from pod: %v", err)
return 0, nil, nil, logging.Errorf("TryLoadPodDelegates: error in getting k8s network from pod: %v", err)
}

if err = conf.AddDelegates(delegates); err != nil {
return 0, nil, err
return 0, nil, nil, err
}

// Check gatewayRequest is configured in delegates
@@ -345,10 +359,10 @@ func TryLoadPodDelegates(k8sArgs *types.K8sArgs, conf *types.NetConf, clientInfo
types.CheckGatewayConfig(conf.Delegates)
}

return len(delegates), clientInfo, nil
return len(delegates), pod, clientInfo, nil
}

return 0, clientInfo, nil
return 0, pod, clientInfo, nil
}

// GetK8sClient gets client info from kubeconfig
@@ -396,9 +410,16 @@ func GetK8sClient(kubeconfig string, kubeClient *ClientInfo) (*ClientInfo, error
return nil, err
}

broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(klog.Infof)
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "multus"})

return &ClientInfo{
Client: client,
NetClient: netclient,
Client: client,
NetClient: netclient,
EventBroadcaster: broadcaster,
EventRecorder: recorder,
}, nil
}

@@ -617,8 +617,9 @@ var _ = Describe("k8sclient operations", func() {
Expect(netConf.Delegates[0].Conf.Name).To(Equal("net2"))
Expect(netConf.Delegates[0].Conf.Type).To(Equal("mynet2"))

numK8sDelegates, _, err := TryLoadPodDelegates(k8sArgs, netConf, clientInfo)
numK8sDelegates, pod, _, err := TryLoadPodDelegates(k8sArgs, netConf, clientInfo)
Expect(err).NotTo(HaveOccurred())
Expect(pod).NotTo(BeNil())
Expect(numK8sDelegates).To(Equal(0))
Expect(netConf.Delegates[0].Conf.Name).To(Equal("net1"))
Expect(netConf.Delegates[0].Conf.Type).To(Equal("mynet1"))
@@ -655,7 +656,7 @@ var _ = Describe("k8sclient operations", func() {
Expect(err).To(HaveOccurred())

netConf.ConfDir = "badfilepath"
_, _, err = TryLoadPodDelegates(k8sArgs, netConf, clientInfo)
_, _, _, err = TryLoadPodDelegates(k8sArgs, netConf, clientInfo)
Expect(err).To(HaveOccurred())
})

@@ -690,7 +691,7 @@ var _ = Describe("k8sclient operations", func() {
k8sArgs, err := GetK8sArgs(args)
Expect(err).NotTo(HaveOccurred())

numK8sDelegates, _, err := TryLoadPodDelegates(k8sArgs, netConf, clientInfo)
numK8sDelegates, _, _, err := TryLoadPodDelegates(k8sArgs, netConf, clientInfo)
Expect(err).NotTo(HaveOccurred())
Expect(numK8sDelegates).To(Equal(0))
Expect(netConf.Delegates[0].Conf.Name).To(Equal("net1"))
@@ -727,7 +728,7 @@ var _ = Describe("k8sclient operations", func() {
k8sArgs, err := GetK8sArgs(args)
Expect(err).NotTo(HaveOccurred())

_, _, err = TryLoadPodDelegates(k8sArgs, netConf, nil)
_, _, _, err = TryLoadPodDelegates(k8sArgs, netConf, nil)
Expect(err).To(HaveOccurred())
})

@@ -760,12 +761,12 @@ var _ = Describe("k8sclient operations", func() {
k8sArgs, err := GetK8sArgs(args)
Expect(err).NotTo(HaveOccurred())

_, _, err = TryLoadPodDelegates(k8sArgs, netConf, nil)
_, _, _, err = TryLoadPodDelegates(k8sArgs, netConf, nil)
Expect(err).NotTo(HaveOccurred())

// additionally, we expect the test to fail with no delegates, as at least one is always required.
netConf.Delegates = nil
_, _, err = TryLoadPodDelegates(k8sArgs, netConf, nil)
_, _, _, err = TryLoadPodDelegates(k8sArgs, netConf, nil)
Expect(err).To(HaveOccurred())
})

@@ -824,7 +825,7 @@ users:
k8sArgs, err := GetK8sArgs(args)
Expect(err).NotTo(HaveOccurred())

_, _, err = TryLoadPodDelegates(k8sArgs, netConf, nil)
_, _, _, err = TryLoadPodDelegates(k8sArgs, netConf, nil)
Expect(err).NotTo(HaveOccurred())
})

@@ -34,6 +34,7 @@ import (
"github.com/containernetworking/cni/pkg/invoke"
"github.com/containernetworking/cni/pkg/skel"
cnitypes "github.com/containernetworking/cni/pkg/types"
cnicurrent "github.com/containernetworking/cni/pkg/types/current"
cniversion "github.com/containernetworking/cni/pkg/version"
"github.com/containernetworking/plugins/pkg/ns"
k8s "github.com/intel/multus-cni/k8sclient"
@@ -43,6 +44,7 @@ import (
nettypes "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1"
nadutils "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/utils"
"github.com/vishvananda/netlink"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
)

@@ -263,7 +265,7 @@ func conflistDel(rt *libcni.RuntimeConf, rawnetconflist []byte, binDir string, e
return err
}

func delegateAdd(exec invoke.Exec, ifName string, delegate *types.DelegateNetConf, rt *libcni.RuntimeConf, binDir string, cniArgs string) (cnitypes.Result, error) {
func delegateAdd(exec invoke.Exec, kubeClient *k8s.ClientInfo, pod *v1.Pod, ifName string, delegate *types.DelegateNetConf, rt *libcni.RuntimeConf, binDir string, cniArgs string) (cnitypes.Result, error) {
logging.Debugf("delegateAdd: %v, %s, %v, %v, %s", exec, ifName, delegate, rt, binDir)
if os.Setenv("CNI_IFNAME", ifName) != nil {
return nil, logging.Errorf("delegateAdd: error setting envionment variable CNI_IFNAME")
@@ -322,7 +324,7 @@ func delegateAdd(exec invoke.Exec, ifName string, delegate *types.DelegateNetCon
} else {
result, err = confAdd(rt, delegate.Bytes, binDir, exec)
if err != nil {
return nil, logging.Errorf("delegateAdd: error invoking DelegateAdd - %q: %v", delegate.Conf.Type, err)
return nil, logging.Errorf("delegateAdd: error invoking confAdd - %q: %v", delegate.Conf.Type, err)
}
}

@@ -338,6 +340,24 @@ func delegateAdd(exec invoke.Exec, ifName string, delegate *types.DelegateNetCon
logging.Verbosef("Add: %s:%s:%s:%s %s", rt.Args[1][1], rt.Args[2][1], confName, rt.IfName, string(data))
}

// get IP addresses from result
ips := []string{}
res, err := cnicurrent.NewResultFromResult(result)
if err != nil {
logging.Errorf("delegateAdd: error converting result: %v", err)
return result, nil
}
for _, ip := range res.IPs {
ips = append(ips, ip.Address.String())
}

// send kubernetes events
if delegate.Name != "" {
kubeClient.Eventf(pod, v1.EventTypeNormal, "AddedInterface", "Add %s %v from %s", rt.IfName, ips, delegate.Name)
} else {
kubeClient.Eventf(pod, v1.EventTypeNormal, "AddedInterface", "Add %s %v", rt.IfName, ips)
}

return result, nil
}

@@ -444,6 +464,11 @@ func cmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) (c
return nil, cmdErr(nil, "error loading netconf: %v", err)
}

kubeClient, err = k8s.GetK8sClient(n.Kubeconfig, kubeClient)
if err != nil {
return nil, cmdErr(nil, "error getting k8s client: %v", err)
}

k8sArgs, err := k8s.GetK8sArgs(args)
if err != nil {
return nil, cmdErr(nil, "error getting k8s args: %v", err)
@@ -468,7 +493,7 @@ func cmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) (c
n.Delegates[0].MasterPlugin = true
}

_, kc, err := k8s.TryLoadPodDelegates(k8sArgs, n, kubeClient)
_, pod, kc, err := k8s.TryLoadPodDelegates(k8sArgs, n, kubeClient)
if err != nil {
return nil, cmdErr(k8sArgs, "error loading k8s delegates k8s args: %v", err)
}
@@ -486,7 +511,7 @@ func cmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) (c

runtimeConfig := types.MergeCNIRuntimeConfig(n.RuntimeConfig, delegate)
rt := types.CreateCNIRuntimeConf(args, k8sArgs, ifName, runtimeConfig)
tmpResult, err = delegateAdd(exec, ifName, delegate, rt, n.BinDir, cniArgs)
tmpResult, err = delegateAdd(exec, kubeClient, pod, ifName, delegate, rt, n.BinDir, cniArgs)
if err != nil {
// If the add failed, tear down all networks we already added
netName := delegate.Conf.Name
@@ -631,7 +656,7 @@ func cmdDel(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) er
}

// Get pod annotation and so on
_, _, err := k8s.TryLoadPodDelegates(k8sArgs, in, kubeClient)
_, _, _, err := k8s.TryLoadPodDelegates(k8sArgs, in, kubeClient)
if err != nil {
if len(in.Delegates) == 0 {
// No delegate available so send error

0 comments on commit 079c853

Please sign in to comment.
You can’t perform that action at this time.