Skip to content

Commit

Permalink
Merge 597c9ac into 3b843e1
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Nov 1, 2021
2 parents 3b843e1 + 597c9ac commit 1a3e15f
Show file tree
Hide file tree
Showing 20 changed files with 219 additions and 83 deletions.
37 changes: 37 additions & 0 deletions .github/workflows/lacework.yml
@@ -0,0 +1,37 @@
name: lacework

on:
push:
tags:
- '*'

jobs:
scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: coursier/cache-action@v3
- name: Set up JDK
uses: actions/setup-java@v1
with:
java-version: 11
- name: Get current version
id: ver
run: echo "::set-output name=tag::${GITHUB_REF#refs/tags/}"

- name: Install lacework scanner
run: |
sudo apt-get update
sudo apt-get -y install curl
curl -L https://github.com/lacework/lacework-vulnerability-scanner/releases/latest/download/lw-scanner-linux-amd64 -o lw-scanner
chmod +x lw-scanner
- name: Build docker images
run: sbt docker:publishLocal

- name: Scan snowplow-postgres-loader
env:
LW_ACCESS_TOKEN: ${{ secrets.LW_ACCESS_TOKEN }}
LW_ACCOUNT_NAME: ${{ secrets.LW_ACCOUNT_NAME }}
LW_SCANNER_SAVE_RESULTS: ${{ !contains(steps.version.outputs.tag, 'rc') }}
run: ./lw-scanner image evaluate snowplow/snowplow-postgres-loader ${{ steps.ver.outputs.tag }} --build-id ${{ github.run_id }} --no-pull
14 changes: 12 additions & 2 deletions .github/workflows/test.yml
Expand Up @@ -71,17 +71,27 @@ jobs:
DOCKER_PASSWORD: ${{ secrets.DOCKER_PASSWORD }}
- name: Stage the Docker build
run: sbt "project loader" docker:stage
- name: Docker metadata
id: meta
uses: docker/metadata-action@v3
with:
images: snowplow/snowplow-postgres-loader
tags: |
type=raw,value=latest,enable=${{ !contains(steps.ver.outputs.tag, 'rc') }}
type=raw,value=${{ steps.ver.outputs.tag }}
flavor: |
latest=false
- name: Set up QEMU
uses: docker/setup-qemu-action@v1
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1
- name: Build image
- name: Push image
uses: docker/build-push-action@v2
with:
context: modules/loader/target/docker/stage
file: modules/loader/target/docker/stage/Dockerfile
platforms: linux/amd64,linux/arm64/v8
tags: snowplow/snowplow-postgres-loader:${{ steps.ver.outputs.tag }}
tags: ${{ steps.meta.outputs.tags }}
push: true

deploy_common:
Expand Down
3 changes: 3 additions & 0 deletions config/config.kinesis.reference.hocon
Expand Up @@ -74,6 +74,9 @@
# Max size of the batch in bytes before emitting
# Default is 5MB
"maxBatchBytes": 5000000

# Only used when "type" is "Noop" or missing. How often to log number of bad rows discarded.
"reportPeriod": 10 seconds
}
}

Expand Down
3 changes: 3 additions & 0 deletions config/config.local.reference.hocon
Expand Up @@ -42,6 +42,9 @@
"type": "Local"
# Path for bad row sink.
"path": "./tmp/bad"

# Only used when "type" is "Noop" or missing. How often to log number of bad rows discarded.
"reportPeriod": 10 seconds
}
}

Expand Down
3 changes: 3 additions & 0 deletions config/config.pubsub.reference.hocon
Expand Up @@ -63,6 +63,9 @@
# The number of threads used internally by library to process the callback after message delivery
# Default is 1
"numCallbackExecutors": 1

# Only used when "type" is "Noop" or missing. How often to log number of bad rows discarded.
"reportPeriod": 10 seconds
}
}

Expand Down
Expand Up @@ -12,9 +12,33 @@
*/
package com.snowplowanalytics.snowplow.postgres.streaming

import cats.effect.{Resource, Concurrent}
import org.log4s.getLogger

import cats.effect.{Resource, Concurrent, Sync, Timer}
import cats.effect.concurrent.Ref

import fs2.Stream

import scala.concurrent.duration.FiniteDuration

