Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug 2090685: Cache object storer for subsequent uploads #40

Merged
merged 1 commit into from May 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion controllers/suite_test.go
Expand Up @@ -183,7 +183,7 @@ var _ = BeforeSuite(func() {
Client: k8sManager.GetClient(),
APIReader: k8sManager.GetAPIReader(),
Log: ctrl.Log.WithName("controllers").WithName("VolumeReplicationGroup"),
ObjStoreGetter: ramencontrollers.S3ObjectStoreGetter(),
ObjStoreGetter: fakeObjectStoreGetter{},
PVDownloader: FakePVDownloader{},
PVUploader: FakePVUploader{},
PVDeleter: FakePVDeleter{},
Expand Down
78 changes: 57 additions & 21 deletions controllers/volumereplicationgroup_controller.go
Expand Up @@ -53,7 +53,7 @@ type PVDownloader interface {
}

type PVUploader interface {
UploadPV(v interface{}, s3ProfileName string, pvc *corev1.PersistentVolumeClaim) error
UploadPV(storer ObjectStorer, v interface{}, s3ProfileName string, pvc *corev1.PersistentVolumeClaim) error
}

type PVDeleter interface {
Expand Down Expand Up @@ -312,6 +312,11 @@ func (r *VolumeReplicationGroupReconciler) Reconcile(ctx context.Context, req ct
return v.processVRG()
}

type CachedObjectStorer struct {
storer ObjectStorer
err error
}

type VRGInstance struct {
reconciler *VolumeReplicationGroupReconciler
ctx context.Context
Expand All @@ -322,6 +327,7 @@ type VRGInstance struct {
replClassList *volrep.VolumeReplicationClassList
vrcUpdated bool
namespacedName string
objectStorers map[string]CachedObjectStorer
}

const (
Expand Down Expand Up @@ -1452,10 +1458,28 @@ func (v *VRGInstance) uploadPVToS3Stores(pvc *corev1.PersistentVolumeClaim, log

func (v *VRGInstance) PVUploadToObjectStore(pvc *corev1.PersistentVolumeClaim,
log logr.Logger) ([]string, error) {
var (
err error
storer ObjectStorer
)

s3Profiles := []string{}
// Upload the PV to all the S3 profiles in the VRG spec
for _, s3ProfileName := range v.instance.Spec.S3Profiles {
if err := v.reconciler.PVUploader.UploadPV(v, s3ProfileName, pvc); err != nil {
switch {
case s3ProfileName == "":
err = fmt.Errorf("error uploading cluster data of PV %s because VRG spec has no S3 profiles",
pvc.Name)
default:
storer, err = v.getOrCacheObjectStorer(s3ProfileName)
if err != nil {
break
}

err = v.reconciler.PVUploader.UploadPV(storer, v, s3ProfileName, pvc)
}

if err != nil {
log.Error(err, fmt.Sprintf("error uploading PV cluster data to s3Profile %s, %v",
s3ProfileName, err))

Expand All @@ -1476,36 +1500,48 @@ func (v *VRGInstance) PVUploadToObjectStore(pvc *corev1.PersistentVolumeClaim,
return s3Profiles, nil
}

func (v *VRGInstance) getOrCacheObjectStorer(s3ProfileName string) (ObjectStorer, error) {
if v.objectStorers == nil {
v.objectStorers = make(map[string]CachedObjectStorer)
}

// return a cached objectStore if present
if cachedObjectStore, ok := v.objectStorers[s3ProfileName]; ok {
return cachedObjectStore.storer, cachedObjectStore.err
}

// create a new objectStore (connection)
objectStore, err := v.reconciler.ObjStoreGetter.ObjectStore(
v.ctx,
v.reconciler.APIReader,
s3ProfileName,
v.namespacedName,
v.log)
if err != nil {
err = fmt.Errorf("error connecting to object store for s3Profile %s, %w", s3ProfileName, err)
}

// cache the objectStore (or the error)
v.objectStorers[s3ProfileName] = CachedObjectStorer{
storer: objectStore,
Copy link
Collaborator

@BenamarMk BenamarMk May 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this objectstore encounters an error then it is doomed forever, right? Once it is added with an error, we will always return it in line 1510 with the same error. No retry. I guess all that's need to fix it is to return the err in line 1521

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is true for this instance of VRG, the next reconcile will get a new VRGInstance, where we will try to connect once more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which also makes me wonder why we do not close the sessions we open with the object store? (different issue, but was checking based on this comment if we were closing the connection which would cause other failures)

err: err,
}

return objectStore, err
}

type ObjectStorePVUploader struct{}

// UploadPV checks if the VRG spec has been configured with an s3 endpoint,
// connects to the object store, gets the PV cluster data of the input PVC from
// etcd, creates a bucket in s3 store, uploads the PV cluster data to s3 store.
func (ObjectStorePVUploader) UploadPV(v interface{}, s3ProfileName string,
func (ObjectStorePVUploader) UploadPV(objectStore ObjectStorer, v interface{}, s3ProfileName string,
pvc *corev1.PersistentVolumeClaim) (err error) {
vrg, ok := v.(*VRGInstance)
if !ok {
return fmt.Errorf("error uploading PV, input is not VRGInstance")
}

if s3ProfileName == "" {
return fmt.Errorf("error uploading cluster data of PV %s because VRG spec has no S3 profiles",
pvc.Name)
}

objectStore, err :=
vrg.reconciler.ObjStoreGetter.ObjectStore(
vrg.ctx,
vrg.reconciler.APIReader,
s3ProfileName,
vrg.namespacedName, /* debugTag */
vrg.log,
)
if err != nil {
return fmt.Errorf("error connecting to object store when uploading PV %s to s3Profile %s, %w",
pvc.Name, s3ProfileName, err)
}

pv := corev1.PersistentVolume{}
volumeName := pvc.Spec.VolumeName
pvObjectKey := client.ObjectKey{Name: volumeName}
Expand Down
6 changes: 5 additions & 1 deletion controllers/volumereplicationgroup_controller_test.go
Expand Up @@ -1106,7 +1106,11 @@ func (s FakePVDownloader) DownloadPVs(ctx context.Context, r client.Reader,

type FakePVUploader struct{}

func (s FakePVUploader) UploadPV(v interface{}, s3ProfileName string, pvc *corev1.PersistentVolumeClaim) error {
func (s FakePVUploader) UploadPV(
storer vrgController.ObjectStorer,
v interface{},
s3ProfileName string,
pvc *corev1.PersistentVolumeClaim) error {
UploadedPVs[pvc.Spec.VolumeName] = Empty{}

return nil
Expand Down