-
Notifications
You must be signed in to change notification settings - Fork 1
/
ConfigSpec.scala
140 lines (114 loc) · 4.35 KB
/
ConfigSpec.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package unit
import akka.actor.ActorSystem
import akka.kafka.ConsumerSettings
import base.UnitSpecBase
import cats.implicits.*
import com.typesafe.config.{ConfigException, ConfigFactory}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import uk.sky.kafka.topicloader.TopicLoader.consumerSettings
import uk.sky.kafka.topicloader.config.Config
import scala.concurrent.duration.*
class ConfigSpec extends UnitSpecBase {
"consumerSettings" should {
val system: ActorSystem = ActorSystem(
"test-actor-system",
ConfigFactory.parseString(
s"""
|topic-loader {
| idle-timeout = 1 second
| buffer-size = 10
|}
""".stripMargin
)
)
"use given ConsumerSettings when given some settings" in {
val config = Config.loadOrThrow(system.settings.config)
val testSettings: ConsumerSettings[Array[Byte], Array[Byte]] =
ConsumerSettings(system, new ByteArrayDeserializer, new ByteArrayDeserializer)
.withProperty("test", "testing")
val result: ConsumerSettings[Array[Byte], Array[Byte]] =
consumerSettings(testSettings.some, config.topicLoader)(system)
result.properties("test") shouldBe "testing"
}
"use default ConsumerSettings if given None for maybeConsumerSettings" in {
val config = Config.loadOrThrow(system.settings.config)
val result: ConsumerSettings[Array[Byte], Array[Byte]] = consumerSettings(None, config.topicLoader)(system)
result.properties.get("test") shouldBe None
}
"use the config client ID over the akka client ID" in {
implicit val system: ActorSystem = ActorSystem(
"test-actor-system",
ConfigFactory.parseString(
s"""
|topic-loader {
| idle-timeout = 1 second
| buffer-size = 10
| client-id = test-client-id
|}
|akka.kafka.consumer.kafka-clients.client.id = akka-client-id
""".stripMargin
)
)
val config = Config.loadOrThrow(system.settings.config)
val result: ConsumerSettings[Array[Byte], Array[Byte]] = consumerSettings(None, config.topicLoader)
result.properties(ConsumerConfig.CLIENT_ID_CONFIG) shouldBe "test-client-id"
}
"use the akka client ID if no client ID is specified" in {
implicit val system: ActorSystem = ActorSystem(
"test-actor-system",
ConfigFactory.parseString(
s"""
|topic-loader {
| idle-timeout = 1 second
| buffer-size = 10
|}
|akka.kafka.consumer.kafka-clients.client.id = akka-client-id
""".stripMargin
)
)
val config = Config.loadOrThrow(system.settings.config)
val result: ConsumerSettings[Array[Byte], Array[Byte]] = consumerSettings(None, config.topicLoader)
result.properties(ConsumerConfig.CLIENT_ID_CONFIG) shouldBe "akka-client-id"
}
}
"config" should {
"load a valid config correctly" in {
val system: ActorSystem = ActorSystem(
"test-actor-system",
ConfigFactory.parseString(
s"""
|topic-loader {
| idle-timeout = 1 second
| buffer-size = 10
| client-id = test-client-id
|}
""".stripMargin
)
)
val config = Config.loadOrThrow(system.settings.config)
config.topicLoader.idleTimeout shouldBe 1.second
config.topicLoader.bufferSize.value shouldBe 10
config.topicLoader.clientId.value shouldBe "test-client-id"
}
"fail to load an invalid config" in {
val system: ActorSystem = ActorSystem(
"test-actor-system",
ConfigFactory.parseString(
s"""
|topic-loader {
| idle-timeout = 9999999999999999999999 seconds
| buffer-size = -1
|}
""".stripMargin
)
)
val exception: ConfigException = intercept[ConfigException](Config.loadOrThrow(system.settings.config))
exception.getMessage should (
include(
"Invalid value at 'topic-loader.idle-timeout': Could not parse duration number '9999999999999999999999'"
) and include("Invalid value at 'topic-loader.buffer-size': -1 is not a positive Int")
)
}
}
}