Skip to content

Commit

Permalink
Add contextual logging to NEG controller and its components
Browse files Browse the repository at this point in the history
  • Loading branch information
gauravkghildiyal committed Jul 29, 2022
1 parent b8f11ed commit 149e5e1
Show file tree
Hide file tree
Showing 16 changed files with 223 additions and 165 deletions.
1 change: 1 addition & 0 deletions cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ func runControllers(ctx *ingctx.ControllerContext) {
enableAsm,
asmServiceNEGSkipNamespaces,
flags.F.EnableEndpointSlices,
klog.TODO(), // TODO(#1761): Replace this with a top level logger configuration once one is available.
)

ctx.AddHealthCheck("neg-controller", negController.IsHealthy)
Expand Down
97 changes: 53 additions & 44 deletions pkg/neg/controller.go

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion pkg/neg/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
negtypes "k8s.io/ingress-gce/pkg/neg/types"
svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/klog/v2"
"k8s.io/legacy-cloud-providers/gce"
)

Expand Down Expand Up @@ -152,6 +153,7 @@ func newTestControllerWithParamsAndContext(kubeClient kubernetes.Interface, test
true, //enableAsm
[]string{},
enableEndpointSlices,
klog.TODO(),
)
}
func newTestController(kubeClient kubernetes.Interface) *Controller {
Expand Down Expand Up @@ -654,7 +656,7 @@ func TestGatherPortMappingUsedByIngress(t *testing.T) {
for _, tc := range testCases {
controller := newTestController(fake.NewSimpleClientset())
defer controller.stop()
portTupleSet := gatherPortMappingUsedByIngress(tc.ings, newTestService(controller, true, []int32{}))
portTupleSet := gatherPortMappingUsedByIngress(tc.ings, newTestService(controller, true, []int32{}), klog.TODO())
if len(portTupleSet) != len(tc.expect) {
t.Errorf("For test case %q, expect %d ports, but got %d.", tc.desc, len(tc.expect), len(portTupleSet))
}
Expand Down
61 changes: 33 additions & 28 deletions pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ type syncerManager struct {
// zoneMap keeps track of the last set of zones the neg controller
// has seen. zoneMap is protected by the mu mutex.
zoneMap map[string]struct{}

logger klog.Logger
}

func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer,
Expand All @@ -112,11 +114,12 @@ func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer,
nodeLister cache.Indexer,
svcNegLister cache.Indexer,
enableNonGcpMode bool,
enableEndpointSlices bool) *syncerManager {
enableEndpointSlices bool,
logger klog.Logger) *syncerManager {

zones, err := zoneGetter.ListZones(utils.AllNodesPredicate)
if err != nil {
klog.V(3).Infof("Unable to initialize zone map in neg manager: %s", err)
logger.V(3).Info("Unable to initialize zone map in neg manager", "err", err)
}
zoneMap := make(map[string]struct{})
for _, zone := range zones {
Expand All @@ -140,6 +143,7 @@ func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer,
enableNonGcpMode: enableNonGcpMode,
enableEndpointSlices: enableEndpointSlices,
zoneMap: zoneMap,
logger: logger,
}
}

Expand All @@ -164,7 +168,7 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg
// Hence, Existing NEG syncer for the service port will always work
manager.removeCommonPorts(adds, removes)
manager.svcPortMap[key] = newPorts
klog.V(3).Infof("EnsureSyncer %v/%v: syncing %v ports, removing %v ports, adding %v ports", namespace, name, newPorts, removes, adds)
manager.logger.V(3).Info("EnsureSyncer is syncing ports", "service", klog.KRef(namespace, name), "ports", newPorts, "portsToRemove", removes, "portsToAdd", adds)

errList := []error{}
successfulSyncers := 0
Expand Down Expand Up @@ -209,7 +213,7 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg

// determine the implementation that calculates NEG endpoints on each sync.
epc := negsyncer.GetEndpointsCalculator(manager.nodeLister, manager.podLister, manager.zoneGetter,
syncerKey, portInfo.EpCalculatorMode)
syncerKey, portInfo.EpCalculatorMode, manager.logger.WithValues("service", klog.KRef(syncerKey.Namespace, syncerKey.Name), "negName", syncerKey.NegName))
syncer = negsyncer.NewTransactionSyncer(
syncerKey,
manager.recorder,
Expand All @@ -227,6 +231,7 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg
manager.svcNegClient,
!manager.namer.IsNEG(portInfo.NegName),
manager.enableEndpointSlices,
manager.logger,
)
manager.syncerMap[syncerKey] = syncer
}
Expand Down Expand Up @@ -298,7 +303,7 @@ func (manager *syncerManager) SyncNodes() {
func (manager *syncerManager) updateZoneMap() bool {
zones, err := manager.zoneGetter.ListZones(utils.AllNodesPredicate)
if err != nil {
klog.Warningf("Unable to list zones: %s", err)
manager.logger.Error(err, "Unable to list zones")
return false
}

Expand All @@ -323,8 +328,8 @@ func (manager *syncerManager) ShutDown() {

// GC garbage collects syncers and NEGs.
func (manager *syncerManager) GC() error {
klog.V(2).Infof("Start NEG garbage collection.")
defer klog.V(2).Infof("NEG garbage collection finished.")
manager.logger.V(2).Info("Start NEG garbage collection.")
defer manager.logger.V(2).Info("NEG garbage collection finished.")
start := time.Now()
// Garbage collect Syncers
manager.garbageCollectSyncer()
Expand Down Expand Up @@ -355,7 +360,7 @@ func (manager *syncerManager) ReadinessGateEnabledNegs(namespace string, podLabe

obj, exists, err := manager.serviceLister.GetByKey(svcKey.Key())
if err != nil {
klog.Errorf("Failed to retrieve service %s from store: %v", svcKey.Key(), err)
manager.logger.Error(err, "Failed to retrieve service from store", "service", svcKey.Key())
continue
}

Expand Down Expand Up @@ -410,7 +415,7 @@ func (manager *syncerManager) ensureDeleteSvcNegCR(namespace, negName string) er
if err = manager.svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(namespace).Delete(context.Background(), negName, metav1.DeleteOptions{}); err != nil {
return fmt.Errorf("errored while deleting neg cr %s/%s: %w", negName, namespace, err)
}
klog.V(2).Infof("Deleted neg cr %s/%s", negName, namespace)
manager.logger.V(2).Info("Deleted neg cr", "svcneg", klog.KRef(namespace, negName))
}
return nil
}
Expand Down Expand Up @@ -438,7 +443,7 @@ func (manager *syncerManager) garbageCollectNEG() error {
for key, neg := range negList {
if key.Type() != meta.Zonal {
// covers the case when key.Zone is not populated
klog.V(4).Infof("Ignoring key %v as it is not zonal", key)
manager.logger.V(4).Info("Ignoring key as it is not zonal", "key", key)
continue
}
if manager.namer.IsNEG(neg.Name) {
Expand Down Expand Up @@ -539,7 +544,7 @@ func (manager *syncerManager) garbageCollectNEGWithCRD() error {
for _, cr := range deletionCandidates {
shouldDeleteNegCR := true
deleteByZone := len(cr.Status.NetworkEndpointGroups) == 0
klog.V(2).Infof("Deletion candidate %s/%s has %d NEG references", cr.Namespace, cr.Name, len(cr.Status.NetworkEndpointGroups))
manager.logger.V(2).Info("Count of NEG references for deletion candidate", "count", len(cr.Status.NetworkEndpointGroups), "svcneg", klog.KObj(cr))
for _, negRef := range cr.Status.NetworkEndpointGroups {
resourceID, err := cloud.ParseResourceURL(negRef.SelfLink)
if err != nil {
Expand All @@ -552,7 +557,7 @@ func (manager *syncerManager) garbageCollectNEGWithCRD() error {
}

if deleteByZone {
klog.V(2).Infof("Deletion candidate %s/%s has 0 NEG reference: %+v", cr.Namespace, cr.Name, cr)
manager.logger.V(2).Info("Deletion candidate has 0 NEG reference", "svcneg", klog.KObj(cr), "cr", cr)
for _, zone := range zones {
shouldDeleteNegCR = shouldDeleteNegCR && deleteNegOrReportErr(cr.Name, zone, cr)
}
Expand All @@ -572,13 +577,13 @@ func (manager *syncerManager) garbageCollectNEGWithCRD() error {
portInfoMap := manager.svcPortMap[svcKey]
for _, portInfo := range portInfoMap {
if portInfo.NegName == cr.Name {
klog.V(2).Infof("NEG CR %s/%s is still desired, skipping deletion", cr.Namespace, cr.Name)
manager.logger.V(2).Info("NEG CR is still desired, skipping deletion", "svcneg", klog.KObj(cr))
return
}
}

klog.V(2).Infof("Deleting NEG CR %s/%s", cr.Namespace, cr.Name)
if err := deleteSvcNegCR(manager.svcNegClient, cr); err != nil {
manager.logger.V(2).Info("Deleting NEG CR", "svcneg", klog.KObj(cr))
if err := deleteSvcNegCR(manager.svcNegClient, cr, manager.logger); err != nil {
errList = append(errList, err)
}
}()
Expand All @@ -592,7 +597,7 @@ func (manager *syncerManager) ensureDeleteNetworkEndpointGroup(name, zone string
neg, err := manager.cloud.GetNetworkEndpointGroup(name, zone, meta.VersionGA)
if err != nil {
if utils.IsNotFoundError(err) || utils.IsHTTPErrorCode(err, http.StatusBadRequest) {
klog.V(2).Infof("Ignoring error when querying for neg %s/%s during GC: %q", name, zone, err)
manager.logger.V(2).Info("Ignoring error when querying for neg during GC", "negName", name, "zone", zone, "err", err)
return nil
}
return err
Expand All @@ -602,16 +607,16 @@ func (manager *syncerManager) ensureDeleteNetworkEndpointGroup(name, zone string
// Controller managed custom named negs will always have a populated description, so do not delete custom named
// negs with empty descriptions.
if !manager.namer.IsNEG(name) && neg.Description == "" {
klog.V(2).Infof("Skipping deletion of Neg %s in %s because name was not generated and empty description", name, zone)
manager.logger.V(2).Info("Skipping deletion of Neg because name was not generated and empty description", "negName", name, "zone", zone)
return nil
}
if matches, err := utils.VerifyDescription(*expectedDesc, neg.Description, name, zone); !matches {
klog.V(2).Infof("Skipping deletion of Neg %s in %s because of conflicting description: %s", name, zone, err)
manager.logger.V(2).Info("Skipping deletion of Neg because of conflicting description", "negName", name, "zone", zone, "err", err)
return nil
}
}

klog.V(2).Infof("Deleting NEG %q in %q.", name, zone)
manager.logger.V(2).Info("Deleting NEG", "negName", name, "zone", zone)
return manager.cloud.DeleteNetworkEndpointGroup(name, zone, meta.VersionGA)
}

Expand All @@ -625,7 +630,7 @@ func (manager *syncerManager) ensureSvcNegCR(svcKey serviceKey, portInfo negtype

obj, exists, err := manager.serviceLister.GetByKey(svcKey.Key())
if err != nil {
klog.Errorf("Failed to retrieve service %s from store: %v", svcKey.Key(), err)
manager.logger.Error(err, "Failed to retrieve service from store", "service", svcKey.Key())
}

if !exists {
Expand Down Expand Up @@ -661,13 +666,13 @@ func (manager *syncerManager) ensureSvcNegCR(svcKey serviceKey, portInfo negtype

// Neg does not exist so create it
_, err = manager.svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(svcKey.namespace).Create(context.Background(), &newCR, metav1.CreateOptions{})
klog.V(2).Infof("Created ServiceNetworkEndpointGroup CR for neg %s/%s", svcKey.namespace, portInfo.NegName)
manager.logger.V(2).Info("Created ServiceNetworkEndpointGroup CR for neg", "svcneg", klog.KRef(svcKey.namespace, portInfo.NegName))
return err
}

needUpdate, err := ensureNegCRLabels(negCR, labels)
needUpdate, err := ensureNegCRLabels(negCR, labels, manager.logger)
if err != nil {
klog.Errorf("failed to ensure labels for neg %s/%s for service %s: %s", negCR.Namespace, negCR.Name, service.Name, err)
manager.logger.Error(err, "failed to ensure labels for neg", "svcneg", klog.KRef(negCR.Namespace, negCR.Name), "service", service.Name)
return err
}
needUpdate = ensureNegCROwnerRef(negCR, newCR.OwnerReferences) || needUpdate
Expand All @@ -679,10 +684,10 @@ func (manager *syncerManager) ensureSvcNegCR(svcKey serviceKey, portInfo negtype
return nil
}

func ensureNegCRLabels(negCR *negv1beta1.ServiceNetworkEndpointGroup, labels map[string]string) (bool, error) {
func ensureNegCRLabels(negCR *negv1beta1.ServiceNetworkEndpointGroup, labels map[string]string, logger klog.Logger) (bool, error) {
needsUpdate := false
existingLabels := negCR.GetLabels()
klog.V(4).Infof("existing neg %s/%s labels: %+v", negCR.Namespace, negCR.Name, existingLabels)
logger.V(4).Info("Ensuring NEG CR labels", "svcneg", klog.KRef(negCR.Namespace, negCR.Name), "existingLabels", existingLabels)

//Check that required labels exist and are matching
for key, value := range labels {
Expand Down Expand Up @@ -711,18 +716,18 @@ func ensureNegCROwnerRef(negCR *negv1beta1.ServiceNetworkEndpointGroup, expected
}

// deleteSvcNegCR will remove finalizers on the given negCR and if deletion timestamp is not set, will delete it as well
func deleteSvcNegCR(svcNegClient svcnegclient.Interface, negCR *negv1beta1.ServiceNetworkEndpointGroup) error {
func deleteSvcNegCR(svcNegClient svcnegclient.Interface, negCR *negv1beta1.ServiceNetworkEndpointGroup, logger klog.Logger) error {
updatedCR := negCR.DeepCopy()
updatedCR.Finalizers = []string{}
if _, err := patchNegStatus(svcNegClient, *negCR, *updatedCR); err != nil {
return err
}

klog.V(2).Infof("Removed finalizer on ServiceNetworkEndpointGroup CR %s/%s", negCR.Namespace, negCR.Name)
logger.V(2).Info("Removed finalizer on ServiceNetworkEndpointGroup CR", "svcneg", klog.KRef(negCR.Namespace, negCR.Name))

// If CR does not have a deletion timestamp, delete
if negCR.GetDeletionTimestamp().IsZero() {
klog.V(2).Infof("Deleting ServiceNetworkEndpointGroup CR %s/%s", negCR.Namespace, negCR.Name)
logger.V(2).Info("Deleting ServiceNetworkEndpointGroup CR", "svcneg", klog.KRef(negCR.Namespace, negCR.Name))
return svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(negCR.Namespace).Delete(context.Background(), negCR.Name, metav1.DeleteOptions{})
}
return nil
Expand Down
2 changes: 2 additions & 0 deletions pkg/neg/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"google.golang.org/api/googleapi"
"k8s.io/ingress-gce/pkg/composite"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/klog/v2"
"k8s.io/legacy-cloud-providers/gce"

apiv1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -92,6 +93,7 @@ func NewTestSyncerManager(kubeClient kubernetes.Interface) (*syncerManager, *gce
testContext.SvcNegInformer.GetIndexer(),
false, //enableNonGcpMode
false, //enableEndpointSlices
klog.TODO(),
)
return manager, testContext.Cloud
}
Expand Down
27 changes: 15 additions & 12 deletions pkg/neg/readiness/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,19 @@ type poller struct {
negCloud negtypes.NetworkEndpointGroupCloud

clock clock.Clock

logger klog.Logger
}

func NewPoller(podLister cache.Indexer, lookup NegLookup, patcher podStatusPatcher, negCloud negtypes.NetworkEndpointGroupCloud) *poller {
func NewPoller(podLister cache.Indexer, lookup NegLookup, patcher podStatusPatcher, negCloud negtypes.NetworkEndpointGroupCloud, logger klog.Logger) *poller {
return &poller{
pollMap: make(map[negMeta]*pollTarget),
podLister: podLister,
lookup: lookup,
patcher: patcher,
negCloud: negCloud,
clock: clock.RealClock{},
logger: logger.WithName("Poller"),
}
}

Expand Down Expand Up @@ -145,12 +148,12 @@ func (p *poller) ScanForWork() []negMeta {
// This function is threadsafe.
func (p *poller) Poll(key negMeta) (retry bool, err error) {
if !p.markPolling(key) {
klog.V(4).Infof("NEG %q in zone %q as is already being polled or no longer needed to be polled.", key.Name, key.Zone)
p.logger.V(4).Info("NEG is already being polled or no longer needed to be polled.", "neg", key.Name, "negZone", key.Zone)
return true, nil
}
defer p.unMarkPolling(key)

klog.V(2).Infof("polling NEG %q in zone %q", key.Name, key.Zone)
p.logger.V(2).Info("polling NEG", "neg", key.Name, "negZone", key.Zone)
// TODO(freehan): filter the NEs that are in interest once the API supports it
res, err := p.negCloud.ListNetworkEndpoints(key.Name, key.Zone /*showHealthStatus*/, true, key.SyncerKey.GetAPIVersion())
if err != nil {
Expand All @@ -160,7 +163,7 @@ func (p *poller) Poll(key negMeta) (retry bool, err error) {
// until the next status poll is executed. However, the pods are not marked as Ready and still passes the LB health check will
// serve LB traffic. The side effect during the delay period is the workload (depending on rollout strategy) might slow down rollout.
// TODO(freehan): enable exponential backoff.
klog.Errorf("Failed to ListNetworkEndpoint in NEG %q, retry in %v", key.String(), retryDelay.String())
p.logger.Error(err, "Failed to ListNetworkEndpoint in NEG. Retrying after some time.", "neg", key.String(), "retryDelay", retryDelay.String())
<-p.clock.After(retryDelay)
return true, err
}
Expand All @@ -177,7 +180,7 @@ func (p *poller) Poll(key negMeta) (retry bool, err error) {
func (p *poller) processHealthStatus(key negMeta, healthStatuses []*composite.NetworkEndpointWithHealthStatus) (bool, error) {
p.lock.Lock()
defer p.lock.Unlock()
klog.V(4).Infof("processHealthStatus(%q, %+v)", key.String(), healthStatuses)
p.logger.V(4).Info("Executing processHealthStatus", "neg", key.String(), "healthStatuses", healthStatuses)

var (
errList []error
Expand All @@ -194,12 +197,12 @@ func (p *poller) processHealthStatus(key negMeta, healthStatuses []*composite.Ne

for _, healthStatus := range healthStatuses {
if healthStatus == nil {
klog.Warningf("healthStatus is nil from response %+v", healthStatuses)
p.logger.Error(nil, "healthStatus is nil from response", "healthStatuses", healthStatuses)
continue
}

if healthStatus.NetworkEndpoint == nil {
klog.Warningf("Health status has nil associated network endpoint: %v", healthStatus)
p.logger.Error(nil, "Health status has nil associated network endpoint", "healthStatus", healthStatus)
continue
}

Expand All @@ -217,7 +220,7 @@ func (p *poller) processHealthStatus(key negMeta, healthStatuses []*composite.Ne
continue
}

bsKey := getHealthyBackendService(healthStatus)
bsKey := getHealthyBackendService(healthStatus, p.logger)
if bsKey == nil {
unhealthyPods = append(unhealthyPods, podName)
continue
Expand Down Expand Up @@ -257,21 +260,21 @@ func (p *poller) processHealthStatus(key negMeta, healthStatuses []*composite.Ne
}

// getHealthyBackendService returns one of the first backend service key where the endpoint is considered healthy.
func getHealthyBackendService(healthStatus *composite.NetworkEndpointWithHealthStatus) *meta.Key {
func getHealthyBackendService(healthStatus *composite.NetworkEndpointWithHealthStatus, logger klog.Logger) *meta.Key {
for _, hs := range healthStatus.Healths {
if hs == nil {
klog.Errorf("Health status is nil in health status of network endpoint %v ", healthStatus)
logger.Error(nil, "Health status is nil in health status of network endpoint", "healthStatus", healthStatus)
continue
}
if hs.BackendService == nil {
klog.Errorf("Backend service is nil in health status of network endpoint %v", healthStatus)
logger.Error(nil, "Backend service is nil in health status of network endpoint", "healthStatus", healthStatus)
continue
}

if hs.HealthState == healthyState {
id, err := cloud.ParseResourceURL(hs.BackendService.BackendService)
if err != nil {
klog.Errorf("Failed to parse backend service reference from a Network Endpoint health status %v: %v", healthStatus, err)
logger.Error(err, "Failed to parse backend service reference from a Network Endpoint health status", "healthStatus", healthStatus)
continue
}
if id != nil {
Expand Down
Loading

0 comments on commit 149e5e1

Please sign in to comment.