Skip to content

Commit

Permalink
Merge 779b709 into 3b843e1
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Oct 30, 2021
2 parents 3b843e1 + 779b709 commit 6c48011
Show file tree
Hide file tree
Showing 24 changed files with 604 additions and 84 deletions.
37 changes: 37 additions & 0 deletions .github/workflows/lacework.yml
@@ -0,0 +1,37 @@
name: lacework

on:
push:
tags:
- '*'

jobs:
scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: coursier/cache-action@v3
- name: Set up JDK
uses: actions/setup-java@v1
with:
java-version: 11
- name: Get current version
id: ver
run: echo "::set-output name=tag::${GITHUB_REF#refs/tags/}"

- name: Install lacework scanner
run: |
sudo apt-get update
sudo apt-get -y install curl
curl -L https://github.com/lacework/lacework-vulnerability-scanner/releases/latest/download/lw-scanner-linux-amd64 -o lw-scanner
chmod +x lw-scanner
- name: Build docker images
run: sbt docker:publishLocal

- name: Scan snowplow-postgres-loader
env:
LW_ACCESS_TOKEN: ${{ secrets.LW_ACCESS_TOKEN }}
LW_ACCOUNT_NAME: ${{ secrets.LW_ACCOUNT_NAME }}
LW_SCANNER_SAVE_RESULTS: ${{ !contains(steps.version.outputs.tag, 'rc') }}
run: ./lw-scanner image evaluate snowplow/snowplow-postgres-loader ${{ steps.ver.outputs.tag }} --build-id ${{ github.run_id }} --no-pull
14 changes: 12 additions & 2 deletions .github/workflows/test.yml
Expand Up @@ -71,17 +71,27 @@ jobs:
DOCKER_PASSWORD: ${{ secrets.DOCKER_PASSWORD }}
- name: Stage the Docker build
run: sbt "project loader" docker:stage
- name: Docker metadata
id: meta
uses: docker/metadata-action@v3
with:
images: snowplow/snowplow-postgres-loader
tags: |
type=raw,value=latest,enable=${{ !contains(steps.ver.outputs.tag, 'rc') }}
type=raw,value=${{ steps.ver.outputs.tag }}
flavor: |
latest=false
- name: Set up QEMU
uses: docker/setup-qemu-action@v1
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1
- name: Build image
- name: Push image
uses: docker/build-push-action@v2
with:
context: modules/loader/target/docker/stage
file: modules/loader/target/docker/stage/Dockerfile
platforms: linux/amd64,linux/arm64/v8
tags: snowplow/snowplow-postgres-loader:${{ steps.ver.outputs.tag }}
tags: ${{ steps.meta.outputs.tags }}
push: true

deploy_common:
Expand Down
3 changes: 3 additions & 0 deletions config/config.kinesis.reference.hocon
Expand Up @@ -74,6 +74,9 @@
# Max size of the batch in bytes before emitting
# Default is 5MB
"maxBatchBytes": 5000000

# Only used when "type" is "Noop" or missing. How often to log number of bad rows discarded.
"reportPeriod": 10 seconds
}
}

Expand Down
3 changes: 3 additions & 0 deletions config/config.local.reference.hocon
Expand Up @@ -42,6 +42,9 @@
"type": "Local"
# Path for bad row sink.
"path": "./tmp/bad"

# Only used when "type" is "Noop" or missing. How often to log number of bad rows discarded.
"reportPeriod": 10 seconds
}
}

Expand Down
3 changes: 3 additions & 0 deletions config/config.pubsub.reference.hocon
Expand Up @@ -63,6 +63,9 @@
# The number of threads used internally by library to process the callback after message delivery
# Default is 1
"numCallbackExecutors": 1

# Only used when "type" is "Noop" or missing. How often to log number of bad rows discarded.
"reportPeriod": 10 seconds
}
}

Expand Down
Expand Up @@ -12,9 +12,33 @@
*/
package com.snowplowanalytics.snowplow.postgres.streaming

import cats.effect.{Resource, Concurrent}
import org.log4s.getLogger

import cats.effect.{Resource, Concurrent, Sync, Timer}
import cats.effect.concurrent.Ref

import fs2.Stream

import scala.concurrent.duration.FiniteDuration

