-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Handle ProvisionedThroughputExceededException throttling #19
Conversation
bf812a7
to
c72ae54
Compare
c72ae54
to
7b318de
Compare
clientlibrary/config/config.go
Outdated
@@ -283,6 +283,9 @@ type ( | |||
|
|||
// LeaseSyncingTimeInterval The number of milliseconds to wait before syncing with lease table (dynamoDB) | |||
LeaseSyncingTimeIntervalMillis int | |||
|
|||
// MaxRetryCount The number of retries for getRecords in error case |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Since this exportable field is in a general config file for the client library, we could have a comment like
// MaxRetryCount The maximum number of retries in case of error
?
That way it could be a general config field not necessarily only linked to getRecords()
@@ -134,15 +143,53 @@ func (sc *PollingShardConsumer) getRecords() error { | |||
ShardIterator: shardIterator, | |||
} | |||
|
|||
// Each shard can support up to five read transactions per second. | |||
if transactionNum == 6 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we make this magic number 6 a constant like MaxReadTransactionsPerSecond = 5
?
The condition then could be if transactionNum > MaxReadTransactionsPerSecond
// If there is insufficient provisioned throughput on the stream, | ||
// subsequent calls made within the next 1 second throw ProvisionedThroughputExceededException. | ||
// ref: https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html | ||
waitTime := time.Since(getRecordsTransactionTime) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's scope to refactor this block (line 176-179) into a method - something like waitASecond(waitTimePassed)
Then reuse it in multiple places where we need to waitASecond()
7b318de
to
27ba073
Compare
@spentakota, you must sign every commit in this pull request acknowledging our Developer Certificate of Origin before your changes are merged. This can be done by adding
|
27ba073
to
07b65d9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for a quick yet comprehensive fix to handle ProvisionedThroughputExceededException
throttling to getRecords()!
Overall, looks good to me with few review comments on lines and below over-arching suggestions which could be skipped or addressed later if not now:
- From code quality perspective, getRecords() is becoming a brain overload - it might help to break it down into smaller logical methods that preferably - do one thing, do it well, do it only.
- This method should be tested so we can assert that it is doing what we expect it to do - which is more important given its complex logic and calculations owing to underlying behaviors that you've rightly linked in comments. A quick basic test would really help in building confidence for these changes.
// Calculate size of records from read transaction | ||
numBytes := 0 | ||
for _, record := range getResp.Records { | ||
numBytes = numBytes + len(record.Data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Could use Compound Assignment Operator += such that numBytes += len(record.Data)
for better notational convenience and readability (although the difference is only syntactical, this could possibly avoid a bug)
} | ||
|
||
// Add to number of getRecords successful transactions | ||
transactionNum++ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Possibly rename this variable to something like NumSuccessfulTransactions
so it's a clear increment counter without the need for this comment. Ok to retain as if there are other reasons why it's called transactionNum
// ref: https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html | ||
waitTime := time.Since(getRecordsTransactionTime) | ||
if waitTime < time.Second { | ||
time.Sleep(time.Second - waitTime) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Suggest using time.Sub()
instead. Also, consistent with usage of time.Sub() to calculate timePassed
in line 234
timePassed := currTime.Sub(lastCheckTime) | ||
lastCheckTime = currTime | ||
|
||
remBytes = remBytes + float64(timePassed.Seconds())*(MaxBytes/(float64(time.Second*5))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Another instance for replacement with +=
and could be called remainingBytes
for clarity.
timePassed := currTime.Sub(lastCheckTime) | ||
lastCheckTime = currTime | ||
|
||
remBytes = remBytes + float64(timePassed.Seconds())*(MaxBytes/(float64(time.Second*5))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not evident what the expression does without processing the numbers in the comment. Could you simplify this (MaxBytes/(float64(time.Second*5)
to be called something that tells what it represents/evaluates to like readDataRate
?
Also, timePassed.Seconds() returns a float64, is the outer float64 casting necessary?
if remBytes <= float64(numBytes) { | ||
// Wait until cool down period has passed to prevent ProvisionedThroughputExceededException | ||
coolDown := numBytes / MaxBytesPerSecond | ||
time.Sleep(time.Duration(coolDown) * time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is a possibility of loss of precision in this arithmetic operation. coolDown
is type float64?
Durations is measured as int64
type in nanoseconds, so time.Duration(f) truncates the floating-point f value to an integer.
We could multiply the number of seconds by the number of duration units per second instead?
time.Sleep(time.Duration(coolDown * float64(time.Second)))
coolDown := numBytes / MaxBytesPerSecond | ||
time.Sleep(time.Duration(coolDown) * time.Second) | ||
} else { | ||
remBytes = remBytes - float64(numBytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Noticed multiple occurrences of casting numBytes
as float64(numBytes)
. Would it make sense for numBytes
to have float64 type to begin with? We might only need one casting on line 209 as
numBytes += float64(len(record.Data))
Another instance for replacement with -=
.
@spentakota, you must sign every commit in this pull request acknowledging our Developer Certificate of Origin before your changes are merged. This can be done by adding
|
5ff9229
to
28246c7
Compare
} | ||
} | ||
|
||
func (sc *PollingShardConsumer) callGetRecordsAPI(successfulTransactionNum *int, remBytes *float64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactor suggestion: If all these fields are related or represent a particular struct, you could capture these fields in that struct or in the method's receiver struct (PollingShardConsumer).
You could return the values instead of updating the field references in this method - making callGetRecordsAPI() idempotent, such that it does not modify the fields that are being passed to it as parameters, but instead returns their values - that might clean this up from having multiple pointers.
Signed-off-by: Shiva Pentakota <spentakota@vmware.com>
Signed-off-by: Shiva Pentakota <spentakota@vmware.com>
Signed-off-by: Shiva Pentakota <spentakota@vmware.com>
Signed-off-by: Shiva Pentakota <spentakota@vmware.com>
Signed-off-by: Shiva Pentakota <spentakota@vmware.com>
28246c7
to
ae3763e
Compare
@spentakota, you must sign every commit in this pull request acknowledging our Developer Certificate of Origin before your changes are merged. This can be done by adding
|
…ordsAPI Signed-off-by: Shiva Pentakota <spentakota@vmware.com>
2a5b00e
to
706e803
Compare
Closing MR in favor of: #21 |
mService metrics.MonitoringService | ||
currTime time.Time | ||
callsLeft int | ||
remBytes float64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use an int here. It doesn't really make sense to have fractional bytes in this case.
callsLeft int | ||
remBytes float64 | ||
lastCheckTime time.Time | ||
bytesRead float64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use ints, not floats for vars that count bytes.
MaxBytes = 10000000.0 | ||
MaxBytesPerSecond = 2000000.0 | ||
BytesToMbConversion = 1000000.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These can all be ints.
timePassed := currentTime.Sub(sc.lastCheckTime) | ||
sc.lastCheckTime = currentTime | ||
sc.remBytes += timePassed.Seconds() * MaxBytesPerSecond | ||
transactionReadRate := sc.bytesRead / (timePassed.Seconds() * BytesToMbConversion) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the byte-counting vars as ints, you can do arithmetic by dividing a time.Duration
by time.Second
timePassed := currentTime.Sub(sc.lastCheckTime) | |
sc.lastCheckTime = currentTime | |
sc.remBytes += timePassed.Seconds() * MaxBytesPerSecond | |
transactionReadRate := sc.bytesRead / (timePassed.Seconds() * BytesToMbConversion) | |
secondsPassed := currentTime.Sub(sc.lastCheckTime) / time.Second | |
sc.lastCheckTime = currentTime | |
sc.remBytes += secondsPassed * MaxBytesPerSecond | |
transactionReadRate := sc.bytesRead / (secondsPassed * BytesToMbConversion) |
Signed-off-by: Shiva Pentakota spentakota@vmware.com