Skip to content

Commit

Permalink
fix: access_denied error handling for OAuth destinations (#3853)
Browse files Browse the repository at this point in the history
* fix: access_denied error handling for OAuth destinations

* fix: mock oauth service

Signed-off-by: Sai Sankeerth <sanpj2292@github.com>

* chore: rename ref_token_invalid_grant constant

Signed-off-by: Sai Sankeerth <sanpj2292@github.com>

* chore: update the method for authStatus toggle to PUT

* chore: include contract changes

* fix: add AUTH_STATUS_INACTIVE handling in regulation-worker

* chore: refactoring some changes, adding logic for handling invalid_grant error while refreshing the token

* fix: address comments
- include relevant status-codes
- remove unnecessary printf statements

* fix: send badrequest when required parameters are not sent in tests

* fix: change response error message for authStatusInactive req(both failure & success)
- refactor authStatus toggle handling in deleteUsers
- correction of test-cases in deleteUsers

* fix: add multiple go-routines tests for authStatus/toggle

* fix: formatting

* fix: rename variables, send right error message post inactivation of authStatus

* fix: comment correction

* fix: remove unused argument

* fix: updated wrong url status-code to 404

---------

Signed-off-by: Sai Sankeerth <sanpj2292@github.com>
Co-authored-by: Sai Sankeerth <sanpj2292@github.com>
  • Loading branch information
sanpj2292 and Sai Sankeerth committed Oct 9, 2023
1 parent 5f3820f commit 0d30d3b
Show file tree
Hide file tree
Showing 6 changed files with 532 additions and 149 deletions.
30 changes: 15 additions & 15 deletions mocks/services/oauth/mock_oauth.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

67 changes: 47 additions & 20 deletions regulation-worker/internal/delete/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ import (
"os"
"strings"

"github.com/samber/lo"

"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/regulation-worker/internal/model"
"github.com/rudderlabs/rudder-server/services/oauth"
"github.com/rudderlabs/rudder-server/utils/httputil"
Expand Down Expand Up @@ -109,17 +112,23 @@ func (api *APIManager) deleteWithRetry(ctx context.Context, job model.Job, desti
jobStatus := getJobStatus(resp.StatusCode, jobResp)
pkgLogger.Debugf("[%v] Job: %v, JobStatus: %v", destination.Name, job.ID, jobStatus)

if isOAuthEnabled && isTokenExpired(jobResp) && currentOauthRetryAttempt < api.MaxOAuthRefreshRetryAttempts {
err = api.refreshOAuthToken(destination.Name, job.WorkspaceID, oAuthDetail)
if err != nil {
pkgLogger.Error(err)
return model.JobStatus{Status: model.JobStatusFailed, Error: err}
oauthErrJob, oauthErrJobFound := getOAuthErrorJob(jobResp)

if oauthErrJobFound && isOAuthEnabled {
if oauthErrJob.AuthErrorCategory == oauth.AUTH_STATUS_INACTIVE {
return api.inactivateAuthStatus(&destination, job, oAuthDetail)
}
if oauthErrJob.AuthErrorCategory == oauth.REFRESH_TOKEN && currentOauthRetryAttempt < api.MaxOAuthRefreshRetryAttempts {
err = api.refreshOAuthToken(&destination, job, oAuthDetail)
if err != nil {
pkgLogger.Error(err)
return model.JobStatus{Status: model.JobStatusFailed, Error: err}
}
// retry the request
pkgLogger.Infof("[%v] Retrying deleteRequest job(id: %v) for the whole batch, RetryAttempt: %v", destination.Name, job.ID, currentOauthRetryAttempt+1)
return api.deleteWithRetry(ctx, job, destination, currentOauthRetryAttempt+1)
}
// retry the request
pkgLogger.Infof("[%v] Retrying deleteRequest job(id: %v) for the whole batch, RetryAttempt: %v", destination.Name, job.ID, currentOauthRetryAttempt+1)
return api.deleteWithRetry(ctx, job, destination, currentOauthRetryAttempt+1)
}

return jobStatus
}

Expand Down Expand Up @@ -160,13 +169,10 @@ func mapJobToPayload(job model.Job, destName string, destConfig map[string]inter
}
}

func isTokenExpired(jobResponses []JobRespSchema) bool {
for _, jobResponse := range jobResponses {
if jobResponse.AuthErrorCategory == oauth.REFRESH_TOKEN {
return true
}
}
return false
func getOAuthErrorJob(jobResponses []JobRespSchema) (JobRespSchema, bool) {
return lo.Find(jobResponses, func(item JobRespSchema) bool {
return lo.Contains([]string{oauth.AUTH_STATUS_INACTIVE, oauth.REFRESH_TOKEN}, item.AuthErrorCategory)
})
}

func setOAuthHeader(secretToken *oauth.AuthResponse, req *http.Request) error {
Expand Down Expand Up @@ -200,21 +206,42 @@ func (api *APIManager) getOAuthDetail(destDetail *model.Destination, workspaceId
}, nil
}

func (api *APIManager) refreshOAuthToken(destName, workspaceId string, oAuthDetail oauthDetail) error {
func (api *APIManager) inactivateAuthStatus(destination *model.Destination, job model.Job, oAuthDetail oauthDetail) (jobStatus model.JobStatus) {
dest := &backendconfig.DestinationT{
ID: destination.DestinationID,
Config: destination.Config,
DestinationDefinition: backendconfig.DestinationDefinitionT{
Name: destination.Name,
Config: destination.DestDefConfig,
},
}
_, resp := api.OAuth.UpdateAuthStatusToInactive(dest, job.WorkspaceID, oAuthDetail.id)
jobStatus.Status = model.JobStatusAborted
jobStatus.Error = fmt.Errorf(resp)
return jobStatus
}

func (api *APIManager) refreshOAuthToken(destination *model.Destination, job model.Job, oAuthDetail oauthDetail) error {
refTokenParams := &oauth.RefreshTokenParams{
Secret: oAuthDetail.secretToken.Account.Secret,
WorkspaceId: workspaceId,
WorkspaceId: job.WorkspaceID,
AccountId: oAuthDetail.id,
DestDefName: destName,
DestDefName: destination.Name,
EventNamePrefix: "refresh_token",
}
statusCode, refreshResponse := api.OAuth.RefreshToken(refTokenParams)
if statusCode != http.StatusOK {
if refreshResponse.Err == oauth.REF_TOKEN_INVALID_GRANT {
// authStatus should be made inactive
errJobStatus := api.inactivateAuthStatus(destination, job, oAuthDetail)
return fmt.Errorf(errJobStatus.Error.Error())
}

var refreshRespErr string
if refreshResponse != nil {
refreshRespErr = refreshResponse.Err
}
return fmt.Errorf("[%v] Failed to refresh token for destination in workspace(%v) & account(%v) with %v", destName, workspaceId, oAuthDetail.id, refreshRespErr)
return fmt.Errorf("[%v] Failed to refresh token for destination in workspace(%v) & account(%v) with %v", destination.Name, job.WorkspaceID, oAuthDetail.id, refreshRespErr)
}
return nil
}

0 comments on commit 0d30d3b

Please sign in to comment.