Conversation
9d9c513 to
036db8a
Compare
036db8a to
4d60edf
Compare
It's probably a bit worse: users who start an MPU during the rollout may also lose that MPU. I think we may need some product direction here. There is another option - to release a version supporting both modes, upgrade to that version, then after a week release a version dropping the old mode. That one is of course expensive, hence we should ask product. |
I don't think that's necessary since once we declare the migration path we can require users to not perform any MPUs during the upgrade. The choice whether to complete outstanding MPUs before upgrade or let the migration process abort them - all of that responsibility will be rolled down to the user |
| t.Cleanup(func() { | ||
| _, _ = s3Client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ | ||
| Bucket: aws.String(repo), | ||
| Key: aws.String(key), | ||
| UploadId: resp.UploadId, | ||
| }) | ||
| }) |
There was a problem hiding this comment.
Worried that this will do more harm than good.
Is there a way to list all the uploads on and delete/abort the ones that relevant for the test before the test starts? it will enable us to identify real failure during the test and the cleanup/setup that we perform before the test.
esti/s3_gateway_test.go
Outdated
| // IsTruncated should be nil or false when not truncated | ||
| if outputExact.IsTruncated != nil { | ||
| require.False(t, *outputExact.IsTruncated, "should not be truncated when request is completely fulfilled") | ||
| } |
There was a problem hiding this comment.
Relevant to multiple places in the test code.
In this case we require nil or or pointer to false - we can require using apiutil.Value or require.True(outputExact.IsTruncated == nil || !*outputExact.IsTruncated).
Not a blocker.
pkg/gateway/multipart/iterator.go
Outdated
| // UploadIterator is an iterator over multipart uploads sorted by Path, then UploadID | ||
| type UploadIterator interface { | ||
| // Next advances the iterator to the next upload | ||
| // Returns true if there is a next upload, false otherwise | ||
| Next() bool | ||
| // Value returns the current upload | ||
| // Should only be called after Next returns true | ||
| Value() *Upload | ||
| // Err returns any error encountered during iteration | ||
| Err() error | ||
| // Close releases resources associated with the iterator | ||
| Close() | ||
| // SeekGE seeks to the first upload with key >= uploadIDKey(path, uploadID) | ||
| // After calling SeekGE, Next() must be called to access the first element at or after the seek position | ||
| SeekGE(key, uploadID string) | ||
| } |
There was a problem hiding this comment.
The interface is part of the tracker interface and code here should define the UploadIterator implementation and return a pointer to the actual struct.
There was a problem hiding this comment.
Not sure I understand the comment - please clarify
There was a problem hiding this comment.
Moving UploadIterator interface to pkg/gateway/multipart/tracker.go where it is used.
The newUploadIterator func should return the implementation type - *kvUploadIterator.
| if errors.Is(err, kv.ErrNotFound) { | ||
| _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrNoSuchBucket)) | ||
| } | ||
| _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError)) |
| func (it *kvUploadIterator) Err() error { | ||
| if it.err != nil { | ||
| return it.err | ||
| } | ||
| if !it.closed { | ||
| return it.kvIter.Err() | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func (it *kvUploadIterator) Close() { | ||
| if it.closed { | ||
| return | ||
| } | ||
| it.kvIter.Close() | ||
| it.closed = true | ||
| } |
There was a problem hiding this comment.
From this code it seems that we don't need to manage the 'closed' state - we just delegate the state to the underlaying iterator. The closed indicator can be the kvIter itself if needed.
| if errors.Is(err, kv.ErrNotFound) { | ||
| _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrNoSuchBucket)) | ||
| } | ||
| _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError)) |
| // Check if there are more uploads (for IsTruncated flag) | ||
| // If we exited the loop due to length limit, iter.Next() hasn't been called for the next item yet | ||
| isTruncated := iter.Next() | ||
|
|
||
| // Set pagination markers for next page | ||
| var nextKeyMarker, nextUploadIDMarker string | ||
| if isTruncated && len(uploads) > 0 { | ||
| last := uploads[len(uploads)-1] | ||
| nextKeyMarker = last.Key | ||
| nextUploadIDMarker = last.UploadID | ||
| } |
There was a problem hiding this comment.
to prevent endless loop - in case isTruncated is true but len(uploads) is zero, we should set isTruncated to false
There was a problem hiding this comment.
It shouldn't happen but added anyway
| if errors.Is(err, kv.ErrNotFound) { | ||
| _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrNoSuchBucket)) | ||
| } | ||
| _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError)) |
| if errors.Is(err, kv.ErrNotFound) { | ||
| _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrNoSuchBucket)) | ||
| } | ||
| _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError)) |
| if errors.Is(err, kv.ErrNotFound) { | ||
| _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrNoSuchBucket)) | ||
| } | ||
| _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError)) |
…-ordering-9554 # Conflicts: # esti/commit_test.go # esti/s3_gateway_test.go
|
@nopcoder Thanks for the thorough review - I hope I didn't miss anything |
This is even more product than before. Bear in mind that "the user" is not a single person. For instance, think about how to roll this out on lakeFS Cloud. We would need to announce that at a certain time slot all MPUs are disallowed.1 And then we must upgrade the entire cluster within that time slot. This will probably require CS involvement. Footnotes
|
I agree completely with everything you said. |
nopcoder
left a comment
There was a problem hiding this comment.
Thanks for addressing all the comments.
There is one concern I have with the kvUploadIterator implementation:
- The Seek implementation first checks for errors which it means we don't enable calling Seek after we failed to New or Seek
- Because Seek is not exactly like New in the case we expect the user to call Close in case we didn't return an error - we need to check if we have 'it.kvIter' set, in the case New was successful, Seek failed and Close that will find 'it.kvIter' set to nil.
|
You've asked for product feedback so here it is :) To make things simple, let's put forth a constraint: our rule of thumb should be to never require downtime to upgrade a lakeFS minor version. There might be extreme cases where we'd have to break that rule, but I'm not convinced this is one of them. I suggest we either find a way to fix this while allowing MPUs to continue uninterrupted during the upgrade process (without downtime) - or consider this a big enough breaking change that we simply can't introduce in lakeFS 1.x. |
Closes #9554
This PR should not be merged before we decide on migration strategy (see below)
Change Description
Bug Fix
Testing Details
Added new unit and integration tests
Breaking Change?
Yes
Migration Strategy:
This PR introduces a breaking change due to the change in the upload ID key in our database.
This means that users who upgrade to this version and have ongoing MPUs will basically lose them.
This becomes even more of an issue since in our current implementation we don't save upload IDs in the context of a repository and the UplaodID data does not save the repository information - and therefore we cannot go through the route of standard migration
Below is a proposal on how to deal with existing MPUs:
Create a migration flow that basically aborts any ongoing MPUs and cleanup any remaining keys in the old partition.