Skip to content

Commit

Permalink
Add X-Klaviyo- request headers and use Retry-After response header (#149
Browse files Browse the repository at this point in the history
)

* Added klaviyo header and attempts header to auto-increment with number of attempts.

* Consolidated and improved API Request tests, added tests for new features.

* Use Retry-After with jitter for a 429 http code if present

---------

Co-authored-by: Evan Masseau <>
  • Loading branch information
evan-masseau committed Apr 10, 2024
1 parent bbbd55c commit 9eca270
Show file tree
Hide file tree
Showing 19 changed files with 392 additions and 286 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ import com.klaviyo.analytics.networking.requests.PushTokenApiRequest
import com.klaviyo.core.Registry
import com.klaviyo.core.lifecycle.ActivityEvent
import java.util.concurrent.ConcurrentLinkedDeque
import kotlin.math.max
import kotlin.math.min
import kotlin.math.pow
import org.json.JSONArray
import org.json.JSONException
import org.json.JSONObject
Expand Down Expand Up @@ -307,7 +304,7 @@ internal object KlaviyoApiClient : ApiClient {
// Encountered a retryable error
// Put this back on top of the queue, and we'll try again with backoff
apiQueue.offerFirst(request)
flushInterval = computeRetryInterval(request.attempts)
flushInterval = request.computeRetryInterval()
broadcastApiRequest(request)
break
}
Expand Down Expand Up @@ -337,17 +334,6 @@ internal object KlaviyoApiClient : ApiClient {
enqueuedTime = Registry.clock.currentTimeMillis()
handler?.postDelayed(this, flushInterval)
}

