Skip to content

Commit

Permalink
Merge 51d89f9 into bed2c1a
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Aug 24, 2021
2 parents bed2c1a + 51d89f9 commit 6d66d8d
Show file tree
Hide file tree
Showing 38 changed files with 1,465 additions and 491 deletions.
8 changes: 5 additions & 3 deletions build.sbt
Expand Up @@ -37,9 +37,13 @@ lazy val common = project
Dependencies.doobieHikari,
Dependencies.log4s,
Dependencies.logback,
Dependencies.fs2Aws,
Dependencies.fs2PubSub,
Dependencies.fs2BlobCore,
Dependencies.analyticsSdk,
Dependencies.badRows,
Dependencies.schemaDdl,
Dependencies.igluClient,
Dependencies.circeLiteral % Test,
Dependencies.specs2,
Dependencies.specs2Check,
Expand All @@ -61,14 +65,12 @@ lazy val loader = project
libraryDependencies ++= Seq(
Dependencies.circeConfig,
Dependencies.commons,
Dependencies.fs2Aws,
Dependencies.fs2PubSub,
Dependencies.decline,
Dependencies.config,
Dependencies.specs2
)
)
.dependsOn(common)
.dependsOn(common % "compile->compile;test->test")
.enablePlugins(JavaAppPackaging, DockerPlugin, BuildInfoPlugin)

addCompilerPlugin("com.olegpy" %% "better-monadic-for" % "0.3.1")
Expand Down
14 changes: 8 additions & 6 deletions config/config.kinesis.minimal.hocon
Expand Up @@ -7,11 +7,13 @@
}

"output" : {
"type": "Postgres"
"host": "localhost"
"database": "snowplow"
"username": "postgres"
"password": ${POSTGRES_PASSWORD}
"schema": "atomic"
good: {
"type": "Postgres"
"host": "localhost"
"database": "snowplow"
"username": "postgres"
"password": ${POSTGRES_PASSWORD}
"schema": "atomic"
}
}
}
60 changes: 43 additions & 17 deletions config/config.kinesis.reference.hocon
Expand Up @@ -11,31 +11,57 @@
# Either TRIM_HORIZON or LATEST
"initialPosition": "TRIM_HORIZON"

# Optional, set the polling mode for retrieving records. Default is FanOut
# Optional, set the polling mode for retrieving records. Default is FanOut
# "retrievalMode": "FanOut"
# "retrievalMode": {
# "type": "Polling"
# "maxRecords": 1000
# }

# Optional, configure the checkpointer.
"checkpointSettings": {
# The max number of records to aggregate before checkpointing the records.
# Default is 1000.
"maxBatchSize": 1000
# The max amount of time to wait before checkpointing the records.
# Default is 10 seconds.
"maxBatchWait": 10 seconds
}
}

"output" : {
"type": "Postgres"
# PostgreSQL host ('localhost' for enabled SSH Tunnel)
"host": "localhost"
# PostgreSQL database port
"port": 5432
# PostgreSQL database name
"database": "snowplow"
# PostgreSQL user to load data
"username": "postgres"
# PostgreSQL password, either plain text or from an environment variable
"password": "mysecretpassword"
"password": ${?POSTGRES_PASSWORD}
# PostgreSQL database schema
"schema": "atomic"
# JDBC ssl mode
"sslMode": "REQUIRE"
# Events that pass validation are written to Postgres.
"good": {
"type": "Postgres"
# PostgreSQL host ('localhost' for enabled SSH Tunnel)
"host": "localhost"
# PostgreSQL database port
"port": 5432
# PostgreSQL database name
"database": "snowplow"
# PostgreSQL user to load data
"username": "postgres"
# PostgreSQL password, either plain text or from an environment variable
"password": "mysecretpassword"
"password": ${?POSTGRES_PASSWORD}
# PostgreSQL database schema
"schema": "atomic"
# JDBC ssl mode
"sslMode": "REQUIRE"
# Maximum number of connections database pool is allowed to reach
"maxConnections": 100
}

# Events that fail validation are written to specified stream.
# If this section is removed from config, bad row output will be disabled.
"bad": {
# Enable the Kinesis sink as bad row sink
"type": "Kinesis"
# Name of the Kinesis stream to write to
"streamName": "bad-rows"
# AWS region in which the Kinesis stream resides.
"region": "eu-central-1"
}
}

