Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(#5296) Support Parquet predicates/projections in tests #5309

Merged
merged 10 commits into from
Jun 5, 2024

Conversation

clairemcginty
Copy link
Contributor

@clairemcginty clairemcginty commented Mar 19, 2024

WIP of Parquet projection/predicate support in JobTest.

Alternately, we could provide custom assertions similar to CoderAssertions, i.e.:

val record: AvroType = ???
record withPredicate(FilterApi.and(...)) withProjection(...schema...) should eq(...)

but I think that's overall harder for users to work with.

The downside of this approach is that I'll have to implement separately for ParquetAvroIO, ParquetTypeIO, ParquetExampleIO, and SmbIO (TypeIO/ExampleIO won't need projections support but they could support filtering).

any feedback on the different possible approaches here is welcome!

Copy link

codecov bot commented Mar 19, 2024

Codecov Report

Attention: Patch coverage is 87.50000% with 8 lines in your changes missing coverage. Please review.

Project coverage is 61.22%. Comparing base (79a0ecb) to head (b2b3f95).
Report is 4 commits behind head on main.

Files Patch % Lines
...potify/scio/testing/parquet/ParquetTestUtils.scala 75.86% 7 Missing ⚠️
...om/spotify/scio/testing/parquet/avro/package.scala 94.44% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #5309      +/-   ##
==========================================
+ Coverage   61.08%   61.22%   +0.13%     
==========================================
  Files         306      310       +4     
  Lines       10993    11089      +96     
  Branches      774      770       -4     
==========================================
+ Hits         6715     6789      +74     
- Misses       4278     4300      +22     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@clairemcginty clairemcginty marked this pull request as draft March 20, 2024 13:07
@kellen
Copy link
Contributor

kellen commented Mar 20, 2024

This kind of breaks the boundary between the read and the test mocks. We don't for example implement the filtering for the BQ storage API. What we could potentially instead do is recommend that users implement their filters separately, and if they need to test them do:

val allMocks: Seq[T] = ???
val filteredMocks = allMocks.parquetFilter(MyJob.MY_FILTER_API)
// ...
.input(SmbIO[K, T]("foo", _.getKey), filteredMocks)

@clairemcginty
Copy link
Contributor Author

This kind of breaks the boundary between the read and the test mocks. We don't for example implement the filtering for the BQ storage API. What we could potentially instead do is recommend that users implement their filters separately, and if they need to test them do:

val allMocks: Seq[T] = ???
val filteredMocks = allMocks.parquetFilter(MyJob.MY_FILTER_API)
// ...
.input(SmbIO[K, T]("foo", _.getKey), filteredMocks)

yeah I do see what you mean! It's tough because making it part of the TestIO itself is simplest thing from a usability perspective, but it does deviate from typical expectation of how JobTest works.

We could implement a parquetFilter type method, as you suggested, in the scio-test artifact, to keep it from being used in production (my initial concern) 🤔

@clairemcginty
Copy link
Contributor Author

This kind of breaks the boundary between the read and the test mocks. We don't for example implement the filtering for the BQ storage API. What we could potentially instead do is recommend that users implement their filters separately, and if they need to test them do:

val allMocks: Seq[T] = ???
val filteredMocks = allMocks.parquetFilter(MyJob.MY_FILTER_API)
// ...
.input(SmbIO[K, T]("foo", _.getKey), filteredMocks)

Re-implemented as a set of helpers in scio-test

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import scala.reflect.ClassTag

object ParquetHelpers {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love the names for any of these objects/classes, better ideas welcome

build.sbt Outdated
@@ -704,6 +704,14 @@ lazy val `scio-test` = project
"org.scalactic" %% "scalactic" % scalatestVersion,
"org.scalatest" %% "scalatest" % scalatestVersion,
"org.typelevel" %% "cats-kernel" % catsVersion,
// provided
"com.spotify" %% "magnolify-parquet" % magnolifyVersion % Provided,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming that anyone using these helpers already has compile-time Parquet dependencies... 🤷‍♀️

roundtripped
}

