diff --git a/cmd/cli/handler_janitor.go b/cmd/cli/handler_janitor.go index a1c7005f49..944c28d29f 100644 --- a/cmd/cli/handler_janitor.go +++ b/cmd/cli/handler_janitor.go @@ -20,6 +20,8 @@ import ( ) const ( + Limit = "limit" + BatchSize = "batch-size" KeepIfYounger = "keep-if-younger" AccessLifespan = "access-lifespan" RefreshLifespan = "refresh-lifespan" @@ -53,6 +55,17 @@ func (_ *JanitorHandler) Args(cmd *cobra.Command, args []string) error { "Janitor requires either --tokens or --requests or both to be set") } + limit := flagx.MustGetInt(cmd, Limit) + batchSize := flagx.MustGetInt(cmd, BatchSize) + if limit <= 0 || batchSize <= 0 { + return fmt.Errorf("%s\n%s\n", cmd.UsageString(), + "Values for --limit and --batch-size should both be greater than 0") + } + if batchSize > limit { + return fmt.Errorf("%s\n%s\n", cmd.UsageString(), + "Value for --batch-size must not be greater than value for --limit") + } + return nil } @@ -111,6 +124,9 @@ func purge(cmd *cobra.Command, args []string) error { p := d.Persister() + limit := flagx.MustGetInt(cmd, Limit) + batchSize := flagx.MustGetInt(cmd, BatchSize) + var routineFlags []string if flagx.MustGetBool(cmd, OnlyTokens) { @@ -121,7 +137,7 @@ func purge(cmd *cobra.Command, args []string) error { routineFlags = append(routineFlags, OnlyRequests) } - return cleanupRun(cmd.Context(), notAfter, addRoutine(p, routineFlags...)...) + return cleanupRun(cmd.Context(), notAfter, limit, batchSize, addRoutine(p, routineFlags...)...) } func addRoutine(p persistence.Persister, names ...string) []cleanupRoutine { @@ -138,11 +154,11 @@ func addRoutine(p persistence.Persister, names ...string) []cleanupRoutine { return routines } -type cleanupRoutine func(ctx context.Context, notAfter time.Time) error +type cleanupRoutine func(ctx context.Context, notAfter time.Time, limit int, batchSize int) error func cleanup(cr cleanupRoutine, routineName string) cleanupRoutine { - return func(ctx context.Context, notAfter time.Time) error { - if err := cr(ctx, notAfter); err != nil { + return func(ctx context.Context, notAfter time.Time, limit int, batchSize int) error { + if err := cr(ctx, notAfter, limit, batchSize); err != nil { return errors.Wrap(errorsx.WithStack(err), fmt.Sprintf("Could not cleanup inactive %s", routineName)) } fmt.Printf("Successfully completed Janitor run on %s\n", routineName) @@ -150,13 +166,13 @@ func cleanup(cr cleanupRoutine, routineName string) cleanupRoutine { } } -func cleanupRun(ctx context.Context, notAfter time.Time, routines ...cleanupRoutine) error { +func cleanupRun(ctx context.Context, notAfter time.Time, limit int, batchSize int, routines ...cleanupRoutine) error { if len(routines) == 0 { return errors.New("clean up run received 0 routines") } for _, r := range routines { - if err := r(ctx, notAfter); err != nil { + if err := r(ctx, notAfter, limit, batchSize); err != nil { return err } } diff --git a/cmd/cli/handler_janitor_test.go b/cmd/cli/handler_janitor_test.go index 49ac4c48f4..52c4514919 100644 --- a/cmd/cli/handler_janitor_test.go +++ b/cmd/cli/handler_janitor_test.go @@ -209,4 +209,53 @@ func TestJanitorHandler_Arguments(t *testing.T) { "memory") require.Error(t, err) require.Contains(t, err.Error(), "Janitor requires either --tokens or --requests or both to be set") + + cmdx.ExecNoErr(t, cmd.NewRootCmd(), + "janitor", + fmt.Sprintf("--%s", cli.OnlyRequests), + fmt.Sprintf("--%s=%s", cli.Limit, "1000"), + fmt.Sprintf("--%s=%s", cli.BatchSize, "100"), + "memory", + ) + + _, _, err = cmdx.ExecCtx(context.Background(), cmd.NewRootCmd(), nil, + "janitor", + fmt.Sprintf("--%s", cli.OnlyRequests), + fmt.Sprintf("--%s=%s", cli.Limit, "0"), + "memory") + require.Error(t, err) + require.Contains(t, err.Error(), "Values for --limit and --batch-size should both be greater than 0") + + _, _, err = cmdx.ExecCtx(context.Background(), cmd.NewRootCmd(), nil, + "janitor", + fmt.Sprintf("--%s", cli.OnlyRequests), + fmt.Sprintf("--%s=%s", cli.Limit, "-100"), + "memory") + require.Error(t, err) + require.Contains(t, err.Error(), "Values for --limit and --batch-size should both be greater than 0") + + _, _, err = cmdx.ExecCtx(context.Background(), cmd.NewRootCmd(), nil, + "janitor", + fmt.Sprintf("--%s", cli.OnlyRequests), + fmt.Sprintf("--%s=%s", cli.BatchSize, "0"), + "memory") + require.Error(t, err) + require.Contains(t, err.Error(), "Values for --limit and --batch-size should both be greater than 0") + + _, _, err = cmdx.ExecCtx(context.Background(), cmd.NewRootCmd(), nil, + "janitor", + fmt.Sprintf("--%s", cli.OnlyRequests), + fmt.Sprintf("--%s=%s", cli.BatchSize, "-100"), + "memory") + require.Error(t, err) + require.Contains(t, err.Error(), "Values for --limit and --batch-size should both be greater than 0") + + _, _, err = cmdx.ExecCtx(context.Background(), cmd.NewRootCmd(), nil, + "janitor", + fmt.Sprintf("--%s", cli.OnlyRequests), + fmt.Sprintf("--%s=%s", cli.Limit, "100"), + fmt.Sprintf("--%s=%s", cli.BatchSize, "1000"), + "memory") + require.Error(t, err) + require.Contains(t, err.Error(), "Value for --batch-size must not be greater than value for --limit") } diff --git a/cmd/janitor.go b/cmd/janitor.go index ea324dbda3..a25a9a4093 100644 --- a/cmd/janitor.go +++ b/cmd/janitor.go @@ -12,6 +12,7 @@ func NewJanitorCmd() *cobra.Command { Use: "janitor []", Short: "Clean the database of old tokens and login/consent requests", Long: `This command will cleanup any expired oauth2 tokens as well as login/consent requests. +This will select records to delete with a limit and delete records in batch to ensure that no table locking issues arise in big production databases. ### Warning ### @@ -52,6 +53,8 @@ Janitor can be used in several ways. RunE: cli.NewHandler().Janitor.RunE, Args: cli.NewHandler().Janitor.Args, } + cmd.Flags().Int(cli.Limit, 10000, "Limit the number of records retrieved from database for deletion.") + cmd.Flags().Int(cli.BatchSize, 100, "Define how many records are deleted with each iteration.") cmd.Flags().Duration(cli.KeepIfYounger, 0, "Keep database records that are younger than a specified duration e.g. 1s, 1m, 1h.") cmd.Flags().Duration(cli.AccessLifespan, 0, "Set the access token lifespan e.g. 1s, 1m, 1h.") cmd.Flags().Duration(cli.RefreshLifespan, 0, "Set the refresh token lifespan e.g. 1s, 1m, 1h.") diff --git a/docs/docs/cli/hydra-janitor.md b/docs/docs/cli/hydra-janitor.md index f26fc6e3d8..14e9505eb3 100644 --- a/docs/docs/cli/hydra-janitor.md +++ b/docs/docs/cli/hydra-janitor.md @@ -18,7 +18,8 @@ Clean the database of old tokens and login/consent requests ### Synopsis This command will cleanup any expired oauth2 tokens as well as login/consent -requests. +requests. This will select records to delete with a limit and delete records in +batch to ensure that no table locking issues arise in big production databases. ### Warning @@ -69,6 +70,8 @@ hydra janitor [<database-url>] [flags] -c, --config strings Path to one or more .json, .yaml, .yml, .toml config files. Values are loaded in the order provided, meaning that the last config file overwrites values from the previous config file. --consent-request-lifespan duration Set the login/consent request lifespan e.g. 1s, 1m, 1h -h, --help help for janitor + --limit Limits the number of records retrieved from database for deletion. This is optional and default value is 10000 records. This value should be bigger than your database grow rate, this being calculated in the time interval between two consecutive janitor runs. + --batch-size Define how many records are deleted with each iteration. This is optional and default value is 100 records. This value must be smaller than the limit value. --keep-if-younger duration Keep database records that are younger than a specified duration e.g. 1s, 1m, 1h. -e, --read-from-env If set, reads the database connection string from the environment variable DSN or config file key dsn. --refresh-lifespan duration Set the refresh token lifespan e.g. 1s, 1m, 1h. diff --git a/go.mod b/go.mod index e84c54ae83..4adb30dfb2 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/go-openapi/swag v0.19.13 github.com/go-openapi/validate v0.20.1 github.com/go-swagger/go-swagger v0.26.1 + github.com/gobuffalo/packr v1.30.1 // indirect github.com/gobuffalo/pop/v5 v5.3.4 github.com/gobuffalo/x v0.0.0-20181007152206-913e47c59ca7 github.com/gobwas/glob v0.2.3 diff --git a/go.sum b/go.sum index e2ae256948..ed631d432a 100644 --- a/go.sum +++ b/go.sum @@ -526,6 +526,8 @@ github.com/gobuffalo/packr v1.20.0/go.mod h1:JDytk1t2gP+my1ig7iI4NcVaXr886+N0ecU github.com/gobuffalo/packr v1.21.0/go.mod h1:H00jGfj1qFKxscFJSw8wcL4hpQtPe1PfU2wa6sg/SR0= github.com/gobuffalo/packr v1.22.0 h1:/YVd/GRGsu0QuoCJtlcWSVllobs4q3Xvx3nqxTvPyN0= github.com/gobuffalo/packr v1.22.0/go.mod h1:Qr3Wtxr3+HuQEwWqlLnNW4t1oTvK+7Gc/Rnoi/lDFvA= +github.com/gobuffalo/packr v1.30.1 h1:hu1fuVR3fXEZR7rXNW3h8rqSML8EVAf6KNm0NKO/wKg= +github.com/gobuffalo/packr v1.30.1/go.mod h1:ljMyFO2EcrnzsHsN99cvbq055Y9OhRrIaviy289eRuk= github.com/gobuffalo/packr/v2 v2.0.0-rc.8/go.mod h1:y60QCdzwuMwO2R49fdQhsjCPv7tLQFR0ayzxxla9zes= github.com/gobuffalo/packr/v2 v2.0.0-rc.9/go.mod h1:fQqADRfZpEsgkc7c/K7aMew3n4aF1Kji7+lIZeR98Fc= github.com/gobuffalo/packr/v2 v2.0.0-rc.10/go.mod h1:4CWWn4I5T3v4c1OsJ55HbHlUEKNWMITG5iIkdr4Px4w= @@ -537,6 +539,7 @@ github.com/gobuffalo/packr/v2 v2.0.0-rc.15/go.mod h1:IMe7H2nJvcKXSF90y4X1rjYIRlN github.com/gobuffalo/packr/v2 v2.0.9/go.mod h1:emmyGweYTm6Kdper+iywB6YK5YzuKchGtJQZ0Odn4pQ= github.com/gobuffalo/packr/v2 v2.2.0/go.mod h1:CaAwI0GPIAv+5wKLtv8Afwl+Cm78K/I/VCm/3ptBN+0= github.com/gobuffalo/packr/v2 v2.4.0/go.mod h1:ra341gygw9/61nSjAbfwcwh8IrYL4WmR4IsPkPBhQiY= +github.com/gobuffalo/packr/v2 v2.5.1/go.mod h1:8f9c96ITobJlPzI44jj+4tHnEKNt0xXWSVlXRN9X1Iw= github.com/gobuffalo/packr/v2 v2.5.2/go.mod h1:sgEE1xNZ6G0FNN5xn9pevVu4nywaxHvgup67xisti08= github.com/gobuffalo/packr/v2 v2.7.1/go.mod h1:qYEvAazPaVxy7Y7KR0W8qYEE+RymX74kETFqjFoFlOc= github.com/gobuffalo/packr/v2 v2.8.0 h1:IULGd15bQL59ijXLxEvA5wlMxsmx/ZkQv9T282zNVIY= @@ -1753,6 +1756,7 @@ golang.org/x/tools v0.0.0-20190613204242-ed0dc450797f/go.mod h1:/rFqwRUd4F7ZHNgw golang.org/x/tools v0.0.0-20190614205625-5aca471b1d59/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190617190820-da514acc4774/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= +golang.org/x/tools v0.0.0-20190624180213-70d37148ca0c/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190624190245-7f2218787638/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190624222133-a101b041ded4/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190628153133-6cdbf07be9d0/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= diff --git a/internal/testhelpers/janitor_test_helper.go b/internal/testhelpers/janitor_test_helper.go index 9b07fc75de..93d3fee9d9 100644 --- a/internal/testhelpers/janitor_test_helper.go +++ b/internal/testhelpers/janitor_test_helper.go @@ -178,6 +178,40 @@ func (j *JanitorConsentTestHelper) LoginRejectionValidate(ctx context.Context, c } } +func (j *JanitorConsentTestHelper) LimitSetup(ctx context.Context, cm consent.Manager, cl client.Manager) func(t *testing.T) { + return func(t *testing.T) { + var err error + + // Create login requests + for _, r := range j.flushLoginRequests { + require.NoError(t, cl.CreateClient(ctx, r.Client)) + require.NoError(t, cm.CreateLoginRequest(ctx, r)) + } + + // Reject each request + for _, r := range j.flushLoginRequests { + _, err = cm.HandleLoginRequest(ctx, r.ID, consent.NewHandledLoginRequest( + r.ID, true, r.RequestedAt, r.AuthenticatedAt)) + require.NoError(t, err) + } + } +} + +func (j *JanitorConsentTestHelper) LimitValidate(ctx context.Context, cm consent.Manager) func(t *testing.T) { + return func(t *testing.T) { + // flush-login-2 and 3 should be cleared now + for _, r := range j.flushLoginRequests { + t.Logf("check login: %s", r.ID) + _, err := cm.GetLoginRequest(ctx, r.ID) + if r.ID == j.flushLoginRequests[0].ID { + require.NoError(t, err) + } else { + require.Error(t, err) + } + } + } +} + func (j *JanitorConsentTestHelper) ConsentRejectionSetup(ctx context.Context, cm consent.Manager, cl client.Manager) func(t *testing.T) { return func(t *testing.T) { var err error @@ -412,7 +446,7 @@ func JanitorTests(conf *config.Provider, consentManager consent.Manager, clientM // run the cleanup routine t.Run("step=cleanup", func(t *testing.T) { - require.NoError(t, fositeManager.FlushInactiveLoginConsentRequests(ctx, notAfter)) + require.NoError(t, fositeManager.FlushInactiveLoginConsentRequests(ctx, notAfter, 1000, 100)) }) // validate test @@ -422,6 +456,23 @@ func JanitorTests(conf *config.Provider, consentManager consent.Manager, clientM } }) + t.Run("case=flush-consent-request-limit", func(t *testing.T) { + jt := NewConsentJanitorTestHelper("limit") + + t.Run("case=limit", func(t *testing.T) { + // setup + t.Run("step=setup", jt.LimitSetup(ctx, consentManager, clientManager)) + + // cleanup + t.Run("step=cleanup", func(t *testing.T) { + require.NoError(t, fositeManager.FlushInactiveLoginConsentRequests(ctx, time.Now().Round(time.Second), 2, 1)) + }) + + // validate + t.Run("step=validate", jt.LimitValidate(ctx, consentManager)) + }) + }) + t.Run("case=flush-consent-request-rejection", func(t *testing.T) { jt := NewConsentJanitorTestHelper("loginRejection") @@ -431,7 +482,7 @@ func JanitorTests(conf *config.Provider, consentManager consent.Manager, clientM // cleanup t.Run("step=cleanup", func(t *testing.T) { - require.NoError(t, fositeManager.FlushInactiveLoginConsentRequests(ctx, time.Now().Round(time.Second))) + require.NoError(t, fositeManager.FlushInactiveLoginConsentRequests(ctx, time.Now().Round(time.Second), 1000, 100)) }) // validate @@ -446,7 +497,7 @@ func JanitorTests(conf *config.Provider, consentManager consent.Manager, clientM // cleanup t.Run("step=cleanup", func(t *testing.T) { - require.NoError(t, fositeManager.FlushInactiveLoginConsentRequests(ctx, time.Now().Round(time.Second))) + require.NoError(t, fositeManager.FlushInactiveLoginConsentRequests(ctx, time.Now().Round(time.Second), 1000, 100)) }) // validate @@ -465,7 +516,7 @@ func JanitorTests(conf *config.Provider, consentManager consent.Manager, clientM // cleanup t.Run("step=cleanup", func(t *testing.T) { - require.NoError(t, fositeManager.FlushInactiveLoginConsentRequests(ctx, time.Now().Round(time.Second))) + require.NoError(t, fositeManager.FlushInactiveLoginConsentRequests(ctx, time.Now().Round(time.Second), 1000, 100)) }) // validate @@ -482,7 +533,7 @@ func JanitorTests(conf *config.Provider, consentManager consent.Manager, clientM // cleanup t.Run("step=cleanup", func(t *testing.T) { - require.NoError(t, fositeManager.FlushInactiveLoginConsentRequests(ctx, time.Now().Round(time.Second))) + require.NoError(t, fositeManager.FlushInactiveLoginConsentRequests(ctx, time.Now().Round(time.Second), 1000, 100)) }) // validate diff --git a/oauth2/fosite_store_helpers.go b/oauth2/fosite_store_helpers.go index 1def3c0d71..87b8168d3c 100644 --- a/oauth2/fosite_store_helpers.go +++ b/oauth2/fosite_store_helpers.go @@ -175,6 +175,7 @@ func TestHelperRunner(t *testing.T, store InternalRegistry, k string) { t.Run(fmt.Sprintf("case=testHelperRevokeRefreshToken/db=%s", k), testHelperRevokeRefreshToken(store)) t.Run(fmt.Sprintf("case=testHelperCreateGetDeletePKCERequestSession/db=%s", k), testHelperCreateGetDeletePKCERequestSession(store)) t.Run(fmt.Sprintf("case=testHelperFlushTokens/db=%s", k), testHelperFlushTokens(store, time.Hour)) + t.Run(fmt.Sprintf("case=testHelperFlushTokensWithLimitAndBatchSize/db=%s", k), testHelperFlushTokensWithLimitAndBatchSize(store, 3, 2)) t.Run(fmt.Sprintf("case=testFositeStoreSetClientAssertionJWT/db=%s", k), testFositeStoreSetClientAssertionJWT(store)) t.Run(fmt.Sprintf("case=testFositeStoreClientAssertionJWTValid/db=%s", k), testFositeStoreClientAssertionJWTValid(store)) t.Run(fmt.Sprintf("case=testHelperDeleteAccessTokens/db=%s", k), testHelperDeleteAccessTokens(store)) @@ -443,7 +444,7 @@ func testHelperFlushTokens(x InternalRegistry, lifespan time.Duration) func(t *t require.NoError(t, err) } - require.NoError(t, m.FlushInactiveAccessTokens(ctx, time.Now().Add(-time.Hour*24))) + require.NoError(t, m.FlushInactiveAccessTokens(ctx, time.Now().Add(-time.Hour*24), 100, 10)) _, err := m.GetAccessTokenSession(ctx, "flush-1", ds) require.NoError(t, err) _, err = m.GetAccessTokenSession(ctx, "flush-2", ds) @@ -451,7 +452,7 @@ func testHelperFlushTokens(x InternalRegistry, lifespan time.Duration) func(t *t _, err = m.GetAccessTokenSession(ctx, "flush-3", ds) require.NoError(t, err) - require.NoError(t, m.FlushInactiveAccessTokens(ctx, time.Now().Add(-(lifespan+time.Hour/2)))) + require.NoError(t, m.FlushInactiveAccessTokens(ctx, time.Now().Add(-(lifespan+time.Hour/2)), 100, 10)) _, err = m.GetAccessTokenSession(ctx, "flush-1", ds) require.NoError(t, err) _, err = m.GetAccessTokenSession(ctx, "flush-2", ds) @@ -459,7 +460,7 @@ func testHelperFlushTokens(x InternalRegistry, lifespan time.Duration) func(t *t _, err = m.GetAccessTokenSession(ctx, "flush-3", ds) require.Error(t, err) - require.NoError(t, m.FlushInactiveAccessTokens(ctx, time.Now())) + require.NoError(t, m.FlushInactiveAccessTokens(ctx, time.Now(), 100, 10)) _, err = m.GetAccessTokenSession(ctx, "flush-1", ds) require.NoError(t, err) _, err = m.GetAccessTokenSession(ctx, "flush-2", ds) @@ -469,6 +470,38 @@ func testHelperFlushTokens(x InternalRegistry, lifespan time.Duration) func(t *t } } +func testHelperFlushTokensWithLimitAndBatchSize(x InternalRegistry, limit int, batchSize int) func(t *testing.T) { + m := x.OAuth2Storage() + ds := &Session{} + + return func(t *testing.T) { + ctx := context.Background() + var requests []*fosite.Request + + // create five expired requests + id := uuid.New() + for i := 0; i < 5; i++ { + r := createTestRequest(fmt.Sprintf("%s-%d", id, i+1)) + r.RequestedAt = time.Now().Add(-2 * time.Hour) + mockRequestForeignKey(t, r.ID, x, false) + require.NoError(t, m.CreateAccessTokenSession(ctx, r.ID, r)) + _, err := m.GetAccessTokenSession(ctx, r.ID, ds) + require.NoError(t, err) + requests = append(requests, r) + } + + require.NoError(t, m.FlushInactiveAccessTokens(ctx, time.Now(), limit, batchSize)) + for i := range requests { + _, err := m.GetAccessTokenSession(ctx, requests[i].ID, ds) + if i >= limit { + require.NoError(t, err) + } else { + require.Error(t, err) + } + } + } +} + func testFositeSqlStoreTransactionCommitAccessToken(m InternalRegistry) func(t *testing.T) { return func(t *testing.T) { { diff --git a/oauth2/handler.go b/oauth2/handler.go index 491a986968..898a143063 100644 --- a/oauth2/handler.go +++ b/oauth2/handler.go @@ -531,12 +531,12 @@ func (h *Handler) FlushHandler(w http.ResponseWriter, r *http.Request, _ httprou fr.NotAfter = time.Now() } - if err := h.r.OAuth2Storage().FlushInactiveAccessTokens(r.Context(), fr.NotAfter); err != nil { + if err := h.r.OAuth2Storage().FlushInactiveAccessTokens(r.Context(), fr.NotAfter, 1000, 100); err != nil { h.r.Writer().WriteError(w, r, err) return } - if err := h.r.OAuth2Storage().FlushInactiveRefreshTokens(r.Context(), fr.NotAfter); err != nil { + if err := h.r.OAuth2Storage().FlushInactiveRefreshTokens(r.Context(), fr.NotAfter, 1000, 100); err != nil { h.r.Writer().WriteError(w, r, err) return } diff --git a/persistence/sql/persister_consent.go b/persistence/sql/persister_consent.go index 31af5a759e..b20dbaf784 100644 --- a/persistence/sql/persister_consent.go +++ b/persistence/sql/persister_consent.go @@ -439,7 +439,7 @@ func (p *Persister) VerifyAndInvalidateLogoutRequest(ctx context.Context, verifi }) } -func (p *Persister) FlushInactiveLoginConsentRequests(ctx context.Context, notAfter time.Time) error { +func (p *Persister) FlushInactiveLoginConsentRequests(ctx context.Context, notAfter time.Time, limit int, batchSize int) error { /* #nosec G201 table is static */ var lr consent.LoginRequest var lrh consent.HandledLoginRequest @@ -447,76 +447,87 @@ func (p *Persister) FlushInactiveLoginConsentRequests(ctx context.Context, notAf var cr consent.ConsentRequest var crh consent.HandledConsentRequest - return p.transaction(ctx, func(ctx context.Context, c *pop.Connection) error { + // The value of notAfter should be the minimum between input parameter and request max expire based on its configured age + requestMaxExpire := time.Now().Add(-p.config.ConsentRequestMaxAge()) + if requestMaxExpire.Before(notAfter) { + notAfter = requestMaxExpire + } - // Delete all entries (and their FK) - // where hydra_oauth2_authentication_request were timed-out or rejected - // - hydra_oauth2_authentication_request_handled - // - hydra_oauth2_consent_request_handled - // AND - // - hydra_oauth2_authentication_request.requested_at < ttl.login_consent_request - // - hydra_oauth2_authentication_request.requested_at < notAfter - - // Using NOT EXISTS instead of LEFT JOIN or NOT IN due to - // LEFT JOIN not supported by Postgres and NOT IN will have performance hits with large tables. - // https://stackoverflow.com/questions/19363481/select-rows-which-are-not-present-in-other-table/19364694#19364694 - // Cannot use table aliasing in MYSQL, will work in Postgresql though... - - err := p.Connection(ctx).RawQuery(fmt.Sprintf(` - DELETE - FROM %[1]s - WHERE NOT EXISTS - ( - SELECT NULL - FROM %[2]s - WHERE %[1]s.challenge = %[2]s.challenge AND (%[2]s.error = '{}' OR %[2]s.error = '' OR %[2]s.error is NULL) - ) - AND NOT EXISTS - ( - SELECT NULL - FROM %[3]s - INNER JOIN %[4]s - ON %[3]s.challenge = %[4]s.challenge - WHERE %[1]s.challenge = %[3]s.login_challenge AND (%[4]s.error = '{}' OR %[4]s.error = '' OR %[4]s.error is NULL) - ) - AND requested_at < ? - AND requested_at < ? - `, - (&lr).TableName(), - (&lrh).TableName(), - (&cr).TableName(), - (&crh).TableName()), - time.Now().Add(-p.config.ConsentRequestMaxAge()), - notAfter).Exec() + challenges := []string{} + queryFormat := ` + SELECT %[1]s.challenge + FROM %[1]s + LEFT JOIN %[2]s ON %[1]s.challenge = %[2]s.challenge + WHERE ( + (%[2]s.challenge IS NULL) + OR (%[2]s.error IS NOT NULL AND %[2]s.error <> '{}' AND %[2]s.error <> '') + ) + AND %[1]s.requested_at < ? + ORDER BY %[1]s.challenge + LIMIT %[3]d + ` + + // Select challenges from all consent requests that can be safely deleted with limit + // where hydra_oauth2_consent_request were unhandled or rejected, so either of these is true + // - hydra_oauth2_authentication_request_handled does not exist (unhandled) + // - hydra_oauth2_consent_request_handled has valid error (rejected) + // AND timed-out + // - hydra_oauth2_consent_request.requested_at < minimum between ttl.login_consent_request and notAfter + q := p.Connection(ctx).RawQuery(fmt.Sprintf(queryFormat, (&cr).TableName(), (&crh).TableName(), limit), notAfter) + + if err := q.All(&challenges); err == sql.ErrNoRows { + return errors.Wrap(fosite.ErrNotFound, "") + } - if err != nil { - return sqlcon.HandleError(err) + // Delete in batch consent requests and their references in cascade + for i := 0; i < len(challenges); i += batchSize { + j := i + batchSize + if j > len(challenges) { + j = len(challenges) } - // This query is needed due to the fact that the first query will not delete cascade to the consent tables - // This cleans up the consent requests if requests have timed out or been rejected. - - // Using NOT EXISTS instead of LEFT JOIN or NOT IN due to - // LEFT JOIN not supported by Postgres and NOT IN will have performance hits with large tables. - // https://stackoverflow.com/questions/19363481/select-rows-which-are-not-present-in-other-table/19364694#19364694 - // Cannot use table aliasing in MYSQL, will work in Postgresql though... - err = p.Connection(ctx).RawQuery( - fmt.Sprintf(` - DELETE - FROM %[1]s - WHERE NOT EXISTS - ( - SELECT NULL - FROM %[2]s - WHERE %[1]s.challenge = %[2]s.challenge AND (%[2]s.error = '{}' OR %[2]s.error = '' OR %[2]s.error is NULL) - ) - AND requested_at < ? - AND requested_at < ?`, - (&cr).TableName(), - (&crh).TableName()), - time.Now().Add(-p.config.ConsentRequestMaxAge()), - notAfter).Exec() + if i != j { + q := p.Connection(ctx).RawQuery( + fmt.Sprintf("DELETE FROM %s WHERE challenge in (?)", (&cr).TableName()), + challenges[i:j], + ) - return sqlcon.HandleError(err) - }) + if err := q.Exec(); err != nil { + return sqlcon.HandleError(err) + } + } + } + + // Select challenges from all authentication requests that can be safely deleted with limit + // where hydra_oauth2_authentication_request were unhandled or rejected, so either of these is true + // - hydra_oauth2_authentication_request_handled does not exist (unhandled) + // - hydra_oauth2_authentication_request_handled has valid error (rejected) + // AND timed-out + // - hydra_oauth2_authentication_request.requested_at < minimum between ttl.login_consent_request and notAfter + q = p.Connection(ctx).RawQuery(fmt.Sprintf(queryFormat, (&lr).TableName(), (&lrh).TableName(), limit), notAfter) + + if err := q.All(&challenges); err == sql.ErrNoRows { + return errors.Wrap(fosite.ErrNotFound, "") + } + + // Delete in batch authentication requests + for i := 0; i < len(challenges); i += batchSize { + j := i + batchSize + if j > len(challenges) { + j = len(challenges) + } + + if i != j { + q := p.Connection(ctx).RawQuery( + fmt.Sprintf("DELETE FROM %s WHERE challenge in (?)", (&lr).TableName()), + challenges[i:j], + ) + + if err := q.Exec(); err != nil { + return sqlcon.HandleError(err) + } + } + } + + return nil } diff --git a/persistence/sql/persister_oauth2.go b/persistence/sql/persister_oauth2.go index fdd859a710..1a1803597a 100644 --- a/persistence/sql/persister_oauth2.go +++ b/persistence/sql/persister_oauth2.go @@ -361,30 +361,79 @@ func (p *Persister) RevokeAccessToken(ctx context.Context, id string) error { return p.deleteSessionByRequestID(ctx, id, sqlTableAccess) } -func (p *Persister) FlushInactiveAccessTokens(ctx context.Context, notAfter time.Time) error { +func (p *Persister) FlushInactiveAccessTokens(ctx context.Context, notAfter time.Time, limit int, batchSize int) error { /* #nosec G201 table is static */ - err := p.Connection(ctx).RawQuery( - fmt.Sprintf("DELETE FROM %s WHERE requested_at < ? AND requested_at < ?", OAuth2RequestSQL{Table: sqlTableAccess}.TableName()), - time.Now().Add(-p.config.AccessTokenLifespan()), + // The value of notAfter should be the minimum between input parameter and access token max expire based on its configured age + requestMaxExpire := time.Now().Add(-p.config.AccessTokenLifespan()) + if requestMaxExpire.Before(notAfter) { + notAfter = requestMaxExpire + } + + signatures := []string{} + + // Select tokens' signatures with limit + q := p.Connection(ctx).RawQuery( + fmt.Sprintf("SELECT signature FROM %s WHERE requested_at < ? ORDER BY signature LIMIT %d", + OAuth2RequestSQL{Table: sqlTableAccess}.TableName(), limit), notAfter, - ).Exec() - if err == sql.ErrNoRows { + ) + if err := q.All(&signatures); err == sql.ErrNoRows { return errors.Wrap(fosite.ErrNotFound, "") } + + // Delete tokens in batch + var err error + for i := 0; i < len(signatures); i += batchSize { + j := i + batchSize + if j > len(signatures) { + j = len(signatures) + } + + if i != j { + err = p.Connection(ctx).RawQuery( + fmt.Sprintf("DELETE FROM %s WHERE signature in (?)", OAuth2RequestSQL{Table: sqlTableAccess}.TableName()), + signatures[i:j], + ).Exec() + } + } return sqlcon.HandleError(err) } -func (p *Persister) FlushInactiveRefreshTokens(ctx context.Context, notAfter time.Time) error { +func (p *Persister) FlushInactiveRefreshTokens(ctx context.Context, notAfter time.Time, limit int, batchSize int) error { /* #nosec G201 table is static */ - err := p.Connection(ctx).RawQuery( - fmt.Sprintf("DELETE FROM %s WHERE requested_at < ? AND requested_at < ?", OAuth2RequestSQL{Table: sqlTableRefresh}.TableName()), - time.Now().Add(-p.config.RefreshTokenLifespan()), + // The value of notAfter should be the minimum between input parameter and refresh token max expire based on its configured age + requestMaxExpire := time.Now().Add(-p.config.RefreshTokenLifespan()) + if requestMaxExpire.Before(notAfter) { + notAfter = requestMaxExpire + } + + signatures := []string{} + + // Select tokens' signatures with limit + q := p.Connection(ctx).RawQuery( + fmt.Sprintf("SELECT signature FROM %s WHERE requested_at < ? ORDER BY signature LIMIT %d", + OAuth2RequestSQL{Table: sqlTableRefresh}.TableName(), limit), notAfter, - ).Exec() - if err == sql.ErrNoRows { + ) + if err := q.All(&signatures); err == sql.ErrNoRows { return errors.Wrap(fosite.ErrNotFound, "") } + // Delete tokens in batch + var err error + for i := 0; i < len(signatures); i += batchSize { + j := i + batchSize + if j > len(signatures) { + j = len(signatures) + } + + if i != j { + err = p.Connection(ctx).RawQuery( + fmt.Sprintf("DELETE FROM %s WHERE signature in (?)", OAuth2RequestSQL{Table: sqlTableRefresh}.TableName()), + signatures[i:j], + ).Exec() + } + } return sqlcon.HandleError(err) } diff --git a/x/fosite_storer.go b/x/fosite_storer.go index bc120affa3..91d3f10fa1 100644 --- a/x/fosite_storer.go +++ b/x/fosite_storer.go @@ -42,14 +42,14 @@ type FositeStorer interface { // flush the access token requests from the database. // no data will be deleted after the 'notAfter' timeframe. - FlushInactiveAccessTokens(ctx context.Context, notAfter time.Time) error + FlushInactiveAccessTokens(ctx context.Context, notAfter time.Time, limit int, batchSize int) error // flush the login requests from the database. // this will address the database long-term growth issues discussed in https://github.com/ory/hydra/issues/1574. // no data will be deleted after the 'notAfter' timeframe. - FlushInactiveLoginConsentRequests(ctx context.Context, notAfter time.Time) error + FlushInactiveLoginConsentRequests(ctx context.Context, notAfter time.Time, limit int, batchSize int) error DeleteAccessTokens(ctx context.Context, clientID string) error - FlushInactiveRefreshTokens(ctx context.Context, notAfter time.Time) error + FlushInactiveRefreshTokens(ctx context.Context, notAfter time.Time, limit int, batchSize int) error }