object DummyStreamSink {
def create[F[_]: Concurrent]:Resource[F, StreamSink[F]] =
Resource.pure[F, StreamSink[F]](_ => Concurrent[F].pure(()))
def create[F[_]: Concurrent: Timer](period: FiniteDuration): Resource[F, StreamSink[F]] =
for {
counter <- Resource.eval(Ref.of(0))
_ <- Resource.make(Concurrent[F].start(reporter(counter, period)))(_.cancel)
} yield { _ =>
counter.update(_ + 1)
}

lazy val logger = getLogger

private def reporter[F[_]: Sync: Timer](counter: Ref[F, Int], period: FiniteDuration): F[Unit] =
Stream.awakeDelay[F](period)
.evalMap(_ => counter.getAndSet(0))
.evalMap { count =>
if (count > 0) Sync[F].delay(logger.info(s"Discarded $count bad rows"))
else Sync[F].unit
}
.compile
.drain
}
Expand Up @@ -102,7 +102,8 @@ object Database {
data_type::VARCHAR,
character_maximum_length::INTEGER
FROM information_schema.columns
WHERE table_name = $tableName"""
WHERE table_name = $tableName
ORDER BY ordinal_position"""
.query[(String, Option[String], Boolean, String, Option[Int])]
.map(ColumnInfo.tupled)
.to[List]
Expand Down
1 change: 1 addition & 0 deletions modules/loader/src/main/resources/application.conf
Expand Up @@ -22,6 +22,7 @@
}
"bad": {
"type": "Noop"
"reportPeriod": 30 seconds
"delayThreshold": 200 milliseconds
"maxBatchSize": 500
"maxBatchBytes": 5000000
Expand Down
11 changes: 9 additions & 2 deletions modules/loader/src/main/resources/logback.xml
@@ -1,5 +1,5 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<appender name="COLOR" class="ch.qos.logback.core.ConsoleAppender">
<!-- On Windows machines setting withJansi to true enables ANSI
color code interpretation by the Jansi library. This requires
org.fusesource.jansi:jansi:1.8 on the class path. Note that
Expand All @@ -11,8 +11,15 @@
</encoder>
</appender>

<appender name="NOCOLOR" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>[%thread] %-5level %logger{15} - %msg %n</pattern>
</encoder>
</appender>

<variable name="LOG_MODE" value="${LOG_MODE:-NOCOLOR}" />
<appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="STDOUT" />
<appender-ref ref="${LOG_MODE}" />
</appender>

<!-- Set environment varialbe LOG_LEVEL=DEBUG to override this setting -->
Expand Down
Expand Up @@ -21,7 +21,7 @@ import scala.concurrent.ExecutionContext
import cats.data.{EitherT, ValidatedNel}
import cats.implicits._

import cats.effect.{Clock, Sync, Async}
import cats.effect.{Async, Clock, ContextShift, Sync}

import com.typesafe.config.{Config => LightbendConfig, ConfigFactory}

Expand Down Expand Up @@ -55,7 +55,7 @@ object Cli {
case Right(rawConfig) => fromRawConfig(rawConfig)
}

def configPreCheck[F[_]: Async](cli: Cli[F])(implicit ec: ExecutionContext): EitherT[F, String, Cli[F]] = {
def configPreCheck[F[_]: Async: ContextShift](cli: Cli[F])(implicit ec: ExecutionContext): EitherT[F, String, Cli[F]] = {
val inputCheck = cli.config.input match {
case Local(pathInfo) =>
fileExists(pathInfo)
Expand Down
Expand Up @@ -234,7 +234,7 @@ object LoaderConfig {
sealed trait StreamSink extends Product with Serializable
object StreamSink {

case object Noop extends StreamSink
case class Noop(reportPeriod: FiniteDuration) extends StreamSink

case class Local(path: PathInfo) extends StreamSink

Expand Down
Expand Up @@ -31,7 +31,7 @@ import com.snowplowanalytics.snowplow.postgres.env.pubsub.{PubSubSink, PubSubEnv
final case class Environment[F[_], A](
source: Stream[F, A],
badRowSink: StreamSink[F],
getPayload: A => Either[BadRow, String],
getPayload: A => F[Either[BadRow, String]],
checkpointer: Pipe[F, A, Unit],
sinkPipe: HikariTransactor[F] => SinkPipe[F]
)
Expand All @@ -42,7 +42,7 @@ object Environment {
blocker: Blocker) =
for {
badSink <- config.output.bad match {
case LoaderConfig.StreamSink.Noop => DummyStreamSink.create
case LoaderConfig.StreamSink.Noop(period) => DummyStreamSink.create(period)
case c: LoaderConfig.StreamSink.Kinesis => KinesisSink.create(c, config.monitoring, config.backoffPolicy, blocker)
case c: LoaderConfig.StreamSink.PubSub => PubSubSink.create(c, config.backoffPolicy)
case c: LoaderConfig.StreamSink.Local => LocalSink.create(c, blocker)
Expand All @@ -54,7 +54,7 @@ object Environment {
}
} yield env

def streamSinkExists[F[_]: Async](config: LoaderConfig.StreamSink)(implicit ec: ExecutionContext): F[Boolean] =
def streamSinkExists[F[_]: Async: ContextShift](config: LoaderConfig.StreamSink)(implicit ec: ExecutionContext): F[Boolean] =
config match {
case c: LoaderConfig.StreamSink.Kinesis => KinesisSink.streamExists(c)
case c: LoaderConfig.StreamSink.PubSub => PubSubSink.topicExists(c)
Expand Down
Expand Up @@ -15,12 +15,12 @@ package com.snowplowanalytics.snowplow.postgres.env.kinesis
import java.util.{Base64, UUID}
import java.net.InetAddress
import java.nio.charset.StandardCharsets
import java.time.Instant

import cats.Monad
import cats.implicits._
import cats.data.NonEmptyList
import cats.data.{EitherT, NonEmptyList}

import cats.effect.{Blocker, ConcurrentEffect, ContextShift, Resource, Sync, Timer}
import cats.effect.{Blocker, Clock, ConcurrentEffect, ContextShift, Resource, Sync, Timer}

import fs2.{Stream, Pipe}
import fs2.aws.kinesis.{CommittableRecord, Kinesis}
Expand All @@ -47,21 +47,20 @@ import com.snowplowanalytics.snowplow.postgres.env.kinesis.KinesisSink.mkKinesis

object KinesisEnv {

def create[F[_]: ConcurrentEffect: ContextShift: Timer](blocker: Blocker,
config: Source.Kinesis,
badSink: StreamSink[F],
metrics: Monitoring.Metrics,
purpose: Purpose): Resource[F, Environment[F, CommittableRecord]] =
def create[F[_]: ConcurrentEffect: ContextShift: Clock: Timer](blocker: Blocker,
config: Source.Kinesis,
badSink: StreamSink[F],
metrics: Monitoring.Metrics,
purpose: Purpose): Resource[F, Environment[F, CommittableRecord]] =
for {
kinesisClient <- mkKinesisClient[F](config.region)
dynamoClient <- mkDynamoDbClient[F](config.region)
cloudWatchClient <- mkCloudWatchClient[F](config.region)
kinesis = Kinesis.create(blocker, scheduler(kinesisClient, dynamoClient, cloudWatchClient, config, metrics, _))
now <- Resource.eval(TimeUtils.now[F])
} yield Environment(
getSource(kinesis),
badSink,
getPayload(purpose, now),
getPayload[F](purpose, _),
checkpointer(kinesis, config.checkpointSettings),
SinkPipe.OrderedPipe.forTransactor[F]
)
Expand All @@ -72,21 +71,23 @@ object KinesisEnv {
// therefore given stream name and app name are not used in anywhere.
kinesis.readFromKinesisStream("THIS DOES NOTHING", "THIS DOES NOTHING")

private def getPayload(purpose: Purpose, now: Instant)(record: CommittableRecord): Either[BadRow, String] =
Either.catchNonFatal(StandardCharsets.UTF_8.decode(record.record.data()).toString)
.leftMap { _ =>
private def getPayload[F[_]: Clock: Monad](purpose: Purpose, record: CommittableRecord): F[Either[BadRow, String]] =
EitherT.fromEither[F](Either.catchNonFatal(StandardCharsets.UTF_8.decode(record.record.data()).toString))
.leftSemiflatMap[BadRow] { _ =>
val payload = StandardCharsets.UTF_8.decode(Base64.getEncoder.encode(record.record.data())).toString
purpose match {
case Purpose.Enriched =>
BadRow.LoaderParsingError(Cli.processor, NotTSV, Payload.RawPayload(payload))
Monad[F].pure(BadRow.LoaderParsingError(Cli.processor, NotTSV, Payload.RawPayload(payload)))
case Purpose.SelfDescribing =>
val failure = Failure.GenericFailure(
now,
NonEmptyList.of("\"Cannot deserialize self-describing JSON from record\"")
)
BadRow.GenericError(Cli.processor, failure, Payload.RawPayload(payload))
TimeUtils.now[F].map { now =>
val failure = Failure.GenericFailure(
now,
NonEmptyList.of("\"Cannot deserialize self-describing JSON from record\"")
)
BadRow.GenericError(Cli.processor, failure, Payload.RawPayload(payload))
}
}
}
}.value

private def checkpointer[F[_]](kinesis: Kinesis[F], checkpointSettings: Source.Kinesis.CheckpointSettings): Pipe[F, CommittableRecord, Unit] =
kinesis.checkpointRecords(checkpointSettings.unwrap).andThen(_.void)
Expand Down
Expand Up @@ -19,7 +19,7 @@ import scala.concurrent.ExecutionContext
import scala.jdk.FutureConverters._
import scala.util.{Success, Failure}

import cats.effect.{Async, Resource, Blocker, Sync, Timer}
import cats.effect.{Async, Blocker, ContextShift, Resource, Sync, Timer}
import cats.implicits._

import fs2.aws.internal.{KinesisProducerClientImpl, KinesisProducerClient}
Expand All @@ -44,7 +44,7 @@ object KinesisSink {

lazy val logger = getLogger

def create[F[_]: Async: Timer](config: LoaderConfig.StreamSink.Kinesis, monitoring: Monitoring, backoffPolicy: BackoffPolicy, blocker: Blocker): Resource[F, StreamSink[F]] =
def create[F[_]: Async: Timer: ContextShift](config: LoaderConfig.StreamSink.Kinesis, monitoring: Monitoring, backoffPolicy: BackoffPolicy, blocker: Blocker): Resource[F, StreamSink[F]] =
mkProducer(config, monitoring).map(writeToKinesis(config.streamName, backoffPolicy, blocker))

private def mkProducer[F[_]: Sync](config: LoaderConfig.StreamSink.Kinesis, monitoring: Monitoring): Resource[F, KinesisProducerClient[F]] =
Expand All @@ -65,16 +65,17 @@ object KinesisSink {
.setRecordMaxBufferedTime(config.delayThreshold.toMillis)
}

private def writeToKinesis[F[_]: Async: Timer](streamName: String,
backoffPolicy: BackoffPolicy,
blocker: Blocker)
(producer: KinesisProducerClient[F])
(data: Array[Byte]): F[Unit] = {
private def writeToKinesis[F[_]: Async: Timer: ContextShift](streamName: String,
backoffPolicy: BackoffPolicy,
blocker: Blocker)
(producer: KinesisProducerClient[F])
(data: Array[Byte]): F[Unit] = {
val res = for {
byteBuffer <- Async[F].delay(ByteBuffer.wrap(data))
partitionKey <- Async[F].delay(UUID.randomUUID().toString)
cb <- producer.putData(streamName, partitionKey, byteBuffer)
cbRes <- registerCallback(cb, blocker)
_ <- ContextShift[F].shift
} yield cbRes
res.retryingOnFailuresAndAllErrors(
wasSuccessful = _.isSuccessful,
Expand All @@ -97,7 +98,7 @@ object KinesisSink {
)
}

def streamExists[F[_]: Async](config: LoaderConfig.StreamSink.Kinesis)(implicit ec: ExecutionContext): F[Boolean] =
def streamExists[F[_]: Async: ContextShift](config: LoaderConfig.StreamSink.Kinesis)(implicit ec: ExecutionContext): F[Boolean] =
mkKinesisClient(config.region).use { client =>
Async[F].async[Boolean] { cb =>
val describeStreamRequest = DescribeStreamRequest.builder()
Expand All @@ -111,6 +112,8 @@ object KinesisSink {
cb(Right(streamStatus == StreamStatus.ACTIVE || streamStatus == StreamStatus.UPDATING))
case Failure(_) => cb(Right(false))
}
}.flatMap { result =>
ContextShift[F].shift.as(result)
}
}

Expand Down
Expand Up @@ -12,6 +12,7 @@
*/
package com.snowplowanalytics.snowplow.postgres.env.local

import cats.Applicative
import cats.implicits._
import cats.effect.{ContextShift, Blocker, Resource, Timer, ConcurrentEffect}

Expand All @@ -33,7 +34,7 @@ object LocalEnv {
Environment[F, String](
getSource(blocker, config),
badSink,
getPayload,
getPayload[F](_),
checkpointer,
SinkPipe.UnorderedPipe.forTransactor[F]
)
Expand All @@ -52,7 +53,7 @@ object LocalEnv {
}
}

private def getPayload[F[_]](record: String): Either[BadRow, String] = record.asRight
private def getPayload[F[_]: Applicative](record: String): F[Either[BadRow, String]] = record.asRight.pure[F]

private def checkpointer[F[_]]: Pipe[F, String, Unit] = _.map(_ => ())
}

0 comments on commit 1a3e15f

Please sign in to comment.