Skip to content

Commit

Permalink
Set user-agent header in PubSub publisher and subscriber (close #37)
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Dec 5, 2023
1 parent c0aa9c1 commit 43f351b
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 3 deletions.
6 changes: 6 additions & 0 deletions modules/pubsub/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ snowplow.defaults: {
maxAckExtensionPeriod: "1 hour"
minDurationPerAckExtension: "60 seconds"
maxDurationPerAckExtension: "600 seconds"
gcpUserAgent: {
productName: "Snowplow OSS"
}
shutdownTimeout: "30 seconds"
}
}
Expand All @@ -14,6 +17,9 @@ snowplow.defaults: {
pubsub: {
batchSize: 1000
requestByteThreshold: 1000000
gcpUserAgent: {
productName: "Snowplow OSS"
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.pubsub

import cats.Show
import cats.implicits._
import io.circe.generic.semiauto._
import io.circe.Decoder
import com.google.api.gax.rpc.FixedHeaderProvider

case class GcpUserAgent(productName: String, productVersion: String)

object GcpUserAgent {
implicit def gcpUserAgentDecoder: Decoder[GcpUserAgent] = deriveDecoder[GcpUserAgent]

implicit def showGcpUserAgent: Show[GcpUserAgent] = Show { ua =>
s"${ua.productName}/${ua.productVersion} (GPN:Snowplow;)"
}

def headerProvider(ua: GcpUserAgent): FixedHeaderProvider =
FixedHeaderProvider.create("user-agent", ua.show)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import com.snowplowanalytics.snowplow.sinks.{ListOfList, Sink, Sinkable}
import org.threeten.bp.{Duration => ThreetenDuration}

import scala.jdk.CollectionConverters._

import java.util.UUID

import com.snowplowanalytics.snowplow.pubsub.GcpUserAgent

object PubsubSink {

def resource[F[_]: Async](config: PubsubSinkConfig): Resource[F, Sink[F]] =
Expand Down Expand Up @@ -64,6 +65,7 @@ object PubsubSink {
Publisher
.newBuilder(topic)
.setBatchingSettings(batchSettings.build)
.setHeaderProvider(GcpUserAgent.headerProvider(config.gcpUserAgent))
.build
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ package com.snowplowanalytics.snowplow.sinks.pubsub
import io.circe.Decoder
import io.circe.generic.semiauto._

import com.snowplowanalytics.snowplow.pubsub.GcpUserAgent

case class PubsubSinkConfig(
topic: PubsubSinkConfig.Topic,
batchSize: Long,
requestByteThreshold: Long
requestByteThreshold: Long,
gcpUserAgent: GcpUserAgent
)

object PubsubSinkConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import com.google.pubsub.v1.{ProjectSubscriptionName, PubsubMessage}
import org.threeten.bp.{Duration => ThreetenDuration}

// snowplow
import com.snowplowanalytics.snowplow.pubsub.GcpUserAgent
import com.snowplowanalytics.snowplow.sources.SourceAndAck
import com.snowplowanalytics.snowplow.sources.internal.{Checkpointer, LowLevelEvents, LowLevelSource}

Expand Down Expand Up @@ -196,6 +197,7 @@ object PubsubSource {
// Switch off any flow control, because we handle it ourselves with the semaphore
FlowControlSettings.getDefaultInstance
}
.setHeaderProvider(GcpUserAgent.headerProvider(config.gcpUserAgent))
.build
})
_ <- Resource.eval(Sync[F].delay {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@ import io.circe.config.syntax._

import scala.concurrent.duration.FiniteDuration

import com.snowplowanalytics.snowplow.pubsub.GcpUserAgent

case class PubsubSourceConfig(
subscription: PubsubSourceConfig.Subscription,
parallelPullCount: Int,
bufferMaxBytes: Int,
maxAckExtensionPeriod: FiniteDuration,
minDurationPerAckExtension: FiniteDuration,
maxDurationPerAckExtension: FiniteDuration,
gcpUserAgent: GcpUserAgent,
shutdownTimeout: FiniteDuration
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import io.circe.Decoder
import io.circe.generic.semiauto._
import org.specs2.Specification

import com.snowplowanalytics.snowplow.pubsub.GcpUserAgent

class PubsubSinkConfigSpec extends Specification {
import PubsubSinkConfigSpec._

Expand All @@ -27,6 +29,9 @@ class PubsubSinkConfigSpec extends Specification {
| "xyz": $${snowplow.defaults.sinks.pubsub}
| "xyz": {
| "topic": "projects/my-project/topics/my-topic"
| "gcpUserAgent": {
| "productVersion": "example-version"
| }
| }
|}
|""".stripMargin
Expand All @@ -36,7 +41,8 @@ class PubsubSinkConfigSpec extends Specification {
val expected = PubsubSinkConfig(
topic = PubsubSinkConfig.Topic("my-project", "my-topic"),
batchSize = 1000L,
requestByteThreshold = 1000000L
requestByteThreshold = 1000000L,
gcpUserAgent = GcpUserAgent("Snowplow OSS", "example-version")
)

result.as[Wrapper] must beRight.like { case w: Wrapper =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import org.specs2.Specification

import scala.concurrent.duration.DurationLong

import com.snowplowanalytics.snowplow.pubsub.GcpUserAgent

class PubsubSourceConfigSpec extends Specification {
import PubsubSourceConfigSpec._

Expand All @@ -29,6 +31,9 @@ class PubsubSourceConfigSpec extends Specification {
| "xyz": $${snowplow.defaults.sources.pubsub}
| "xyz": {
| "subscription": "projects/my-project/subscriptions/my-subscription"
| "gcpUserAgent": {
| "productVersion": "example-version"
| }
| }
|}
|""".stripMargin
Expand All @@ -42,6 +47,7 @@ class PubsubSourceConfigSpec extends Specification {
maxAckExtensionPeriod = 1.hour,
minDurationPerAckExtension = 1.minute,
maxDurationPerAckExtension = 10.minutes,
gcpUserAgent = GcpUserAgent("Snowplow OSS", "example-version"),
shutdownTimeout = 30.seconds
)

Expand Down

0 comments on commit 43f351b

Please sign in to comment.