object DummyStreamSink {
def create[F[_]: Concurrent]:Resource[F, StreamSink[F]] =
Resource.pure[F, StreamSink[F]](_ => Concurrent[F].pure(()))
def create[F[_]: Concurrent: Timer](period: FiniteDuration): Resource[F, StreamSink[F]] =
for {
counter <- Resource.eval(Ref.of(0))
_ <- Resource.make(Concurrent[F].start(reporter(counter, period)))(_.cancel)
} yield { _ =>
counter.update(_ + 1)
}

lazy val logger = getLogger

private def reporter[F[_]: Sync: Timer](counter: Ref[F, Int], period: FiniteDuration): F[Unit] =
Stream.awakeDelay[F](period)
.evalMap(_ => counter.getAndSet(0))
.evalMap { count =>
if (count > 0) Sync[F].delay(logger.info(s"Discarded $count bad rows"))
else Sync[F].unit
}
.compile
.drain
}
Expand Up @@ -102,7 +102,8 @@ object Database {
data_type::VARCHAR,
character_maximum_length::INTEGER
FROM information_schema.columns
WHERE table_name = $tableName"""
WHERE table_name = $tableName
ORDER BY ordinal_position"""
.query[(String, Option[String], Boolean, String, Option[Int])]
.map(ColumnInfo.tupled)
.to[List]
Expand Down
1 change: 1 addition & 0 deletions modules/loader/src/main/resources/application.conf
Expand Up @@ -22,6 +22,7 @@
}
"bad": {
"type": "Noop"
"reportPeriod": 30 seconds
"delayThreshold": 200 milliseconds
"maxBatchSize": 500
"maxBatchBytes": 5000000
Expand Down
@@ -0,0 +1,91 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"description": "Schema for an Iglu resolver's configuration",
"self": {
"vendor": "com.snowplowanalytics.iglu",
"name": "resolver-config",
"format": "jsonschema",
"version": "1-0-0"
},

"type": "object",

"properties": {

"cacheSize": {
"type": "integer",
"minimum": 0
},

"repositories": {
"type": "array",
"items": {
"type": "object",

"properties": {

"name": {
"type": "string"
},

"priority": {
"type": "integer"
},

"vendorPrefixes": {
"type": "array",
"items": {
"type": "string"
}
},

"connection": {
"type": "object",
"oneOf": [
{
"properties": {
"embedded": {
"type": "object",
"properties": {
"path": {
"type": "string"
}
},
"required": ["path"],
"additionalProperties": false
}
},
"required": ["embedded"],
"additionalProperties": false
},
{
"properties": {
"http": {
"type": "object",
"properties": {
"uri": {
"type": "string",
"format": "uri"
}
},
"required": ["uri"],
"additionalProperties": false
}
},
"required": ["http"],
"additionalProperties": false
}
]
}
},
"required": ["name", "priority", "vendorPrefixes", "connection"],
"additionalProperties": false
}
}

},

"required": ["cacheSize", "repositories"],
"additionalProperties": false
}

@@ -0,0 +1,93 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"description": "Schema for an Iglu resolver's configuration",
"self": {
"vendor": "com.snowplowanalytics.iglu",
"name": "resolver-config",
"format": "jsonschema",
"version": "1-0-1"
},

"type": "object",

"properties": {

"cacheSize": {
"type": "integer",
"minimum": 0
},

"repositories": {
"type": "array",
"items": {
"type": "object",

"properties": {

"name": {
"type": "string"
},

"priority": {
"type": "integer"
},

"vendorPrefixes": {
"type": "array",
"items": {
"type": "string"
}
},

"connection": {
"type": "object",
"oneOf": [
{
"properties": {
"embedded": {
"type": "object",
"properties": {
"path": {
"type": "string"
}
},
"required": ["path"],
"additionalProperties": false
}
},
"required": ["embedded"],
"additionalProperties": false
},
{
"properties": {
"http": {
"type": "object",
"properties": {
"uri": {
"type": "string",
"format": "uri"
},
"apikey": {
"type": ["string", "null"]
}
},
"required": ["uri"],
"additionalProperties": false
}
},
"required": ["http"],
"additionalProperties": false
}
]
}
},
"required": ["name", "priority", "vendorPrefixes", "connection"],
"additionalProperties": false
}
}

},

"required": ["cacheSize", "repositories"],
"additionalProperties": false
}

0 comments on commit 6c48011

Please sign in to comment.