-
Notifications
You must be signed in to change notification settings - Fork 33
/
T05MixedTest.scala
93 lines (77 loc) · 2.71 KB
/
T05MixedTest.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
/*
* Copyright 2016 Spotify AB.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.spotify.bdrc.testing
import com.spotify.scio._
import com.spotify.scio.io.TextIO
import com.spotify.scio.testing.PipelineSpec
import com.spotify.scio.values.SCollection
object WordCount4 {
def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)
val input = sc.textFile(args("input"))
val wc = countWords(input)
val output = formatOutput(wc)
output.saveAsTextFile(args("output"))
sc.run()
}
// transforms
def countWords(input: SCollection[String]): SCollection[(String, Long)] =
input.flatMap(split).countByValue
def formatOutput(input: SCollection[(String, Long)]): SCollection[String] =
input.map(format)
// functions
def split(input: String): Seq[String] = input.split("[^a-zA-Z']+").filter(_.nonEmpty)
def format(kv: (String, Long)): String = kv._1 + ": " + kv._2
}
/**
* Mixed function, transform and end-to-end tests
*
* Property-based tests require an object that extends Properties and therefore are not included.
*/
class MixedTest extends PipelineSpec {
val input = Seq("a b c d e", "a b a b")
val expected = Seq("a: 3", "b: 3", "c: 1", "d: 1", "e: 1")
val intermediate = Seq(("a", 3L), ("b", 3L), ("c", 1L), ("d", 1L), ("e", 1L))
// Function tests
"split" should "work" in {
WordCount3.split("a b,c d\te\n\nf") should equal(Seq("a", "b", "c", "d", "e", "f"))
}
"format" should "work" in {
WordCount3.format(("a", 10L)) should equal("a: 10")
}
// Transform tests
"countWords" should "work" in {
runWithContext { sc =>
val in = sc.parallelize(input)
WordCount4.countWords(in) should containInAnyOrder(intermediate)
}
}
"formatOutput" should "work" in {
runWithContext { sc =>
val in = sc.parallelize(intermediate)
WordCount4.formatOutput(in) should containInAnyOrder(expected)
}
}
// End-to-end test
"WordCount1" should "work" in {
JobTest[com.spotify.bdrc.testing.WordCount4.type]
.args("--input=in.txt", "--output=out.txt")
.input(TextIO("in.txt"), input)
.output(TextIO("out.txt"))(output => output should containInAnyOrder(expected))
.run()
}
}