Skip to content

Commit

Permalink
Update documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
clairemcginty committed Jan 23, 2024
1 parent c91eb56 commit 75a5fe4
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,10 @@ object SortMergeBucketJoinExample {
.endRecord
)
// Filter at the Parquet IO level to users under 50
// Filtering at the IO level whenever possible, as it reduces total bytes read
.withFilterPredicate(FilterApi.lt(FilterApi.intColumn("age"), Int.box(50)))
// Filter at the SMB Cogrouping level to a single record per uesr
// Filter at the SMB Cogrouping level to a single record per user
// Filter at the Cogroup level if your filter depends on the materializing key group
.withPredicate((xs, _) => xs.size() == 0)
.from(args("users")),
ParquetTypeSortedBucketIO
Expand All @@ -204,6 +206,7 @@ object SortMergeBucketJoinExample {
object SortMergeBucketTransformExample {
import com.spotify.scio.smb._

// ParquetTypeSortedBucketIO supports case class projections for reading and writing
case class AccountProjection(id: Int, amount: Double)
case class CombinedAccount(id: Int, age: Int, totalValue: Double)

Expand All @@ -217,6 +220,7 @@ object SortMergeBucketTransformExample {
classOf[Integer],
ParquetAvroSortedBucketIO
.read(new TupleTag[GenericRecord](), SortMergeBucketExample.UserDataSchema)
// Filter at the Parquet IO level to users under 50
.withFilterPredicate(FilterApi.lt(FilterApi.intColumn("age"), Int.box(50)))
.from(args("users")),
ParquetTypeSortedBucketIO
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.avro.generic.GenericRecord
import org.apache.beam.sdk.extensions.smb.BucketMetadata
import org.apache.beam.sdk.io.fs.ResourceId
import org.apache.beam.sdk.io.{FileSystems, LocalResources}
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier

import java.nio.channels.Channels
import java.util.UUID
Expand Down Expand Up @@ -77,6 +77,7 @@ class SortMergeBucketExampleTest extends PipelineSpec {
// Inspect metadata on real written files
def getMetadataPath(smbDir: File): ResourceId =
LocalResources.fromFile(smbDir.toPath.resolve("metadata.json").toFile, false)

BucketMetadata
.from(Channels.newInputStream(FileSystems.open(getMetadataPath(userDir))))
.getNumBuckets shouldBe 2
Expand Down
187 changes: 184 additions & 3 deletions site/src/main/paradox/extras/Sort-Merge-Bucket.md
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,187 @@ or using @javadoc[SortedBucketOptions](org.apache.beam.sdk.extensions.smb.Sorted
a _per worker limit_.

## Testing
Currently, mocking data for SMB transforms is not supported in the `com.spotify.scio.testing.JobTest` framework. See
@github[SortMergeBucketExampleTest](/scio-examples/src/test/scala/com/spotify/scio/examples/extra/SortMergeBucketExampleTest.scala)
for an example of using local temp directories to test SMB reads and writes.

As of Scio 0.14, mocking data for SMB transforms is supported in the `com.spotify.scio.testing.JobTest` framework. Prior to Scio 0.14, you can test using real data written to local temp files.

### Testing SMB in JobTest

Scio 0.14 and above support testing SMB reads, writes, and transforms using @javadoc[SmbIO](com.spotify.scio.smb.SmbIO).

Consider the following sample job that contains an SMB read and write:

```scala mdoc
import org.apache.beam.sdk.extensions.smb.ParquetAvroSortedBucketIO
import org.apache.beam.sdk.values.TupleTag
import com.spotify.scio._
import com.spotify.scio.avro.Account
import com.spotify.scio.values.SCollection
import com.spotify.scio.smb._

object SmbJob {
def main(cmdLineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdLineArgs)

// Read
sc.sortMergeGroupByKey(
classOf[Integer],
ParquetAvroSortedBucketIO
.read(new TupleTag[Account](), classOf[Account])
.from(args("input"))
)

// Write
val writeData: SCollection[Account] = ???
val write = writeData.saveAsSortedBucket(
ParquetAvroSortedBucketIO
.write(classOf[Integer], "id", classOf[Account])
.to(args("output"))
)

sc.run().waitUntilDone()
}
}
```

A JobTest can be wired in using `SmbIO` inputs and outputs. `SmbIO` is typed according to the record type and the SMB key type, and the SMB key function is required to construct it.

```scala mdoc
import com.spotify.scio.smb.SmbIO
import com.spotify.scio.testing.PipelineSpec

class SmbJobTest extends PipelineSpec {
"SmbJob" should "work" in {
val smbInput: Seq[Account] = ???

JobTest[SmbJob.type]
.args("--input=gs://input", "--output=gs://output", "--transformInput=gs://tfxInput")

// Mock .sortMergeGroupByKey
.input(SmbIO[Int, Account]("gs://input", _.getId), smbInput)

// Mock .saveAsSortedBucket
.output(SmbIO[Int, Account]("gs://output", _.getId)) { output =>
// Assert on output
}
.run()
}
}
```

SMB Transforms can be mocked by combing input and output `SmbIO`s:

```scala mdoc compile:only
// Scio job
object SmbTransformJob {
def main(cmdLineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdLineArgs)
sc.sortMergeTransform(
classOf[Integer],
ParquetAvroSortedBucketIO
.read(new TupleTag[Account](), classOf[Account])
.from(args("input"))
).to(
ParquetAvroSortedBucketIO
.transformOutput[Integer, Account](classOf[Integer], "id", classOf[Account])
.to(args("output"))
).via { case (key, grouped, outputCollector) =>
val output: Account = ???
outputCollector.accept(output)
}
sc.run().waitUntilDone()
}
}

// Job test
class SmbTransformJobTest extends PipelineSpec {
"SmbTransformJob" should "work" in {
val smbinput: Seq[Account] = ???

JobTest[SmbTransformJob.type]
.args("--input=gs://input", "--output=gs://output")

// Mock SMB Transform input
.input(SmbIO[Int, Account]("gs://input", _.getId), smbinput)

// Mock SMB Transform output
.output(SmbIO[Int, Account]("gs://output", _.getId)) { output =>
// Assert on output
}
.run()
}
}
```

See @github[SortMergeBucketExampleTest](/scio-examples/src/test/scala/com/spotify/scio/examples/extra/SortMergeBucketExampleTest.scala) for complete JobTest examples.

### Testing SMB using local file system

Using the JobTest framework for SMB reads, writes, and transforms is recommended, as it eliminates the need to manage local files and Taps. However, there are a few
cases where performing real reads and writes is advantageous:

- If you want to assert on SMB Predicates/Parquet FilterPredicates in reads, as these are skipped in JobTest
- If you want to assert on written metadata
- If you want to test schema evolution compatibility (i.e. writing using an updated record schema and reading using the original schema),
or on projected schema compatability (i.e. using a case class projection to read Parquet data written with an Avro schema)

Scio 0.14.0 and above automatically return Taps for SMB writes and transforms, and can materialize SMB reads into Taps:

```scala mdoc
import com.spotify.scio.io.ClosedTap

// Scio job
object SmbRealFilesJob {
def write(sc: ScioContext, output: String): ClosedTap[Account] = {
val writeData: SCollection[Account] = ???
writeData.saveAsSortedBucket(
ParquetAvroSortedBucketIO
.write(classOf[Integer], "id", classOf[Account])
.to(output)
)
}

def read(sc: ScioContext, input: String): SCollection[(Integer, Iterable[Account])] = {
sc.sortMergeGroupByKey(
classOf[Integer],
ParquetAvroSortedBucketIO
.read(new TupleTag[Account](), classOf[Account])
.from(input)
)
}
}

// Unit test
import java.nio.file.Files

class SmbLocalFilesTest extends PipelineSpec {
"SmbRealFilesJob" should "write and read data" in {
val dir = Files.createTempDirectory("smb").toString

// Test write
runWithContext { sc =>
val tap = SmbRealFilesJob.write(sc, dir)
val scioResult = sc.run().waitUntilDone()

// Tap output
val writtenData: Iterator[Account] = tap.get(scioResult).value

// Assert on actual written output
writtenData should have size 100
}

// Test read in separate ScioContext
runWithContext { sc =>
val tap = SmbRealFilesJob.read(sc, dir).materialize
val scioResult = sc.run().waitUntilDone()

// Tap sortMergeGroupByKey result
val gbkData: Iterator[(Integer, Iterable[Account])] = tap.get(scioResult).value

// Assert on actual read result
gbkData should have size 50
}
}
}
```

In addition to JobTest examples, see @github[SortMergeBucketExampleTest](/scio-examples/src/test/scala/com/spotify/scio/examples/extra/SortMergeBucketExampleTest.scala) for complete SMB Tap examples.

0 comments on commit 75a5fe4

Please sign in to comment.