Skip to content

Commit

Permalink
Convert SortMergeBucketExample to Parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
clairemcginty committed Jan 23, 2024
1 parent 964aad7 commit c91eb56
Show file tree
Hide file tree
Showing 2 changed files with 249 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,30 @@
// Usage:

// `sbt runMain "com.spotify.scio.examples.extra.SortMergeBucketWriteExample
// --outputL=[OUTPUT]--outputR=[OUTPUT]"`
// --users=[OUTPUT] --accounts=[OUTPUT]"`
// `sbt runMain "com.spotify.scio.examples.extra.SortMergeBucketJoinExample
// --inputL=[INPUT]--inputR=[INPUT] --output=[OUTPUT]"`
// --users=[INPUT] --accounts=[INPUT] --output=[OUTPUT]"`
// `sbt runMain "com.spotify.scio.examples.extra.SortMergeBucketTransformExample
// --inputL=[INPUT]--inputR=[INPUT] --output=[OUTPUT]"`
// --users=[INPUT] --accounts=[INPUT] --output=[OUTPUT]"`
package com.spotify.scio.examples.extra

import com.spotify.scio.{Args, ContextAndArgs, ScioContext}
import com.spotify.scio.avro._
import com.spotify.scio.coders.Coder
import com.spotify.scio.io.ClosedTap
import com.spotify.scio.parquet.ParquetConfiguration
import com.spotify.scio.values.SCollection
import org.apache.avro.Schema
import org.apache.avro.file.CodecFactory
import org.apache.avro.{Schema, SchemaBuilder}
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
import org.apache.beam.sdk.extensions.smb.BucketMetadata.HashType
import org.apache.beam.sdk.extensions.smb.{AvroSortedBucketIO, TargetParallelism}
import org.apache.beam.sdk.extensions.smb.{
ParquetAvroSortedBucketIO,
ParquetTypeSortedBucketIO,
TargetParallelism
}
import org.apache.beam.sdk.values.TupleTag
import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.hadoop.ParquetOutputFormat

import scala.util.Random

