Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sparseIntersectByKey/hashIntersectByKey (#1354) #1393

Merged
merged 2 commits into from
Sep 20, 2018

Conversation

clairemcginty
Copy link
Contributor

No description provided.

val numHashes = BloomFilter.optimalNumHashes(thatNumKeys, width)
val rhsBf = that
.aggregate(BloomFilterAggregator[K](numHashes, width))
.asIterableSideInput
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not using singleton side input here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as a singleton, in the case of an empty that it'll fail with Empty PCollection accessed as a singleton view when we try to access rhsBf through side input context. I based it off of the SparseJoin implementations, which use an IterableSideInput in the same way. wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually there's a def asSingletonSideInput(defaultValue: T): SideInput[T] with default now maybe use that for both?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good! added a unit test for that case in SparseOuterJoins too. Also added an exact boolean to the sparse intersection.

.withSideInputs(rhsBf)
.filter { case (e, c) => c(rhsBf).headOption.exists(_.maybeContains(e._1)) }
.toSCollection
.cogroup(that.map((_, ())))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need the cogroup+flatMap anymore since it's covered by filter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess it depends on if we expect this function to be approximate or exact -- since the bloom filter can have false positives

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a exact: Boolean option that defaults to false? If the user joins the result with another data set it defeats the purpose of cogroup+flatMap.

}
}

protected def sparseIntersectByKeyImpl(that: SCollection[K],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

weirdly when i make it private it's inaccessible from

      val joined = (thisParts zip thatParts).map {
        case (lhs, rhs) =>
          lhs.sparseIntersectByKeyImpl(rhs, bfSettings.capacity, computeExact, fpProb)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right probably because it's an implicit class.

val side = combineAsMapSideInput(that.map((_, ())))
self
.withSideInputs(side)
.flatMap {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a filter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch! thanks.

@codecov-io
Copy link

Codecov Report

Merging #1393 into master will increase coverage by 0.1%.
The diff coverage is 100%.

Impacted file tree graph

@@            Coverage Diff            @@
##           master    #1393     +/-   ##
=========================================
+ Coverage   79.24%   79.35%   +0.1%     
=========================================
  Files         160      160             
  Lines        4899     4925     +26     
  Branches      300      340     +40     
=========================================
+ Hits         3882     3908     +26     
  Misses       1017     1017
Impacted Files Coverage Δ
...ify/scio/values/PairHashSCollectionFunctions.scala 98% <100%> (+0.17%) ⬆️
...spotify/scio/values/PairSCollectionFunctions.scala 95.03% <100%> (+0.91%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 6da0dba...78b0c97. Read the comment docs.

@clairemcginty clairemcginty merged commit 67a0e99 into master Sep 20, 2018
@clairemcginty clairemcginty deleted the claire/sparse_intersect_by_key branch September 20, 2018 17:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants