From 75a5fe45c0257f4d603bbaba2e6eb556b83c2768 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Tue, 23 Jan 2024 14:13:19 -0500 Subject: [PATCH] Update documentation --- .../extra/SortMergeBucketExample.scala | 6 +- .../extra/SortMergeBucketExampleTest.scala | 3 +- .../main/paradox/extras/Sort-Merge-Bucket.md | 187 +++++++++++++++++- 3 files changed, 191 insertions(+), 5 deletions(-) diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/SortMergeBucketExample.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/SortMergeBucketExample.scala index f1dad8289f..9406b7258c 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/SortMergeBucketExample.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/SortMergeBucketExample.scala @@ -177,8 +177,10 @@ object SortMergeBucketJoinExample { .endRecord ) // Filter at the Parquet IO level to users under 50 + // Filtering at the IO level whenever possible, as it reduces total bytes read .withFilterPredicate(FilterApi.lt(FilterApi.intColumn("age"), Int.box(50))) - // Filter at the SMB Cogrouping level to a single record per uesr + // Filter at the SMB Cogrouping level to a single record per user + // Filter at the Cogroup level if your filter depends on the materializing key group .withPredicate((xs, _) => xs.size() == 0) .from(args("users")), ParquetTypeSortedBucketIO @@ -204,6 +206,7 @@ object SortMergeBucketJoinExample { object SortMergeBucketTransformExample { import com.spotify.scio.smb._ + // ParquetTypeSortedBucketIO supports case class projections for reading and writing case class AccountProjection(id: Int, amount: Double) case class CombinedAccount(id: Int, age: Int, totalValue: Double) @@ -217,6 +220,7 @@ object SortMergeBucketTransformExample { classOf[Integer], ParquetAvroSortedBucketIO .read(new TupleTag[GenericRecord](), SortMergeBucketExample.UserDataSchema) + // Filter at the Parquet IO level to users under 50 .withFilterPredicate(FilterApi.lt(FilterApi.intColumn("age"), Int.box(50))) .from(args("users")), ParquetTypeSortedBucketIO diff --git a/scio-examples/src/test/scala/com/spotify/scio/examples/extra/SortMergeBucketExampleTest.scala b/scio-examples/src/test/scala/com/spotify/scio/examples/extra/SortMergeBucketExampleTest.scala index 73b7509cea..ff619d4702 100644 --- a/scio-examples/src/test/scala/com/spotify/scio/examples/extra/SortMergeBucketExampleTest.scala +++ b/scio-examples/src/test/scala/com/spotify/scio/examples/extra/SortMergeBucketExampleTest.scala @@ -31,7 +31,7 @@ import org.apache.avro.generic.GenericRecord import org.apache.beam.sdk.extensions.smb.BucketMetadata import org.apache.beam.sdk.io.fs.ResourceId import org.apache.beam.sdk.io.{FileSystems, LocalResources} -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier import java.nio.channels.Channels import java.util.UUID @@ -77,6 +77,7 @@ class SortMergeBucketExampleTest extends PipelineSpec { // Inspect metadata on real written files def getMetadataPath(smbDir: File): ResourceId = LocalResources.fromFile(smbDir.toPath.resolve("metadata.json").toFile, false) + BucketMetadata .from(Channels.newInputStream(FileSystems.open(getMetadataPath(userDir)))) .getNumBuckets shouldBe 2 diff --git a/site/src/main/paradox/extras/Sort-Merge-Bucket.md b/site/src/main/paradox/extras/Sort-Merge-Bucket.md index 10efe9f2c4..c6f16f29d0 100644 --- a/site/src/main/paradox/extras/Sort-Merge-Bucket.md +++ b/site/src/main/paradox/extras/Sort-Merge-Bucket.md @@ -302,6 +302,187 @@ or using @javadoc[SortedBucketOptions](org.apache.beam.sdk.extensions.smb.Sorted a _per worker limit_. ## Testing -Currently, mocking data for SMB transforms is not supported in the `com.spotify.scio.testing.JobTest` framework. See -@github[SortMergeBucketExampleTest](/scio-examples/src/test/scala/com/spotify/scio/examples/extra/SortMergeBucketExampleTest.scala) -for an example of using local temp directories to test SMB reads and writes. + +As of Scio 0.14, mocking data for SMB transforms is supported in the `com.spotify.scio.testing.JobTest` framework. Prior to Scio 0.14, you can test using real data written to local temp files. + +### Testing SMB in JobTest + +Scio 0.14 and above support testing SMB reads, writes, and transforms using @javadoc[SmbIO](com.spotify.scio.smb.SmbIO). + +Consider the following sample job that contains an SMB read and write: + +```scala mdoc +import org.apache.beam.sdk.extensions.smb.ParquetAvroSortedBucketIO +import org.apache.beam.sdk.values.TupleTag +import com.spotify.scio._ +import com.spotify.scio.avro.Account +import com.spotify.scio.values.SCollection +import com.spotify.scio.smb._ + +object SmbJob { + def main(cmdLineArgs: Array[String]): Unit = { + val (sc, args) = ContextAndArgs(cmdLineArgs) + + // Read + sc.sortMergeGroupByKey( + classOf[Integer], + ParquetAvroSortedBucketIO + .read(new TupleTag[Account](), classOf[Account]) + .from(args("input")) + ) + + // Write + val writeData: SCollection[Account] = ??? + val write = writeData.saveAsSortedBucket( + ParquetAvroSortedBucketIO + .write(classOf[Integer], "id", classOf[Account]) + .to(args("output")) + ) + + sc.run().waitUntilDone() + } +} +``` + +A JobTest can be wired in using `SmbIO` inputs and outputs. `SmbIO` is typed according to the record type and the SMB key type, and the SMB key function is required to construct it. + +```scala mdoc +import com.spotify.scio.smb.SmbIO +import com.spotify.scio.testing.PipelineSpec + +class SmbJobTest extends PipelineSpec { + "SmbJob" should "work" in { + val smbInput: Seq[Account] = ??? + + JobTest[SmbJob.type] + .args("--input=gs://input", "--output=gs://output", "--transformInput=gs://tfxInput") + + // Mock .sortMergeGroupByKey + .input(SmbIO[Int, Account]("gs://input", _.getId), smbInput) + + // Mock .saveAsSortedBucket + .output(SmbIO[Int, Account]("gs://output", _.getId)) { output => + // Assert on output + } + .run() + } +} +``` + +SMB Transforms can be mocked by combing input and output `SmbIO`s: + +```scala mdoc compile:only +// Scio job +object SmbTransformJob { + def main(cmdLineArgs: Array[String]): Unit = { + val (sc, args) = ContextAndArgs(cmdLineArgs) + sc.sortMergeTransform( + classOf[Integer], + ParquetAvroSortedBucketIO + .read(new TupleTag[Account](), classOf[Account]) + .from(args("input")) + ).to( + ParquetAvroSortedBucketIO + .transformOutput[Integer, Account](classOf[Integer], "id", classOf[Account]) + .to(args("output")) + ).via { case (key, grouped, outputCollector) => + val output: Account = ??? + outputCollector.accept(output) + } + sc.run().waitUntilDone() + } +} + +// Job test +class SmbTransformJobTest extends PipelineSpec { + "SmbTransformJob" should "work" in { + val smbinput: Seq[Account] = ??? + + JobTest[SmbTransformJob.type] + .args("--input=gs://input", "--output=gs://output") + + // Mock SMB Transform input + .input(SmbIO[Int, Account]("gs://input", _.getId), smbinput) + + // Mock SMB Transform output + .output(SmbIO[Int, Account]("gs://output", _.getId)) { output => + // Assert on output + } + .run() + } +} +``` + +See @github[SortMergeBucketExampleTest](/scio-examples/src/test/scala/com/spotify/scio/examples/extra/SortMergeBucketExampleTest.scala) for complete JobTest examples. + +### Testing SMB using local file system + +Using the JobTest framework for SMB reads, writes, and transforms is recommended, as it eliminates the need to manage local files and Taps. However, there are a few +cases where performing real reads and writes is advantageous: + +- If you want to assert on SMB Predicates/Parquet FilterPredicates in reads, as these are skipped in JobTest +- If you want to assert on written metadata +- If you want to test schema evolution compatibility (i.e. writing using an updated record schema and reading using the original schema), +or on projected schema compatability (i.e. using a case class projection to read Parquet data written with an Avro schema) + +Scio 0.14.0 and above automatically return Taps for SMB writes and transforms, and can materialize SMB reads into Taps: + +```scala mdoc +import com.spotify.scio.io.ClosedTap + +// Scio job +object SmbRealFilesJob { + def write(sc: ScioContext, output: String): ClosedTap[Account] = { + val writeData: SCollection[Account] = ??? + writeData.saveAsSortedBucket( + ParquetAvroSortedBucketIO + .write(classOf[Integer], "id", classOf[Account]) + .to(output) + ) + } + + def read(sc: ScioContext, input: String): SCollection[(Integer, Iterable[Account])] = { + sc.sortMergeGroupByKey( + classOf[Integer], + ParquetAvroSortedBucketIO + .read(new TupleTag[Account](), classOf[Account]) + .from(input) + ) + } +} + +// Unit test +import java.nio.file.Files + +class SmbLocalFilesTest extends PipelineSpec { + "SmbRealFilesJob" should "write and read data" in { + val dir = Files.createTempDirectory("smb").toString + + // Test write + runWithContext { sc => + val tap = SmbRealFilesJob.write(sc, dir) + val scioResult = sc.run().waitUntilDone() + + // Tap output + val writtenData: Iterator[Account] = tap.get(scioResult).value + + // Assert on actual written output + writtenData should have size 100 + } + + // Test read in separate ScioContext + runWithContext { sc => + val tap = SmbRealFilesJob.read(sc, dir).materialize + val scioResult = sc.run().waitUntilDone() + + // Tap sortMergeGroupByKey result + val gbkData: Iterator[(Integer, Iterable[Account])] = tap.get(scioResult).value + + // Assert on actual read result + gbkData should have size 50 + } + } +} +``` + +In addition to JobTest examples, see @github[SortMergeBucketExampleTest](/scio-examples/src/test/scala/com/spotify/scio/examples/extra/SortMergeBucketExampleTest.scala) for complete SMB Tap examples.