Expand All @@ -49,7 +56,7 @@ object SortMergeBucketExample {
| "fields": [
| {
| "name": "userId",
| "type": ["null", {"type": "string", "avro.java.string": "String"}]
| "type": "int"
| },
| {
| "name": "age", "type": "int"
Expand All @@ -58,10 +65,11 @@ object SortMergeBucketExample {
|""".stripMargin
)

def user(id: String, age: Int): GenericRecord = new GenericRecordBuilder(UserDataSchema)
.set("userId", id)
.set("age", age)
.build()
def user(id: Int, age: Int): GenericRecord =
new GenericRecordBuilder(UserDataSchema)
.set("userId", id)
.set("age", age)
.build()
}

object SortMergeBucketWriteExample {
Expand All @@ -70,48 +78,50 @@ object SortMergeBucketWriteExample {
implicit val coder: Coder[GenericRecord] =
avroGenericRecordCoder(SortMergeBucketExample.UserDataSchema)

def pipeline(cmdLineArgs: Array[String]): ScioContext = {
val (sc, args) = ContextAndArgs(cmdLineArgs)

sc.parallelize(0 until 500)
.map(i => SortMergeBucketExample.user(i.toString, i % 100))
def pipeline(sc: ScioContext, args: Args): (ClosedTap[GenericRecord], ClosedTap[Account]) = {
val userWriteTap = sc
.parallelize(0 until 500)
.map(i => SortMergeBucketExample.user(i % 100, i % 100))
.saveAsSortedBucket(
AvroSortedBucketIO
.write(classOf[String], "userId", SortMergeBucketExample.UserDataSchema)
ParquetAvroSortedBucketIO
.write(classOf[Integer], "userId", SortMergeBucketExample.UserDataSchema)
.to(args("users"))
.withTempDirectory(sc.options.getTempLocation)
.withCodec(CodecFactory.snappyCodec())
.withHashType(HashType.MURMUR3_32)
.withFilenamePrefix("example-prefix")
.withNumBuckets(2)
.withNumShards(1)
)

// #SortMergeBucketExample_sink
sc.parallelize(250 until 750)
val accountWriteTap = sc
.parallelize(250 until 750)
.map { i =>
Account
.newBuilder()
.setId(i)
.setName(i.toString)
.setId(i % 100)
.setName(s"name$i")
.setType(s"type${i % 5}")
.setAmount(Random.nextDouble() * 1000)
.build()
}
.saveAsSortedBucket(
AvroSortedBucketIO
.write[String, Account](classOf[String], "name", classOf[Account])
ParquetAvroSortedBucketIO
.write[Integer, Account](classOf[Integer], "id", classOf[Account])
.to(args("accounts"))
.withSorterMemoryMb(128)
.withTempDirectory(sc.options.getTempLocation)
.withCodec(CodecFactory.snappyCodec())
.withConfiguration(
ParquetConfiguration.of(ParquetOutputFormat.BLOCK_SIZE -> 512 * 1024 * 1024)
)
.withHashType(HashType.MURMUR3_32)
.withFilenamePrefix("part") // Default is "bucket"
.withNumBuckets(1)
.withNumShards(1)
)
// #SortMergeBucketExample_sink
sc

(userWriteTap, accountWriteTap)
}

def secondaryKeyExample(
Expand All @@ -121,11 +131,11 @@ object SortMergeBucketWriteExample {
in
// #SortMergeBucketExample_sink_secondary
.saveAsSortedBucket(
AvroSortedBucketIO
.write[String, String, Account](
ParquetAvroSortedBucketIO
.write[Integer, String, Account](
// primary key class and field
classOf[String],
"name",
classOf[Integer],
"id",
// secondary key class and field
classOf[String],
"type",
Expand All @@ -137,8 +147,10 @@ object SortMergeBucketWriteExample {
}

def main(cmdLineArgs: Array[String]): Unit = {
val sc = pipeline(cmdLineArgs)
val (sc, args) = ContextAndArgs(cmdLineArgs)
pipeline(sc, args)
sc.run().waitUntilDone()
()
}
}

Expand All @@ -148,40 +160,42 @@ object SortMergeBucketJoinExample {
implicit val coder: Coder[GenericRecord] =
avroGenericRecordCoder(SortMergeBucketExample.UserDataSchema)

case class UserAccountData(userId: String, age: Int, balance: Double) {
override def toString: String = s"$userId\t$age\t$balance"
}

def pipeline(cmdLineArgs: Array[String]): ScioContext = {
val (sc, args) = ContextAndArgs(cmdLineArgs)

val mapFn: ((String, (GenericRecord, Account))) => UserAccountData = {
case (userId, (userData, account)) =>
UserAccountData(userId, userData.get("age").toString.toInt, account.getAmount)
}
case class AccountProjection(id: Int, amount: Double)

def pipeline(sc: ScioContext, args: Args): ClosedTap[String] = {
// #SortMergeBucketExample_join
sc.sortMergeJoin(
classOf[String],
AvroSortedBucketIO
.read(new TupleTag[GenericRecord]("lhs"), SortMergeBucketExample.UserDataSchema)
// 1. Only 1 user per user ID
// 2. Out of key intersection 250-499, only 100 (300-349, 400-499) with age < 50
.withPredicate((xs, x) => xs.size() == 0 && x.get("age").asInstanceOf[Int] < 50)
classOf[Integer],
ParquetAvroSortedBucketIO
.read(new TupleTag[GenericRecord](), SortMergeBucketExample.UserDataSchema)
.withProjection(
SchemaBuilder
.record("UserProjection")
.fields
.requiredInt("userId")
.requiredInt("age")
.endRecord
)
// Filter at the Parquet IO level to users under 50
.withFilterPredicate(FilterApi.lt(FilterApi.intColumn("age"), Int.box(50)))
// Filter at the SMB Cogrouping level to a single record per uesr
.withPredicate((xs, _) => xs.size() == 0)
.from(args("users")),
AvroSortedBucketIO
.read(new TupleTag[Account]("rhs"), classOf[Account])
ParquetTypeSortedBucketIO
.read(new TupleTag[AccountProjection]())
.from(args("accounts")),
TargetParallelism.max()
).map(mapFn) // Apply mapping function
).map { case (_, (userData, account)) =>
(userData.get("age").asInstanceOf[Int], account.amount)
}.groupByKey
.mapValues(amounts => amounts.sum / amounts.size)
.saveAsTextFile(args("output"))
// #SortMergeBucketExample_join

sc
}

def main(cmdLineArgs: Array[String]): Unit = {
val sc = pipeline(cmdLineArgs)
val (sc, args) = ContextAndArgs(cmdLineArgs)
pipeline(sc, args)
sc.run().waitUntilDone()
()
}
Expand All @@ -190,43 +204,38 @@ object SortMergeBucketJoinExample {
object SortMergeBucketTransformExample {
import com.spotify.scio.smb._

def pipeline(cmdLineArgs: Array[String]): ScioContext = {
val (sc, args) = ContextAndArgs(cmdLineArgs)
case class AccountProjection(id: Int, amount: Double)
case class CombinedAccount(id: Int, age: Int, totalValue: Double)

// #SortMergeBucketExample_transform
val (readLhs, readRhs) = (
AvroSortedBucketIO
.read(new TupleTag[GenericRecord]("lhs"), SortMergeBucketExample.UserDataSchema)
.from(args("users")),
AvroSortedBucketIO
.read(new TupleTag[Account]("rhs"), classOf[Account])
.from(args("accounts"))
def pipeline(sc: ScioContext, args: Args): ClosedTap[CombinedAccount] = {
implicit val coder: Coder[GenericRecord] = avroGenericRecordCoder(
SortMergeBucketExample.UserDataSchema
)

// #SortMergeBucketExample_transform
sc.sortMergeTransform(
classOf[String],
readLhs,
readRhs,
classOf[Integer],
ParquetAvroSortedBucketIO
.read(new TupleTag[GenericRecord](), SortMergeBucketExample.UserDataSchema)
.withFilterPredicate(FilterApi.lt(FilterApi.intColumn("age"), Int.box(50)))
.from(args("users")),
ParquetTypeSortedBucketIO
.read(new TupleTag[AccountProjection]())
.from(args("accounts")),
TargetParallelism.auto()
).to(
AvroSortedBucketIO
.transformOutput(classOf[String], "name", classOf[Account])
ParquetTypeSortedBucketIO
.transformOutput[Integer, CombinedAccount]("id")
.to(args("output"))
).via { case (key, (users, accounts), outputCollector) =>
users.foreach { _ =>
val sum = accounts.map(_.amount).sum
users.foreach { user =>
outputCollector.accept(
Account
.newBuilder()
.setId(key.toInt)
.setName(key)
.setType("combinedAmount")
.setAmount(accounts.foldLeft(0.0)(_ + _.getAmount))
.build()
CombinedAccount(key, user.get("age").asInstanceOf[Integer], sum)
)
}
}
// #SortMergeBucketExample_transform
sc
}

def secondaryReadExample(cmdLineArgs: Array[String]): Unit = {
Expand All @@ -236,7 +245,7 @@ object SortMergeBucketTransformExample {
sc.sortMergeGroupByKey(
classOf[String], // primary key class
classOf[String], // secondary key class
AvroSortedBucketIO
ParquetAvroSortedBucketIO
.read(new TupleTag[Account]("account"), classOf[Account])
.from(args("accounts"))
).map { case ((primaryKey, secondaryKey), elements) =>
Expand All @@ -246,7 +255,8 @@ object SortMergeBucketTransformExample {
}

def main(cmdLineArgs: Array[String]): Unit = {
val sc = pipeline(cmdLineArgs)
val (sc, args) = ContextAndArgs(cmdLineArgs)
pipeline(sc, args)
sc.run().waitUntilDone()
()
}
Expand Down
Loading

0 comments on commit c91eb56

Please sign in to comment.