diff --git a/engine/cdn/cdn_gc.go b/engine/cdn/cdn_gc.go index 7a7b786f5d..2109d14f3f 100644 --- a/engine/cdn/cdn_gc.go +++ b/engine/cdn/cdn_gc.go @@ -7,6 +7,7 @@ import ( "github.com/rockbears/log" + "github.com/ovh/cds/engine/cache" "github.com/ovh/cds/engine/cdn/item" "github.com/ovh/cds/engine/cdn/storage" "github.com/ovh/cds/sdk" @@ -153,7 +154,6 @@ func (s *Service) cleanBuffer(ctx context.Context) error { if len(itemIDs) == 0 { continue } - itemUnitsIDs, err := storage.LoadAllItemUnitsIDsByItemIDsAndUnitID(s.mustDBWithCtx(ctx), bu.ID(), itemIDs) if err != nil { ctx := sdk.ContextWithStacktrace(ctx, err) @@ -161,6 +161,34 @@ func (s *Service) cleanBuffer(ctx context.Context) error { continue } + itemUnitsToMark := make([]string, 0, len(itemUnitsIDs)) + for i := range itemUnitsIDs { + uiID := itemUnitsIDs[i] + lockKey := cache.Key(storage.FileBufferKey, bu.ID(), "lock", uiID) + b, err := s.Cache.Lock(lockKey, 30*time.Second, 0, 1) + if err != nil { + log.Error(ctx, "unable to lock unit item: %v: %v", uiID, err) + continue + } + if !b { + log.Info(ctx, "do not delete item unit %s, already locked: %v", uiID) + continue + } + readerPatternKey := cache.Key(storage.FileBufferKey, bu.ID(), "reader", uiID, "*") + keys, err := s.Cache.Keys(cache.Key(readerPatternKey)) + if err != nil { + log.Error(ctx, "unable to check if item unit is currently reading by cdn") + _ = s.Cache.Unlock(lockKey) + continue + } + if len(keys) > 0 { + log.Info(ctx, "do not delete item unit, it is currently reading by cdn") + _ = s.Cache.Unlock(lockKey) + continue + } + itemUnitsToMark = append(itemUnitsToMark, uiID) + } + tx, err := s.mustDBWithCtx(ctx).Begin() if err != nil { ctx := sdk.ContextWithStacktrace(ctx, err) @@ -168,7 +196,7 @@ func (s *Service) cleanBuffer(ctx context.Context) error { continue } - if _, err := storage.MarkItemUnitToDelete(tx, itemUnitsIDs); err != nil { + if _, err := storage.MarkItemUnitToDelete(tx, itemUnitsToMark); err != nil { _ = tx.Rollback() ctx := sdk.ContextWithStacktrace(ctx, err) log.Error(ctx, "unable to mark item as delete: %v", err) diff --git a/engine/cdn/cdn_gc_test.go b/engine/cdn/cdn_gc_test.go index 0bd0d60ca0..07a16f7110 100644 --- a/engine/cdn/cdn_gc_test.go +++ b/engine/cdn/cdn_gc_test.go @@ -6,6 +6,12 @@ import ( "testing" "time" + "github.com/ovh/symmecrypt/ciphers/aesgcm" + "github.com/ovh/symmecrypt/convergent" + "github.com/rockbears/log" + "github.com/stretchr/testify/require" + + "github.com/ovh/cds/engine/cache" "github.com/ovh/cds/engine/cdn/item" "github.com/ovh/cds/engine/cdn/lru" "github.com/ovh/cds/engine/cdn/storage" @@ -13,10 +19,6 @@ import ( "github.com/ovh/cds/engine/gorpmapper" "github.com/ovh/cds/engine/test" "github.com/ovh/cds/sdk" - "github.com/ovh/symmecrypt/ciphers/aesgcm" - "github.com/ovh/symmecrypt/convergent" - "github.com/rockbears/log" - "github.com/stretchr/testify/require" ) func TestCleanSynchronizedItem(t *testing.T) { @@ -503,3 +505,127 @@ func TestPurgeItem(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, len(items)) } + +func TestCleanSynchronizedReadingItem(t *testing.T) { + m := gorpmapper.New() + item.InitDBMapping(m) + storage.InitDBMapping(m) + + log.Factory = log.NewTestingWrapper(t) + db, factory, store, cancel := test.SetupPGToCancel(t, m, sdk.TypeCDN) + t.Cleanup(cancel) + + cfg := test.LoadTestingConf(t, sdk.TypeCDN) + + cdntest.ClearItem(t, context.TODO(), m, db) + cdntest.ClearUnits(t, context.TODO(), m, db) + + // Create cdn service + s := Service{ + DBConnectionFactory: factory, + Cache: store, + Mapper: m, + } + s.GoRoutines = sdk.NewGoRoutines(context.TODO()) + + tmpDir, err := ioutil.TempDir("", t.Name()+"-cdn-1-*") + require.NoError(t, err) + + tmpDir2, err := ioutil.TempDir("", t.Name()+"-cdn-2-*") + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + t.Cleanup(cancel) + + cdnUnits, err := storage.Init(ctx, m, store, db.DbMap, sdk.NewGoRoutines(ctx), storage.Configuration{ + HashLocatorSalt: "thisismysalt", + Buffers: map[string]storage.BufferConfiguration{ + "redis_buffer": { + Redis: &storage.RedisBufferConfiguration{ + Host: cfg["redisHost"], + Password: cfg["redisPassword"], + }, + BufferType: storage.CDNBufferTypeLog, + }, + "file_buffer": { + Local: &storage.LocalBufferConfiguration{ + Path: tmpDir2, + }, + BufferType: storage.CDNBufferTypeFile, + }, + }, + Storages: map[string]storage.StorageConfiguration{ + "fs-backend": { + Local: &storage.LocalStorageConfiguration{ + Path: tmpDir, + Encryption: []convergent.ConvergentEncryptionConfig{ + { + Cipher: aesgcm.CipherName, + LocatorSalt: "secret_locator_salt", + SecretValue: "secret_value", + }, + }, + }, + }, + "cds-backend": { + CDS: &storage.CDSStorageConfiguration{ + Host: "lolcat.host", + Token: "mytoken", + }, + }, + }, + }) + require.NoError(t, err) + s.Units = cdnUnits + + // Add Item in redis / fs/ cds -will be delete from redis + it := sdk.CDNItem{ + ID: sdk.UUID(), + Type: sdk.CDNTypeItemRunResult, + Status: sdk.CDNStatusItemCompleted, + APIRefHash: sdk.RandomString(10), + } + require.NoError(t, item.Insert(context.TODO(), s.Mapper, db, &it)) + iuCDS := sdk.CDNItemUnit{UnitID: s.Units.Storages[1].ID(), ItemID: it.ID, Type: it.Type} + require.NoError(t, storage.InsertItemUnit(context.TODO(), s.Mapper, db, &iuCDS)) + iuFileBuf := sdk.CDNItemUnit{UnitID: s.Units.FileBuffer().ID(), ItemID: it.ID, Type: it.Type} + require.NoError(t, storage.InsertItemUnit(context.TODO(), s.Mapper, db, &iuFileBuf)) + iuFileStorage := sdk.CDNItemUnit{UnitID: s.Units.Storages[0].ID(), ItemID: it.ID, Type: it.Type} + require.NoError(t, storage.InsertItemUnit(context.TODO(), s.Mapper, db, &iuFileStorage)) + + /////////////////////////////////////// + // 1st test, getItem Lock the item unit + /////////////////////////////////////// + lockKey := cache.Key(storage.FileBufferKey, s.Units.FileBuffer().ID(), "lock", iuFileBuf.ID) + hasLocked, err := s.Cache.Lock(lockKey, 5*time.Second, 0, 1) + require.NoError(t, err) + t.Cleanup(func() { + s.Cache.Unlock(lockKey) + }) + require.True(t, hasLocked) + require.NoError(t, s.cleanBuffer(context.TODO())) + + _, err = storage.LoadItemUnitByID(ctx, m, db, iuFileBuf.ID) + require.NoError(t, err) + + require.NoError(t, s.Cache.Unlock(lockKey)) + + //////////////////////////////////////////////////////// + // 2nd test, getItem is reading the file from the buffer + //////////////////////////////////////////////////////// + readerKey := cache.Key(storage.FileBufferKey, s.Units.FileBuffer().ID(), "reader", iuFileBuf.ID, sdk.UUID()) + require.NoError(t, s.Cache.SetWithTTL(readerKey, true, 30)) + require.NoError(t, s.cleanBuffer(context.TODO())) + + _, err = storage.LoadItemUnitByID(ctx, m, db, iuFileBuf.ID) + require.NoError(t, err) + + require.NoError(t, s.Cache.Delete(readerKey)) + //////////////////////////////////////////////////////// + // 3rd test, mark as delete + //////////////////////////////////////////////////////// + require.NoError(t, s.cleanBuffer(context.TODO())) + + _, err = storage.LoadItemUnitByID(ctx, m, db, iuFileBuf.ID) + require.True(t, sdk.ErrorIs(err, sdk.ErrNotFound)) +} diff --git a/engine/cdn/cdn_item.go b/engine/cdn/cdn_item.go index 82adf7afe1..d446bf0043 100644 --- a/engine/cdn/cdn_item.go +++ b/engine/cdn/cdn_item.go @@ -15,6 +15,7 @@ import ( "github.com/rockbears/log" + "github.com/ovh/cds/engine/cache" "github.com/ovh/cds/engine/cdn/item" "github.com/ovh/cds/engine/cdn/storage" "github.com/ovh/cds/engine/gorpmapper" @@ -208,13 +209,47 @@ func (s *Service) getItemFileValue(ctx context.Context, t sdk.CDNItemType, apiRe // If item is in Buffer, get from it if itemUnit != nil { - log.Error(ctx, "getItemFileValue> Getting file from buffer") - - rc, err := s.Units.FileBuffer().NewReader(ctx, *itemUnit) + log.Debug(ctx, "getItemFileValue> Getting file from buffer") + ignoreBuffer := false + lockKey := cache.Key(storage.FileBufferKey, s.Units.FileBuffer().ID(), "lock", itemUnit.ID) + hasLocked, err := s.Cache.Lock(lockKey, 5*time.Second, 0, 1) if err != nil { - return nil, nil, nil, err + log.Error(ctx, "unable to get lock for %s", lockKey) + ignoreBuffer = true + } + if hasLocked { + // Reload to be sure that it's not marked as delete + _, err := storage.LoadItemUnitByID(ctx, s.Mapper, s.mustDBWithCtx(ctx), itemUnit.ID) + if err != nil { + if !sdk.ErrorIs(err, sdk.ErrNotFound) { + log.Error(ctx, "unable to load item unit: %v", err) + } + ignoreBuffer = true + } + + if !ignoreBuffer { + readerKey := cache.Key(storage.FileBufferKey, s.Units.FileBuffer().ID(), "reader", itemUnit.ID, sdk.UUID()) + if err := s.Cache.SetWithTTL(readerKey, true, 30); err != nil { + log.Error(ctx, "unable to set reader on file buffer: %v", err) + ignoreBuffer = true + } + } + + if err := s.Cache.Unlock(lockKey); err != nil { + log.Error(ctx, "unable to release lock for %s", lockKey) + } + } else { + ignoreBuffer = true } - return itemUnit, s.Units.FileBuffer(), rc, nil + + if !ignoreBuffer { + rc, err := s.Units.FileBuffer().NewReader(ctx, *itemUnit) + if err != nil { + return nil, nil, nil, err + } + return itemUnit, s.Units.FileBuffer(), rc, nil + } + } // Get from storage @@ -349,12 +384,13 @@ func (s *Service) getRandomItemUnitIDByItemID(ctx context.Context, itemID string return "", "", err } + itemUnits = s.Units.FilterItemUnitReaderByType(itemUnits) + itemUnits = s.Units.FilterItemUnitFromBuffer(itemUnits) + if len(itemUnits) == 0 { return "", "", sdk.WithStack(fmt.Errorf("unable to find item units for item with id: %s", itemID)) } - itemUnits = s.Units.FilterItemUnitReaderByType(itemUnits) - var unit *sdk.CDNUnit var selectedItemUnit *sdk.CDNItemUnit if defaultUnitName != "" { diff --git a/engine/cdn/cdn_item_test.go b/engine/cdn/cdn_item_test.go index 6c336c3ab7..1f9f4df75d 100644 --- a/engine/cdn/cdn_item_test.go +++ b/engine/cdn/cdn_item_test.go @@ -13,6 +13,7 @@ import ( "github.com/rockbears/log" "github.com/stretchr/testify/require" + "github.com/ovh/cds/engine/cache" "github.com/ovh/cds/engine/cdn/item" "github.com/ovh/cds/engine/cdn/lru" "github.com/ovh/cds/engine/cdn/redis" @@ -509,3 +510,54 @@ func TestGetItemValue_ThousandLinesReverse(t *testing.T) { require.Equal(t, int64(0), lines[226].Number) require.Equal(t, "Line 0\n", lines[226].Value) } + +func TestGetFileItemFromBuffer(t *testing.T) { + m := gorpmapper.New() + item.InitDBMapping(m) + storage.InitDBMapping(m) + + log.Factory = log.NewTestingWrapper(t) + db, factory, store, cancel := test.SetupPGToCancel(t, m, sdk.TypeCDN) + t.Cleanup(cancel) + + cdntest.ClearItem(t, context.TODO(), m, db) + cdntest.ClearSyncRedisSet(t, store, "local_storage") + + // Create cdn service + s := Service{ + DBConnectionFactory: factory, + Cache: store, + Mapper: m, + } + s.GoRoutines = sdk.NewGoRoutines(context.TODO()) + + ctx, cancel := context.WithCancel(context.TODO()) + t.Cleanup(cancel) + + _ = test.LoadTestingConf(t, sdk.TypeCDN) + cdnUnits := newRunningStorageUnits(t, m, s.DBConnectionFactory.GetDBMap(m)(), ctx, store) + s.Units = cdnUnits + + // Add Item in redis / fs/ cds -will be delete from redis + it := sdk.CDNItem{ + ID: sdk.UUID(), + Type: sdk.CDNTypeItemRunResult, + Status: sdk.CDNStatusItemCompleted, + APIRefHash: sdk.RandomString(10), + } + require.NoError(t, item.Insert(context.TODO(), s.Mapper, db, &it)) + iuFileBuf := sdk.CDNItemUnit{UnitID: s.Units.FileBuffer().ID(), ItemID: it.ID, Type: it.Type} + require.NoError(t, storage.InsertItemUnit(context.TODO(), s.Mapper, db, &iuFileBuf)) + + // IU locked by gc + lockKey := cache.Key(storage.FileBufferKey, s.Units.FileBuffer().ID(), "lock", iuFileBuf.ID) + hasLocked, err := s.Cache.Lock(lockKey, 5*time.Second, 0, 1) + require.NoError(t, err) + require.True(t, hasLocked) + + ui, unit, reader, err := s.getItemFileValue(ctx, sdk.CDNTypeItemRunResult, it.APIRefHash, getItemFileOptions{}) + require.Nil(t, reader) + require.Nil(t, ui) + require.Nil(t, unit) + require.Contains(t, err.Error(), "unable to find item units for item with id: "+it.ID) +} diff --git a/engine/cdn/storage/storageunit.go b/engine/cdn/storage/storageunit.go index 81e7bff3d1..e670df7296 100644 --- a/engine/cdn/storage/storageunit.go +++ b/engine/cdn/storage/storageunit.go @@ -20,6 +20,10 @@ import ( cdslog "github.com/ovh/cds/sdk/log" ) +var ( + FileBufferKey = cache.Key("cdn", "unit") +) + func (r RunningStorageUnits) Storage(name string) StorageUnit { for _, x := range r.Storages { if x.Name() == name { diff --git a/engine/cdn/storage/types.go b/engine/cdn/storage/types.go index 53f4563980..d421ffc587 100644 --- a/engine/cdn/storage/types.go +++ b/engine/cdn/storage/types.go @@ -286,6 +286,17 @@ func (x RunningStorageUnits) GetBuffer(bufferType sdk.CDNItemType) BufferUnit { } } +func (x *RunningStorageUnits) FilterItemUnitFromBuffer(ius []sdk.CDNItemUnit) []sdk.CDNItemUnit { + itemsUnits := make([]sdk.CDNItemUnit, 0, len(ius)) + for _, u := range ius { + if x.IsBuffer(u.UnitID) { + continue + } + itemsUnits = append(itemsUnits, u) + } + return itemsUnits +} + func (x *RunningStorageUnits) FilterItemUnitReaderByType(ius []sdk.CDNItemUnit) []sdk.CDNItemUnit { // Remove cds backend from getting something that is not a log if ius[0].Type != sdk.CDNTypeItemStepLog && ius[0].Type != sdk.CDNTypeItemServiceLog { @@ -307,6 +318,15 @@ func (x *RunningStorageUnits) FilterItemUnitReaderByType(ius []sdk.CDNItemUnit) return ius } +func (x *RunningStorageUnits) IsBuffer(id string) bool { + for _, buf := range x.Buffers { + if buf.ID() == id { + return true + } + } + return false +} + type LogConfig struct { // Step logs StepMaxSize int64 `toml:"stepMaxSize" default:"15728640" comment:"Max step logs size in bytes (default: 15MB)" json:"stepMaxSize"`