Skip to content

Commit dccc3e5

Browse files
Merge pull request #133690 from pohly/log-client-go-leaderelection
client-go leaderelection: structured, contextual logging Kubernetes-commit: b9c467483e1db90b7aca125c98827f8553cc635b
2 parents d327527 + b650194 commit dccc3e5

File tree

6 files changed

+69
-44
lines changed

6 files changed

+69
-44
lines changed

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ require (
2525
golang.org/x/time v0.9.0
2626
google.golang.org/protobuf v1.36.6
2727
gopkg.in/evanphx/json-patch.v4 v4.13.0
28-
k8s.io/api v0.0.0-20250903042103-a2a241f99d37
29-
k8s.io/apimachinery v0.0.0-20250903081746-f33167494573
28+
k8s.io/api v0.0.0-20250903202746-44a3d73239dc
29+
k8s.io/apimachinery v0.0.0-20250905080136-9357001b8e78
3030
k8s.io/klog/v2 v2.130.1
3131
k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b
3232
k8s.io/utils v0.0.0-20250604170112-4c0f3b243397

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -148,10 +148,10 @@ gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
148148
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
149149
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
150150
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
151-
k8s.io/api v0.0.0-20250903042103-a2a241f99d37 h1:ybz9dJ1Dy4rA1hD775Tjo5CGmdC2enFcMkn5gIPrx4g=
152-
k8s.io/api v0.0.0-20250903042103-a2a241f99d37/go.mod h1:MUZMQv8g8lHQb2eyO/Uo7B5NN4E09dlElJ9ls7J0YSM=
153-
k8s.io/apimachinery v0.0.0-20250903081746-f33167494573 h1:U4QezBSIuWaOR+NlUE1Ff+OkvDiDfZz4Ja/zzGOWCz0=
154-
k8s.io/apimachinery v0.0.0-20250903081746-f33167494573/go.mod h1:7OLTj40s+nlro63xSAmcBvG2yQchJEdd+Lq2NGRhUlY=
151+
k8s.io/api v0.0.0-20250903202746-44a3d73239dc h1:le+wI707pUGjUdfzccngxtprX9a31BMlByOHc5esFT0=
152+
k8s.io/api v0.0.0-20250903202746-44a3d73239dc/go.mod h1:YbxtQgtmYfY4/flhPDKpBWcJFaSIw00EKR5wrJ/yeb0=
153+
k8s.io/apimachinery v0.0.0-20250905080136-9357001b8e78 h1:bKP+Ktqy2mMYabMMvsHrfo+vcUtvoP3kg+KEJGkohm4=
154+
k8s.io/apimachinery v0.0.0-20250905080136-9357001b8e78/go.mod h1:7OLTj40s+nlro63xSAmcBvG2yQchJEdd+Lq2NGRhUlY=
155155
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=
156156
k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
157157
k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b h1:MloQ9/bdJyIu9lb1PzujOPolHyvO06MXG5TUIj2mNAA=

tools/leaderelection/leaderelection.go

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ type LeaderElector struct {
209209
// before leader election loop is stopped by ctx or it has
210210
// stopped holding the leader lease
211211
func (le *LeaderElector) Run(ctx context.Context) {
212-
defer runtime.HandleCrash()
212+
defer runtime.HandleCrashWithContext(ctx)
213213
defer le.config.Callbacks.OnStoppedLeading()
214214

215215
if !le.acquire(ctx) {
@@ -254,7 +254,8 @@ func (le *LeaderElector) acquire(ctx context.Context) bool {
254254
defer cancel()
255255
succeeded := false
256256
desc := le.config.Lock.Describe()
257-
klog.Infof("attempting to acquire leader lease %v...", desc)
257+
logger := klog.FromContext(ctx)
258+
logger.Info("Attempting to acquire leader lease...", "lock", desc)
258259
wait.JitterUntil(func() {
259260
if !le.config.Coordinated {
260261
succeeded = le.tryAcquireOrRenew(ctx)
@@ -263,12 +264,12 @@ func (le *LeaderElector) acquire(ctx context.Context) bool {
263264
}
264265
le.maybeReportTransition()
265266
if !succeeded {
266-
klog.V(4).Infof("failed to acquire lease %v", desc)
267+
logger.V(4).Info("Failed to acquire lease", "lock", desc)
267268
return
268269
}
269270
le.config.Lock.RecordEvent("became leader")
270271
le.metrics.leaderOn(le.config.Name)
271-
klog.Infof("successfully acquired lease %v", desc)
272+
logger.Info("Successfully acquired lease", "lock", desc)
272273
cancel()
273274
}, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
274275
return succeeded
@@ -279,6 +280,7 @@ func (le *LeaderElector) renew(ctx context.Context) {
279280
defer le.config.Lock.RecordEvent("stopped leading")
280281
ctx, cancel := context.WithCancel(ctx)
281282
defer cancel()
283+
logger := klog.FromContext(ctx)
282284
wait.Until(func() {
283285
err := wait.PollUntilContextTimeout(ctx, le.config.RetryPeriod, le.config.RenewDeadline, true, func(ctx context.Context) (done bool, err error) {
284286
if !le.config.Coordinated {
@@ -290,33 +292,33 @@ func (le *LeaderElector) renew(ctx context.Context) {
290292
le.maybeReportTransition()
291293
desc := le.config.Lock.Describe()
292294
if err == nil {
293-
klog.V(5).Infof("successfully renewed lease %v", desc)
295+
logger.V(5).Info("Successfully renewed lease", "lock", desc)
294296
return
295297
}
296298
le.metrics.leaderOff(le.config.Name)
297-
klog.Infof("failed to renew lease %v: %v", desc, err)
299+
logger.Info("Failed to renew lease", "lock", desc, "err", err)
298300
cancel()
299301
}, le.config.RetryPeriod, ctx.Done())
300302

301303
// if we hold the lease, give it up
302304
if le.config.ReleaseOnCancel {
303-
le.release()
305+
le.release(logger)
304306
}
305307
}
306308

307309
// release attempts to release the leader lease if we have acquired it.
308-
func (le *LeaderElector) release() bool {
310+
func (le *LeaderElector) release(logger klog.Logger) bool {
309311
ctx := context.Background()
310312
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
311313
defer timeoutCancel()
312314
// update the resourceVersion of lease
313315
oldLeaderElectionRecord, _, err := le.config.Lock.Get(timeoutCtx)
314316
if err != nil {
315317
if !errors.IsNotFound(err) {
316-
klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
318+
logger.Error(err, "error retrieving resource lock", "lock", le.config.Lock.Describe())
317319
return false
318320
}
319-
klog.Infof("lease lock not found: %v", le.config.Lock.Describe())
321+
logger.Info("lease lock not found", "lock", le.config.Lock.Describe())
320322
return false
321323
}
322324

@@ -331,7 +333,7 @@ func (le *LeaderElector) release() bool {
331333
AcquireTime: now,
332334
}
333335
if err := le.config.Lock.Update(timeoutCtx, leaderElectionRecord); err != nil {
334-
klog.Errorf("Failed to release lock: %v", err)
336+
logger.Error(err, "Failed to release lease", "lock", le.config.Lock.Describe())
335337
return false
336338
}
337339

@@ -343,6 +345,7 @@ func (le *LeaderElector) release() bool {
343345
// lease if it has already been acquired. Returns true on success else returns
344346
// false.
345347
func (le *LeaderElector) tryCoordinatedRenew(ctx context.Context) bool {
348+
logger := klog.FromContext(ctx)
346349
now := metav1.NewTime(le.clock.Now())
347350
leaderElectionRecord := rl.LeaderElectionRecord{
348351
HolderIdentity: le.config.Lock.Identity(),
@@ -355,10 +358,10 @@ func (le *LeaderElector) tryCoordinatedRenew(ctx context.Context) bool {
355358
oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
356359
if err != nil {
357360
if !errors.IsNotFound(err) {
358-
klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
361+
logger.Error(err, "Error retrieving lease lock", "lock", le.config.Lock.Describe())
359362
return false
360363
}
361-
klog.Infof("lease lock not found: %v", le.config.Lock.Describe())
364+
logger.Info("Lease lock not found", "lock", le.config.Lock.Describe(), "err", err)
362365
return false
363366
}
364367

@@ -371,18 +374,18 @@ func (le *LeaderElector) tryCoordinatedRenew(ctx context.Context) bool {
371374

372375
hasExpired := le.observedTime.Add(time.Second * time.Duration(oldLeaderElectionRecord.LeaseDurationSeconds)).Before(now.Time)
373376
if hasExpired {
374-
klog.Infof("lock has expired: %v", le.config.Lock.Describe())
377+
logger.Info("Lease has expired", "lock", le.config.Lock.Describe())
375378
return false
376379
}
377380

378381
if !le.IsLeader() {
379-
klog.V(6).Infof("lock is held by %v and has not yet expired: %v", oldLeaderElectionRecord.HolderIdentity, le.config.Lock.Describe())
382+
logger.V(6).Info("Lease is held and has not yet expired", "lock", le.config.Lock.Describe(), "holder", oldLeaderElectionRecord.HolderIdentity)
380383
return false
381384
}
382385

383386
// 2b. If the lease has been marked as "end of term", don't renew it
384387
if le.IsLeader() && oldLeaderElectionRecord.PreferredHolder != "" {
385-
klog.V(4).Infof("lock is marked as 'end of term': %v", le.config.Lock.Describe())
388+
logger.V(4).Info("Lease is marked as 'end of term'", "lock", le.config.Lock.Describe())
386389
// TODO: Instead of letting lease expire, the holder may deleted it directly
387390
// This will not be compatible with all controllers, so it needs to be opt-in behavior.
388391
// We must ensure all code guarded by this lease has successfully completed
@@ -406,7 +409,7 @@ func (le *LeaderElector) tryCoordinatedRenew(ctx context.Context) bool {
406409

407410
// update the lock itself
408411
if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
409-
klog.Errorf("Failed to update lock: %v", err)
412+
logger.Error(err, "Failed to update lock", "lock", le.config.Lock.Describe())
410413
return false
411414
}
412415

@@ -418,6 +421,7 @@ func (le *LeaderElector) tryCoordinatedRenew(ctx context.Context) bool {
418421
// else it tries to renew the lease if it has already been acquired. Returns true
419422
// on success else returns false.
420423
func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
424+
logger := klog.FromContext(ctx)
421425
now := metav1.NewTime(le.clock.Now())
422426
leaderElectionRecord := rl.LeaderElectionRecord{
423427
HolderIdentity: le.config.Lock.Identity(),
@@ -438,18 +442,18 @@ func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
438442
le.setObservedRecord(&leaderElectionRecord)
439443
return true
440444
}
441-
klog.Errorf("Failed to update lock optimistically: %v, falling back to slow path", err)
445+
logger.Error(err, "Failed to update lease optimistically, falling back to slow path", "lock", le.config.Lock.Describe())
442446
}
443447

444448
// 2. obtain or create the ElectionRecord
445449
oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
446450
if err != nil {
447451
if !errors.IsNotFound(err) {
448-
klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
452+
logger.Error(err, "Error retrieving lease lock", "lock", le.config.Lock.Describe())
449453
return false
450454
}
451455
if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
452-
klog.Errorf("error initially creating leader election record: %v", err)
456+
logger.Error(err, "Error initially creating lease lock", "lock", le.config.Lock.Describe())
453457
return false
454458
}
455459

@@ -465,7 +469,7 @@ func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
465469
le.observedRawRecord = oldLeaderElectionRawRecord
466470
}
467471
if len(oldLeaderElectionRecord.HolderIdentity) > 0 && le.isLeaseValid(now.Time) && !le.IsLeader() {
468-
klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
472+
logger.V(4).Info("Lease is held by and has not yet expired", "lock", le.config.Lock.Describe(), "holder", oldLeaderElectionRecord.HolderIdentity)
469473
return false
470474
}
471475

@@ -481,7 +485,7 @@ func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
481485

482486
// update the lock itself
483487
if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
484-
klog.Errorf("Failed to update lock: %v", err)
488+
logger.Error(err, "Failed to update lease", "lock", le.config.Lock.Describe())
485489
return false
486490
}
487491

tools/leaderelection/leaderelection_test.go

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
fakeclient "k8s.io/client-go/testing"
3838
rl "k8s.io/client-go/tools/leaderelection/resourcelock"
3939
"k8s.io/client-go/tools/record"
40+
"k8s.io/klog/v2/ktesting"
4041
"k8s.io/utils/clock"
4142
)
4243

@@ -265,6 +266,8 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
265266
for i := range tests {
266267
test := &tests[i]
267268
t.Run(test.name, func(t *testing.T) {
269+
_, ctx := ktesting.NewTestContext(t)
270+
268271
// OnNewLeader is called async so we have to wait for it.
269272
var wg sync.WaitGroup
270273
wg.Add(1)
@@ -316,10 +319,10 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
316319
clock: clock,
317320
metrics: globalMetricsFactory.newLeaderMetrics(),
318321
}
319-
if test.expectSuccess != le.tryAcquireOrRenew(context.Background()) {
322+
if test.expectSuccess != le.tryAcquireOrRenew(ctx) {
320323
if test.retryAfter != 0 {
321324
time.Sleep(test.retryAfter)
322-
if test.expectSuccess != le.tryAcquireOrRenew(context.Background()) {
325+
if test.expectSuccess != le.tryAcquireOrRenew(ctx) {
323326
t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess)
324327
}
325328
} else {
@@ -411,6 +414,8 @@ func TestTryCoordinatedRenew(t *testing.T) {
411414
for i := range tests {
412415
test := &tests[i]
413416
t.Run(test.name, func(t *testing.T) {
417+
_, ctx := ktesting.NewTestContext(t)
418+
414419
// OnNewLeader is called async so we have to wait for it.
415420
var wg sync.WaitGroup
416421
wg.Add(1)
@@ -457,10 +462,10 @@ func TestTryCoordinatedRenew(t *testing.T) {
457462
clock: clock,
458463
metrics: globalMetricsFactory.newLeaderMetrics(),
459464
}
460-
if test.expectSuccess != le.tryCoordinatedRenew(context.Background()) {
465+
if test.expectSuccess != le.tryCoordinatedRenew(ctx) {
461466
if test.retryAfter != 0 {
462467
time.Sleep(test.retryAfter)
463-
if test.expectSuccess != le.tryCoordinatedRenew(context.Background()) {
468+
if test.expectSuccess != le.tryCoordinatedRenew(ctx) {
464469
t.Errorf("unexpected result of tryCoordinatedRenew: [succeeded=%v]", !test.expectSuccess)
465470
}
466471
} else {
@@ -590,6 +595,8 @@ func testReleaseLease(t *testing.T, objectType string) {
590595
for i := range tests {
591596
test := &tests[i]
592597
t.Run(test.name, func(t *testing.T) {
598+
logger, ctx := ktesting.NewTestContext(t)
599+
593600
// OnNewLeader is called async so we have to wait for it.
594601
var wg sync.WaitGroup
595602
wg.Add(1)
@@ -641,7 +648,7 @@ func testReleaseLease(t *testing.T, objectType string) {
641648
clock: clock.RealClock{},
642649
metrics: globalMetricsFactory.newLeaderMetrics(),
643650
}
644-
if !le.tryAcquireOrRenew(context.Background()) {
651+
if !le.tryAcquireOrRenew(ctx) {
645652
t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", true)
646653
}
647654

@@ -651,7 +658,7 @@ func testReleaseLease(t *testing.T, objectType string) {
651658
wg.Wait()
652659
wg.Add(1)
653660

654-
if test.expectSuccess != le.release() {
661+
if test.expectSuccess != le.release(logger) {
655662
t.Errorf("unexpected result of release: [succeeded=%v]", !test.expectSuccess)
656663
}
657664

@@ -686,6 +693,7 @@ func TestReleaseLeaseLeases(t *testing.T) {
686693

687694
// TestReleaseMethodCallsGet test release method calls Get
688695
func TestReleaseMethodCallsGet(t *testing.T) {
696+
logger, _ := ktesting.NewTestContext(t)
689697
objectType := "leases"
690698
getCalled := false
691699

@@ -730,7 +738,7 @@ func TestReleaseMethodCallsGet(t *testing.T) {
730738
metrics: globalMetricsFactory.newLeaderMetrics(),
731739
}
732740

733-
le.release()
741+
le.release(logger)
734742

735743
if !getCalled {
736744
t.Errorf("release method does not call Get")
@@ -903,6 +911,8 @@ func testReleaseOnCancellation(t *testing.T, objectType string) {
903911
for i := range tests {
904912
test := &tests[i]
905913
t.Run(test.name, func(t *testing.T) {
914+
_, ctx := ktesting.NewTestContext(t)
915+
906916
wg.Add(1)
907917
resetVars()
908918

@@ -930,7 +940,7 @@ func testReleaseOnCancellation(t *testing.T, objectType string) {
930940
t.Fatal("Failed to create leader elector: ", err)
931941
}
932942

933-
ctx, cancel := context.WithCancel(context.Background())
943+
ctx, cancel := context.WithCancel(ctx)
934944

935945
go elector.Run(ctx)
936946

@@ -1144,6 +1154,8 @@ func TestFastPathLeaderElection(t *testing.T) {
11441154
for i := range tests {
11451155
test := &tests[i]
11461156
t.Run(test.name, func(t *testing.T) {
1157+
_, ctx := ktesting.NewTestContext(t)
1158+
11471159
resetVars()
11481160

11491161
recorder := record.NewFakeRecorder(100)
@@ -1170,7 +1182,7 @@ func TestFastPathLeaderElection(t *testing.T) {
11701182
t.Fatal("Failed to create leader elector: ", err)
11711183
}
11721184

1173-
ctx, cancel := context.WithCancel(context.Background())
1185+
ctx, cancel := context.WithCancel(ctx)
11741186
cancelFunc = cancel
11751187

11761188
elector.Run(ctx)

tools/leaderelection/leasecandidate.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,12 @@ func NewCandidate(clientset kubernetes.Interface,
120120
func (c *LeaseCandidate) Run(ctx context.Context) {
121121
defer c.queue.ShutDown()
122122

123+
logger := klog.FromContext(ctx)
124+
logger = klog.LoggerWithName(logger, "leasecandidate")
125+
ctx = klog.NewContext(ctx, logger)
126+
123127
c.informerFactory.Start(ctx.Done())
124-
if !cache.WaitForNamedCacheSync("leasecandidateclient", ctx.Done(), c.hasSynced) {
128+
if !cache.WaitForNamedCacheSyncWithContext(ctx, c.hasSynced) {
125129
return
126130
}
127131

@@ -148,7 +152,7 @@ func (c *LeaseCandidate) processNextWorkItem(ctx context.Context) bool {
148152
return true
149153
}
150154

151-
utilruntime.HandleError(err)
155+
utilruntime.HandleErrorWithContext(ctx, err, "Ensuring lease failed")
152156
c.queue.AddRateLimited(key)
153157

154158
return true
@@ -161,20 +165,21 @@ func (c *LeaseCandidate) enqueueLease() {
161165
// ensureLease creates the lease if it does not exist and renew it if it exists. Returns the lease and
162166
// a bool (true if this call created the lease), or any error that occurs.
163167
func (c *LeaseCandidate) ensureLease(ctx context.Context) error {
168+
logger := klog.FromContext(ctx)
164169
lease, err := c.leaseClient.Get(ctx, c.name, metav1.GetOptions{})
165170
if apierrors.IsNotFound(err) {
166-
klog.V(2).Infof("Creating lease candidate")
171+
logger.V(2).Info("Creating lease candidate")
167172
// lease does not exist, create it.
168173
leaseToCreate := c.newLeaseCandidate()
169174
if _, err := c.leaseClient.Create(ctx, leaseToCreate, metav1.CreateOptions{}); err != nil {
170175
return err
171176
}
172-
klog.V(2).Infof("Created lease candidate")
177+
logger.V(2).Info("Created lease candidate")
173178
return nil
174179
} else if err != nil {
175180
return err
176181
}
177-
klog.V(2).Infof("lease candidate exists. Renewing.")
182+
logger.V(2).Info("Lease candidate exists. Renewing.")
178183
clone := lease.DeepCopy()
179184
clone.Spec.RenewTime = &metav1.MicroTime{Time: c.clock.Now()}
180185
_, err = c.leaseClient.Update(ctx, clone, metav1.UpdateOptions{})

0 commit comments

Comments
 (0)