Skip to content

Commit

Permalink
Configure contextual logging for util functions in NEG controller.
Browse files Browse the repository at this point in the history
This PR is the continuation of kubernetes#1746(contextual logging for components
in NEG controller).
* Functions will accept a logger object from its caller, so the prefix
  will be determined based on the caller objects.
  • Loading branch information
sawsa307 committed Oct 28, 2023
1 parent a2dd40c commit 495db5a
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 60 deletions.
2 changes: 1 addition & 1 deletion pkg/neg/readiness/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (p *poller) RegisterNegEndpoints(key negMeta, endpointMap negtypes.Endpoint
// It returns false if there is no endpoints needed to be polled, returns true if otherwise.
// Assumes p.lock is held when calling this method.
func (p *poller) registerNegEndpoints(key negMeta, endpointMap negtypes.EndpointPodMap) bool {
endpointsToPoll := needToPoll(key.SyncerKey, endpointMap, p.lookup, p.podLister)
endpointsToPoll := needToPoll(key.SyncerKey, endpointMap, p.lookup, p.podLister, p.logger)
if len(endpointsToPoll) == 0 {
delete(p.pollMap, key)
return false
Expand Down
8 changes: 4 additions & 4 deletions pkg/neg/readiness/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,20 +126,20 @@ func preparePatchBytesforPodStatus(oldPodStatus, newPodStatus v1.PodStatus) ([]b
// 2. the pod exists
// 3. the pod has neg readiness gate
// 4. the pod's neg readiness condition is not True
func needToPoll(syncerKey negtypes.NegSyncerKey, endpointMap negtypes.EndpointPodMap, lookup NegLookup, podLister cache.Indexer) negtypes.EndpointPodMap {
func needToPoll(syncerKey negtypes.NegSyncerKey, endpointMap negtypes.EndpointPodMap, lookup NegLookup, podLister cache.Indexer, logger klog.Logger) negtypes.EndpointPodMap {
if !lookup.ReadinessGateEnabled(syncerKey) {
return negtypes.EndpointPodMap{}
}
removeIrrelevantEndpoints(endpointMap, podLister)
removeIrrelevantEndpoints(endpointMap, podLister, logger)
return endpointMap
}

// removeIrrelevantEndpoints will filter out the endpoints that does not need health status polling from the input endpoint map
func removeIrrelevantEndpoints(endpointMap negtypes.EndpointPodMap, podLister cache.Indexer) {
func removeIrrelevantEndpoints(endpointMap negtypes.EndpointPodMap, podLister cache.Indexer, logger klog.Logger) {
for endpoint, namespacedName := range endpointMap {
pod, exists, err := getPodFromStore(podLister, namespacedName.Namespace, namespacedName.Name)
if err != nil {
klog.Warningf("Failed to retrieve pod %q from store: %v", namespacedName.String(), err)
logger.Error(err, "Failed to retrieve pod from store", "pod", namespacedName.String())
metrics.PublishNegControllerErrorCountMetrics(err, true)
}
if err == nil && exists && needToProcess(pod) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/neg/readiness/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/client-go/kubernetes/fake"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/ingress-gce/pkg/neg/types/shared"
"k8s.io/klog/v2"
)

const (
Expand Down Expand Up @@ -566,7 +567,7 @@ func TestNeedToPoll(t *testing.T) {
},
} {
tc.mutateState()
ret := needToPoll(key, tc.inputMap, fakeLookUp, podLister)
ret := needToPoll(key, tc.inputMap, fakeLookUp, podLister, klog.TODO())
if !reflect.DeepEqual(ret, tc.expectOutputMap) {
t.Errorf("For test case %q, expect %v, got: %v", tc.desc, tc.expectOutputMap, ret)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/neg/syncers/endpoints_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (l *L7EndpointsCalculator) Mode() types.EndpointsCalculatorMode {

// CalculateEndpoints determines the endpoints in the NEGs based on the current service endpoints and the current NEGs.
func (l *L7EndpointsCalculator) CalculateEndpoints(eds []types.EndpointsData, _ map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, int, error) {
result, err := toZoneNetworkEndpointMap(eds, l.zoneGetter, l.podLister, l.servicePortName, l.networkEndpointType, l.enableDualStackNEG)
result, err := toZoneNetworkEndpointMap(eds, l.zoneGetter, l.podLister, l.servicePortName, l.networkEndpointType, l.enableDualStackNEG, l.logger)
if err == nil { // If current calculation ends up in error, we trigger and emit metrics in degraded mode.
l.syncMetricsCollector.UpdateSyncerEPMetrics(l.syncerKey, result.EPCount, result.EPSCount)
}
Expand All @@ -248,7 +248,7 @@ func (l *L7EndpointsCalculator) CalculateEndpoints(eds []types.EndpointsData, _

// CalculateEndpoints determines the endpoints in the NEGs based on the current service endpoints and the current NEGs.
func (l *L7EndpointsCalculator) CalculateEndpointsDegradedMode(eds []types.EndpointsData, _ map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, error) {
result := toZoneNetworkEndpointMapDegradedMode(eds, l.zoneGetter, l.podLister, l.nodeLister, l.serviceLister, l.servicePortName, l.networkEndpointType, l.enableDualStackNEG)
result := toZoneNetworkEndpointMapDegradedMode(eds, l.zoneGetter, l.podLister, l.nodeLister, l.serviceLister, l.servicePortName, l.networkEndpointType, l.enableDualStackNEG, l.logger)
l.syncMetricsCollector.UpdateSyncerEPMetrics(l.syncerKey, result.EPCount, result.EPSCount)
return result.NetworkEndpointSet, result.EndpointPodMap, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/neg/syncers/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (s *syncer) Start() error {
retryMsg = "(will retry)"
}

if svc := getService(s.serviceLister, s.Namespace, s.Name); svc != nil {
if svc := getService(s.serviceLister, s.Namespace, s.Name, s.logger); svc != nil {
s.recorder.Eventf(svc, apiv1.EventTypeWarning, "SyncNetworkEndpointGroupFailed", "Failed to sync NEG %q %s: %v", s.NegSyncerKey.NegName, retryMsg, err)
}
} else {
Expand Down
7 changes: 4 additions & 3 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func (s *transactionSyncer) syncInternalImpl() error {
}
s.logger.V(2).Info("Sync NEG", "negSyncerKey", s.NegSyncerKey.String(), "endpointsCalculatorMode", s.endpointsCalculator.Mode())

currentMap, currentPodLabelMap, err := retrieveExistingZoneNetworkEndpointMap(s.NegSyncerKey.NegName, s.zoneGetter, s.cloud, s.NegSyncerKey.GetAPIVersion(), s.endpointsCalculator.Mode(), s.enableDualStackNEG)
currentMap, currentPodLabelMap, err := retrieveExistingZoneNetworkEndpointMap(s.NegSyncerKey.NegName, s.zoneGetter, s.cloud, s.NegSyncerKey.GetAPIVersion(), s.endpointsCalculator.Mode(), s.enableDualStackNEG, s.logger)
if err != nil {
return fmt.Errorf("%w: %w", negtypes.ErrCurrentNegEPNotFound, err)
}
Expand Down Expand Up @@ -409,6 +409,7 @@ func (s *transactionSyncer) ensureNetworkEndpointGroups() error {
s.NegSyncerKey.GetAPIVersion(),
s.customName,
s.networkInfo,
s.logger,
)
if err != nil {
errList = append(errList, err)
Expand All @@ -435,7 +436,7 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m
continue
}

batch, err := makeEndpointBatch(endpointSet, s.NegType, endpointPodLabelMap)
batch, err := makeEndpointBatch(endpointSet, s.NegType, endpointPodLabelMap, s.logger)
if err != nil {
return err
}
Expand Down Expand Up @@ -562,7 +563,7 @@ func checkEndpointBatchErr(err error, operation transactionOp) error {
}

func (s *transactionSyncer) recordEvent(eventType, reason, eventDesc string) {
if svc := getService(s.serviceLister, s.Namespace, s.Name); svc != nil {
if svc := getService(s.serviceLister, s.Namespace, s.Name, s.logger); svc != nil {
s.recorder.Eventf(svc, eventType, reason, eventDesc)
}
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/neg/syncers/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1540,7 +1540,7 @@ func TestUnknownNodes(t *testing.T) {
}

// Check that unknown zone did not cause endpoints to be removed
out, _, err := retrieveExistingZoneNetworkEndpointMap(testNegName, zoneGetter, fakeCloud, meta.VersionGA, negtypes.L7Mode, false)
out, _, err := retrieveExistingZoneNetworkEndpointMap(testNegName, zoneGetter, fakeCloud, meta.VersionGA, negtypes.L7Mode, false, klog.TODO())
if err != nil {
t.Errorf("errored retrieving existing network endpoints")
}
Expand Down Expand Up @@ -1835,7 +1835,7 @@ func TestEnableDegradedMode(t *testing.T) {
(s.syncer.(*syncer)).stopped = false
tc.modify(s)

out, _, err := retrieveExistingZoneNetworkEndpointMap(tc.negName, zoneGetter, fakeCloud, meta.VersionGA, negtypes.L7Mode, false)
out, _, err := retrieveExistingZoneNetworkEndpointMap(tc.negName, zoneGetter, fakeCloud, meta.VersionGA, negtypes.L7Mode, false, klog.TODO())
if err != nil {
t.Errorf("errored retrieving existing network endpoints")
}
Expand All @@ -1848,7 +1848,7 @@ func TestEnableDegradedMode(t *testing.T) {
t.Errorf("syncInternal returned %v, expected %v", err, tc.expectErr)
}
err = wait.PollImmediate(time.Second, 3*time.Second, func() (bool, error) {
out, _, err = retrieveExistingZoneNetworkEndpointMap(tc.negName, zoneGetter, fakeCloud, meta.VersionGA, negtypes.L7Mode, false)
out, _, err = retrieveExistingZoneNetworkEndpointMap(tc.negName, zoneGetter, fakeCloud, meta.VersionGA, negtypes.L7Mode, false, klog.TODO())
if err != nil {
return false, nil
}
Expand Down Expand Up @@ -2258,7 +2258,7 @@ func unionEndpointMap(m1, m2 negtypes.EndpointPodMap) negtypes.EndpointPodMap {
}

func generateEndpointBatch(endpointSet negtypes.NetworkEndpointSet, endpointPodLabelMap labels.EndpointPodLabelMap) map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint {
ret, _ := makeEndpointBatch(endpointSet, negtypes.VmIpPortEndpointType, endpointPodLabelMap)
ret, _ := makeEndpointBatch(endpointSet, negtypes.VmIpPortEndpointType, endpointPodLabelMap, klog.TODO())
return ret
}

Expand Down
Loading

0 comments on commit 495db5a

Please sign in to comment.