Skip to content

Commit

Permalink
feat: improve delete queries for janitor command (#2540)
Browse files Browse the repository at this point in the history
This patch improves delete queries by separating the data extraction from actual delete. Extraction is made with a configurable limit, using the `--limit` CLI flag. Deletes use that list in batch mode with a configurable batch size (`--batch-size` CLI flag). Default value for limit is 100000 records and default value for batch size is 100 records.

To improve performance, `LEFT JOIN` is used to select also login and consent requests which did not result in a complete authentication, i.e. user requested login but timed out or user logged in and timed out at consent. Also, two independent `SELECT`s are used in the extraction of login and consent requests eligible for deletion. This solves a bug in the single `SELECT` causing deletion of consent requests where matching login requests were eligible for deletion and vice versa. With independent `SELECT`s we keep consent requests even if matching login request gets deleted.

Closes #2513
  • Loading branch information
flavioleggio committed Aug 4, 2021
1 parent 3a48df6 commit 6ea0bf8
Show file tree
Hide file tree
Showing 12 changed files with 318 additions and 100 deletions.
28 changes: 22 additions & 6 deletions cmd/cli/handler_janitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
)

const (
Limit = "limit"
BatchSize = "batch-size"
KeepIfYounger = "keep-if-younger"
AccessLifespan = "access-lifespan"
RefreshLifespan = "refresh-lifespan"
Expand Down Expand Up @@ -53,6 +55,17 @@ func (_ *JanitorHandler) Args(cmd *cobra.Command, args []string) error {
"Janitor requires either --tokens or --requests or both to be set")
}

limit := flagx.MustGetInt(cmd, Limit)
batchSize := flagx.MustGetInt(cmd, BatchSize)
if limit <= 0 || batchSize <= 0 {
return fmt.Errorf("%s\n%s\n", cmd.UsageString(),
"Values for --limit and --batch-size should both be greater than 0")
}
if batchSize > limit {
return fmt.Errorf("%s\n%s\n", cmd.UsageString(),
"Value for --batch-size must not be greater than value for --limit")
}

return nil
}