private fun computeRetryInterval(attempts: Int): Long {
val minRetryInterval = Registry.config.networkFlushIntervals[networkType]
val jitterSeconds = Registry.config.networkJitterRange.random()
val exponentialBackoff = (2.0.pow(attempts).toLong() + jitterSeconds).times(1_000)
val maxRetryInterval = Registry.config.networkMaxRetryInterval
return min(
max(minRetryInterval, exponentialBackoff),
maxRetryInterval
)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ interface ApiRequest {
val httpMethod: String

/**
* HTTP Headers
* HTTP request headers
*/
val headers: Map<String, String>

Expand All @@ -76,6 +76,11 @@ interface ApiRequest {
*/
val responseCode: Int?

/**
* HTTP Response Headers
*/
val responseHeaders: Map<String, List<String>>

/**
* Render the response as a string, if the request has been sent
* Format depends on the endpoint, see Klaviyo API documentation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,6 @@ internal class EventApiRequest(

override var type: String = "Create Event"

override var headers: Map<String, String> = mapOf(
HEADER_CONTENT to TYPE_JSON,
HEADER_ACCEPT to TYPE_JSON,
HEADER_REVISION to V3_REVISION,
HEADER_USER_AGENT to DeviceProperties.userAgent
)

override var query: Map<String, String> = mapOf(
COMPANY_ID to Registry.config.apiKey
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.klaviyo.analytics.networking.requests

import com.klaviyo.analytics.DeviceProperties
import com.klaviyo.core.Registry
import java.io.BufferedReader
import java.io.IOException
Expand All @@ -8,6 +9,9 @@ import java.net.HttpURLConnection
import java.net.URL
import java.util.UUID
import javax.net.ssl.HttpsURLConnection
import kotlin.math.max
import kotlin.math.min
import kotlin.math.pow
import org.json.JSONObject

/**
Expand Down Expand Up @@ -47,6 +51,9 @@ internal open class KlaviyoApiRequest(
const val HEADER_USER_AGENT = "User-Agent"
const val HEADER_ACCEPT = "Accept"
const val HEADER_REVISION = "Revision"
const val HEADER_KLAVIYO_MOBILE = "X-Klaviyo-Mobile"
const val HEADER_KLAVIYO_ATTEMPT = "X-Klaviyo-Attempt-Count"
const val HEADER_RETRY_AFTER = "Retry-After"
const val TYPE_JSON = "application/json"
const val V3_REVISION = "2023-07-15"

Expand Down Expand Up @@ -126,6 +133,15 @@ internal open class KlaviyoApiRequest(
}
}

/**
* Tracks number of attempts to limit retries
*/
final override var attempts = 0
private set(value) {
field = value
headers[HEADER_KLAVIYO_ATTEMPT] = "$value/${Registry.config.networkMaxAttempts}"
}

/**
* Compiles the base url, path and query data into a [URL] object
*/
Expand All @@ -150,7 +166,14 @@ internal open class KlaviyoApiRequest(
/**
* HTTP request headers
*/
override var headers: Map<String, String> = emptyMap()
override val headers: MutableMap<String, String> = mutableMapOf(
HEADER_CONTENT to TYPE_JSON,
HEADER_ACCEPT to TYPE_JSON,
HEADER_REVISION to V3_REVISION,
HEADER_USER_AGENT to DeviceProperties.userAgent,
HEADER_KLAVIYO_MOBILE to "1",
HEADER_KLAVIYO_ATTEMPT to "$attempts/${Registry.config.networkMaxAttempts}"
)

/**
* HTTP request query params
Expand All @@ -167,12 +190,6 @@ internal open class KlaviyoApiRequest(
*/
override val requestBody: String? get() = body?.toString()

/**
* Tracks number of attempts to limit retries
*/
final override var attempts = 0
private set

/**
* Timestamp request was first enqueued
*/
Expand Down Expand Up @@ -203,6 +220,12 @@ internal open class KlaviyoApiRequest(
override var responseCode: Int? = null
protected set

/**
* Response headers from Klaviyo
*/
override var responseHeaders: Map<String, List<String>> = emptyMap()
protected set

/**
* Body of response content from last send attempt
*/
Expand All @@ -220,7 +243,7 @@ internal open class KlaviyoApiRequest(
.accumulate(METHOD_JSON_KEY, method.name)
.accumulate(TIME_JSON_KEY, queuedTime)
.accumulate(UUID_JSON_KEY, uuid)
.accumulate(HEADERS_JSON_KEY, JSONObject(headers))
.accumulate(HEADERS_JSON_KEY, JSONObject(headers as Map<String, String>))
.accumulate(QUERY_JSON_KEY, JSONObject(query))
.accumulate(BODY_JSON_KEY, body)

Expand Down Expand Up @@ -315,6 +338,7 @@ internal open class KlaviyoApiRequest(
protected open fun parseResponse(connection: HttpURLConnection): Status {
// https://developers.klaviyo.com/en/docs/rate_limits_and_error_handling
responseCode = connection.responseCode
responseHeaders = connection.headerFields

status = when (responseCode) {
in successCodes -> Status.Complete
Expand All @@ -339,4 +363,44 @@ internal open class KlaviyoApiRequest(

return status
}

/**
* Compute a retry interval based on state of the request
*
* If present, obey the Retry-After response header, plus some jitter.
* Absent the header, use an exponential backoff algorithm, with a
* floor set by current network connection, and ceiling set by the config.
*/
fun computeRetryInterval(): Long {
val jitterSeconds = Registry.config.networkJitterRange.random()

try {
val retryAfter = this.responseHeaders[HEADER_RETRY_AFTER]?.getOrNull(0)

if (retryAfter?.isNotEmpty() == true) {
return (retryAfter.toInt() + jitterSeconds).times(1_000L)
}
} catch (e: NumberFormatException) {
Registry.log.warning("Invalid Retry-After header value", e)
}

val networkType = Registry.networkMonitor.getNetworkType().position
val minRetryInterval = Registry.config.networkFlushIntervals[networkType]
val exponentialBackoff = (2.0.pow(attempts).toLong() + jitterSeconds).times(1_000L)
val maxRetryInterval = Registry.config.networkMaxRetryInterval

return min(
max(minRetryInterval, exponentialBackoff),
maxRetryInterval
)
}

/**
* Clear a mutable map and add new key value pairs
* Utility to replace all headers
*/
fun <K, V> MutableMap<K, V>.replaceAllWith(newValues: Map<K, V>) = apply {
clear()
this += newValues
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ internal object KlaviyoApiRequestDecoder {
PushTokenApiRequest::class.simpleName -> PushTokenApiRequest(time, uuid)
else -> KlaviyoApiRequest(urlPath, method, time, uuid)
}.apply {
headers = json.getJSONObject(KlaviyoApiRequest.HEADERS_JSON_KEY).let {
it.keys().asSequence().associateWith { k -> it.getString(k) }
}
headers.replaceAllWith(
json.getJSONObject(KlaviyoApiRequest.HEADERS_JSON_KEY).let {
it.keys().asSequence().associateWith { k -> it.getString(k) }.toMap()
}
)
query = json.getJSONObject(KlaviyoApiRequest.QUERY_JSON_KEY).let {
it.keys().asSequence().associateWith { k -> it.getString(k) }
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.klaviyo.analytics.networking.requests

import com.klaviyo.analytics.DeviceProperties
import com.klaviyo.analytics.model.Profile
import com.klaviyo.analytics.model.ProfileKey
import com.klaviyo.core.Registry
Expand Down Expand Up @@ -63,13 +62,6 @@ internal class ProfileApiRequest(

override var type: String = "Identify Profile"

override var headers: Map<String, String> = mapOf(
HEADER_CONTENT to TYPE_JSON,
HEADER_ACCEPT to TYPE_JSON,
HEADER_REVISION to V3_REVISION,
HEADER_USER_AGENT to DeviceProperties.userAgent
)

override var query: Map<String, String> = mapOf(
COMPANY_ID to Registry.config.apiKey
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,6 @@ internal class PushTokenApiRequest(

override val type: String = "Push Token"

/**
* HTTP request headers
*/
override var headers: Map<String, String> = mapOf(
HEADER_CONTENT to TYPE_JSON,
HEADER_ACCEPT to TYPE_JSON,
HEADER_REVISION to V3_REVISION,
HEADER_USER_AGENT to DeviceProperties.userAgent
)

/**
* HTTP request query params
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.junit.After
import org.junit.Assert.assertEquals
import org.junit.Assert.assertNotEquals
import org.junit.Assert.assertNull
import org.junit.Before
import org.junit.Test

internal class KlaviyoTest : BaseTest() {
Expand Down Expand Up @@ -80,6 +81,7 @@ internal class KlaviyoTest : BaseTest() {
private val debounceTime = 5
private val apiClientMock: ApiClient = mockk()

@Before
override fun setup() {
super.setup()
Registry.register<ApiClient> { apiClientMock }
Expand All @@ -89,13 +91,14 @@ internal class KlaviyoTest : BaseTest() {
every { apiClientMock.enqueueEvent(any(), any()) } returns Unit
every { apiClientMock.enqueuePushToken(any(), any()) } returns Unit
every { configMock.debounceInterval } returns debounceTime
UserInfo.reset()
DevicePropertiesTest.mockDeviceProperties()
UserInfo.reset()
}

@After
fun cleanup() {
override fun cleanup() {
UserInfo.reset()
super.cleanup()
DevicePropertiesTest.unmockDeviceProperties()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.junit.Before
import org.junit.Test

internal class UserInfoTest : BaseTest() {

@Before
override fun setup() {
super.setup()
Expand Down
Loading

0 comments on commit 9eca270

Please sign in to comment.