Skip to content

Commit

Permalink
make bigsampler bq output partition configurable (#705)
Browse files Browse the repository at this point in the history
* make-bigsampler-bq-output-configurable

* formatting

* lowercase docs and add arg docs to CLI

* day
  • Loading branch information
benkonz committed Feb 27, 2024
1 parent dbc9a12 commit 1b6b9c3
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 24 deletions.
19 changes: 10 additions & 9 deletions ratatool-sampling/README.md
Expand Up @@ -18,15 +18,16 @@ For full details see [BigSample.scala](https://github.com/spotify/ratatool/blob/
BigSampler - a tool for big data sampling
Usage: ratatool bigSampler [dataflow_options] [options]
--sample=<percentage> Percentage of records to take in sample, a decimal between 0.0 and 1.0
--input=<path> Input file path or BigQuery table
--output=<path> Output file path or BigQuery table
[--fields=<field1,field2,...>] An optional list of fields to include in hashing for sampling cohort selection
[--seed=<seed>] An optional seed used in hashing for sampling cohort selection
[--hashAlgorithm=(murmur|farm)] An optional arg to select the hashing algorithm for sampling cohort selection. Defaults to FarmHash for BigQuery compatibility
[--distribution=(uniform|stratified)] An optional arg to sample for a stratified or uniform distribution. Must provide `distributionFields`
[--distributionFields=<field1,field2,...>] An optional list of fields to sample for distribution. Must provide `distribution`
[--exact] An optional arg for higher precision distribution sampling.
--sample=<percentage> Percentage of records to take in sample, a decimal between 0.0 and 1.0
--input=<path> Input file path or BigQuery table
--output=<path> Output file path or BigQuery table
[--fields=<field1,field2,...>] An optional list of fields to include in hashing for sampling cohort selection
[--seed=<seed>] An optional seed used in hashing for sampling cohort selection
[--hashAlgorithm=(murmur|farm)] An optional arg to select the hashing algorithm for sampling cohort selection. Defaults to FarmHash for BigQuery compatibility
[--distribution=(uniform|stratified)] An optional arg to sample for a stratified or uniform distribution. Must provide `distributionFields`
[--distributionFields=<field1,field2,...>] An optional list of fields to sample for distribution. Must provide `distribution`
[--exact] An optional arg for higher precision distribution sampling.
[--bigqueryPartitioning=<day|hour|month|year|null>] An optional arg specifying what partitioning to use for the output BigQuery table, or 'null' for no partitioning. Defaults to day.
Since this runs a Scio/Beam pipeline, Dataflow options will have to be provided. At a
minimum, the following should be specified:
Expand Down
Expand Up @@ -101,16 +101,18 @@ object BigSampler extends Command {
println(s"""BigSampler - a tool for big data sampling
|Usage: ratatool $command [dataflow_options] [options]
|
| --sample=<percentage> Percentage of records to take in sample, a decimal between 0.0 and 1.0
| --input=<path> Input file path or BigQuery table
| --output=<path> Output file path or BigQuery table
| [--fields=<field1,field2,...>] An optional list of fields to include in hashing for sampling cohort selection
| [--seed=<seed>] An optional seed used in hashing for sampling cohort selection
| [--hashAlgorithm=(murmur|farm)] An optional arg to select the hashing algorithm for sampling cohort selection. Defaults to FarmHash for BigQuery compatibility
| [--distribution=(uniform|stratified)] An optional arg to sample for a stratified or uniform distribution. Must provide `distributionFields`
| [--distributionFields=<field1,field2,...>] An optional list of fields to sample for distribution. Must provide `distribution`
| [--exact] An optional arg for higher precision distribution sampling.
| [--byteEncoding=(raw|hex|base64)] An optional arg for how to encode fields of type bytes: raw bytes, hex encoded string, or base64 encoded string. Default is to hash raw bytes.
| --sample=<percentage> Percentage of records to take in sample, a decimal between 0.0 and 1.0
| --input=<path> Input file path or BigQuery table
| --output=<path> Output file path or BigQuery table
| [--fields=<field1,field2,...>] An optional list of fields to include in hashing for sampling cohort selection
| [--seed=<seed>] An optional seed used in hashing for sampling cohort selection
| [--hashAlgorithm=(murmur|farm)] An optional arg to select the hashing algorithm for sampling cohort selection. Defaults to FarmHash for BigQuery compatibility
| [--distribution=(uniform|stratified)] An optional arg to sample for a stratified or uniform distribution. Must provide `distributionFields`
| [--distributionFields=<field1,field2,...>] An optional list of fields to sample for distribution. Must provide `distribution`
| [--exact] An optional arg for higher precision distribution sampling.
| [--byteEncoding=(raw|hex|base64)] An optional arg for how to encode fields of type bytes: raw bytes, hex encoded string, or base64 encoded string. Default is to hash raw bytes.
| [--bigqueryPartitioning=<day|hour|month|year|null>] An optional arg specifying what partitioning to use for the output BigQuery table, or 'null' for no partitioning. Defaults to day.
|
|
|Since this runs a Scio/Beam pipeline, Dataflow options will have to be provided. At a
|minimum, the following should be specified:
Expand Down Expand Up @@ -175,7 +177,8 @@ object BigSampler extends Command {
hashAlgorithm,
distribution,
distributionFields,
exact
exact,
bigqueryPartitioning
) =
try {
val pct = args("sample").toFloat
Expand All @@ -189,7 +192,8 @@ object BigSampler extends Command {
args.optional("hashAlgorithm").map(HashAlgorithm.fromString).getOrElse(FarmHash),
args.optional("distribution").map(SampleDistribution.fromString),
args.list("distributionFields"),
Precision.fromBoolean(args.boolean("exact", default = false))
Precision.fromBoolean(args.boolean("exact", default = false)),
args.getOrElse("bigqueryPartitioning", "day")
)
} catch {
case e: Throwable =>
Expand Down Expand Up @@ -223,6 +227,10 @@ object BigSampler extends Command {
s"Input is a BigQuery table `$input`, output should be a BigQuery table too," +
s"but instead it's `$output`."
)
require(
List("DAY", "HOUR", "MONTH", "YEAR", "NULL").contains(bigqueryPartitioning.toUpperCase),
s"bigqueryPartitioning must be either 'day', 'month', 'year', or 'null', found $bigqueryPartitioning"
)
val inputTbl = parseAsBigQueryTable(input).get
val outputTbl = parseAsBigQueryTable(output).get

Expand All @@ -238,7 +246,8 @@ object BigSampler extends Command {
distributionFields,
exact,
sizePerKey,
byteEncoding
byteEncoding,
bigqueryPartitioning.toUpperCase
)
} else if (parseAsURI(input).isDefined) {
// right now only support for avro
Expand Down
Expand Up @@ -153,7 +153,8 @@ private[samplers] object BigSamplerBigQuery {
distributionFields: List[String],
precision: Precision,
sizePerKey: Int,
byteEncoding: ByteEncoding = RawEncoding
byteEncoding: ByteEncoding = RawEncoding,
bigqueryPartitioning: String
): ClosedTap[TableRow] = {
import BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED
import BigQueryIO.Write.WriteDisposition.WRITE_EMPTY
Expand Down Expand Up @@ -183,14 +184,18 @@ private[samplers] object BigSamplerBigQuery {
byteEncoding
)

val partitioning = bigqueryPartitioning match {
case "NULL" => null
case _ => TimePartitioning(bigqueryPartitioning)
}
val r = sampledCollection
.saveAsBigQueryTable(
Table.Ref(outputTbl),
schema,
WRITE_EMPTY,
CREATE_IF_NEEDED,
tableDescription = "",
TimePartitioning("DAY")
partitioning
)
sc.run().waitUntilDone()
r
Expand Down

0 comments on commit 1b6b9c3

Please sign in to comment.