Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add docs about SMB secondary keys #5095

Merged
merged 3 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@
// --inputL=[INPUT]--inputR=[INPUT] --output=[OUTPUT]"`
package com.spotify.scio.examples.extra

import com.spotify.scio.{ContextAndArgs, ScioContext}
import com.spotify.scio.{Args, ContextAndArgs, ScioContext}
import com.spotify.scio.avro.Account
import com.spotify.scio.coders.Coder
import com.spotify.scio.values.SCollection
import org.apache.avro.Schema
import org.apache.avro.file.CodecFactory
import org.apache.avro.generic.{GenericData, GenericRecord}
Expand Down Expand Up @@ -116,6 +117,28 @@ object SortMergeBucketWriteExample {
sc
}

def secondaryKeyExample(
args: Args,
in: SCollection[Account]
): Unit = {
in
// #SortMergeBucketExample_sink_secondary
.saveAsSortedBucket(
AvroSortedBucketIO
.write[String, String, Account](
// primary key class and field
classOf[String],
"name",
// secondary key class and field
classOf[String],
"type",
classOf[Account]
)
.to(args("accounts"))
)
// #SortMergeBucketExample_sink_secondary
}

def main(cmdLineArgs: Array[String]): Unit = {
val sc = pipeline(cmdLineArgs)
sc.run().waitUntilDone()
Expand Down Expand Up @@ -209,6 +232,22 @@ object SortMergeBucketTransformExample {
sc
}

def secondaryReadExample(cmdLineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdLineArgs)

// #SortMergeBucketExample_secondary_read
sc.sortMergeGroupByKey(
classOf[String], // primary key class
classOf[String], // secondary key class
AvroSortedBucketIO
.read(new TupleTag[Account]("account"), classOf[Account])
.from(args("accounts"))
).map { case ((primaryKey, secondaryKey), elements) =>
// ...
}
// #SortMergeBucketExample_secondary_read
}

