Skip to content

Commit

Permalink
Merge 8be2a19 into bed2c1a
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Aug 26, 2021
2 parents bed2c1a + 8be2a19 commit 549ea55
Show file tree
Hide file tree
Showing 43 changed files with 1,662 additions and 495 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/test.yml
Expand Up @@ -36,8 +36,10 @@ jobs:
distribution: adopt
- name: Prepare test environment
run: $GITHUB_WORKSPACE/.github/start_environment.sh
- name: Run tests
run: sbt clean coverage test
- name: Run common module tests
run: sbt "project common" clean coverage test
- name: Run loader module tests
run: sbt "project loader" clean coverage test
- name: Aggregate coverage data
if: ${{ always() }}
run: sbt coverageAggregate
Expand Down
5 changes: 4 additions & 1 deletion build.sbt
Expand Up @@ -37,9 +37,11 @@ lazy val common = project
Dependencies.doobieHikari,
Dependencies.log4s,
Dependencies.logback,
Dependencies.catsRetry,
Dependencies.analyticsSdk,
Dependencies.badRows,
Dependencies.schemaDdl,
Dependencies.igluClient,
Dependencies.circeLiteral % Test,
Dependencies.specs2,
Dependencies.specs2Check,
Expand All @@ -63,12 +65,13 @@ lazy val loader = project
Dependencies.commons,
Dependencies.fs2Aws,
Dependencies.fs2PubSub,
Dependencies.fs2BlobCore,
Dependencies.decline,
Dependencies.config,
Dependencies.specs2
)
)
.dependsOn(common)
.dependsOn(common % "compile->compile;test->test")
.enablePlugins(JavaAppPackaging, DockerPlugin, BuildInfoPlugin)

addCompilerPlugin("com.olegpy" %% "better-monadic-for" % "0.3.1")
Expand Down
14 changes: 8 additions & 6 deletions config/config.kinesis.minimal.hocon
Expand Up @@ -7,11 +7,13 @@
}

"output" : {
"type": "Postgres"
"host": "localhost"
"database": "snowplow"
"username": "postgres"
"password": ${POSTGRES_PASSWORD}
"schema": "atomic"
good: {
"type": "Postgres"
"host": "localhost"
"database": "snowplow"
"username": "postgres"
"password": ${POSTGRES_PASSWORD}
"schema": "atomic"
}
}
}
74 changes: 57 additions & 17 deletions config/config.kinesis.reference.hocon
Expand Up @@ -11,31 +11,66 @@
# Either TRIM_HORIZON or LATEST
"initialPosition": "TRIM_HORIZON"

# Optional, set the polling mode for retrieving records. Default is FanOut
# Optional, set the polling mode for retrieving records. Default is FanOut
# "retrievalMode": "FanOut"
# "retrievalMode": {
# "type": "Polling"
# "maxRecords": 1000
# }

# Optional, configure the checkpointer.
"checkpointSettings": {
# The max number of records to aggregate before checkpointing the records.
# Default is 1000.
"maxBatchSize": 1000
# The max amount of time to wait before checkpointing the records.
# Default is 10 seconds.
"maxBatchWait": 10 seconds
}
}

"output" : {
"type": "Postgres"
# PostgreSQL host ('localhost' for enabled SSH Tunnel)
"host": "localhost"
# PostgreSQL database port
"port": 5432
# PostgreSQL database name
"database": "snowplow"
# PostgreSQL user to load data
"username": "postgres"
# PostgreSQL password, either plain text or from an environment variable
"password": "mysecretpassword"
"password": ${?POSTGRES_PASSWORD}
# PostgreSQL database schema
"schema": "atomic"
# JDBC ssl mode
"sslMode": "REQUIRE"
# Events that pass validation are written to Postgres.
"good": {
"type": "Postgres"
# PostgreSQL host ('localhost' for enabled SSH Tunnel)
"host": "localhost"
# PostgreSQL database port
"port": 5432
# PostgreSQL database name
"database": "snowplow"
# PostgreSQL user to load data
"username": "postgres"
# PostgreSQL password, either plain text or from an environment variable
"password": "mysecretpassword"
"password": ${?POSTGRES_PASSWORD}
# PostgreSQL database schema
"schema": "atomic"
# JDBC ssl mode
"sslMode": "REQUIRE"
# Maximum number of connections database pool is allowed to reach
"maxConnections": 100
}

# Events that fail validation are written to specified stream.
# If this section is removed from config, bad row output will be disabled.
"bad": {
# Enable the Kinesis sink as bad row sink
"type": "Kinesis"
# Name of the Kinesis stream to write to
"streamName": "bad-rows"
# AWS region in which the Kinesis stream resides.
"region": "eu-central-1"
# The delay threshold to use for batching
# Default is 200 milliseconds
"delayThreshold": 201 milliseconds
# Max number of items in the batch to collect before emitting
# Default is 500
"maxBatchSize": 501
# Max size of the batch in bytes before emitting
# Default is 5MB
"maxBatchBytes": 5000001
}
}

