Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workers Not Leasing All Shards #14

Open
calebstewart opened this issue Oct 17, 2022 · 0 comments
Open

Workers Not Leasing All Shards #14

calebstewart opened this issue Oct 17, 2022 · 0 comments
Labels
bug Something isn't working

Comments

@calebstewart
Copy link
Contributor

calebstewart commented Oct 17, 2022

Describe the bug

The current worker implementation will only grab a lease for a single shard per shard sync interval. This seems like a bug. Based on the commit message (e2a945d), the intent was to "prevent on host tak[ing] more shard[s] than it's configuration allowed". However, the result is that only a single shard is leased per interval. This causes start up times to balloon as more shards are introduced.

Reproduction steps

A basic worker on a stream with multiple shards exhibits this behavior:

package main

import (
  "os"
  "fmt"
  "os/signal"

  "github.com/vmware/vmware-go-kcl-v2/clientlibrary/config"
  "github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces"
  "github.com/vmware/vmware-go-kcl-v2/clientlibrary/worker"
)

type RecordProcessor struct {}
type RecordProcessorFactory struct {}

func (f *RecordProcessorFactory) CreateProcessor() interfaces.IRecordProcessor {
  return &RecordProcessor{}
}

func (p *RecordProcessor) ProcessRecords(input *interfaces.ProcessRecordsInput) {}
func (p *RecordProcessor) Initialize(input *interfaces.InitializationInput) {}
func (p *RecordProcessor) Shutdown(input *interfaces.ShutdownInput) {}

func main() {

  // Separately, I have no idea why, but the library seems incapable of figuring out the
  // Kinesis service endpoint on it's own. Not specifying it manually results in errors
  // where it seemingly is trying to use an empty string as a service endpoint, but that's
  // probably a problem for a separate issue.
  cfg := config.NewKinesisClientLibConfig("test", "caleb-testing", "us-east-2", "worker")
  cfg.KinesisEndpoint = "https://kinesis.us-east-2.amazonaws.com"
  kcl := worker.NewWorker(&RecordProcessorFactory{}, cfg)

  if err := kcl.Start(); err != nil {
    fmt.Printf("[!] failed to start kcl worker: %v\n", err)
    return
  }
  defer kcl.Shutdown()

  signals := make(chan os.Signal, 1)
  signal.Notify(signals, os.Interrupt, os.Kill)
  for range signals {
    break
  }

  return
}

Expected behavior

A worker should lease as many shards as it can up to MaxLeasesPerWorker on every shard sync.

Additional context

I believe the solution is to refactor the lease loop (ref) to look something like this:

// max number of lease has not been reached yet
for _, shard := range w.shardStatus {
  // Don't take out work leases than allowed
  if counter >= w.kclConfig.MaxLeasesForWorker {
    break
  }

  // already owner of the shard
  if shard.GetLeaseOwner() == w.workerID {
    continue
  }

  err := w.checkpointer.FetchCheckpoint(shard)
  if err != nil {
    // checkpoint may not exist yet is not an error condition.
    if err != chk.ErrSequenceIDNotFound {
      log.Warnf("Couldn't fetch checkpoint: %+v", err)
      // move on to next shard
      continue
    }
  }

  // The shard is closed and we have processed all records
  if shard.GetCheckpoint() == chk.ShardEnd {
    continue
  }

  var stealShard bool
  if w.kclConfig.EnableLeaseStealing && shard.ClaimRequest != "" {
    upcomingStealingInterval := time.Now().UTC().Add(time.Duration(w.kclConfig.LeaseStealingIntervalMillis) * time.Millisecond)
    if shard.GetLeaseTimeout().Before(upcomingStealingInterval) && !shard.IsClaimRequestExpired(w.kclConfig) {
      if shard.ClaimRequest == w.workerID {
        stealShard = true
        log.Debugf("Stealing shard: %s", shard.ID)
      } else {
        log.Debugf("Shard being stolen: %s", shard.ID)
        continue
      }
    }
  }

  err = w.checkpointer.GetLease(shard, w.workerID)
  if err != nil {
    // cannot get lease on the shard
    if !errors.As(err, &chk.ErrLeaseNotAcquired{}) {
      log.Errorf("Cannot get lease: %+v", err)
    }
    continue
  }

  if stealShard {
    log.Debugf("Successfully stole shard: %+v", shard.ID)
    w.shardStealInProgress = false
  }

  // log metrics on got lease
  w.mService.LeaseGained(shard.ID)
  w.waitGroup.Add(1)
  go func(shard *par.ShardStatus) {
    defer w.waitGroup.Done()
    if err := w.newShardConsumer(shard).getRecords(); err != nil {
      log.Errorf("Error in getRecords: %+v", err)
    }
  }(shard)

  // Increase the number of leases we have
  counter++
}
@calebstewart calebstewart added the bug Something isn't working label Oct 17, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant