New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create workunitstream for CompactionSource #1826
Conversation
if (last == null) | ||
log.debug ("Waiting for producer to complete..."); | ||
} else { | ||
if (workUnits.size() > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems you should add last = this.workUnits.poll()
here?
*/ | ||
public boolean hasNext () { | ||
try { | ||
while (true) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this work?
while(true) {
if (last != null) return true;
if (this.unProcessed <= 0 && this.workUnits.isEmpty()) return false;
this.last = this.workUnits.poll(1, TimeUnit.SECONDS);
}
last = null; | ||
log.debug ("next() pops out a workunit"); | ||
return tmp; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should probably have else { throw new IllegalStateException()}
to catch the case where hasNext()
returns true but last
doesn't get populated.
|
||
List<Dataset> datasets = finder.findDatasets(); | ||
CompactionDatasetIterator iterator = new CompactionDatasetIterator(datasets, verifiers); | ||
service.submit(iterator.getDatasetProcessor()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You probably want multiple data processors right? Note that will also require to transform the List
into something like a Spliterator
(although that is only available on java 8)
|
||
@Override | ||
public List<WorkUnit> getWorkunits(SourceState state) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should remove getWorkUnits(SourceState)
, it is better to only have to maintain one functionally equivalent method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
@yukuai518 what is the status of this PR? |
Issue: https://issues.apache.org/jira/browse/GOBBLIN-38 Please update your PR title with following prefix: [GOBBLIN-38] |
No description provided.