# Kind of data stored in this instance. Either ENRICHED_EVENTS or JSON
Expand All @@ -48,4 +83,9 @@
}
}

# Minimum and maximum backoff periods
"backoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 10 seconds
}
}
18 changes: 18 additions & 0 deletions config/config.local.minimal.hocon
@@ -0,0 +1,18 @@
# The minimum required config options for loading from local source
{
"input": {
"type": "Local"
"path": "/tmp/example"
}

"output" : {
"good": {
"type": "Postgres"
"host": "localhost"
"database": "snowplow"
"username": "postgres"
"password": ${POSTGRES_PASSWORD}
"schema": "atomic"
}
}
}
48 changes: 48 additions & 0 deletions config/config.local.reference.hocon
@@ -0,0 +1,48 @@
{
"input": {
# Enable the local event source
"type": "Local"
# Path for event source. It can be directory or file.
# If it is directory, all the files under given directory will be read recursively.
# Also, given path can be both absolute path or relative path w.r.t. executable.
"path": "./tmp/example"
}

"output" : {
# Events that pass validation are written to Postgres.
"good": {
"type": "Postgres"
# PostgreSQL host ('localhost' for enabled SSH Tunnel)
"host": "localhost"
# PostgreSQL database port
"port": 5432
# PostgreSQL database name
"database": "snowplow"
# PostgreSQL user to load data
"username": "postgres"
# PostgreSQL password, either plain text or from an environment variable
"password": "mysecretpassword"
"password": ${?POSTGRES_PASSWORD}
# PostgreSQL database schema
"schema": "atomic"
# JDBC ssl mode
"sslMode": "REQUIRE"
# Maximum number of connections database pool is allowed to reach
"maxConnections": 100
}

# Events that fail validation are written to specified stream.
# If this section is removed from config, bad row output will be disabled.
"bad": {
# Enable the local sink as bad row sink
"type": "Local"
# Path for bad row sink. It can be directory or file.
# If it is directory, all the files under given directory will be read recursively.
# Also, given path can be both absolute path or relative path w.r.t. executable.
"path": "./tmp/bad"
}
}

# Kind of data stored in this instance. Either ENRICHED_EVENTS or JSON
"purpose": "ENRICHED_EVENTS"
}
17 changes: 17 additions & 0 deletions config/config.local.relativetest.hocon
@@ -0,0 +1,17 @@
{
"input": {
"type": "Local"
"path": "tmp/example"
}

"output" : {
"good": {
"type": "Postgres"
"host": "localhost"
"database": "snowplow"
"username": "postgres"
"password": ${POSTGRES_PASSWORD}
"schema": "atomic"
}
}
}
14 changes: 8 additions & 6 deletions config/config.pubsub.minimal.hocon
Expand Up @@ -7,11 +7,13 @@
}

"output" : {
"type": "Postgres"
"host": "localhost"
"database": "snowplow"
"username": "postgres"
"password": ${POSTGRES_PASSWORD}
"schema": "atomic"
"good": {
"type": "Postgres"
"host": "localhost"
"database": "snowplow"
"username": "postgres"
"password": ${POSTGRES_PASSWORD}
"schema": "atomic"
}
}
}
72 changes: 56 additions & 16 deletions config/config.pubsub.reference.hocon
Expand Up @@ -6,28 +6,68 @@
"projectId": "my-project"
# Your GCP PubSub subscription id
"subscriptionId": "my-subscription"

# Optional, configure the checkpointer.
"checkpointSettings": {
# The max number of concurrent effects for checkpointer.
# Default is 100.
"maxConcurrent": 100
}
}

