Skip to content

Commit

Permalink
Merge d0224bf into 894bbf2
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed May 26, 2023
2 parents 894bbf2 + d0224bf commit b9c2866
Show file tree
Hide file tree
Showing 13 changed files with 571 additions and 10 deletions.
11 changes: 11 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ lazy val root = project.in(file("."))
transformerBatch,
transformerKinesis,
transformerPubsub,
transformerKafka
)

lazy val common: Project = project
Expand Down Expand Up @@ -177,3 +178,13 @@ lazy val transformerPubsubDistroless = project
.settings(excludeDependencies ++= Dependencies.commonStreamTransformerExclusions)
.dependsOn(commonTransformerStream % "compile->compile;test->test;runtime->runtime", gcp % "compile->compile;test->test")
.enablePlugins(JavaAppPackaging, SnowplowDistrolessDockerPlugin, BuildInfoPlugin)

lazy val transformerKafka = project
.in(file("modules/transformer-kafka"))
.settings(BuildSettings.transformerKafkaBuildSettings)
.settings(addCompilerPlugin(Dependencies.betterMonadicFor))
.settings(libraryDependencies ++= Dependencies.transformerKafkaDependencies)
.settings(excludeDependencies ++= Dependencies.commonStreamTransformerExclusions)
.dependsOn(commonTransformerStream % "compile->compile;test->test;runtime->runtime", azure % "compile->compile;test->test;runtime->runtime")
.enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin)