# Kind of data stored in this instance. Either ENRICHED_EVENTS or JSON
Expand Down
18 changes: 18 additions & 0 deletions config/config.local.minimal.hocon
@@ -0,0 +1,18 @@
# The minimum required config options for loading from local source
{
"input": {
"type": "Local"
"path": "/tmp/example"
}

"output" : {
"good": {
"type": "Postgres"
"host": "localhost"
"database": "snowplow"
"username": "postgres"
"password": ${POSTGRES_PASSWORD}
"schema": "atomic"
}
}
}
48 changes: 48 additions & 0 deletions config/config.local.reference.hocon
@@ -0,0 +1,48 @@
{
"input": {
# Enable the local event source
"type": "Local"
# Path for event source. It can be directory or file.
# If it is directory, all the files under given directory will be read recursively.
# Also, given path can be both absolute path or relative path w.r.t. executable.
"path": "./tmp/example"
}

"output" : {
# Events that pass validation are written to Postgres.
"good": {
"type": "Postgres"
# PostgreSQL host ('localhost' for enabled SSH Tunnel)
"host": "localhost"
# PostgreSQL database port
"port": 5432
# PostgreSQL database name
"database": "snowplow"
# PostgreSQL user to load data
"username": "postgres"
# PostgreSQL password, either plain text or from an environment variable
"password": "mysecretpassword"
"password": ${?POSTGRES_PASSWORD}
# PostgreSQL database schema
"schema": "atomic"
# JDBC ssl mode
"sslMode": "REQUIRE"
# Maximum number of connections database pool is allowed to reach
"maxConnections": 100
}

# Events that fail validation are written to specified stream.
# If this section is removed from config, bad row output will be disabled.
"bad": {
# Enable the local sink as bad row sink
"type": "Local"
# Path for bad row sink. It can be directory or file.
# If it is directory, all the files under given directory will be read recursively.
# Also, given path can be both absolute path or relative path w.r.t. executable.
"path": "./tmp/bad"
}
}

# Kind of data stored in this instance. Either ENRICHED_EVENTS or JSON
"purpose": "ENRICHED_EVENTS"
}
17 changes: 17 additions & 0 deletions config/config.local.relativetest.hocon
@@ -0,0 +1,17 @@
{
"input": {
"type": "Local"
"path": "tmp/example"
}

"output" : {
"good": {
"type": "Postgres"
"host": "localhost"
"database": "snowplow"
"username": "postgres"
"password": ${POSTGRES_PASSWORD}
"schema": "atomic"
}
}
}
14 changes: 8 additions & 6 deletions config/config.pubsub.minimal.hocon
Expand Up @@ -7,11 +7,13 @@
}

"output" : {
"type": "Postgres"
"host": "localhost"
"database": "snowplow"
"username": "postgres"
"password": ${POSTGRES_PASSWORD}
"schema": "atomic"
"good": {
"type": "Postgres"
"host": "localhost"
"database": "snowplow"
"username": "postgres"
"password": ${POSTGRES_PASSWORD}
"schema": "atomic"
}
}
}
48 changes: 32 additions & 16 deletions config/config.pubsub.reference.hocon
Expand Up @@ -9,22 +9,38 @@
}

"output" : {
"type": "Postgres"
# PostgreSQL host ('localhost' for enabled SSH Tunnel)
"host": "localhost"
# PostgreSQL database port
"port": 5432
# PostgreSQL database name
"database": "snowplow"
# PostgreSQL user to load data
"username": "postgres"
# PostgreSQL password, either plain text or from an environment variable
"password": "mysecretpassword"
"password": ${?POSTGRES_PASSWORD}
# PostgreSQL database schema
"schema": "atomic"
# JDBC ssl mode
"sslMode": "REQUIRE"
# Events that pass validation are written to Postgres.
"good": {
"type": "Postgres"
# PostgreSQL host ('localhost' for enabled SSH Tunnel)
"host": "localhost"
# PostgreSQL database port
"port": 5432
# PostgreSQL database name
"database": "snowplow"
# PostgreSQL user to load data
"username": "postgres"
# PostgreSQL password, either plain text or from an environment variable
"password": "mysecretpassword"
"password": ${?POSTGRES_PASSWORD}
# PostgreSQL database schema
"schema": "atomic"
# JDBC ssl mode
"sslMode": "REQUIRE"
# Maximum number of connections database pool is allowed to reach
"maxConnections": 100
}

# Events that fail validation are written to specified stream.
# If this section is removed from config, bad row output will be disabled.
"bad": {
# Enable the Pubsub sink as bad row sink
"type": "PubSub"
# Your GCP project id
"projectId": "my-project"
# Your GCP PubSub topic id
"topicId": "my-topic"
}
}

# Kind of data stored in this instance. Either ENRICHED_EVENTS or JSON
Expand Down
Expand Up @@ -28,7 +28,7 @@ import com.snowplowanalytics.iglu.schemaddl.migrations.SchemaList

import com.snowplowanalytics.snowplow.postgres.shredding.{Entity, Shredded, schema}
import com.snowplowanalytics.snowplow.postgres.storage.ddl
import com.snowplowanalytics.snowplow.postgres.streaming.sink
import com.snowplowanalytics.snowplow.postgres.streaming.Sink

trait DB[F[_]] {
def insert(event: List[Entity]): F[Unit]
Expand Down Expand Up @@ -88,7 +88,7 @@ object DB {
def interpreter[F[_]: Sync: Clock](resolver: Resolver[F], xa: Transactor[F], schemaName: String): DB[F] =
new DB[F] {
def insert(event: List[Entity]): F[Unit] =
event.traverse_(sink.insertStatement(schemaName, _)).transact(xa)
event.traverse_(Sink.insertStatement(schemaName, _)).transact(xa)

def alter(schemaKey: SchemaKey): F[Unit] = {
val result = ddl.alterTable[F](resolver, schemaName, schemaKey)
Expand Down
Expand Up @@ -23,7 +23,8 @@ case class DBConfig(host: String,
username: String,
password: String, // TODO: can be EC2 store
sslMode: String,
schema: String
schema: String,
maxConnections: Option[Int]
) {
def getJdbc: JdbcUri =
JdbcUri(host, port, database, sslMode.toLowerCase().replace('_', '-'))
Expand All @@ -45,7 +46,7 @@ object DBConfig {
config.setJdbcUrl(dbConfig.getJdbc.toString)
config.setUsername(dbConfig.username)
config.setPassword(dbConfig.password)
// TODO: DBConfig could take a MaxConnections field, and set `config.setMaximumPoolSize`.
dbConfig.maxConnections.foreach(config.setMaximumPoolSize)
config
}

Expand Down

0 comments on commit 6d66d8d

Please sign in to comment.