Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query: fix retry query case. #297

Merged
merged 2 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions query/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ var (
// queryJob is the internal struct that wraps the Query to work on, in
// addition to some information about the query.
type queryJob struct {
tries uint8
index uint64
timeout time.Duration
encoding wire.MessageEncoding
cancelChan <-chan struct{}
tries uint8
index uint64
timeout time.Duration
encoding wire.MessageEncoding
cancelChan <-chan struct{}
internalCancelChan <-chan struct{}
*Request
}

Expand Down Expand Up @@ -128,6 +129,16 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) {
// result will be sent back.
break

case <-job.internalCancelChan:
log.Tracef("Worker %v found job with index %v "+
"already internally canceled (batch timed out)",
peer.Addr(), job.Index())

// We break to the below loop, where we'll check the
// internal cancel channel again and the ErrJobCanceled
// result will be sent back.
break

// We received a non-canceled query job, send it to the peer.
default:
log.Tracef("Worker %v queuing job %T with index %v",
Expand Down Expand Up @@ -214,6 +225,13 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) {
jobErr = ErrJobCanceled
break Loop

case <-job.internalCancelChan:
log.Tracef("Worker %v job %v internally "+
"canceled", peer.Addr(), job.Index())

jobErr = ErrJobCanceled
break Loop

case <-quit:
return
}
Expand Down
127 changes: 76 additions & 51 deletions query/workmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ func (w *peerWorkManager) workDispatcher() {
timeout <-chan time.Time
rem int
errChan chan error
cancelChan chan struct{}
}

// We set up a batch index counter to keep track of batches that still
Expand Down Expand Up @@ -309,7 +310,20 @@ Loop:
// turns out to be an error.
batchNum := currentQueries[result.job.index]
delete(currentQueries, result.job.index)
batch := currentBatches[batchNum]

// In case the batch is already canceled we return
// early.
batch, ok := currentBatches[batchNum]
if !ok {
log.Warnf("Query(%d) result from peer %v "+
"discarded with retries %d, because "+
"batch already canceled: %v",
result.job.index,
result.peer.Addr(),
result.job.tries, result.err)

continue Loop
}

switch {
// If the query ended because it was canceled, drop it.
Expand All @@ -322,30 +336,34 @@ Loop:
// was canceled, forward the error on the
// batch's error channel. We do this since a
// cancellation applies to the whole batch.
if batch != nil {
batch.errChan <- result.err
delete(currentBatches, batchNum)
batch.errChan <- result.err
delete(currentBatches, batchNum)

log.Debugf("Canceled batch %v",
batchNum)
continue Loop
}
log.Debugf("Canceled batch %v", batchNum)
continue Loop

// If the query ended with any other error, put it back
// into the work queue if it has not reached the
// maximum number of retries.
case result.err != nil:
// Punish the peer for the failed query.
w.cfg.Ranking.Punish(result.peer.Addr())
// Refresh peer rank on disconnect.
if result.err == ErrPeerDisconnected {
w.cfg.Ranking.ResetRanking(
result.peer.Addr(),
)
ProofOfKeags marked this conversation as resolved.
Show resolved Hide resolved
} else {
// Punish the peer for the failed query.
w.cfg.Ranking.Punish(result.peer.Addr())
}

if batch != nil && !batch.noRetryMax {
if !batch.noRetryMax {
result.job.tries++
}

// Check if this query has reached its maximum
// number of retries. If so, remove it from the
// batch and don't reschedule it.
if batch != nil && !batch.noRetryMax &&
if !batch.noRetryMax &&
result.job.tries >= batch.maxRetries {

log.Warnf("Query(%d) from peer %v "+
Expand Down Expand Up @@ -380,11 +398,6 @@ Loop:
result.job.timeout = newTimeout
}

// Refresh peer rank on disconnect.
if result.err == ErrPeerDisconnected {
w.cfg.Ranking.ResetRanking(result.peer.Addr())
}

heap.Push(work, result.job)
currentQueries[result.job.index] = batchNum

Expand All @@ -396,42 +409,47 @@ Loop:

// Decrement the number of queries remaining in
// the batch.
if batch != nil {
batch.rem--
log.Tracef("Remaining jobs for batch "+
"%v: %v ", batchNum, batch.rem)

// If this was the last query in flight
// for this batch, we can notify that
// it finished, and delete it.
if batch.rem == 0 {
batch.errChan <- nil
delete(currentBatches, batchNum)

log.Tracef("Batch %v done",
batchNum)
continue Loop
}
batch.rem--
log.Tracef("Remaining jobs for batch "+
"%v: %v ", batchNum, batch.rem)

// If this was the last query in flight
// for this batch, we can notify that
// it finished, and delete it.
if batch.rem == 0 {
batch.errChan <- nil
delete(currentBatches, batchNum)

log.Tracef("Batch %v done",
batchNum)
continue Loop
}
}

// If the total timeout for this batch has passed,
// return an error.
if batch != nil {
select {
case <-batch.timeout:
batch.errChan <- ErrQueryTimeout
delete(currentBatches, batchNum)
select {
case <-batch.timeout:
batch.errChan <- ErrQueryTimeout
delete(currentBatches, batchNum)

// When deleting the particular batch
// number we need to make sure to cancel
// all queued and ongoing queryJobs
// to not waste resources when the batch
// call is already canceled.
if batch.cancelChan != nil {
close(batch.cancelChan)
}

log.Warnf("Query(%d) failed with "+
"error: %v. Timing out.",
result.job.index, result.err)
log.Warnf("Query(%d) failed with "+
"error: %v. Timing out.",
result.job.index, result.err)

log.Debugf("Batch %v timed out",
batchNum)
log.Warnf("Batch %v timed out",
batchNum)

default:
}
default:
}

// A new batch of queries where scheduled.
Expand All @@ -442,13 +460,17 @@ Loop:
log.Debugf("Adding new batch(%d) of %d queries to "+
"work queue", batchIndex, len(batch.requests))

// Internal cancel channel of a batch request.
cancelChan := make(chan struct{})

for _, q := range batch.requests {
heap.Push(work, &queryJob{
index: queryIndex,
timeout: minQueryTimeout,
encoding: batch.options.encoding,
cancelChan: batch.options.cancelChan,
Request: q,
index: queryIndex,
timeout: minQueryTimeout,
encoding: batch.options.encoding,
cancelChan: batch.options.cancelChan,
internalCancelChan: cancelChan,
Request: q,
})
currentQueries[queryIndex] = batchIndex
queryIndex++
Expand All @@ -457,9 +479,12 @@ Loop:
currentBatches[batchIndex] = &batchProgress{
noRetryMax: batch.options.noRetryMax,
maxRetries: batch.options.numRetries,
timeout: time.After(batch.options.timeout),
timeout: time.After(
batch.options.timeout,
),
rem: len(batch.requests),
errChan: batch.errChan,
cancelChan: cancelChan,
}
batchIndex++

Expand Down
Loading
Loading