"output" : {
"type": "Postgres"
# PostgreSQL host ('localhost' for enabled SSH Tunnel)
"host": "localhost"
# PostgreSQL database port
"port": 5432
# PostgreSQL database name
"database": "snowplow"
# PostgreSQL user to load data
"username": "postgres"
# PostgreSQL password, either plain text or from an environment variable
"password": "mysecretpassword"
"password": ${?POSTGRES_PASSWORD}
# PostgreSQL database schema
"schema": "atomic"
# JDBC ssl mode
"sslMode": "REQUIRE"
# Events that pass validation are written to Postgres.
"good": {
"type": "Postgres"
# PostgreSQL host ('localhost' for enabled SSH Tunnel)
"host": "localhost"
# PostgreSQL database port
"port": 5432
# PostgreSQL database name
"database": "snowplow"
# PostgreSQL user to load data
"username": "postgres"
# PostgreSQL password, either plain text or from an environment variable
"password": "mysecretpassword"
"password": ${?POSTGRES_PASSWORD}
# PostgreSQL database schema
"schema": "atomic"
# JDBC ssl mode
"sslMode": "REQUIRE"
# Maximum number of connections database pool is allowed to reach
"maxConnections": 100
}

# Events that fail validation are written to specified stream.
# If this section is removed from config, bad row output will be disabled.
"bad": {
# Enable the Pubsub sink as bad row sink
"type": "PubSub"
# Your GCP project id
"projectId": "my-project"
# Your GCP PubSub topic id
"topicId": "my-topic"
# The delay threshold to use for batching
# Default is 200 milliseconds
"delayThreshold": 201 milliseconds
# Max number of items in the batch to collect before emitting
# Default is 500
"maxBatchSize": 501
# Max size of the batch in bytes before emitting
# Default is 5MB
"maxBatchBytes": 5000001
# The number of threads used internally by library to process the callback after message delivery
# Default is 1
"numCallbackExecutors": 2
}
}

# Kind of data stored in this instance. Either ENRICHED_EVENTS or JSON
"purpose": "ENRICHED_EVENTS"

# Minimum and maximum backoff periods
"backoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 10 seconds
}
}
6 changes: 6 additions & 0 deletions migrations/0-3-0.sql
@@ -0,0 +1,6 @@
ALTER TABLE public.events
ALTER COLUMN user_ipaddress TYPE varchar(128),
ALTER COLUMN domain_userid TYPE varchar(128),
ALTER COLUMN network_userid TYPE varchar(128),
ALTER COLUMN refr_domain_userid TYPE varchar(128),
ALTER COLUMN domain_sessionid TYPE varchar(128);
Expand Up @@ -28,7 +28,7 @@ import com.snowplowanalytics.iglu.schemaddl.migrations.SchemaList

import com.snowplowanalytics.snowplow.postgres.shredding.{Entity, Shredded, schema}
import com.snowplowanalytics.snowplow.postgres.storage.ddl
import com.snowplowanalytics.snowplow.postgres.streaming.sink
import com.snowplowanalytics.snowplow.postgres.streaming.Sink

trait DB[F[_]] {
def insert(event: List[Entity]): F[Unit]
Expand Down Expand Up @@ -88,7 +88,7 @@ object DB {
def interpreter[F[_]: Sync: Clock](resolver: Resolver[F], xa: Transactor[F], schemaName: String): DB[F] =
new DB[F] {
def insert(event: List[Entity]): F[Unit] =
event.traverse_(sink.insertStatement(schemaName, _)).transact(xa)
event.traverse_(Sink.insertStatement(schemaName, _)).transact(xa)

def alter(schemaKey: SchemaKey): F[Unit] = {
val result = ddl.alterTable[F](resolver, schemaName, schemaKey)
Expand Down
Expand Up @@ -23,7 +23,8 @@ case class DBConfig(host: String,
username: String,
password: String, // TODO: can be EC2 store
sslMode: String,
schema: String
schema: String,
maxConnections: Option[Int]
) {
def getJdbc: JdbcUri =
JdbcUri(host, port, database, sslMode.toLowerCase().replace('_', '-'))
Expand All @@ -45,7 +46,7 @@ object DBConfig {
config.setJdbcUrl(dbConfig.getJdbc.toString)
config.setUsername(dbConfig.username)
config.setPassword(dbConfig.password)
// TODO: DBConfig could take a MaxConnections field, and set `config.setMaximumPoolSize`.
dbConfig.maxConnections.foreach(config.setMaximumPoolSize)
config
}

Expand Down
Expand Up @@ -150,7 +150,7 @@ object transform {
val data = event.ordered.parTraverse {
case ("contexts" | "derived_contexts" | "unstruct_event", _) =>
none.asRight.toEitherNel
case (key @ ("event_id" | "domain_sessionid"), Some(value)) =>
case (key @ "event_id", Some(value)) =>
val error = castError("uuid") _
value.fold(
none.asRight.toEitherNel,
Expand Down

0 comments on commit 549ea55

Please sign in to comment.