Expand Down Expand Up @@ -111,6 +124,9 @@ func purge(cmd *cobra.Command, args []string) error {

p := d.Persister()

limit := flagx.MustGetInt(cmd, Limit)
batchSize := flagx.MustGetInt(cmd, BatchSize)

var routineFlags []string

if flagx.MustGetBool(cmd, OnlyTokens) {
Expand All @@ -121,7 +137,7 @@ func purge(cmd *cobra.Command, args []string) error {
routineFlags = append(routineFlags, OnlyRequests)
}

return cleanupRun(cmd.Context(), notAfter, addRoutine(p, routineFlags...)...)
return cleanupRun(cmd.Context(), notAfter, limit, batchSize, addRoutine(p, routineFlags...)...)
}

func addRoutine(p persistence.Persister, names ...string) []cleanupRoutine {
Expand All @@ -138,25 +154,25 @@ func addRoutine(p persistence.Persister, names ...string) []cleanupRoutine {
return routines
}

type cleanupRoutine func(ctx context.Context, notAfter time.Time) error
type cleanupRoutine func(ctx context.Context, notAfter time.Time, limit int, batchSize int) error

func cleanup(cr cleanupRoutine, routineName string) cleanupRoutine {
return func(ctx context.Context, notAfter time.Time) error {
if err := cr(ctx, notAfter); err != nil {
return func(ctx context.Context, notAfter time.Time, limit int, batchSize int) error {
if err := cr(ctx, notAfter, limit, batchSize); err != nil {
return errors.Wrap(errorsx.WithStack(err), fmt.Sprintf("Could not cleanup inactive %s", routineName))
}
fmt.Printf("Successfully completed Janitor run on %s\n", routineName)
return nil
}
}

func cleanupRun(ctx context.Context, notAfter time.Time, routines ...cleanupRoutine) error {
func cleanupRun(ctx context.Context, notAfter time.Time, limit int, batchSize int, routines ...cleanupRoutine) error {
if len(routines) == 0 {
return errors.New("clean up run received 0 routines")
}

for _, r := range routines {
if err := r(ctx, notAfter); err != nil {
if err := r(ctx, notAfter, limit, batchSize); err != nil {
return err
}
}
Expand Down
49 changes: 49 additions & 0 deletions cmd/cli/handler_janitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,4 +209,53 @@ func TestJanitorHandler_Arguments(t *testing.T) {
"memory")
require.Error(t, err)
require.Contains(t, err.Error(), "Janitor requires either --tokens or --requests or both to be set")

cmdx.ExecNoErr(t, cmd.NewRootCmd(),
"janitor",
fmt.Sprintf("--%s", cli.OnlyRequests),
fmt.Sprintf("--%s=%s", cli.Limit, "1000"),
fmt.Sprintf("--%s=%s", cli.BatchSize, "100"),
"memory",
)

_, _, err = cmdx.ExecCtx(context.Background(), cmd.NewRootCmd(), nil,
"janitor",
fmt.Sprintf("--%s", cli.OnlyRequests),
fmt.Sprintf("--%s=%s", cli.Limit, "0"),
"memory")
require.Error(t, err)
require.Contains(t, err.Error(), "Values for --limit and --batch-size should both be greater than 0")

_, _, err = cmdx.ExecCtx(context.Background(), cmd.NewRootCmd(), nil,
"janitor",
fmt.Sprintf("--%s", cli.OnlyRequests),
fmt.Sprintf("--%s=%s", cli.Limit, "-100"),
"memory")
require.Error(t, err)
require.Contains(t, err.Error(), "Values for --limit and --batch-size should both be greater than 0")

_, _, err = cmdx.ExecCtx(context.Background(), cmd.NewRootCmd(), nil,
"janitor",
fmt.Sprintf("--%s", cli.OnlyRequests),
fmt.Sprintf("--%s=%s", cli.BatchSize, "0"),
"memory")
require.Error(t, err)
require.Contains(t, err.Error(), "Values for --limit and --batch-size should both be greater than 0")

_, _, err = cmdx.ExecCtx(context.Background(), cmd.NewRootCmd(), nil,
"janitor",
fmt.Sprintf("--%s", cli.OnlyRequests),
fmt.Sprintf("--%s=%s", cli.BatchSize, "-100"),
"memory")
require.Error(t, err)
require.Contains(t, err.Error(), "Values for --limit and --batch-size should both be greater than 0")

_, _, err = cmdx.ExecCtx(context.Background(), cmd.NewRootCmd(), nil,
"janitor",
fmt.Sprintf("--%s", cli.OnlyRequests),
fmt.Sprintf("--%s=%s", cli.Limit, "100"),
fmt.Sprintf("--%s=%s", cli.BatchSize, "1000"),
"memory")
require.Error(t, err)
require.Contains(t, err.Error(), "Value for --batch-size must not be greater than value for --limit")
}
3 changes: 3 additions & 0 deletions cmd/janitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ func NewJanitorCmd() *cobra.Command {
Use: "janitor [<database-url>]",
Short: "Clean the database of old tokens and login/consent requests",
Long: `This command will cleanup any expired oauth2 tokens as well as login/consent requests.
This will select records to delete with a limit and delete records in batch to ensure that no table locking issues arise in big production databases.
### Warning ###
Expand Down Expand Up @@ -52,6 +53,8 @@ Janitor can be used in several ways.
RunE: cli.NewHandler().Janitor.RunE,
Args: cli.NewHandler().Janitor.Args,
}
cmd.Flags().Int(cli.Limit, 10000, "Limit the number of records retrieved from database for deletion.")
cmd.Flags().Int(cli.BatchSize, 100, "Define how many records are deleted with each iteration.")
cmd.Flags().Duration(cli.KeepIfYounger, 0, "Keep database records that are younger than a specified duration e.g. 1s, 1m, 1h.")
cmd.Flags().Duration(cli.AccessLifespan, 0, "Set the access token lifespan e.g. 1s, 1m, 1h.")
cmd.Flags().Duration(cli.RefreshLifespan, 0, "Set the refresh token lifespan e.g. 1s, 1m, 1h.")
Expand Down
5 changes: 4 additions & 1 deletion docs/docs/cli/hydra-janitor.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ Clean the database of old tokens and login/consent requests
### Synopsis

This command will cleanup any expired oauth2 tokens as well as login/consent
requests.
requests. This will select records to delete with a limit and delete records in
batch to ensure that no table locking issues arise in big production databases.

### Warning

Expand Down Expand Up @@ -69,6 +70,8 @@ hydra janitor [&lt;database-url&gt;] [flags]
-c, --config strings Path to one or more .json, .yaml, .yml, .toml config files. Values are loaded in the order provided, meaning that the last config file overwrites values from the previous config file.
--consent-request-lifespan duration Set the login/consent request lifespan e.g. 1s, 1m, 1h
-h, --help help for janitor
--limit Limits the number of records retrieved from database for deletion. This is optional and default value is 10000 records. This value should be bigger than your database grow rate, this being calculated in the time interval between two consecutive janitor runs.
--batch-size Define how many records are deleted with each iteration. This is optional and default value is 100 records. This value must be smaller than the limit value.
--keep-if-younger duration Keep database records that are younger than a specified duration e.g. 1s, 1m, 1h.
-e, --read-from-env If set, reads the database connection string from the environment variable DSN or config file key dsn.
--refresh-lifespan duration Set the refresh token lifespan e.g. 1s, 1m, 1h.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/go-openapi/swag v0.19.13
github.com/go-openapi/validate v0.20.1
github.com/go-swagger/go-swagger v0.26.1
github.com/gobuffalo/packr v1.30.1 // indirect
github.com/gobuffalo/pop/v5 v5.3.4
github.com/gobuffalo/x v0.0.0-20181007152206-913e47c59ca7
github.com/gobwas/glob v0.2.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,8 @@ github.com/gobuffalo/packr v1.20.0/go.mod h1:JDytk1t2gP+my1ig7iI4NcVaXr886+N0ecU
github.com/gobuffalo/packr v1.21.0/go.mod h1:H00jGfj1qFKxscFJSw8wcL4hpQtPe1PfU2wa6sg/SR0=
github.com/gobuffalo/packr v1.22.0 h1:/YVd/GRGsu0QuoCJtlcWSVllobs4q3Xvx3nqxTvPyN0=
github.com/gobuffalo/packr v1.22.0/go.mod h1:Qr3Wtxr3+HuQEwWqlLnNW4t1oTvK+7Gc/Rnoi/lDFvA=
github.com/gobuffalo/packr v1.30.1 h1:hu1fuVR3fXEZR7rXNW3h8rqSML8EVAf6KNm0NKO/wKg=
github.com/gobuffalo/packr v1.30.1/go.mod h1:ljMyFO2EcrnzsHsN99cvbq055Y9OhRrIaviy289eRuk=
github.com/gobuffalo/packr/v2 v2.0.0-rc.8/go.mod h1:y60QCdzwuMwO2R49fdQhsjCPv7tLQFR0ayzxxla9zes=
github.com/gobuffalo/packr/v2 v2.0.0-rc.9/go.mod h1:fQqADRfZpEsgkc7c/K7aMew3n4aF1Kji7+lIZeR98Fc=
github.com/gobuffalo/packr/v2 v2.0.0-rc.10/go.mod h1:4CWWn4I5T3v4c1OsJ55HbHlUEKNWMITG5iIkdr4Px4w=
Expand Down
61 changes: 56 additions & 5 deletions internal/testhelpers/janitor_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,40 @@ func (j *JanitorConsentTestHelper) LoginRejectionValidate(ctx context.Context, c
}
}

func (j *JanitorConsentTestHelper) LimitSetup(ctx context.Context, cm consent.Manager, cl client.Manager) func(t *testing.T) {
return func(t *testing.T) {
var err error

// Create login requests
for _, r := range j.flushLoginRequests {
require.NoError(t, cl.CreateClient(ctx, r.Client))
require.NoError(t, cm.CreateLoginRequest(ctx, r))
}

// Reject each request
for _, r := range j.flushLoginRequests {
_, err = cm.HandleLoginRequest(ctx, r.ID, consent.NewHandledLoginRequest(
r.ID, true, r.RequestedAt, r.AuthenticatedAt))
require.NoError(t, err)
}
}
}

func (j *JanitorConsentTestHelper) LimitValidate(ctx context.Context, cm consent.Manager) func(t *testing.T) {
return func(t *testing.T) {
// flush-login-2 and 3 should be cleared now
for _, r := range j.flushLoginRequests {
t.Logf("check login: %s", r.ID)
_, err := cm.GetLoginRequest(ctx, r.ID)
if r.ID == j.flushLoginRequests[0].ID {
require.NoError(t, err)
} else {
require.Error(t, err)
}
}
}
}

func (j *JanitorConsentTestHelper) ConsentRejectionSetup(ctx context.Context, cm consent.Manager, cl client.Manager) func(t *testing.T) {
return func(t *testing.T) {
var err error
Expand Down Expand Up @@ -412,7 +446,7 @@ func JanitorTests(conf *config.Provider, consentManager consent.Manager, clientM

// run the cleanup routine
t.Run("step=cleanup", func(t *testing.T) {
require.NoError(t, fositeManager.FlushInactiveLoginConsentRequests(ctx, notAfter))
require.NoError(t, fositeManager.FlushInactiveLoginConsentRequests(ctx, notAfter, 1000, 100))
})

// validate test
Expand All @@ -422,6 +456,23 @@ func JanitorTests(conf *config.Provider, consentManager consent.Manager, clientM
}
})

t.Run("case=flush-consent-request-limit", func(t *testing.T) {
jt := NewConsentJanitorTestHelper("limit")

t.Run("case=limit", func(t *testing.T) {
// setup
t.Run("step=setup", jt.LimitSetup(ctx, consentManager, clientManager))

// cleanup
t.Run("step=cleanup", func(t *testing.T) {
require.NoError(t, fositeManager.FlushInactiveLoginConsentRequests(ctx, time.Now().Round(time.Second), 2, 1))
})

// validate
t.Run("step=validate", jt.LimitValidate(ctx, consentManager))
})
})

t.Run("case=flush-consent-request-rejection", func(t *testing.T) {
jt := NewConsentJanitorTestHelper("loginRejection")

Expand All @@ -431,7 +482,7 @@ func JanitorTests(conf *config.Provider, consentManager consent.Manager, clientM

// cleanup
t.Run("step=cleanup", func(t *testing.T) {
require.NoError(t, fositeManager.FlushInactiveLoginConsentRequests(ctx, time.Now().Round(time.Second)))
require.NoError(t, fositeManager.FlushInactiveLoginConsentRequests(ctx, time.Now().Round(time.Second), 1000, 100))
})

// validate
Expand All @@ -446,7 +497,7 @@ func JanitorTests(conf *config.Provider, consentManager consent.Manager, clientM

// cleanup
t.Run("step=cleanup", func(t *testing.T) {
require.NoError(t, fositeManager.FlushInactiveLoginConsentRequests(ctx, time.Now().Round(time.Second)))
require.NoError(t, fositeManager.FlushInactiveLoginConsentRequests(ctx, time.Now().Round(time.Second), 1000, 100))
})

// validate
Expand All @@ -465,7 +516,7 @@ func JanitorTests(conf *config.Provider, consentManager consent.Manager, clientM

// cleanup
t.Run("step=cleanup", func(t *testing.T) {
require.NoError(t, fositeManager.FlushInactiveLoginConsentRequests(ctx, time.Now().Round(time.Second)))
require.NoError(t, fositeManager.FlushInactiveLoginConsentRequests(ctx, time.Now().Round(time.Second), 1000, 100))
})

// validate
Expand All @@ -482,7 +533,7 @@ func JanitorTests(conf *config.Provider, consentManager consent.Manager, clientM

// cleanup
t.Run("step=cleanup", func(t *testing.T) {
require.NoError(t, fositeManager.FlushInactiveLoginConsentRequests(ctx, time.Now().Round(time.Second)))
require.NoError(t, fositeManager.FlushInactiveLoginConsentRequests(ctx, time.Now().Round(time.Second), 1000, 100))
})

// validate
Expand Down
39 changes: 36 additions & 3 deletions oauth2/fosite_store_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ func TestHelperRunner(t *testing.T, store InternalRegistry, k string) {
t.Run(fmt.Sprintf("case=testHelperRevokeRefreshToken/db=%s", k), testHelperRevokeRefreshToken(store))
t.Run(fmt.Sprintf("case=testHelperCreateGetDeletePKCERequestSession/db=%s", k), testHelperCreateGetDeletePKCERequestSession(store))
t.Run(fmt.Sprintf("case=testHelperFlushTokens/db=%s", k), testHelperFlushTokens(store, time.Hour))
t.Run(fmt.Sprintf("case=testHelperFlushTokensWithLimitAndBatchSize/db=%s", k), testHelperFlushTokensWithLimitAndBatchSize(store, 3, 2))
t.Run(fmt.Sprintf("case=testFositeStoreSetClientAssertionJWT/db=%s", k), testFositeStoreSetClientAssertionJWT(store))
t.Run(fmt.Sprintf("case=testFositeStoreClientAssertionJWTValid/db=%s", k), testFositeStoreClientAssertionJWTValid(store))
t.Run(fmt.Sprintf("case=testHelperDeleteAccessTokens/db=%s", k), testHelperDeleteAccessTokens(store))
Expand Down Expand Up @@ -443,23 +444,23 @@ func testHelperFlushTokens(x InternalRegistry, lifespan time.Duration) func(t *t
require.NoError(t, err)
}

require.NoError(t, m.FlushInactiveAccessTokens(ctx, time.Now().Add(-time.Hour*24)))
require.NoError(t, m.FlushInactiveAccessTokens(ctx, time.Now().Add(-time.Hour*24), 100, 10))
_, err := m.GetAccessTokenSession(ctx, "flush-1", ds)
require.NoError(t, err)
_, err = m.GetAccessTokenSession(ctx, "flush-2", ds)
require.NoError(t, err)
_, err = m.GetAccessTokenSession(ctx, "flush-3", ds)
require.NoError(t, err)

require.NoError(t, m.FlushInactiveAccessTokens(ctx, time.Now().Add(-(lifespan+time.Hour/2))))
require.NoError(t, m.FlushInactiveAccessTokens(ctx, time.Now().Add(-(lifespan+time.Hour/2)), 100, 10))
_, err = m.GetAccessTokenSession(ctx, "flush-1", ds)
require.NoError(t, err)
_, err = m.GetAccessTokenSession(ctx, "flush-2", ds)
require.NoError(t, err)
_, err = m.GetAccessTokenSession(ctx, "flush-3", ds)
require.Error(t, err)

require.NoError(t, m.FlushInactiveAccessTokens(ctx, time.Now()))
require.NoError(t, m.FlushInactiveAccessTokens(ctx, time.Now(), 100, 10))
_, err = m.GetAccessTokenSession(ctx, "flush-1", ds)
require.NoError(t, err)
_, err = m.GetAccessTokenSession(ctx, "flush-2", ds)
Expand All @@ -469,6 +470,38 @@ func testHelperFlushTokens(x InternalRegistry, lifespan time.Duration) func(t *t
}
}

func testHelperFlushTokensWithLimitAndBatchSize(x InternalRegistry, limit int, batchSize int) func(t *testing.T) {
m := x.OAuth2Storage()
ds := &Session{}

return func(t *testing.T) {
ctx := context.Background()
var requests []*fosite.Request

// create five expired requests
id := uuid.New()
for i := 0; i < 5; i++ {
r := createTestRequest(fmt.Sprintf("%s-%d", id, i+1))
r.RequestedAt = time.Now().Add(-2 * time.Hour)
mockRequestForeignKey(t, r.ID, x, false)
require.NoError(t, m.CreateAccessTokenSession(ctx, r.ID, r))
_, err := m.GetAccessTokenSession(ctx, r.ID, ds)
require.NoError(t, err)
requests = append(requests, r)
}

require.NoError(t, m.FlushInactiveAccessTokens(ctx, time.Now(), limit, batchSize))
for i := range requests {
_, err := m.GetAccessTokenSession(ctx, requests[i].ID, ds)
if i >= limit {
require.NoError(t, err)
} else {
require.Error(t, err)
}
}
}
}

func testFositeSqlStoreTransactionCommitAccessToken(m InternalRegistry) func(t *testing.T) {
return func(t *testing.T) {
{
Expand Down
4 changes: 2 additions & 2 deletions oauth2/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,12 +531,12 @@ func (h *Handler) FlushHandler(w http.ResponseWriter, r *http.Request, _ httprou
fr.NotAfter = time.Now()
}

if err := h.r.OAuth2Storage().FlushInactiveAccessTokens(r.Context(), fr.NotAfter); err != nil {
if err := h.r.OAuth2Storage().FlushInactiveAccessTokens(r.Context(), fr.NotAfter, 1000, 100); err != nil {
h.r.Writer().WriteError(w, r, err)
return
}

if err := h.r.OAuth2Storage().FlushInactiveRefreshTokens(r.Context(), fr.NotAfter); err != nil {
if err := h.r.OAuth2Storage().FlushInactiveRefreshTokens(r.Context(), fr.NotAfter, 1000, 100); err != nil {
h.r.Writer().WriteError(w, r, err)
return
}
Expand Down
Loading

0 comments on commit 6ea0bf8

Please sign in to comment.