Skip to content

Commit

Permalink
Make database thread pool size configurable (close #60)
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Aug 30, 2021
1 parent 70e7990 commit 9cdc0b3
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 15 deletions.
3 changes: 3 additions & 0 deletions config/config.kinesis.reference.hocon
Expand Up @@ -51,6 +51,9 @@
# Maximum number of connections database pool is allowed to reach
# Default is 10
"maxConnections": 10
# Size of the thread pool for blocking database operations
# Default is value of "maxConnections"
"threadPoolSize": 10
}

# Events that fail validation are written to specified stream.
Expand Down
3 changes: 3 additions & 0 deletions config/config.local.reference.hocon
Expand Up @@ -30,6 +30,9 @@
# Maximum number of connections database pool is allowed to reach
# Default is 10
"maxConnections": 10
# Size of the thread pool for blocking database operations
# Default is value of "maxConnections"
"threadPoolSize": 10
}

# Events that fail validation are written to specified stream.
Expand Down
3 changes: 3 additions & 0 deletions config/config.pubsub.reference.hocon
Expand Up @@ -37,6 +37,9 @@
# Maximum number of connections database pool is allowed to reach
# Default is 10
"maxConnections": 10
# Size of the thread pool for blocking database operations
# Default is value of "maxConnections"
"threadPoolSize": 10
}

# Events that fail validation are written to specified stream.
Expand Down
Expand Up @@ -24,7 +24,8 @@ case class DBConfig(host: String,
password: String, // TODO: can be EC2 store
sslMode: String,
schema: String,
maxConnections: Option[Int]
maxConnections: Int,
threadPoolSize: Option[Int]
) {
def getJdbc: JdbcUri =
JdbcUri(host, port, database, sslMode.toLowerCase().replace('_', '-'))
Expand All @@ -46,7 +47,7 @@ object DBConfig {
config.setJdbcUrl(dbConfig.getJdbc.toString)
config.setUsername(dbConfig.username)
config.setPassword(dbConfig.password)
dbConfig.maxConnections.foreach(config.setMaximumPoolSize)
config.setMaximumPoolSize(dbConfig.maxConnections)
config
}

Expand Down
Expand Up @@ -38,7 +38,7 @@ object resources {
def initialize[F[_]: Concurrent: Clock: ContextShift](postgres: DBConfig, iglu: Client[F, Json]) =
for {
blocker <- Blocker[F]
xa <- resources.getTransactor[F](DBConfig.hikariConfig(postgres), blocker)
xa <- resources.getTransactor[F](DBConfig.hikariConfig(postgres), blocker, postgres.threadPoolSize)
state <- Resource.eval(initializeState(postgres.schema, iglu, xa))
} yield (blocker, xa, state)

Expand All @@ -58,14 +58,8 @@ object resources {
} yield state

/** Get a HikariCP transactor */
def getTransactor[F[_]: Async: ContextShift](config: HikariConfig, be: Blocker): Resource[F, HikariTransactor[F]] = {
val threadPoolSize = {
// This could be made configurable, but these are sensible defaults and unlikely to be critical for tuning throughput.
// Exceeding availableProcessors could lead to unnecessary context switching.
// Exceeding the connection pool size is unnecessary, because that is limit of the app's parallelism.
val maxPoolSize = if (config.getMaximumPoolSize > 0) config.getMaximumPoolSize else 10
Math.min(maxPoolSize, Runtime.getRuntime.availableProcessors)
}
def getTransactor[F[_]: Async: ContextShift](config: HikariConfig, be: Blocker, threadPoolSizeOpt: Option[Int] = None): Resource[F, HikariTransactor[F]] = {
val threadPoolSize = threadPoolSizeOpt.getOrElse(config.getMaximumPoolSize)
logger.debug(s"Using thread pool of size $threadPoolSize for Hikari transactor")

for {
Expand Down
Expand Up @@ -54,7 +54,8 @@ class CliSpec extends Specification {
"mysecretpassword",
"REQUIRE",
"atomic",
Some(10)
10,
None
),
LoaderConfig.StreamSink.Noop
),
Expand Down Expand Up @@ -91,6 +92,7 @@ class CliSpec extends Specification {
"mysecretpassword",
"REQUIRE",
"atomic",
10,
Some(10)
),
LoaderConfig.StreamSink.Kinesis(
Expand Down Expand Up @@ -131,7 +133,8 @@ class CliSpec extends Specification {
"mysecretpassword",
"REQUIRE",
"atomic",
Some(10)
10,
None
),
LoaderConfig.StreamSink.Noop
),
Expand Down Expand Up @@ -165,6 +168,7 @@ class CliSpec extends Specification {
"mysecretpassword",
"REQUIRE",
"atomic",
10,
Some(10)
),
LoaderConfig.StreamSink.PubSub(
Expand Down Expand Up @@ -202,7 +206,8 @@ class CliSpec extends Specification {
"mysecretpassword",
"REQUIRE",
"atomic",
Some(10)
10,
None
),
LoaderConfig.StreamSink.Noop
),
Expand Down Expand Up @@ -232,6 +237,7 @@ class CliSpec extends Specification {
"mysecretpassword",
"REQUIRE",
"atomic",
10,
Some(10)
),
LoaderConfig.StreamSink.Local(PathInfo(Path.fromString("./tmp/bad").get, PathType.Relative))
Expand Down Expand Up @@ -262,7 +268,8 @@ class CliSpec extends Specification {
"mysecretpassword",
"REQUIRE",
"atomic",
Some(10)
10,
None
),
LoaderConfig.StreamSink.Noop
),
Expand Down
Expand Up @@ -54,6 +54,7 @@ class LocalSourceSpec extends Database {
"mysecretpassword",
"allow",
"public",
10,
None
),
LoaderConfig.StreamSink.Noop
Expand Down

0 comments on commit 9cdc0b3

Please sign in to comment.