-
Notifications
You must be signed in to change notification settings - Fork 346
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Expire objects from S3 according to retention policy #309
Conversation
8c60e01
to
9254611
Compare
Note well: retention needs an access key ID in order to retrieve the account ID. Currently we read it from In a production setting I expect this field to be set. Options:
|
catalog/cataloger_retention.go
Outdated
// QueryExpired returns ExpiryRows iterating over all objects to expire on repositoryName | ||
// according to policy to channel out. | ||
func (c *cataloger) QueryExpired(ctx context.Context, repositoryName string, policy *retention.Policy) (ExpiryRows, error) { | ||
func (c *cataloger) QueryExpired(ctx context.Context, repositoryName string, policy *Policy) (ExpiryRows, error) { | ||
logger := logging.Default().WithContext(ctx).WithField("policy", *policy) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you might be looking for logging.FromContext(ctx)
which would populate the logger returned with all context logging values?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SG, thanks!
ON a.physical_address = b.physical_address) | ||
WHERE a.c = b.c) | ||
`, | ||
expiryByEntriesQueryString, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not use a builder?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No way to interpolate table names (because SQL or Postgres and placeholders). So the builder will buy us very little, I would still need %-interpolation. See e.g. this message on the PostgresQL mailing list.
if err != nil { | ||
return nil, fmt.Errorf("running query: %w", err) | ||
} | ||
return &expiryRows{rows, repositoryName}, nil | ||
var ret ExpiryRows = &expiryRows{rows: rows, RepositoryName: repositoryName} | ||
return ret, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could result in a huge list, exceeding available memory size.
I don't know we should address it now, but I'd probably go with pagination (using a very big page size) to make sure we don't end up with a crash loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per documentation and @nopcoder (not to mention implementation bugs I had when I returned unread rows
from a transaction), rows
is just an iterator. This is not in memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regardless. @tzahij asked me to consider going back to using a table here, and also has concerns about the whole system as created on this series of PRs. We shall discuss f2f tomorrow.
}, | ||
}) | ||
if err != nil { | ||
t.Fatalf("read all expiration records failed: %s", err) | ||
} | ||
resultByPhysicalAddress := make(map[string]*ExpireResult, len(allResults)) | ||
for _, result := range allResults { | ||
t.Logf("Result: %+v", result) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume this was for debugging? do we still want to print it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIU test logs only show up on failures (by default). Let me know if you disagree or think it's too much, I'll happily remove.
logger.WithError(err).Fatal("cannot list repositories") | ||
} | ||
|
||
// TODO(ariels: fail on failure! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please do!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Separate PR: this is mainly called in buildS3Adapter, I want to understand better what counts as failure here.
config/config.go
Outdated
return cfg | ||
} | ||
|
||
func GetAccount(awsConfig *aws.Config) (*string, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason to return a pointer to a string here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AWS likes to use pointers when serializing. Changing to be more Goish.
go.mod
Outdated
@@ -39,6 +39,7 @@ require ( | |||
github.com/lib/pq v1.7.0 // indirect | |||
github.com/lunixbochs/vtclean v1.0.0 // indirect | |||
github.com/manifoldco/promptui v0.7.0 | |||
github.com/matoous/go-nanoid v1.4.1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see where we're using it in the PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was using it, forgot to clean it. Thanks!
) | ||
|
||
// WriteExpiryResultsToSeekableReader returns a file-backed (Seeker) Reader holding the contents of expiryRows. | ||
func WriteExpiryResultsToSeekableReader(ctx context.Context, expiryRows catalog.ExpiryRows) (fileutil.RewindableReader, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Usually easier to test (and compose) when we pass the Reader as a dependency instead of returning one.
it's nicer but def. not important.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's that kind of Reader. This is a rewindable file abstraction: first you write it, then you transform it into a rewindable reader.
Writer: ret, | ||
CsvWriter: csv.NewWriter(ret), | ||
} | ||
(*bw)[bucketName] = record |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if I understand the rest of the code correctly, we only ever call GetWriter and iterate over bucket/writer pairs. so never really get or set values directly. Not sure type aliasing a map is the best API for that...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
l.120 is random access to get the writer for a bucket. (We need this complexity because different repos could live on different buckets, and namespaces mean they could share them too)
} | ||
resetableReader, count, err := encodingData.Writer.StartReading() | ||
if err != nil { | ||
bucketLogger.WithError(err).Error("failed to start reading encoded CSVs; lose all bucket expiries") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all errors in this loop are swallowed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes: we tried to start expiry and failed. We're not going to manage to expire them today. Best I can think of is log an error and hope tomorrow is another day. With other changes requested it does fail the expiry run -- but the objects did not get expired.
Monitoring will make it clearer that the repo is not being expired. Not sure what else I can do ("it's an ops problem").
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks; PTAL!
catalog/cataloger_retention.go
Outdated
// QueryExpired returns ExpiryRows iterating over all objects to expire on repositoryName | ||
// according to policy to channel out. | ||
func (c *cataloger) QueryExpired(ctx context.Context, repositoryName string, policy *retention.Policy) (ExpiryRows, error) { | ||
func (c *cataloger) QueryExpired(ctx context.Context, repositoryName string, policy *Policy) (ExpiryRows, error) { | ||
logger := logging.Default().WithContext(ctx).WithField("policy", *policy) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SG, thanks!
ON a.physical_address = b.physical_address) | ||
WHERE a.c = b.c) | ||
`, | ||
expiryByEntriesQueryString, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No way to interpolate table names (because SQL or Postgres and placeholders). So the builder will buy us very little, I would still need %-interpolation. See e.g. this message on the PostgresQL mailing list.
if err != nil { | ||
return nil, fmt.Errorf("running query: %w", err) | ||
} | ||
return &expiryRows{rows, repositoryName}, nil | ||
var ret ExpiryRows = &expiryRows{rows: rows, RepositoryName: repositoryName} | ||
return ret, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per documentation and @nopcoder (not to mention implementation bugs I had when I returned unread rows
from a transaction), rows
is just an iterator. This is not in memory.
}, | ||
}) | ||
if err != nil { | ||
t.Fatalf("read all expiration records failed: %s", err) | ||
} | ||
resultByPhysicalAddress := make(map[string]*ExpireResult, len(allResults)) | ||
for _, result := range allResults { | ||
t.Logf("Result: %+v", result) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIU test logs only show up on failures (by default). Let me know if you disagree or think it's too much, I'll happily remove.
logger.WithError(err).Fatal("cannot list repositories") | ||
} | ||
|
||
// TODO(ariels: fail on failure! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Separate PR: this is mainly called in buildS3Adapter, I want to understand better what counts as failure here.
go.mod
Outdated
@@ -39,6 +39,7 @@ require ( | |||
github.com/lib/pq v1.7.0 // indirect | |||
github.com/lunixbochs/vtclean v1.0.0 // indirect | |||
github.com/manifoldco/promptui v0.7.0 | |||
github.com/matoous/go-nanoid v1.4.1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was using it, forgot to clean it. Thanks!
) | ||
|
||
// WriteExpiryResultsToSeekableReader returns a file-backed (Seeker) Reader holding the contents of expiryRows. | ||
func WriteExpiryResultsToSeekableReader(ctx context.Context, expiryRows catalog.ExpiryRows) (fileutil.RewindableReader, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's that kind of Reader. This is a rewindable file abstraction: first you write it, then you transform it into a rewindable reader.
Writer: ret, | ||
CsvWriter: csv.NewWriter(ret), | ||
} | ||
(*bw)[bucketName] = record |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
l.120 is random access to get the writer for a bucket. (We need this complexity because different repos could live on different buckets, and namespaces mean they could share them too)
} | ||
resetableReader, count, err := encodingData.Writer.StartReading() | ||
if err != nil { | ||
bucketLogger.WithError(err).Error("failed to start reading encoded CSVs; lose all bucket expiries") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes: we tried to start expiry and failed. We're not going to manage to expire them today. Best I can think of is log an error and hope tomorrow is another day. With other changes requested it does fail the expiry run -- but the objects did not get expired.
Monitoring will make it clearer that the repo is not being expired. Not sure what else I can do ("it's an ops problem").
cmd/lakefs/cmd/expire.go
Outdated
continue | ||
} | ||
|
||
retention.ExpireOnS3(ctx, s3ControlClient, s3Client, cataloger, expiryReader, &expiryParams) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do log the errors; there's not much more to do, but failing the process is a good idea!
I plan to fix the race found by @tzahij under a separate PR, this one is large enough and the issue is not in any of the code on this one. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work on this!
Prevent dependency loops. They are *parsed* to API objects in retention, but catalog uses them and is at a lower level.
Operationally run this periodically (e.g. daily) on the lakeFS server (hence lakefs and not via the API). Adds these additional configuration variables in `blockstore.s3.retention`: - `role_arn`: ARN to use in batch tagging - `manifest_base_url`: S3 URL prefix (e.g. directory) to use to upload tagging manifest
Inter-branch copies can share the same dedupe ID, don't expire until they all agree about expiration. The object is available on all its branches until it expires from the last branch: retention is *not* a synchronous mechanism and applications are not allowed to rely on it occurring. - Manifests are CSV not JSON format (whoops) - Flush CSV encoder Tested with tiny expiration vs. S3 -- objects were tagged.
Find some trivial errors in the object, doesn't cost much to perform. Does *not* actually discover that no other fields are allowed in `Report` when setting `Enabled: false`.
Thanks! Fixing numerous minor conflicts and pulling (unless one of the conflicts turns out to be non-minor). |
- Repos with no retention should be skipped with no error - Use `logger.FromContext` - Remove unused go-nanoid
Tested by using IAM to fail CreateJob and seeing an appropriately FATAL report.
0677198
to
9d6b701
Compare
@ozkatz I seem to have lost your approval due to the rebase changes (and/or the rebase itself). Dunno if that's our configuration or my mishandling of something git-(hub?)-ish. |
…e-from-s3 Expire objects from S3 according to retention policy Former-commit-id: 3cc43a763b7ac551977418b922a5281109251eab
Add a new command
lakefs expire
. It should be run on the server in order to emit logs and to avoid holding a long-lived connection. When run:"lakefs-expire"
value"1"
.lakefs diagnose
verifies that there is such a lifecycle rule.)Tested on my local-with-S3 lakefs instance.
Limitation: Object expiry on namespaced LakeFS repositories untested.