From a07fc73ad743ba7082b85b09001136c1c4519ddf Mon Sep 17 00:00:00 2001 From: Steven Guiheux Date: Wed, 2 Jun 2021 10:27:25 +0200 Subject: [PATCH 1/8] fix: read item from buffer while deleting it --- engine/cdn/cdn_gc.go | 33 +++++- engine/cdn/cdn_gc_test.go | 134 +++++++++++++++++++++++- engine/cdn/cdn_item.go | 42 +++++++- engine/cdn/cdn_item_test.go | 52 +++++++++ engine/cdn/storage/storageunit.go | 4 + engine/cdn/storage/storageunit_purge.go | 4 +- engine/cdn/storage/types.go | 9 ++ 7 files changed, 265 insertions(+), 13 deletions(-) diff --git a/engine/cdn/cdn_gc.go b/engine/cdn/cdn_gc.go index 7a7b786f5d..c28fa4652d 100644 --- a/engine/cdn/cdn_gc.go +++ b/engine/cdn/cdn_gc.go @@ -7,6 +7,7 @@ import ( "github.com/rockbears/log" + "github.com/ovh/cds/engine/cache" "github.com/ovh/cds/engine/cdn/item" "github.com/ovh/cds/engine/cdn/storage" "github.com/ovh/cds/sdk" @@ -153,7 +154,6 @@ func (s *Service) cleanBuffer(ctx context.Context) error { if len(itemIDs) == 0 { continue } - itemUnitsIDs, err := storage.LoadAllItemUnitsIDsByItemIDsAndUnitID(s.mustDBWithCtx(ctx), bu.ID(), itemIDs) if err != nil { ctx := sdk.ContextWithStacktrace(ctx, err) @@ -161,6 +161,35 @@ func (s *Service) cleanBuffer(ctx context.Context) error { continue } + itemUnitsToMark := make([]string, 0, len(itemUnitsIDs)) + for i := range itemUnitsIDs { + uiID := itemUnitsIDs[i] + lockKey := cache.Key(storage.FileBufferKey, bu.ID(), "lock", uiID) + b, err := s.Cache.Lock(lockKey, 30*time.Second, 0, 1) + if err != nil { + log.Error(ctx, "unable to lock unit item: %v: %v", uiID, err) + continue + } + if !b { + log.Info(ctx, "do not delete item unit %s, already locked: %v", uiID) + _ = s.Cache.Unlock(lockKey) + continue + } + readerPatternKey := cache.Key(storage.FileBufferKey, bu.ID(), "reader", uiID, "*") + keys, err := s.Cache.Keys(cache.Key(readerPatternKey)) + if err != nil { + log.Error(ctx, "unable to check if item unit is currently reading by cdn") + _ = s.Cache.Unlock(lockKey) + continue + } + if len(keys) > 0 { + log.Info(ctx, "do not delete item unit, it is currently reading by cdn") + _ = s.Cache.Unlock(lockKey) + continue + } + itemUnitsToMark = append(itemUnitsToMark, uiID) + } + tx, err := s.mustDBWithCtx(ctx).Begin() if err != nil { ctx := sdk.ContextWithStacktrace(ctx, err) @@ -168,7 +197,7 @@ func (s *Service) cleanBuffer(ctx context.Context) error { continue } - if _, err := storage.MarkItemUnitToDelete(tx, itemUnitsIDs); err != nil { + if _, err := storage.MarkItemUnitToDelete(tx, itemUnitsToMark); err != nil { _ = tx.Rollback() ctx := sdk.ContextWithStacktrace(ctx, err) log.Error(ctx, "unable to mark item as delete: %v", err) diff --git a/engine/cdn/cdn_gc_test.go b/engine/cdn/cdn_gc_test.go index 0bd0d60ca0..07a16f7110 100644 --- a/engine/cdn/cdn_gc_test.go +++ b/engine/cdn/cdn_gc_test.go @@ -6,6 +6,12 @@ import ( "testing" "time" + "github.com/ovh/symmecrypt/ciphers/aesgcm" + "github.com/ovh/symmecrypt/convergent" + "github.com/rockbears/log" + "github.com/stretchr/testify/require" + + "github.com/ovh/cds/engine/cache" "github.com/ovh/cds/engine/cdn/item" "github.com/ovh/cds/engine/cdn/lru" "github.com/ovh/cds/engine/cdn/storage" @@ -13,10 +19,6 @@ import ( "github.com/ovh/cds/engine/gorpmapper" "github.com/ovh/cds/engine/test" "github.com/ovh/cds/sdk" - "github.com/ovh/symmecrypt/ciphers/aesgcm" - "github.com/ovh/symmecrypt/convergent" - "github.com/rockbears/log" - "github.com/stretchr/testify/require" ) func TestCleanSynchronizedItem(t *testing.T) { @@ -503,3 +505,127 @@ func TestPurgeItem(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, len(items)) } + +func TestCleanSynchronizedReadingItem(t *testing.T) { + m := gorpmapper.New() + item.InitDBMapping(m) + storage.InitDBMapping(m) + + log.Factory = log.NewTestingWrapper(t) + db, factory, store, cancel := test.SetupPGToCancel(t, m, sdk.TypeCDN) + t.Cleanup(cancel) + + cfg := test.LoadTestingConf(t, sdk.TypeCDN) + + cdntest.ClearItem(t, context.TODO(), m, db) + cdntest.ClearUnits(t, context.TODO(), m, db) + + // Create cdn service + s := Service{ + DBConnectionFactory: factory, + Cache: store, + Mapper: m, + } + s.GoRoutines = sdk.NewGoRoutines(context.TODO()) + + tmpDir, err := ioutil.TempDir("", t.Name()+"-cdn-1-*") + require.NoError(t, err) + + tmpDir2, err := ioutil.TempDir("", t.Name()+"-cdn-2-*") + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + t.Cleanup(cancel) + + cdnUnits, err := storage.Init(ctx, m, store, db.DbMap, sdk.NewGoRoutines(ctx), storage.Configuration{ + HashLocatorSalt: "thisismysalt", + Buffers: map[string]storage.BufferConfiguration{ + "redis_buffer": { + Redis: &storage.RedisBufferConfiguration{ + Host: cfg["redisHost"], + Password: cfg["redisPassword"], + }, + BufferType: storage.CDNBufferTypeLog, + }, + "file_buffer": { + Local: &storage.LocalBufferConfiguration{ + Path: tmpDir2, + }, + BufferType: storage.CDNBufferTypeFile, + }, + }, + Storages: map[string]storage.StorageConfiguration{ + "fs-backend": { + Local: &storage.LocalStorageConfiguration{ + Path: tmpDir, + Encryption: []convergent.ConvergentEncryptionConfig{ + { + Cipher: aesgcm.CipherName, + LocatorSalt: "secret_locator_salt", + SecretValue: "secret_value", + }, + }, + }, + }, + "cds-backend": { + CDS: &storage.CDSStorageConfiguration{ + Host: "lolcat.host", + Token: "mytoken", + }, + }, + }, + }) + require.NoError(t, err) + s.Units = cdnUnits + + // Add Item in redis / fs/ cds -will be delete from redis + it := sdk.CDNItem{ + ID: sdk.UUID(), + Type: sdk.CDNTypeItemRunResult, + Status: sdk.CDNStatusItemCompleted, + APIRefHash: sdk.RandomString(10), + } + require.NoError(t, item.Insert(context.TODO(), s.Mapper, db, &it)) + iuCDS := sdk.CDNItemUnit{UnitID: s.Units.Storages[1].ID(), ItemID: it.ID, Type: it.Type} + require.NoError(t, storage.InsertItemUnit(context.TODO(), s.Mapper, db, &iuCDS)) + iuFileBuf := sdk.CDNItemUnit{UnitID: s.Units.FileBuffer().ID(), ItemID: it.ID, Type: it.Type} + require.NoError(t, storage.InsertItemUnit(context.TODO(), s.Mapper, db, &iuFileBuf)) + iuFileStorage := sdk.CDNItemUnit{UnitID: s.Units.Storages[0].ID(), ItemID: it.ID, Type: it.Type} + require.NoError(t, storage.InsertItemUnit(context.TODO(), s.Mapper, db, &iuFileStorage)) + + /////////////////////////////////////// + // 1st test, getItem Lock the item unit + /////////////////////////////////////// + lockKey := cache.Key(storage.FileBufferKey, s.Units.FileBuffer().ID(), "lock", iuFileBuf.ID) + hasLocked, err := s.Cache.Lock(lockKey, 5*time.Second, 0, 1) + require.NoError(t, err) + t.Cleanup(func() { + s.Cache.Unlock(lockKey) + }) + require.True(t, hasLocked) + require.NoError(t, s.cleanBuffer(context.TODO())) + + _, err = storage.LoadItemUnitByID(ctx, m, db, iuFileBuf.ID) + require.NoError(t, err) + + require.NoError(t, s.Cache.Unlock(lockKey)) + + //////////////////////////////////////////////////////// + // 2nd test, getItem is reading the file from the buffer + //////////////////////////////////////////////////////// + readerKey := cache.Key(storage.FileBufferKey, s.Units.FileBuffer().ID(), "reader", iuFileBuf.ID, sdk.UUID()) + require.NoError(t, s.Cache.SetWithTTL(readerKey, true, 30)) + require.NoError(t, s.cleanBuffer(context.TODO())) + + _, err = storage.LoadItemUnitByID(ctx, m, db, iuFileBuf.ID) + require.NoError(t, err) + + require.NoError(t, s.Cache.Delete(readerKey)) + //////////////////////////////////////////////////////// + // 3rd test, mark as delete + //////////////////////////////////////////////////////// + require.NoError(t, s.cleanBuffer(context.TODO())) + + _, err = storage.LoadItemUnitByID(ctx, m, db, iuFileBuf.ID) + require.True(t, sdk.ErrorIs(err, sdk.ErrNotFound)) +} diff --git a/engine/cdn/cdn_item.go b/engine/cdn/cdn_item.go index 82adf7afe1..8e2a6671a8 100644 --- a/engine/cdn/cdn_item.go +++ b/engine/cdn/cdn_item.go @@ -15,6 +15,7 @@ import ( "github.com/rockbears/log" + "github.com/ovh/cds/engine/cache" "github.com/ovh/cds/engine/cdn/item" "github.com/ovh/cds/engine/cdn/storage" "github.com/ovh/cds/engine/gorpmapper" @@ -208,13 +209,42 @@ func (s *Service) getItemFileValue(ctx context.Context, t sdk.CDNItemType, apiRe // If item is in Buffer, get from it if itemUnit != nil { - log.Error(ctx, "getItemFileValue> Getting file from buffer") + log.Debug(ctx, "getItemFileValue> Getting file from buffer") - rc, err := s.Units.FileBuffer().NewReader(ctx, *itemUnit) + lockKey := cache.Key(storage.FileBufferKey, s.Units.FileBuffer().ID(), "lock", itemUnit.ID) + hasLocked, err := s.Cache.Lock(lockKey, 5*time.Second, 0, 1) if err != nil { - return nil, nil, nil, err + log.Error(ctx, "unable to get lock for %s", lockKey) } - return itemUnit, s.Units.FileBuffer(), rc, nil + ignoreBuffer := false + if hasLocked { + // Reload to be sure that it's not marked as delete + _, err := storage.LoadItemUnitByID(ctx, s.Mapper, s.mustDBWithCtx(ctx), itemUnit.ID) + if err != nil { + if !sdk.ErrorIs(err, sdk.ErrNotFound) { + log.Error(ctx, "unable to load item unit: %v", err) + } + ignoreBuffer = true + } + + readerKey := cache.Key(storage.FileBufferKey, s.Units.FileBuffer().ID(), "reader", itemUnit.ID, sdk.UUID()) + if err := s.Cache.SetWithTTL(readerKey, true, 30); err != nil { + log.Error(ctx, "unable to set reader on file buffer: %v", err) + ignoreBuffer = true + } + if err := s.Cache.Unlock(lockKey); err != nil { + log.Error(ctx, "unable to release lock for %s", lockKey) + } + } + + if hasLocked && !ignoreBuffer { + rc, err := s.Units.FileBuffer().NewReader(ctx, *itemUnit) + if err != nil { + return nil, nil, nil, err + } + return itemUnit, s.Units.FileBuffer(), rc, nil + } + } // Get from storage @@ -375,6 +405,10 @@ func (s *Service) getRandomItemUnitIDByItemID(ctx context.Context, itemID string return selectedItemUnit.ID, defaultUnitName, nil } + if len(itemUnits) == 1 && s.Units.IsBuffer(itemUnits[0].UnitID) { + return "", "", sdk.WithStack(fmt.Errorf("unable to find a non buffer storage for item: %s", itemID)) + } + // Random pick a unit idx := 0 if len(itemUnits) > 1 { diff --git a/engine/cdn/cdn_item_test.go b/engine/cdn/cdn_item_test.go index 6c336c3ab7..eb2da59c25 100644 --- a/engine/cdn/cdn_item_test.go +++ b/engine/cdn/cdn_item_test.go @@ -13,6 +13,7 @@ import ( "github.com/rockbears/log" "github.com/stretchr/testify/require" + "github.com/ovh/cds/engine/cache" "github.com/ovh/cds/engine/cdn/item" "github.com/ovh/cds/engine/cdn/lru" "github.com/ovh/cds/engine/cdn/redis" @@ -509,3 +510,54 @@ func TestGetItemValue_ThousandLinesReverse(t *testing.T) { require.Equal(t, int64(0), lines[226].Number) require.Equal(t, "Line 0\n", lines[226].Value) } + +func TestGetFileItemFromBuffer(t *testing.T) { + m := gorpmapper.New() + item.InitDBMapping(m) + storage.InitDBMapping(m) + + log.Factory = log.NewTestingWrapper(t) + db, factory, store, cancel := test.SetupPGToCancel(t, m, sdk.TypeCDN) + t.Cleanup(cancel) + + cdntest.ClearItem(t, context.TODO(), m, db) + cdntest.ClearSyncRedisSet(t, store, "local_storage") + + // Create cdn service + s := Service{ + DBConnectionFactory: factory, + Cache: store, + Mapper: m, + } + s.GoRoutines = sdk.NewGoRoutines(context.TODO()) + + ctx, cancel := context.WithCancel(context.TODO()) + t.Cleanup(cancel) + + _ = test.LoadTestingConf(t, sdk.TypeCDN) + cdnUnits := newRunningStorageUnits(t, m, s.DBConnectionFactory.GetDBMap(m)(), ctx, store) + s.Units = cdnUnits + + // Add Item in redis / fs/ cds -will be delete from redis + it := sdk.CDNItem{ + ID: sdk.UUID(), + Type: sdk.CDNTypeItemRunResult, + Status: sdk.CDNStatusItemCompleted, + APIRefHash: sdk.RandomString(10), + } + require.NoError(t, item.Insert(context.TODO(), s.Mapper, db, &it)) + iuFileBuf := sdk.CDNItemUnit{UnitID: s.Units.FileBuffer().ID(), ItemID: it.ID, Type: it.Type} + require.NoError(t, storage.InsertItemUnit(context.TODO(), s.Mapper, db, &iuFileBuf)) + + // IU locked by gc + lockKey := cache.Key(storage.FileBufferKey, s.Units.FileBuffer().ID(), "lock", iuFileBuf.ID) + hasLocked, err := s.Cache.Lock(lockKey, 5*time.Second, 0, 1) + require.NoError(t, err) + require.True(t, hasLocked) + + ui, unit, reader, err := s.getItemFileValue(ctx, sdk.CDNTypeItemRunResult, it.APIRefHash, getItemFileOptions{}) + require.Nil(t, reader) + require.Nil(t, ui) + require.Nil(t, unit) + require.Contains(t, err.Error(), "unable to find a non buffer storage for item") +} diff --git a/engine/cdn/storage/storageunit.go b/engine/cdn/storage/storageunit.go index 81e7bff3d1..e670df7296 100644 --- a/engine/cdn/storage/storageunit.go +++ b/engine/cdn/storage/storageunit.go @@ -20,6 +20,10 @@ import ( cdslog "github.com/ovh/cds/sdk/log" ) +var ( + FileBufferKey = cache.Key("cdn", "unit") +) + func (r RunningStorageUnits) Storage(name string) StorageUnit { for _, x := range r.Storages { if x.Name() == name { diff --git a/engine/cdn/storage/storageunit_purge.go b/engine/cdn/storage/storageunit_purge.go index bcef8cd3a0..6a6ef6d8bd 100644 --- a/engine/cdn/storage/storageunit_purge.go +++ b/engine/cdn/storage/storageunit_purge.go @@ -2,11 +2,9 @@ package storage import ( "context" - - "github.com/rockbears/log" - "github.com/ovh/cds/engine/gorpmapper" "github.com/ovh/cds/sdk" + "github.com/rockbears/log" ) const ( diff --git a/engine/cdn/storage/types.go b/engine/cdn/storage/types.go index 53f4563980..024acfa886 100644 --- a/engine/cdn/storage/types.go +++ b/engine/cdn/storage/types.go @@ -307,6 +307,15 @@ func (x *RunningStorageUnits) FilterItemUnitReaderByType(ius []sdk.CDNItemUnit) return ius } +func (x *RunningStorageUnits) IsBuffer(id string) bool { + for _, buf := range x.Buffers { + if buf.ID() == id { + return true + } + } + return false +} + type LogConfig struct { // Step logs StepMaxSize int64 `toml:"stepMaxSize" default:"15728640" comment:"Max step logs size in bytes (default: 15MB)" json:"stepMaxSize"` From 713e390ca8a15592eb7cb849d7d38ab5dea7e6a3 Mon Sep 17 00:00:00 2001 From: Steven Guiheux Date: Wed, 2 Jun 2021 10:35:16 +0200 Subject: [PATCH 2/8] fix: simplify condition --- engine/cdn/cdn_item.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/engine/cdn/cdn_item.go b/engine/cdn/cdn_item.go index 8e2a6671a8..ad2be7cc83 100644 --- a/engine/cdn/cdn_item.go +++ b/engine/cdn/cdn_item.go @@ -210,13 +210,13 @@ func (s *Service) getItemFileValue(ctx context.Context, t sdk.CDNItemType, apiRe // If item is in Buffer, get from it if itemUnit != nil { log.Debug(ctx, "getItemFileValue> Getting file from buffer") - + ignoreBuffer := false lockKey := cache.Key(storage.FileBufferKey, s.Units.FileBuffer().ID(), "lock", itemUnit.ID) hasLocked, err := s.Cache.Lock(lockKey, 5*time.Second, 0, 1) if err != nil { log.Error(ctx, "unable to get lock for %s", lockKey) + ignoreBuffer = true } - ignoreBuffer := false if hasLocked { // Reload to be sure that it's not marked as delete _, err := storage.LoadItemUnitByID(ctx, s.Mapper, s.mustDBWithCtx(ctx), itemUnit.ID) @@ -237,7 +237,7 @@ func (s *Service) getItemFileValue(ctx context.Context, t sdk.CDNItemType, apiRe } } - if hasLocked && !ignoreBuffer { + if !ignoreBuffer { rc, err := s.Units.FileBuffer().NewReader(ctx, *itemUnit) if err != nil { return nil, nil, nil, err From ae2180a29d73e84ea62dab6bc0562355934661e7 Mon Sep 17 00:00:00 2001 From: Steven Guiheux Date: Wed, 2 Jun 2021 10:36:53 +0200 Subject: [PATCH 3/8] fix: simplify condition --- engine/cdn/cdn_item.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/cdn/cdn_item.go b/engine/cdn/cdn_item.go index ad2be7cc83..efa8f61a67 100644 --- a/engine/cdn/cdn_item.go +++ b/engine/cdn/cdn_item.go @@ -217,7 +217,7 @@ func (s *Service) getItemFileValue(ctx context.Context, t sdk.CDNItemType, apiRe log.Error(ctx, "unable to get lock for %s", lockKey) ignoreBuffer = true } - if hasLocked { + if hasLocked && !ignoreBuffer { // Reload to be sure that it's not marked as delete _, err := storage.LoadItemUnitByID(ctx, s.Mapper, s.mustDBWithCtx(ctx), itemUnit.ID) if err != nil { From 3fa6738e5971b089a1938d9c5da774cdfa3e1504 Mon Sep 17 00:00:00 2001 From: Steven Guiheux Date: Wed, 2 Jun 2021 11:25:04 +0200 Subject: [PATCH 4/8] fix: ignore buffer if no lock --- engine/cdn/cdn_item.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/engine/cdn/cdn_item.go b/engine/cdn/cdn_item.go index efa8f61a67..94e682ed84 100644 --- a/engine/cdn/cdn_item.go +++ b/engine/cdn/cdn_item.go @@ -217,7 +217,7 @@ func (s *Service) getItemFileValue(ctx context.Context, t sdk.CDNItemType, apiRe log.Error(ctx, "unable to get lock for %s", lockKey) ignoreBuffer = true } - if hasLocked && !ignoreBuffer { + if hasLocked { // Reload to be sure that it's not marked as delete _, err := storage.LoadItemUnitByID(ctx, s.Mapper, s.mustDBWithCtx(ctx), itemUnit.ID) if err != nil { @@ -235,6 +235,8 @@ func (s *Service) getItemFileValue(ctx context.Context, t sdk.CDNItemType, apiRe if err := s.Cache.Unlock(lockKey); err != nil { log.Error(ctx, "unable to release lock for %s", lockKey) } + } else { + ignoreBuffer = true } if !ignoreBuffer { From f6d31061af1f2ecfa04746dc6fd3097de30b108f Mon Sep 17 00:00:00 2001 From: Steven Guiheux Date: Thu, 3 Jun 2021 10:41:33 +0200 Subject: [PATCH 5/8] fix: code review --- engine/cdn/cdn_item.go | 11 +++++++---- engine/cdn/storage/storageunit_purge.go | 4 +++- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/engine/cdn/cdn_item.go b/engine/cdn/cdn_item.go index 94e682ed84..a51b8f8add 100644 --- a/engine/cdn/cdn_item.go +++ b/engine/cdn/cdn_item.go @@ -227,11 +227,14 @@ func (s *Service) getItemFileValue(ctx context.Context, t sdk.CDNItemType, apiRe ignoreBuffer = true } - readerKey := cache.Key(storage.FileBufferKey, s.Units.FileBuffer().ID(), "reader", itemUnit.ID, sdk.UUID()) - if err := s.Cache.SetWithTTL(readerKey, true, 30); err != nil { - log.Error(ctx, "unable to set reader on file buffer: %v", err) - ignoreBuffer = true + if !ignoreBuffer { + readerKey := cache.Key(storage.FileBufferKey, s.Units.FileBuffer().ID(), "reader", itemUnit.ID, sdk.UUID()) + if err := s.Cache.SetWithTTL(readerKey, true, 30); err != nil { + log.Error(ctx, "unable to set reader on file buffer: %v", err) + ignoreBuffer = true + } } + if err := s.Cache.Unlock(lockKey); err != nil { log.Error(ctx, "unable to release lock for %s", lockKey) } diff --git a/engine/cdn/storage/storageunit_purge.go b/engine/cdn/storage/storageunit_purge.go index 6a6ef6d8bd..bcef8cd3a0 100644 --- a/engine/cdn/storage/storageunit_purge.go +++ b/engine/cdn/storage/storageunit_purge.go @@ -2,9 +2,11 @@ package storage import ( "context" + + "github.com/rockbears/log" + "github.com/ovh/cds/engine/gorpmapper" "github.com/ovh/cds/sdk" - "github.com/rockbears/log" ) const ( From dc1829682bb3dc3b2be70552505df0189651bd68 Mon Sep 17 00:00:00 2001 From: Steven Guiheux Date: Thu, 3 Jun 2021 10:52:39 +0200 Subject: [PATCH 6/8] fix: remove all itemunit from buffer --- engine/cdn/cdn_item.go | 9 +++------ engine/cdn/storage/types.go | 11 +++++++++++ 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/engine/cdn/cdn_item.go b/engine/cdn/cdn_item.go index a51b8f8add..d446bf0043 100644 --- a/engine/cdn/cdn_item.go +++ b/engine/cdn/cdn_item.go @@ -384,12 +384,13 @@ func (s *Service) getRandomItemUnitIDByItemID(ctx context.Context, itemID string return "", "", err } + itemUnits = s.Units.FilterItemUnitReaderByType(itemUnits) + itemUnits = s.Units.FilterItemUnitFromBuffer(itemUnits) + if len(itemUnits) == 0 { return "", "", sdk.WithStack(fmt.Errorf("unable to find item units for item with id: %s", itemID)) } - itemUnits = s.Units.FilterItemUnitReaderByType(itemUnits) - var unit *sdk.CDNUnit var selectedItemUnit *sdk.CDNItemUnit if defaultUnitName != "" { @@ -410,10 +411,6 @@ func (s *Service) getRandomItemUnitIDByItemID(ctx context.Context, itemID string return selectedItemUnit.ID, defaultUnitName, nil } - if len(itemUnits) == 1 && s.Units.IsBuffer(itemUnits[0].UnitID) { - return "", "", sdk.WithStack(fmt.Errorf("unable to find a non buffer storage for item: %s", itemID)) - } - // Random pick a unit idx := 0 if len(itemUnits) > 1 { diff --git a/engine/cdn/storage/types.go b/engine/cdn/storage/types.go index 024acfa886..d421ffc587 100644 --- a/engine/cdn/storage/types.go +++ b/engine/cdn/storage/types.go @@ -286,6 +286,17 @@ func (x RunningStorageUnits) GetBuffer(bufferType sdk.CDNItemType) BufferUnit { } } +func (x *RunningStorageUnits) FilterItemUnitFromBuffer(ius []sdk.CDNItemUnit) []sdk.CDNItemUnit { + itemsUnits := make([]sdk.CDNItemUnit, 0, len(ius)) + for _, u := range ius { + if x.IsBuffer(u.UnitID) { + continue + } + itemsUnits = append(itemsUnits, u) + } + return itemsUnits +} + func (x *RunningStorageUnits) FilterItemUnitReaderByType(ius []sdk.CDNItemUnit) []sdk.CDNItemUnit { // Remove cds backend from getting something that is not a log if ius[0].Type != sdk.CDNTypeItemStepLog && ius[0].Type != sdk.CDNTypeItemServiceLog { From 1701aa172a104ded9c0d52f411335357e8abd5a7 Mon Sep 17 00:00:00 2001 From: Steven Guiheux Date: Thu, 3 Jun 2021 10:55:55 +0200 Subject: [PATCH 7/8] fix: remove all itemunit from buffer --- engine/cdn/cdn_gc.go | 1 - 1 file changed, 1 deletion(-) diff --git a/engine/cdn/cdn_gc.go b/engine/cdn/cdn_gc.go index c28fa4652d..2109d14f3f 100644 --- a/engine/cdn/cdn_gc.go +++ b/engine/cdn/cdn_gc.go @@ -172,7 +172,6 @@ func (s *Service) cleanBuffer(ctx context.Context) error { } if !b { log.Info(ctx, "do not delete item unit %s, already locked: %v", uiID) - _ = s.Cache.Unlock(lockKey) continue } readerPatternKey := cache.Key(storage.FileBufferKey, bu.ID(), "reader", uiID, "*") From 9143b0b08eb6d1b75fb699756c2b87f6668ade1a Mon Sep 17 00:00:00 2001 From: Steven Guiheux Date: Thu, 3 Jun 2021 11:47:37 +0200 Subject: [PATCH 8/8] fix: unit test --- engine/cdn/cdn_item_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/cdn/cdn_item_test.go b/engine/cdn/cdn_item_test.go index eb2da59c25..1f9f4df75d 100644 --- a/engine/cdn/cdn_item_test.go +++ b/engine/cdn/cdn_item_test.go @@ -559,5 +559,5 @@ func TestGetFileItemFromBuffer(t *testing.T) { require.Nil(t, reader) require.Nil(t, ui) require.Nil(t, unit) - require.Contains(t, err.Error(), "unable to find a non buffer storage for item") + require.Contains(t, err.Error(), "unable to find item units for item with id: "+it.ID) }