13 changes: 13 additions & 0 deletions config/transformer/azure/transformer.kafka.config.minimal.hocon
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"input": {
"topicName": "enriched"
"bootstrapServers": "localhost:9092"
}
"output": {
"path": "https://accountName.blob.core.windows.net/transformed/"
}
"queue": {
"topicName": "loaderTopic"
"bootstrapServers": "localhost:9092"
}
}
153 changes: 153 additions & 0 deletions config/transformer/azure/transformer.kafka.config.reference.hocon
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
{
"input": {
"type": "kafka"

# Name of the Kafka topic to read from
"topicName": "enriched"

# A list of host:port pairs to use for establishing the initial connection to the Kafka cluster
# This list should be in the form host1:port1,host2:port2,...
"bootstrapServers": "localhost:9092"

# Optional, Kafka Consumer configuration
# See https://kafka.apache.org/documentation/#consumerconfigs for all properties
"consumerConf": {
"enable.auto.commit": "false"
"auto.offset.reset" : "earliest"
"group.id": "transformer"
}
}

# Path to transformed archive
"output": {
# Path to transformer output
"path": "https://accountName.blob.core.windows.net/transformed/",
# Transformer output compression, GZIP or NONE
# Optional, default value GZIP
"compression": "GZIP",

# Optional section specifying details about badrows output. When unspecified, badrows are written as files under 'output.path' URI
"bad": {

# Type of output sink. Either 'kafka' or 'file'. Optional, default value 'file'. When 'file', badrows are written as files under 'output.path' URI
"type": "kafka",

# Name of the Kafka topic to write to
"topicName": "bad"

# A list of host:port pairs to use for establishing the initial connection to the Kafka cluster
# This list should be in the form host1:port1,host2:port2,...
"bootstrapServers": "localhost:9092"

# Optional, Kafka producer configuration
# See https://kafka.apache.org/documentation/#producerconfigs for all properties
"producerConf": {
"acks": "all"
}
}
}

# Frequency to emit loading finished message - 5,10,15,20,30,60 etc minutes
# Optional, default value 10 minutes
"windowing": "10 minutes"

# Kafka topic used to communicate with Loader
"queue": {
"type": "kafka",

# Name of the Kafka topic to write to
"topicName": "loaderTopic"

# A list of host:port pairs to use for establishing the initial connection to the Kafka cluster
# This list should be in the form host1:port1,host2:port2,...
"bootstrapServers": "localhost:9092"

# Optional, Kafka producer configuration
# See https://kafka.apache.org/documentation/#producerconfigs for all properties
"producerConf": {
"acks": "all"
}
}

"formats": {
# Optional. Denotes output file format.
# Possible values are 'json' and 'parquet'. Default value 'json'.
"fileFormat": "json"
}

# Events will be validated against given criterias and
# bad row will be created if validation is not successful
"validations": {
"minimumTimestamp": "2021-11-18T11:00:00.00Z"
}

# Observability and reporting options
"monitoring": {
# Optional, for tracking runtime exceptions
"sentry": {
"dsn": "http://sentry.acme.com"
}
# Optional. How metrics are reported
"metrics": {
# Optional. Send metrics to a StatsD server (e.g. on localhost)
"statsd": {
"hostname": "localhost"
"port": 8125
"period": "1 minute"
# Optional. Any key-value pairs to be tagged on every StatsD metric
"tags": {
"app": transformer
}
# Optional. Override the default metric prefix
# "prefix": "snowplow.transformer."
}
# Optional. Log to stdout using Slf4j (logger name: transformer.metrics)
"stdout": {
"period": "1 minute"
# Optional. Override the default metric prefix
# "prefix": "snowplow.transformer."
}
}
}

# Optional. Configure telemetry
# All the fields are optional
"telemetry": {
# Set to true to disable telemetry
"disable": false
# Interval for the heartbeat event
"interval": 15 minutes
# HTTP method used to send the heartbeat event
"method": "POST"
# URI of the collector receiving the heartbeat event
"collectorUri": "collector-g.snowplowanalytics.com"
# Port of the collector receiving the heartbeat event
"collectorPort": 443
# Whether to use https or not
"secure": true
# Identifier intended to tie events together across modules,
# infrastructure and apps when used consistently
"userProvidedId": "my_pipeline"
# ID automatically generated upon running a modules deployment script
# Intended to identify each independent module, and the infrastructure it controls
"autoGeneratedId": "hfy67e5ydhtrd"
# Unique identifier for the VM instance
# Unique for each instance of the app running within a module
"instanceId": "665bhft5u6udjf"
# Name of the terraform module that deployed the app
"moduleName": "transformer-kafka-ce"
# Version of the terraform module that deployed the app
"moduleVersion": "1.0.0"
}

# Optional. Enable features that are still in beta, or which are here to enable smoother upgrades
"featureFlags": {
# Read/write in the legacy version 1 shredding complete message format.
# This should be enabled during upgrade from older versions of the loader.
"legacyMessageFormat": false

# When enabled, event's atomic fields are truncated (based on the length limits from the atomic JSON schema) before transformation.
# Optional, default "false".
"truncateAtomicFields": false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,17 @@ import cats.implicits._
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.Queue
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.Queue.Consumer
import fs2.kafka.{CommittableConsumerRecord, ConsumerSettings, KafkaConsumer => Fs2KafkaConsumer}
import org.typelevel.log4cats.Logger

import java.nio.charset.StandardCharsets

object KafkaConsumer {

private final case class KafkaMessage[F[_]](record: CommittableConsumerRecord[F, String, Array[Byte]]) extends Queue.Consumer.Message[F] {
final case class KafkaMessage[F[_]](record: CommittableConsumerRecord[F, String, Array[Byte]]) extends Queue.Consumer.Message[F] {
override def content: String = new String(record.record.value, StandardCharsets.UTF_8)
override def ack: F[Unit] = record.offset.commit
}

def consumer[F[_]: Async: Logger](
def consumer[F[_]: Async](
bootstrapServers: String,
topicName: String,
consumerConf: Map[String, String]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@ import cats.effect.{Async, Resource}
import cats.implicits._
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.Queue
import fs2.kafka.{KafkaProducer => Fs2KafkaProducer, ProducerRecord, ProducerSettings}
import org.typelevel.log4cats.Logger

import java.nio.charset.StandardCharsets
import java.util.UUID

object KafkaProducer {

def producer[F[_]: Async: Logger](
def producer[F[_]: Async](
bootstrapServers: String,
topicName: String,
producerConf: Map[String, String]
Expand All @@ -50,7 +49,7 @@ object KafkaProducer {
}
}

def chunkProducer[F[_]: Async: Logger](
def chunkProducer[F[_]: Async](
bootstrapServers: String,
topicName: String,
producerConf: Map[String, String]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ object Config {
throw new IllegalArgumentException(s"Subscription format $subscription invalid")
}
}

final case class Kafka(
topicName: String,
bootstrapServers: String,
consumerConf: Map[String, String]
) extends StreamInput
}

sealed trait Output {
Expand Down Expand Up @@ -112,6 +118,14 @@ object Config {
bad: Bad
) extends Output

final case class AzureBlobStorage(
path: URI,
compression: Compression,
bufferSize: Int,
maxRecordsPerFile: Long,
bad: Bad
) extends Output

sealed trait Bad
object Bad {

Expand Down Expand Up @@ -152,6 +166,12 @@ object Config {
throw new IllegalArgumentException(s"Subscription format $topic invalid")
}
}

final case class Kafka(
topicName: String,
bootstrapServers: String,
producerConf: Map[String, String]
) extends Queue
}

}
Expand All @@ -178,6 +198,12 @@ object Config {
throw new IllegalArgumentException(s"Subscription format $topic invalid")
}
}

final case class Kafka(
topicName: String,
bootstrapServers: String,
producerConf: Map[String, String]
) extends QueueConfig
}

final case class Monitoring(sentry: Option[TransformerConfig.Sentry], metrics: MetricsReporters)
Expand Down Expand Up @@ -232,8 +258,15 @@ object Config {
cur.as[StreamInput.Kinesis]
case Right("pubsub") =>
cur.as[StreamInput.Pubsub]
case Right("kafka") =>
cur.as[StreamInput.Kafka]
case Right(other) =>
Left(DecodingFailure(s"Shredder input type $other is not supported yet. Supported types: 'kinesis', 'pubsub'", typeCur.history))
Left(
DecodingFailure(
s"Shredder input type $other is not supported yet. Supported types: 'kinesis', 'pubsub', 'kafka'",
typeCur.history
)
)
case Left(DecodingFailure(_, List(CursorOp.DownField("type")))) =>
Left(DecodingFailure("Cannot find 'type' string in transformer configuration", typeCur.history))
case Left(other) =>
Expand All @@ -247,6 +280,9 @@ object Config {
implicit val streamInputPubsubConfigDecoder: Decoder[StreamInput.Pubsub] =
deriveDecoder[StreamInput.Pubsub]

implicit val streamInputKafkaConfigDecoder: Decoder[StreamInput.Kafka] =
deriveDecoder[StreamInput.Kafka]

implicit val outputConfigDecoder: Decoder[Output] =
Decoder.instance { cur =>
val pathCur = cur.downField("path")
Expand All @@ -255,9 +291,14 @@ object Config {
cur.as[Output.S3]
case Right("gs") =>
cur.as[Output.GCS]
case Right("https") =>
cur.as[Output.AzureBlobStorage]
case Right(other) =>
Left(
DecodingFailure(s"Output type $other is not supported yet. Supported types: 's3', 's3a', 's3n', and 'gs'", pathCur.history)
DecodingFailure(
s"Output type $other is not supported yet. Supported types: 's3', 's3a', 's3n', 'gs', 'https'",
pathCur.history
)
)
case Left(DecodingFailure(_, List(CursorOp.DownField("type")))) =>
Left(DecodingFailure("Cannot find 'path' string in output configuration", pathCur.history))
Expand All @@ -272,6 +313,9 @@ object Config {
implicit val pubsubBadOutputConfigDecoder: Decoder[Output.Bad.Queue.Pubsub] =
deriveDecoder[Output.Bad.Queue.Pubsub]

implicit val kafkaBadOutputConfigDecoder: Decoder[Output.Bad.Queue.Kafka] =
deriveDecoder[Output.Bad.Queue.Kafka]

implicit val badOutputConfigDecoder: Decoder[Output.Bad] =
Decoder.instance { cur =>
val typeCur = cur.downField("type")
Expand All @@ -280,12 +324,14 @@ object Config {
cur.as[Output.Bad.Queue.Kinesis]
case Right("pubsub") =>
cur.as[Output.Bad.Queue.Pubsub]
case Right("kafka") =>
cur.as[Output.Bad.Queue.Kafka]
case Right("file") =>
Right(Output.Bad.File)
case Right(other) =>
Left(
DecodingFailure(
s"Bad output type '$other' is not supported yet. Supported types: 'kinesis', 'pubsub', 'file'",
s"Bad output type '$other' is not supported yet. Supported types: 'kinesis', 'pubsub', 'kafka', 'file'",
typeCur.history
)
)
Expand All @@ -299,6 +345,9 @@ object Config {
implicit val outputGCSConfigDecoder: Decoder[Output.GCS] =
deriveDecoder[Output.GCS]

implicit val outputAzureBlobStorageConfigDecoder: Decoder[Output.AzureBlobStorage] =
deriveDecoder[Output.AzureBlobStorage]

implicit val queueConfigDecoder: Decoder[QueueConfig] =
Decoder.instance { cur =>
val typeCur = cur.downField("type")
Expand All @@ -309,8 +358,15 @@ object Config {
cur.as[QueueConfig.SQS]
case Right("pubsub") =>
cur.as[QueueConfig.Pubsub]
case Right("kafka") =>
cur.as[QueueConfig.Kafka]
case Right(other) =>
Left(DecodingFailure(s"Queue type $other is not supported yet. Supported types: 'SNS', 'SQS' and 'pubsub'", typeCur.history))
Left(
DecodingFailure(
s"Queue type $other is not supported yet. Supported types: 'SNS', 'SQS', 'pubsub' and 'kafka'",
typeCur.history
)
)
case Left(DecodingFailure(_, List(CursorOp.DownField("type")))) =>
Left(DecodingFailure("Cannot find 'type' string in transformer configuration", typeCur.history))
case Left(other) =>
Expand All @@ -327,6 +383,9 @@ object Config {
implicit val pubsubConfigDecoder: Decoder[QueueConfig.Pubsub] =
deriveDecoder[QueueConfig.Pubsub]

implicit val kafkaConfigDecoder: Decoder[QueueConfig.Kafka] =
deriveDecoder[QueueConfig.Kafka]

implicit val configDecoder: Decoder[Config] =
deriveDecoder[Config].ensure(validateConfig)

Expand Down
Loading

0 comments on commit b9c2866

Please sign in to comment.