-
Notifications
You must be signed in to change notification settings - Fork 703
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for push down filters in parquet sources #1050
Conversation
have you tested this? even manually? |
extends FixedPathSource(paths: _*) with ParquetTupleSource | ||
|
||
class FixedPathParquetTuple(override val fields: Fields, | ||
paths: String*)(override val filterPredicate: Option[FilterPredicate] = None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not crazy about the extra parameter group here. I see why you are doing it, but I am trying to think if we can find another way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@johnynek: as far as I can tell there are 3 options. The parameter, a mutable internal variable, or f-bounded polymorphism.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As there is only one class with this we could also do:
public constructor (fields: Fields, paths: String*)
private constructor (fields: Fields, paths: Array[String], filterPredicate: Option[FilterPredicate])
and add a
def withFilter(filter: FilterPredicate) = new FixedPathParquetTuple(fields, paths, filter)
From the user the syntax looks like:
new FixedPathParquetTuple(fields, "a", "b").withFilter(myFilter)
This is a great candidate for a platform test using the HadoopPlatformTest, though I haven't used parquet with that yet. |
@isnotinvain Could you add a test? It does not need to read a file, just instantiate the classes and check that the field is set. That would have the benefit of exercising the API |
@ianoc , @julienledem I'll add a unit test that makes sure the field is set correctly. @jcoveney I think this functionality should be tested in parquet right? scalding is just passing things along? As for the API, we went around on this a few times in the past, seems like we all want an immutable .withFilter method, which isn't really possible (even w/ f bounded polymorphism) without requiring each subclass to implement .withFilter But there's only 3 subclasses, and likely to be less than 10 pretty much forever -- so how about I add the withFilter method and have each of these classes implement it (immutably via a copy constructor) |
extends DailySuffixSource(path, dateRange) with ParquetThrift[DailySuffixParquetThrift[T], T] { | ||
|
||
override protected def copyWithFilter(fp: FilterPredicate): DailySuffixParquetThrift[T] = | ||
new DailySuffixParquetThrift[T](path, dateRange)(mf) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can omit (mf) here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done!
I restarted the stuck build and now it passes. |
extends HourlySuffixSource(path, dateRange) with ParquetThrift[HourlySuffixParquetThrift[T], T] { | ||
|
||
override protected def copyWithFilter(fp: FilterPredicate): HourlySuffixParquetThrift[T] = | ||
new HourlySuffixParquetThrift[T](path, dateRange)(mf) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A little worry about calling the constructor here. User may extends this class, and calling copyWithFilter may return an instance of different class. And in the subclass, user may mix-in other trait or override some methods, and those changes will be missed after calling copyWithFilter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, users should not override this class, maybe we should make it final.
Because if they override this class, the This parameter would be wrong.
OK I updated to make the sources final, and instead of returning new anonymous classes, I pushed the values into fields in the class. This means you can do either:
or you can do:
So, I didn't bother making the constructor that takes a filter predicate as an argument private. Seemed like overkill to make it private + provide an aux constructor, when that's really the same as having a default argument. So at this point, the sources are final, immutable, and all support the .withFilter(fp) method. I think this is pretty good / ready to merge unless you have any more concerns. Thanks! |
def filterPredicate: Option[FilterPredicate] = None | ||
|
||
final def withFilter(fp: FilterPredicate): This = { | ||
val newFp = filterPredicate.map(prev => and(prev, fp)).getOrElse(fp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's use a case match here:
val newFp = filterPredicate match {
case None => fp
case Some(old) => and(old, fp)
}
I feel like that is a clearer expression, and clearly perf does not matter here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
I think this looks great. Just some minor comments. |
merge when green. |
Add support for push down filters in parquet sources
This includes bumping parquet to version 1.6.0rc2