-
Notifications
You must be signed in to change notification settings - Fork 201
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added support for multiple workers in S3 Scan Source #4439
Conversation
* @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException if the partition could not be given up due to some failure | ||
* @since 2.8 | ||
*/ | ||
void giveUpPartitions(SourcePartition<T> sourcePartition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's name this giveUpPartition
instead of giveUpPartitions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it a bit redundant with closePartition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
closePartition does not make partitions available (i.e. UNASSIGNED)
Optional<SourcePartitionStoreItem> ownedPartitions = Optional.empty(); | ||
try { | ||
if (lock.tryLock()) { | ||
ownedPartitions = sourceCoordinationStore.tryAcquireAvailablePartition(sourceIdentifierWithPartitionType, ownerId, DEFAULT_LEASE_TIMEOUT); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not put a lock around this line. We only want to lock on acquiring the global partition.
So it should look like this
Optional<SourcePartitionStoreItem> ownedPartitions = sourceCoordinationStore.tryAcquireAvailablePartition(sourceIdentifierWithPartitionType, ownerId, DEFAULT_LEASE_TIMEOUT);
try {
if (ownedPartitions.isEmpty && lock.tryLock()) {
... code to acquire the global partition and run supplier ...
}
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
if (ownedPartition.isEmpty()) {
ownedPartitions = sourceCoordinationStore.tryAcquireAvailablePartition(sourceIdentifierWithPartitionType, ownerId, DEFAULT_LEASE_TIMEOUT);
}
}
} | ||
|
||
ownedPartitions = sourceCoordinationStore.tryAcquireAvailablePartition(sourceIdentifierWithPartitionType, ownerId, DEFAULT_LEASE_TIMEOUT); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't need to be in the lock either
if (!initialized) { | ||
return; | ||
} | ||
|
||
final Optional<SourcePartition<T>> activePartition = partitionManager.getActivePartition(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This won't have a partition since you changed the code to not track partition manager. There are two options here
- Have partition manager track all the partitions that threads are working on, and call giveUpPartition on each
- Remove the
PartitionManager
class completely, and then on shutdown have threads stop and give up their individual partition
I would say option 2 is easier
|
||
clientFactory = new ClientFactory(awsCredentialsSupplier, sourceConfig.getAwsAuthenticationConfig(), sourceConfig.getTableConfigs().get(0).getExportConfig()); | ||
} | ||
|
||
@Override | ||
public boolean areAcknowledgementsEnabled() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to pull from main and rebase your branch against main to get rid of this old commit. If it still shows up then create a new branch off main and cherry-pick the commit for the worker change to it
@@ -36,7 +42,10 @@ public class S3ScanService { | |||
private final AcknowledgementSetManager acknowledgementSetManager; | |||
private final S3ObjectDeleteWorker s3ObjectDeleteWorker; | |||
private final PluginMetrics pluginMetrics; | |||
private ScanObjectWorker scanObjectWorker; | |||
//private ScanObjectWorker scanObjectWorker; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra comment
} | ||
|
||
public void stop() { | ||
scanObjectWorker.stop(); | ||
executorService.shutdown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make sure the scan workers are shutdown properly and that they give up their partition if they own one
//private ScanObjectWorker scanObjectWorker; | ||
private final ExecutorService executorService; | ||
static final int STANDARD_BACKOFF_MILLIS = 30_000; | ||
static final int FAST_BACKOFF_MILLIS = 100; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really fast and doesn't really make sense. Let's just make this configurable at the source and keep the default at 30 seconds. Something like
source:
s3:
scan:
backoff_time: "1s" // default to 30 seconds
Also, there are a lot of failing |
da4f0a5
to
a2961af
Compare
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
a2961af
to
e0363e3
Compare
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good just had some comments
*/ | ||
void giveUpPartitions(); | ||
void giveUpPartition(SourcePartition<T> sourcePartition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can just take a String partitionKey
to align with the other functions here
} | ||
Optional<SourcePartitionStoreItem> ownedPartitions = Optional.empty(); | ||
try { | ||
if (lock.tryLock()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This lock is still blocking the tryAcquireAvailablePartition
call unnecessarily. Please change to align with my comment below.
} | ||
|
||
@Override | ||
public void giveUpPartitions() { | ||
public void giveUpPartition(SourcePartition<T> sourcePartition) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You only need the String partitionKey here
@@ -90,6 +93,7 @@ public ScanObjectWorker(final S3Client s3Client, | |||
this.pluginMetrics = pluginMetrics; | |||
acknowledgementSetCallbackCounter = pluginMetrics.counter(ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME); | |||
this.sourceCoordinator.initialize(); | |||
this.partitions = new ArrayList<>(); | |||
|
|||
this.partitionCreationSupplier = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsBuilderList, s3ScanSchedulingOptions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: This class can be created only once and share between all the threads. Having it as it is know won't cause any issues though, just some extra instantiations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean the ScanObjectWorker class? We need a Runnable class for each thread. Not using this means we need to define another runnable class, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No the S3ScanPartitionCreationSupplier class. We could only instantiate it once and pass to all of the ScanObjectWorkers
} | ||
}, ACKNOWLEDGEMENT_SET_TIMEOUT); | ||
partitions.remove(objectToProcess.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is meant to be in the callback?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Thanks for catching it.
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
@@ -450,6 +439,6 @@ private void giveUpAndSaveGlobalStateForPartitionCreation(final SourcePartitionS | |||
private SourcePartitionStoreItem getItemWithAction(final String partitionKey, final String action, final Boolean fromAcknowledgmentsCallback) { | |||
// The validation against activePartition in partition manager needs to be skipped when called from acknowledgments callback | |||
// because otherwise it will fail the validation since it is actively working on a different partition when ack is received | |||
return fromAcknowledgmentsCallback ? getSourcePartitionStoreItem(partitionKey, action) : validateAndGetSourcePartitionStoreItem(partitionKey, action); | |||
return fromAcknowledgmentsCallback ? getSourcePartitionStoreItem(partitionKey, action) : getSourcePartitionStoreItem(partitionKey, action); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: No need for this ternary operator anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, right. Will change it.
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
…ect#4439) * Addressed review comments Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> * Fixed failing integration tests Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> * Fixed failing check style Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> * Addressed review comments Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> * Addressed review comments Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> --------- Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> Co-authored-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Description
Added support for multiple workers in S3 Scan Source
Issues Resolved
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.