Permalink
Browse files

feat(sync): add retry and backoff logic to sync amplitude import

  • Loading branch information...
philbooth committed Jan 31, 2019
1 parent e48162d commit 37cacb3e621d87b7c95730f8f3a9d1fa2a960ec1
Showing with 28 additions and 7 deletions.
  1. +28 −7 sync-common.js
@@ -21,6 +21,8 @@ const AWS_SECRET_KEY = process.env.FXA_AWS_SECRET_KEY
const MAX_EVENTS_PER_BATCH = 10
const API_KEY = process.env.FXA_AMPLITUDE_API_KEY
const S3_PATH = /^s3:\/\/([\w.-]+)\/(.+)$/
const AMPLITUDE_BACKOFF = 30000
const AMPLITUDE_RETRY_LIMIT = 3

module.exports = { run, hash, getOs }

@@ -138,14 +140,33 @@ function run (dataPath, impl) {
.then(() => processData({ count, reader, schema, eventCounts, index: index + PARQUET_BATCH_SIZE }))
}

function sendBatch (batch) {
return request('https://api.amplitude.com/httpapi', {
method: 'POST',
formData: {
api_key: API_KEY,
event: JSON.stringify(batch)
async function sendBatch (batch, iteration = 0) {
try {
return await request('https://api.amplitude.com/httpapi', {
simple: true,
method: 'POST',
formData: {
api_key: API_KEY,
event: JSON.stringify(batch)
}
}).promise()
} catch (error) {
iteration += 1
if (iteration === AMPLITUDE_RETRY_LIMIT) {
throw error
}
})

if (error.statusCode === 429) {
return new Promise((resolve, reject) => {
setTimeout(() => {
sendBatch(batch, iteration)
.then(resolve, reject)
}, AMPLITUDE_BACKOFF)
})
}

return sendBatch(batch, iteration)
}
}

async function processDataFromS3 (bucket, key) {

0 comments on commit 37cacb3

Please sign in to comment.