Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refreshing access token logic for get/createAudience #1767

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
96 changes: 90 additions & 6 deletions packages/core/src/destination-kit/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ interface AuthSettings<Settings> {
interface RefreshAuthSettings<Settings> {
settings: Settings
auth: OAuth2ClientCredentials
statsContext?: StatsContext
}

interface Authentication<Settings> {
Expand Down Expand Up @@ -419,7 +420,73 @@ export class Destination<Settings = JSONObject, AudienceSettings = JSONObject> {
}
}

async createAudience(createAudienceInput: CreateAudienceInput<Settings, AudienceSettings>) {
audienceRunnerFactory = (
operation: 'create' | 'get',
audienceDefinition: AudienceDestinationDefinition,
requestClient: ReturnType<typeof createRequestClient>,
audienceInput: CreateAudienceInput<Settings, AudienceSettings> | GetAudienceInput<Settings, AudienceSettings>
) => {
const run = async () => {
if (!instanceOfAudienceDestinationSettingsWithCreateGet(audienceDefinition.audienceConfig)) {
throw new Error('Unexpected call to createAudience')
}

switch (operation) {
case 'create':
return await audienceDefinition.audienceConfig?.createAudience(
requestClient,
audienceInput as CreateAudienceInput<Settings, AudienceSettings>
)
case 'get':
return await audienceDefinition.audienceConfig?.getAudience(
requestClient,
audienceInput as GetAudienceInput<Settings, AudienceSettings>
)
}
}

return run
}

audienceRetryFactory = (destinationSettings: Settings, settings: JSONObject, options?: OnEventOptions) => {
const onFailedAttempt = async (error: any) => {
const statusCode = error?.status ?? error?.response?.status ?? 500

// Throw original error if it is unrelated to invalid access tokens and not an oauth2 scheme
if (
!(
statusCode === 401 &&
(this.authentication?.scheme === 'oauth2' || this.authentication?.scheme === 'oauth-managed')
)
) {
throw error
}

const oauthSettings = getOAuth2Data(settings)
const newTokens = await this.refreshAccessToken(
destinationSettings,
oauthSettings,
options?.synchronizeRefreshAccessToken
)
if (!newTokens) {
throw new InvalidAuthenticationError('Failed to refresh access token', ErrorCodes.OAUTH_REFRESH_FAILED)
}

// Update `settings` with new tokens
settings = updateOAuthSettings(settings, newTokens)
// Copied from onSubscriptions -> I don't think this options object will have the onTokenRefresh function...

if (options) {
await options?.onTokenRefresh?.(newTokens)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should you do a try / catch here if it's possible to get error onTokenRefresh?

}
}
return onFailedAttempt
}

async createAudience(
createAudienceInput: CreateAudienceInput<Settings, AudienceSettings>,
requestOptions?: OnEventOptions
) {
const audienceDefinition = this.definition as AudienceDestinationDefinition
if (!instanceOfAudienceDestinationSettingsWithCreateGet(audienceDefinition.audienceConfig)) {
throw new Error('Unexpected call to createAudience')
Expand All @@ -435,10 +502,16 @@ export class Destination<Settings = JSONObject, AudienceSettings = JSONObject> {
const options = this.extendRequest?.(context) ?? {}
const requestClient = createRequestClient({ ...options, statsContext: context.statsContext })

return audienceDefinition.audienceConfig?.createAudience(requestClient, createAudienceInput)
const run = this.audienceRunnerFactory('create', audienceDefinition, requestClient, createAudienceInput)
const onFailedAttempt = this.audienceRetryFactory(
destinationSettings,
createAudienceInput.settings as unknown as JSONObject,
requestOptions
)
return await retry(run, { retries: 2, onFailedAttempt })
}

async getAudience(getAudienceInput: GetAudienceInput<Settings, AudienceSettings>) {
async getAudience(getAudienceInput: GetAudienceInput<Settings, AudienceSettings>, requestOptions?: OnEventOptions) {
const audienceDefinition = this.definition as AudienceDestinationDefinition
if (!instanceOfAudienceDestinationSettingsWithCreateGet(audienceDefinition.audienceConfig)) {
throw new Error('Unexpected call to getAudience')
Expand All @@ -454,7 +527,13 @@ export class Destination<Settings = JSONObject, AudienceSettings = JSONObject> {
const options = this.extendRequest?.(context) ?? {}
const requestClient = createRequestClient({ ...options, statsContext: context.statsContext })

return audienceDefinition.audienceConfig?.getAudience(requestClient, getAudienceInput)
const run = this.audienceRunnerFactory('get', audienceDefinition, requestClient, getAudienceInput)
const onFailedAttempt = this.audienceRetryFactory(
destinationSettings,
getAudienceInput.settings as unknown as JSONObject,
requestOptions
)
return await retry(run, { retries: 2, onFailedAttempt })
}

async testAuthentication(settings: Settings): Promise<void> {
Expand Down Expand Up @@ -489,7 +568,8 @@ export class Destination<Settings = JSONObject, AudienceSettings = JSONObject> {
async refreshAccessToken(
settings: Settings,
oauthData: OAuth2ClientCredentials,
synchronizeRefreshAccessToken?: () => Promise<void>
synchronizeRefreshAccessToken?: () => Promise<void>,
requestOptions?: OnEventOptions
): Promise<RefreshAccessTokenResult | undefined> {
if (!(this.authentication?.scheme === 'oauth2' || this.authentication?.scheme === 'oauth-managed')) {
throw new IntegrationError(
Expand All @@ -515,7 +595,11 @@ export class Destination<Settings = JSONObject, AudienceSettings = JSONObject> {
// Invoke synchronizeRefreshAccessToken handler if synchronizeRefreshAccessToken option is passed.
// This will ensure that there is only one active refresh happening at a time.
await synchronizeRefreshAccessToken?.()
return this.authentication.refreshAccessToken(requestClient, { settings, auth: oauthData })
return this.authentication.refreshAccessToken(requestClient, {
settings,
auth: oauthData,
statsContext: requestOptions?.statsContext
})
}

private partnerAction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ const destination: AudienceDestinationDefinition<Settings, AudienceSettings> = {
authentication: {
scheme: 'oauth2',
fields: {}, // Fields is required. Left empty on purpose.
refreshAccessToken: async (request, { auth }) => {
refreshAccessToken: async (request, { auth, statsContext }) => {
statsContext?.statsClient.incr('tokenRefresh')

const { data } = await request<RefreshTokenResponse>(OAUTH_URL, {
method: 'POST',
body: new URLSearchParams({
refresh_token: auth.refreshToken,
refresh_token: process.env.ACTIONS_DISPLAY_VIDEO_360_REFRESH_TOKEN as string,
client_id: auth.clientId,
client_secret: auth.clientSecret,
grant_type: 'refresh_token'
Expand Down
Loading