private def inMemoryOutputFile(baos: ByteArrayOutputStream): OutputFile = new OutputFile {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks a class would be more appropriate

Suggested change
private def inMemoryOutputFile(baos: ByteArrayOutputStream): OutputFile = new OutputFile {
private class InMemoryOutputFile(baos: ByteArrayOutputStream) extends OutputFile {

Comment on lines 47 to 52
.parquetFilter(
FilterApi.gt(FilterApi.intColumn("int_field"), 5.asInstanceOf[java.lang.Integer])
)
.parquetProject(
SchemaBuilder.record("TestRecord").fields().optionalInt("int_field").endRecord()
)
Copy link
Contributor

@RustedBones RustedBones Apr 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% convinced on this syntax.
IMHO It would be nicer to develop custom scalatest matchers instead. WDYT ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree it's a bit clunky! Plus, I'm now working on the TFExample integration now, and discovering that the parquetFilter/parquetProject APIs don't work for this case as Examples require an explicit Schema to be passed, so it's getting messy

Custom matchers would be nice syntactically. Something like:

records withProjection(...) should ...
records withFilter(...) should ...

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only issues is that, IMO, many users will want to plug this directly into their JobTests, to verify that the projection won't generate a NPE in the scio job logic, for example. This would be a bit harder to do with the scalatest matcher approach

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is the primary use-case IMO; users want to generate unfiltered data and simulate the filter/projection being applied in the pipeline

Copy link
Contributor Author

@clairemcginty clairemcginty Apr 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviving this thread! I guess a custom matcher could work if it supported Iterables, ie:

withPredicate(records, predicate) should  { filteredRecords => 
   JobTest[T]
       .input(ParquetAvroIO(path), filteredRecords)
}

but it is a bit awkward to use. I think the primary use case of these helpers is to simply make sure that the protection/predicate is compatible with the Scio job logic. Which actually points back to us supporting it natively in Parquet*IO in JobTest 😅😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about this more... maybe it makes the most sense to implement this as a non-default Coder in scio-test-parquet, that can be constructed by the user with a projection/predicate:

def parquetTestCoder[T <: SpecificRecord](projection: Schema, predicate: Option[FilterPredicate]): Coder[T] = ...

That way, it could be declared in whatever scope the user needs it, either in JobTest:

implicit val parquetCoder: Coder[MyRecord] = parquetTestCoder[MyRecord](projection, Some(FilterApi.lt(...))

JobTest[MyJob.type]
  .input(ParquetAvroIO[MyRecord]("path"), records)
...

Or just ad-hoc:

implicit val parquetCoder: Coder[MyRecord] = parquetTestCoder[MyRecord](projection, Some(FilterApi.lt(...))

val record: MyRecord = ???

record coderShould ...

wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I briefly tried out this idea and realized it won't work well as a Coder that's applied per-element: if application of FilterPredicate filters out the record, Coder#decode will return null. Thus, the user code would have to be written to handle null records:

implicit val parquetCoder: Coder[MyRecord] = parquetTestCoder[MyRecord](projection, Some(FilterApi.lt(...))

sc
  .parquetAvroFile[T](path)
  .filter(_ != null)

which is a bad pattern to enforce. so I think this idea is out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to update the test ParquetIOs to accept projection and predicate if we want to enable this testing in JobTest

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is getting too complicated--IMO, let not wire it into JobTest by default, but keep these as test helpers that play nicely with scalatest (I think the custom scalatest matcher route doesn't 100% work here because withProjection/withPredicate aren't strictly Matchers or Assertions). I refactored the API to work like:

records withFilter filter withProjection projection should have size(...)
records withFilter filter withProjection projection should containInAnyOrder(...)

It can also be used explicitly with JobTest, by just applying it to a test input.

@clairemcginty clairemcginty marked this pull request as ready for review May 30, 2024 18:44
@clairemcginty clairemcginty changed the title (WIP) (#5296) Support Parquet predicates/projections in tests (#5296) Support Parquet predicates/projections in tests May 30, 2024

object ParquetTestUtils {
case class ParquetMagnolifyHelpers[T: ParquetType] private[testing] (records: Iterable[T]) {
def withFilter(filter: FilterPredicate): Iterable[T] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't the ParquetType be expected here instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean on the method rather than on the class declaration? why?

import com.spotify.scio.testing.parquet.ParquetTestUtils._
import magnolify.parquet.ParquetType
import org.apache.avro.generic.GenericRecord
import org.tensorflow.proto.example.Example
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will crash if tensorflow is not part of the dependencies.
We need to split the packages to avoid runtime issues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I split the package objects, but I guess there would be the same issue with loading ParquetTestUtils.scala too, so I'll split that as well.

Comment on lines +35 to +37
class ParquetAvroHelpers[U <: GenericRecord] private[testing] (
records: Iterable[U]
) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: we should avoid using package object for definitions as it is deprecated in scala 3 (ref),

@clairemcginty clairemcginty merged commit 37d538e into main Jun 5, 2024
24 checks passed
@clairemcginty clairemcginty deleted the parquet-assertions branch June 5, 2024 15:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants