Skip to content

Commit

Permalink
Use a local directory as an event source (close #42)
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Aug 5, 2021
1 parent bed2c1a commit 3079967
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 37 deletions.
1 change: 1 addition & 0 deletions build.sbt
Expand Up @@ -63,6 +63,7 @@ lazy val loader = project
Dependencies.commons,
Dependencies.fs2Aws,
Dependencies.fs2PubSub,
Dependencies.fs2BlobCore,
Dependencies.decline,
Dependencies.config,
Dependencies.specs2
Expand Down
2 changes: 1 addition & 1 deletion config/config.kinesis.minimal.hocon
Expand Up @@ -11,7 +11,7 @@
"host": "localhost"
"database": "snowplow"
"username": "postgres"
"password": ${POSTGRES_PASSWORD}
"password": "mysecretpassword"
"schema": "atomic"
}
}
16 changes: 16 additions & 0 deletions config/config.local.minimal.hocon
@@ -0,0 +1,16 @@
# The minimum required config options for loading from kinesis
{
"input": {
"type": "LocalFs"
"path": "/tmp/example"
}

"output" : {
"type": "Postgres"
"host": "localhost"
"database": "snowplow"
"username": "postgres"
"password": "mysecretpassword"
"schema": "atomic"
}
}
2 changes: 1 addition & 1 deletion config/config.pubsub.minimal.hocon
Expand Up @@ -11,7 +11,7 @@
"host": "localhost"
"database": "snowplow"
"username": "postgres"
"password": ${POSTGRES_PASSWORD}
"password": "mysecretpassword"
"schema": "atomic"
}
}
@@ -0,0 +1,30 @@
package com.snowplowanalytics.snowplow.postgres.config

import cats.effect.Sync
import cats.data.EitherT
import cats.implicits._

import java.nio.file.{Path, Files}
import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig.Source.LocalFs

import scala.util.Try

/**
* Check if given configuration meets necessary requirements
*/
object ConfigPreCheck {
def apply[F[_]: Sync](cli: Cli[F]): EitherT[F, String, Cli[F]] =
EitherT(Sync[F].delay {
cli.config.input match {
// If configuration is for local source, check if given path exists
case LocalFs(path) =>
Either.fromTry(Try {
Files.exists(Path.of("/" + path.toString()))
}).flatMap { exists =>
if (exists) cli.asRight
else "Path does not exist".asLeft
}.leftMap(_.toString)
case _ => cli.asRight
}
})
}
Expand Up @@ -20,7 +20,6 @@ import scala.jdk.CollectionConverters._
import cats.syntax.either._

import io.circe.Decoder
//import io.circe.generic.semiauto.deriveDecoder
import io.circe.generic.extras.semiauto.deriveConfiguredDecoder
import io.circe.generic.extras.Configuration

Expand All @@ -29,6 +28,8 @@ import LoaderConfig.{Purpose, Source}
import software.amazon.awssdk.regions.Region
import software.amazon.kinesis.common.{InitialPositionInStream, InitialPositionInStreamExtended}

import blobstore.Path

case class LoaderConfig(input: Source,
output: DBConfig,
purpose: Purpose,
Expand Down Expand Up @@ -108,6 +109,7 @@ object LoaderConfig {
initialPosition: InitPosition,
retrievalMode: Kinesis.Retrieval) extends Source
case class PubSub(projectId: String, subscriptionId: String) extends Source
case class LocalFs(path: Path) extends Source

object Kinesis {
sealed trait Retrieval
Expand All @@ -127,6 +129,11 @@ object LoaderConfig {
}
}

implicit val pathDecoder: Decoder[Path] =
Decoder.decodeString.emap { p =>
Path.fromString(p).toRight(s"Invalid path: $p")
}

implicit def ioCirceConfigSourceDecoder: Decoder[Source] =
deriveConfiguredDecoder[Source]
}
Expand Down
Expand Up @@ -12,18 +12,18 @@
*/
package com.snowplowanalytics.snowplow.postgres.loader

import cats.effect.{ExitCode, IO, IOApp}
import cats.effect.{IOApp, IO, ExitCode}

import org.log4s.getLogger

import com.snowplowanalytics.snowplow.badrows.Processor
import com.snowplowanalytics.snowplow.postgres.api.DB
import com.snowplowanalytics.snowplow.postgres.config.Cli
import com.snowplowanalytics.snowplow.postgres.config.{Cli, ConfigPreCheck}
import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig.Purpose
import com.snowplowanalytics.snowplow.postgres.generated.BuildInfo
import com.snowplowanalytics.snowplow.postgres.resources
import com.snowplowanalytics.snowplow.postgres.storage.utils
import com.snowplowanalytics.snowplow.postgres.streaming.{UnorderedPipe, sink, source}
import com.snowplowanalytics.snowplow.postgres.streaming.{sink, source, UnorderedPipe}

object Main extends IOApp {

Expand All @@ -32,7 +32,7 @@ object Main extends IOApp {
val processor = Processor(BuildInfo.name, BuildInfo.version)

def run(args: List[String]): IO[ExitCode] =
Cli.parse[IO](args).value.flatMap {
Cli.parse[IO](args).flatMap(ConfigPreCheck[IO]).value.flatMap {
case Right(Cli(loaderConfig, iglu)) =>
resources.initialize[IO](loaderConfig.output, iglu).use {
case (blocker, xa, state) =>
Expand Down
Expand Up @@ -15,6 +15,9 @@ package com.snowplowanalytics.snowplow.postgres.streaming
import java.util.{Base64, UUID}
import java.net.InetAddress
import java.nio.charset.StandardCharsets
import java.nio.file.{Paths => NioPaths}

import scala.util.Try

import cats.implicits._

Expand All @@ -30,7 +33,6 @@ import org.log4s.getLogger

import com.snowplowanalytics.iglu.core.SelfDescribingData
import com.snowplowanalytics.iglu.core.circe.implicits._

import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.analytics.scalasdk.ParsingError.NotTSV
import com.snowplowanalytics.snowplow.badrows.{BadRow, Payload}
Expand All @@ -53,6 +55,9 @@ import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient

import blobstore.fs.FileStore
import blobstore.Store

object source {

private lazy val logger = getLogger
Expand Down Expand Up @@ -83,36 +88,46 @@ object source {
val pubsubConfig = PubsubGoogleConsumerConfig[F](onFailedTerminate = pubsubOnFailedTerminate[F])
PubsubGoogleConsumer
.subscribeAndAck[F, Either[BadData, Data]](blocker, project, subscription, pubsubErrorHandler[F], pubsubConfig)
case Source.LocalFs(path) =>
val store: Store[F] = FileStore[F](NioPaths.get("/"), blocker)
store.list(path, recursive = true)
.map { p =>
store.get(p, chunkSize = 32 * 1024)
.through(fs2.text.utf8Decode)
.through(fs2.text.lines)
.filter(_.nonEmpty)
.map(parseItem(purpose, _))
}.flatten
}

/**
* Parse Kinesis record into a valid Loader's record, either enriched event or self-describing JSON,
* depending on purpose of the Loader
* Parse Kinesis record into a valid Loader's record
*/
def parseRecord(kind: Purpose, record: CommittableRecord): Either[BadData, Data] = {
val string =
try StandardCharsets.UTF_8.decode(record.record.data()).toString.asRight[BadData]
catch {
case _: IllegalArgumentException =>
val payload = StandardCharsets.UTF_8.decode(Base64.getEncoder.encode(record.record.data())).toString
kind match {
case Purpose.Enriched =>
val badRow = BadRow.LoaderParsingError(Cli.processor, NotTSV, Payload.RawPayload(payload))
BadData.BadEnriched(badRow).asLeft
case Purpose.SelfDescribing =>
BadData.BadJson(payload, "Cannot deserialize self-describing JSON from Kinesis record").asLeft
}
def parseRecord(kind: Purpose, record: CommittableRecord): Either[BadData, Data] =
Either.fromTry(Try(StandardCharsets.UTF_8.decode(record.record.data()).toString))
.leftMap { _ =>
val payload = StandardCharsets.UTF_8.decode(Base64.getEncoder.encode(record.record.data())).toString
kind match {
case Purpose.Enriched =>
val badRow = BadRow.LoaderParsingError(Cli.processor, NotTSV, Payload.RawPayload(payload))
BadData.BadEnriched(badRow)
case Purpose.SelfDescribing =>
BadData.BadJson(payload, "Cannot deserialize self-describing JSON from Kinesis record")
}
}
.flatMap(parseItem(kind, _))

string.flatMap { payload =>
kind match {
case Purpose.Enriched =>
parseEventString(payload).map(Data.Snowplow.apply)
case Purpose.SelfDescribing =>
parseJson(payload).map(Data.SelfDescribing.apply)
}
/**
* Parse given item into a valid Loader's record, either enriched event or self-describing JSON,
* depending on purpose of the Loader
*/
def parseItem(kind: Purpose, item: String): Either[BadData, Data] =
kind match {
case Purpose.Enriched =>
parseEventString(item).map(Data.Snowplow.apply)
case Purpose.SelfDescribing =>
parseJson(item).map(Data.SelfDescribing.apply)
}
}

def parseEventString(s: String): Either[BadData, Event] =
Event.parse(s).toEither.leftMap { error =>
Expand All @@ -127,12 +142,7 @@ object source {
.leftMap(error => BadData.BadJson(s, error))

def pubsubDataDecoder(purpose: Purpose): MessageDecoder[Either[BadData, Data]] =
purpose match {
case Purpose.Enriched =>
(message: Array[Byte]) => parseEventString(new String(message)).map(Data.Snowplow.apply).asRight
case Purpose.SelfDescribing =>
(message: Array[Byte]) => parseJson(new String(message)).map(Data.SelfDescribing.apply).asRight
}
(message: Array[Byte]) => parseItem(purpose, new String(message)).asRight

def pubsubErrorHandler[F[_]: Sync](message: PubsubMessage, error: Throwable, ack: F[Unit], nack: F[Unit]): F[Unit] = {
val _ = (error, nack)
Expand Down
Expand Up @@ -20,6 +20,8 @@ import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig.{InitPosition

import software.amazon.awssdk.regions.Region

import blobstore.Path

import org.specs2.mutable.Specification

class CliSpec extends Specification {
Expand Down Expand Up @@ -143,6 +145,31 @@ class CliSpec extends Specification {
case Cli(config, _) => config must beEqualTo(expected)
}
}

"accept example minimal local config" >> {
val config = Paths.get(getClass.getResource("/config.local.minimal.hocon").toURI)
val resolver = Paths.get(getClass.getResource("/resolver.json").toURI)
val argv = List("--config", config.toString, "--resolver", resolver.toString)

val expected = LoaderConfig(
Source.LocalFs(Path.fromString("/tmp/example").get),
DBConfig(
"localhost",
5432,
"snowplow",
"postgres",
"mysecretpassword",
"REQUIRE",
"atomic"
),
Purpose.Enriched,
Monitoring(Monitoring.Metrics(true))
)
val result = Cli.parse[IO](argv).value.unsafeRunSync()
result must beRight.like {
case Cli(config, _) => config must beEqualTo(expected)
}
}
}

}
2 changes: 2 additions & 0 deletions project/Dependencies.scala
Expand Up @@ -32,6 +32,7 @@ object Dependencies {
val fs2 = "2.5.6"
val log4s = "1.10.0"
val config = "1.4.1"
val fs2BlobCore = "0.7.3"

val analyticsSdk = "2.1.0"
val badRows = "2.1.0"
Expand Down Expand Up @@ -68,6 +69,7 @@ object Dependencies {
val doobiePgCirce = "org.tpolecat" %% "doobie-postgres-circe" % V.doobie
val doobieHikari = "org.tpolecat" %% "doobie-hikari" % V.doobie
val log4s = "org.log4s" %% "log4s" % V.log4s
val fs2BlobCore = "com.github.fs2-blobstore" %% "core" % V.fs2BlobCore

// Scala first-party
val analyticsSdk = "com.snowplowanalytics" %% "snowplow-scala-analytics-sdk" % V.analyticsSdk
Expand Down

0 comments on commit 3079967

Please sign in to comment.