Skip to content

Commit

Permalink
[FLINK-2689] [runtime] Fix reuse of null object for solution set Join…
Browse files Browse the repository at this point in the history
…s and CoGroups.

This closes apache#1136
  • Loading branch information
fhueske committed Sep 17, 2015
1 parent 3894ab3 commit e61a9ea
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,9 @@ public void run() throws Exception {
while (this.running && probeSideInput.nextKey()) {
IT2 current = probeSideInput.getCurrent();

buildSideRecord = prober.getMatchFor(current, buildSideRecord);
if (buildSideRecord != null) {
siIter.set(buildSideRecord);
IT1 matchedRecord = prober.getMatchFor(current, buildSideRecord);
if (matchedRecord != null) {
siIter.set(matchedRecord);
coGroupStub.coGroup(siIter, probeSideInput.getValues(), collector);
} else {
coGroupStub.coGroup(emptySolutionSide, probeSideInput.getValues(), collector);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,9 @@ public void run() throws Exception {
while (this.running && probeSideInput.nextKey()) {
IT1 current = probeSideInput.getCurrent();

buildSideRecord = prober.getMatchFor(current, buildSideRecord);
if (buildSideRecord != null) {
siIter.set(buildSideRecord);
IT2 matchedRecord = prober.getMatchFor(current, buildSideRecord);
if (matchedRecord != null) {
siIter.set(matchedRecord);
coGroupStub.coGroup(probeSideInput.getValues(), siIter, collector);
} else {
coGroupStub.coGroup(probeSideInput.getValues(), emptySolutionSide, collector);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ public void run() throws Exception {
IT1 buildSideRecord = this.solutionSideRecord;

while (this.running && ((probeSideRecord = probeSideInput.next(probeSideRecord)) != null)) {
buildSideRecord = prober.getMatchFor(probeSideRecord, buildSideRecord);
joinFunction.join(buildSideRecord, probeSideRecord, collector);
IT1 matchedRecord = prober.getMatchFor(probeSideRecord, buildSideRecord);
joinFunction.join(matchedRecord, probeSideRecord, collector);
}
} else if (objectMap != null) {
final JoinHashMap<IT1> hashTable = this.objectMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ public void run() throws Exception {
IT2 buildSideRecord = this.solutionSideRecord;

while (this.running && ((probeSideRecord = probeSideInput.next(probeSideRecord)) != null)) {
buildSideRecord = prober.getMatchFor(probeSideRecord, buildSideRecord);
joinFunction.join(probeSideRecord, buildSideRecord, collector);
IT2 matchedRecord = prober.getMatchFor(probeSideRecord, buildSideRecord);
joinFunction.join(probeSideRecord, matchedRecord, collector);
}
} else if (objectMap != null) {
final JoinHashMap<IT2> hashTable = this.objectMap;
Expand Down

0 comments on commit e61a9ea

Please sign in to comment.