def main(cmdLineArgs: Array[String]): Unit = {
val sc = pipeline(cmdLineArgs)
sc.run().waitUntilDone()
Expand Down
81 changes: 47 additions & 34 deletions site/src/main/paradox/extras/Sort-Merge-Bucket.md
Original file line number Diff line number Diff line change
@@ -1,44 +1,36 @@
# Sort Merge Bucket

Sort Merge Bucket is a technique for writing data to file system in deterministic file locations,
sorted according to some pre-determined key, so that it can later be read in as key groups with
no shuffle required. Since each element is assigned a file destination (bucket) based on a hash
of its join key, we can use the same technique to cogroup multiple Sources as long as they're
written using the same key and hashing scheme.
Sort Merge Bucket (SMB) is a technique for writing data to file system in deterministic file locations, sorted according to some pre-determined key, so that it can later be read in as key groups with no shuffle required.
Since each element is assigned a file destination (bucket) based on a hash of its join key, we can use the same technique to cogroup multiple Sources as long as they're written using the same key and hashing scheme.

For example, given these input records, and SMB write will first extract the key, assign the record
to a bucket, sort values within the bucket, and write these values to a corresponding file.
For example, given these input records, and SMB write will first extract the key, assign the record to a bucket, sort values within the bucket, and write these values to a corresponding file.

| Input | Key | Bucket | File Assignment |
|-------------------------------------------------------------------------|
| {key:"b", value: 1} | "b" | 0 | bucket-00000-of-00002.avro |
| {key:"b", value: 2} | "b" | 0 | bucket-00000-of-00002.avro |
| {key:"a", value: 3} | "a" | 1 | bucket-00001-of-00002.avro |

Two sources can be joined by opening file readers on corresponding buckets of eachT source and
merging key-groups as we go.
Two sources can be joined by opening file readers on corresponding buckets of each source and merging key-groups as we go.

## What are SMB transforms?

`scio-smb` provides three @javadoc[PTransform](org.apache.beam.sdk.transforms.PTransform)s,
as well as corresponding Scala API bindings, for SMB operations:
`scio-smb` provides three @javadoc[PTransform](org.apache.beam.sdk.transforms.PTransform)s, as well as corresponding Scala API bindings, for SMB operations:

- @javadoc[SortedBucketSink](org.apache.beam.sdk.extensions.smb.SortedBucketSink) writes data
to file system in SMB format.
- @javadoc[SortedBucketSink](org.apache.beam.sdk.extensions.smb.SortedBucketSink) writes data to file system in SMB format.
Scala APIs (see: @scaladoc[SortedBucketSCollection](com.spotify.scio.smb.syntax.SortedBucketSCollection)):

* `SCollection[T: Coder]#saveAsSortedBucket`

@@snip [SortMergeBucketExample.scala](/scio-examples/src/main/scala/com/spotify/scio/examples/extra/SortMergeBucketExample.scala) { #SortMergeBucketExample_sink }

Note the use of `Integer` for parameterized key type instead of a Scala `Int`. The key class
must have a Coder available in the default Beam (Java) coder registry.
Note the use of `Integer` for parameterized key type instead of a Scala `Int`.
The key class must have a Coder available in the default Beam (Java) coder registry.

Also note that the number of buckets specified must be a power of 2. This allows sources of different
bucket sizes to still be joinable.
Also note that the number of buckets specified must be a power of 2.
This allows sources of different bucket sizes to still be joinable.

- @javadoc[SortedBucketSource](org.apache.beam.sdk.extensions.smb.SortedBucketSource) reads
data that has been written to file system using `SortedBucketSink` into a collection of
- @javadoc[SortedBucketSource](org.apache.beam.sdk.extensions.smb.SortedBucketSource) reads data that has been written to file system using `SortedBucketSink` into a collection of
@javadoc[CoGbkResult](org.apache.beam.sdk.transforms.join.CoGbkResult)s.
Scala APIs (see: @scaladoc[SortedBucketScioContext](com.spotify.scio.smb.syntax.SortedBucketScioContext)):

Expand All @@ -50,10 +42,7 @@ data that has been written to file system using `SortedBucketSink` into a collec

@@snip [SortMergeBucketExample.scala](/scio-examples/src/main/scala/com/spotify/scio/examples/extra/SortMergeBucketExample.scala) { #SortMergeBucketExample_join }

- @javadoc[SortedBucketTransform](org.apache.beam.sdk.extensions.smb.SortedBucketTransform) reads
data that has been written to file system using `SortedBucketSink`, transforms each
@javadoc[CoGbkResult](org.apache.beam.sdk.transforms.join.CoGbkResult) using a user-supplied
function, and immediately rewrites them using the same bucketing scheme.
- @javadoc[SortedBucketTransform](org.apache.beam.sdk.extensions.smb.SortedBucketTransform) reads data that has been written to file system using `SortedBucketSink`, transforms each @javadoc[CoGbkResult](org.apache.beam.sdk.transforms.join.CoGbkResult) using a user-supplied function, and immediately rewrites them using the same bucketing scheme.
Scala APIs (see: @scaladoc[SortedBucketScioContext](com.spotify.scio.smb.syntax.SortedBucketScioContext)):

* `ScioContext#sortMergeTransform` (1-22 sources)
Expand All @@ -66,17 +55,41 @@ function, and immediately rewrites them using the same bucketing scheme.

SMB writes are supported for multiple formats:

- Avro (GenericRecord and SpecificRecord) when also depending on `scio-avro`.
- @javadoc[AvroSortedBucketIO](org.apache.beam.sdk.extensions.smb.AvroSortedBucketIO)
- JSON
- @javadoc[JsonSortedBucketIO](org.apache.beam.sdk.extensions.smb.JsonSortedBucketIO)
- Parquet when also depending on `scio-parquet`
- @javadoc[ParquetAvroSortedBucketIO](org.apache.beam.sdk.extensions.smb.ParquetAvroSortedBucketIO)
- @javadoc[ParquetTypesSortedBucketIO](org.apache.beam.sdk.extensions.smb.ParquetTypesSortedBucketIO)
- Tensorflow when also depending on `scio-tensorflow`
- @javadoc[TensorFlowBucketIO](org.apache.beam.sdk.extensions.smb.TensorFlowBucketIO)

## Null keys in SMB datasets
* Avro (GenericRecord and SpecificRecord) when also depending on `scio-avro`.
* @javadoc[AvroSortedBucketIO](org.apache.beam.sdk.extensions.smb.AvroSortedBucketIO)
* JSON
* @javadoc[JsonSortedBucketIO](org.apache.beam.sdk.extensions.smb.JsonSortedBucketIO)
* Parquet when also depending on `scio-parquet`
* @javadoc[ParquetAvroSortedBucketIO](org.apache.beam.sdk.extensions.smb.ParquetAvroSortedBucketIO)
* @javadoc[ParquetTypesSortedBucketIO](org.apache.beam.sdk.extensions.smb.ParquetTypesSortedBucketIO)
* Tensorflow when also depending on `scio-tensorflow`
* @javadoc[TensorFlowBucketIO](org.apache.beam.sdk.extensions.smb.TensorFlowBucketIO)

## Secondary keys

_Since Scio 0.12.0_.

A single key group may be very large and the implementation of SMB requires either handling the elements of the key group iteratively or loading the entire key group into memory.
In the case where a secondary grouping or sorting is required, this can be prohibitive in terms of memory and/or wasteful when multiple downstream pipelines do the same grouping.
For example, a SMB dataset might be keyed by `user_id` but all downstreams want to group by the tuple of `(user_id, artist_id)`.
kellen marked this conversation as resolved.
Show resolved Hide resolved

Secondary SMB keys enable this use-case by sorting pipeline output by the hashed primary SMB key as described above, then additionally sorting the output for each key by the secondary SMB key.
When key groups are read by a downstream pipeline it may read either the entire (primary) key group or the subset of elements belonging to the (primary key, secondary key) tuple.

_A dataset may therefore add a secondary key and remain compatible with any downstream readers which expect only a primary key._

To write with a secondary key, the additional key class and path must be provided:

@@snip [SortMergeBucketExample.scala](/scio-examples/src/main/scala/com/spotify/scio/examples/extra/SortMergeBucketExample.scala) { #SortMergeBucketExample_sink_secondary }

To read with a secondary key, the additional key class must be provided:

@@snip [SortMergeBucketExample.scala](/scio-examples/src/main/scala/com/spotify/scio/examples/extra/SortMergeBucketExample.scala) { #SortMergeBucketExample_secondary_read }

Corresponding secondary-key-enabled variants of `sortMergeJoin`, `sortMergeCogroup`, and `sortMergeTransform` are also included.


## Null keys

If the key field of one or more PCollection elements is null, those elements will be diverted into a special
bucket file, `bucket-null-keys.avro`. This file will be ignored in SMB reads and transforms and must
Expand Down
3 changes: 2 additions & 1 deletion site/src/main/paradox/releases/v0.12.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,12 @@ data
classOf[Integer], // Key class secondary
"age", // Key field secondary
classOf[Account])
)
```

### Magnolify upgrade
Scio 0.12.0 uses Magnolify 0.6.2, which contains a few new features: neo4j support, `AvroType` performance improvements,
and the the capability to annotate Parquet case classes when used in `AvroCompat` mode:
and the capability to annotate Parquet case classes when used in `AvroCompat` mode:

```scala
import magnolify.parquet.ParquetArray.AvroCompat._
Expand Down
Loading