/
WordCount.scala
85 lines (75 loc) · 3.18 KB
/
WordCount.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
/*
* Copyright 2019 Spotify AB.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
// Example: Word Count Example with Metrics
// Usage:
// `sbt "runMain com.spotify.scio.examples.WordCount
// --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME]
// --input=gs://apache-beam-samples/shakespeare/kinglear.txt
// --output=gs://[BUCKET]/[PATH]/wordcount"`
package com.spotify.scio.examples
import com.spotify.scio._
import com.spotify.scio.examples.common.ExampleData
import org.slf4j.LoggerFactory
object WordCount {
// Logger is an object instance, i.e. statically initialized and thus can be used safely in an
// anonymous function without serialization issue
private val logger = LoggerFactory.getLogger(this.getClass)
def main(cmdlineArgs: Array[String]): Unit = {
// Create `ScioContext` and `Args`
val (sc, args) = ContextAndArgs(cmdlineArgs)
// Parse input and output path from command line arguments
val input = args.getOrElse("input", ExampleData.KING_LEAR)
val output = args("output")
// Create a distribution and two counter metrics. `Distribution` tracks min, max, sum, min,
// etc. and `Counter` tracks count.
val lineDist = ScioMetrics.distribution("lineLength")
val sumNonEmpty = ScioMetrics.counter("nonEmptyLines")
val sumEmpty = ScioMetrics.counter("emptyLines")
// Open text files as an `SCollection[String]`
sc.textFile(input)
.transform("input cleaner") {
_.map { w =>
// Trim input lines, update distribution metric
val trimmed = w.trim
lineDist.update(trimmed.length.toLong)
trimmed
}.filter { w =>
// Filter out empty lines, update counter metrics
val r = w.nonEmpty
if (r) sumNonEmpty.inc() else sumEmpty.inc()
r
}
}
.transform("counter") {
// Split input lines, filter out empty tokens and expand into a collection of tokens
_.flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty))
// Count occurrences of each unique `String` to get `(String, Long)`
.countByValue
}
// Map `(String, Long)` tuples into strings
.map(t => t._1 + ": " + t._2)
// Save result as text files under the output path
.saveAsTextFile(output)
// Execute the pipeline and block until it finishes
val result = sc.run().waitUntilDone()
// Retrieve metric values
logger.info("Max: " + result.distribution(lineDist).committed.map(_.getMax))
logger.info("Min: " + result.distribution(lineDist).committed.map(_.getMin))
logger.info("Sum non-empty: " + result.counter(sumNonEmpty).committed)
logger.info("Sum empty: " + result.counter(sumEmpty).committed)
}
}