Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cdn): get item unit from buffer while deleting it #5831

Merged
merged 8 commits into from
Jun 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 30 additions & 2 deletions engine/cdn/cdn_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/rockbears/log"

"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/cdn/item"
"github.com/ovh/cds/engine/cdn/storage"
"github.com/ovh/cds/sdk"
Expand Down Expand Up @@ -153,22 +154,49 @@ func (s *Service) cleanBuffer(ctx context.Context) error {
if len(itemIDs) == 0 {
continue
}

itemUnitsIDs, err := storage.LoadAllItemUnitsIDsByItemIDsAndUnitID(s.mustDBWithCtx(ctx), bu.ID(), itemIDs)
if err != nil {
ctx := sdk.ContextWithStacktrace(ctx, err)
log.Error(ctx, "unable to load item units: %v", err)
continue
}

itemUnitsToMark := make([]string, 0, len(itemUnitsIDs))
for i := range itemUnitsIDs {
uiID := itemUnitsIDs[i]
lockKey := cache.Key(storage.FileBufferKey, bu.ID(), "lock", uiID)
b, err := s.Cache.Lock(lockKey, 30*time.Second, 0, 1)
if err != nil {
log.Error(ctx, "unable to lock unit item: %v: %v", uiID, err)
continue
}
if !b {
log.Info(ctx, "do not delete item unit %s, already locked: %v", uiID)
continue
}
readerPatternKey := cache.Key(storage.FileBufferKey, bu.ID(), "reader", uiID, "*")
keys, err := s.Cache.Keys(cache.Key(readerPatternKey))
if err != nil {
log.Error(ctx, "unable to check if item unit is currently reading by cdn")
_ = s.Cache.Unlock(lockKey)
continue
}
if len(keys) > 0 {
log.Info(ctx, "do not delete item unit, it is currently reading by cdn")
_ = s.Cache.Unlock(lockKey)
continue
}
itemUnitsToMark = append(itemUnitsToMark, uiID)
}

tx, err := s.mustDBWithCtx(ctx).Begin()
if err != nil {
ctx := sdk.ContextWithStacktrace(ctx, err)
log.Error(ctx, "unable to start transaction: %v", err)
continue
}

if _, err := storage.MarkItemUnitToDelete(tx, itemUnitsIDs); err != nil {
if _, err := storage.MarkItemUnitToDelete(tx, itemUnitsToMark); err != nil {
_ = tx.Rollback()
ctx := sdk.ContextWithStacktrace(ctx, err)
log.Error(ctx, "unable to mark item as delete: %v", err)
Expand Down
134 changes: 130 additions & 4 deletions engine/cdn/cdn_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,19 @@ import (
"testing"
"time"

"github.com/ovh/symmecrypt/ciphers/aesgcm"
"github.com/ovh/symmecrypt/convergent"
"github.com/rockbears/log"
"github.com/stretchr/testify/require"

"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/cdn/item"
"github.com/ovh/cds/engine/cdn/lru"
"github.com/ovh/cds/engine/cdn/storage"
cdntest "github.com/ovh/cds/engine/cdn/test"
"github.com/ovh/cds/engine/gorpmapper"
"github.com/ovh/cds/engine/test"
"github.com/ovh/cds/sdk"
"github.com/ovh/symmecrypt/ciphers/aesgcm"
"github.com/ovh/symmecrypt/convergent"
"github.com/rockbears/log"
"github.com/stretchr/testify/require"
)

func TestCleanSynchronizedItem(t *testing.T) {
Expand Down Expand Up @@ -503,3 +505,127 @@ func TestPurgeItem(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(items))
}

func TestCleanSynchronizedReadingItem(t *testing.T) {
m := gorpmapper.New()
item.InitDBMapping(m)
storage.InitDBMapping(m)

log.Factory = log.NewTestingWrapper(t)
db, factory, store, cancel := test.SetupPGToCancel(t, m, sdk.TypeCDN)
t.Cleanup(cancel)

cfg := test.LoadTestingConf(t, sdk.TypeCDN)

cdntest.ClearItem(t, context.TODO(), m, db)
cdntest.ClearUnits(t, context.TODO(), m, db)

// Create cdn service
s := Service{
DBConnectionFactory: factory,
Cache: store,
Mapper: m,
}
s.GoRoutines = sdk.NewGoRoutines(context.TODO())

tmpDir, err := ioutil.TempDir("", t.Name()+"-cdn-1-*")
require.NoError(t, err)

tmpDir2, err := ioutil.TempDir("", t.Name()+"-cdn-2-*")
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
t.Cleanup(cancel)

cdnUnits, err := storage.Init(ctx, m, store, db.DbMap, sdk.NewGoRoutines(ctx), storage.Configuration{
HashLocatorSalt: "thisismysalt",
Buffers: map[string]storage.BufferConfiguration{
"redis_buffer": {
Redis: &storage.RedisBufferConfiguration{
Host: cfg["redisHost"],
Password: cfg["redisPassword"],
},
BufferType: storage.CDNBufferTypeLog,
},
"file_buffer": {
Local: &storage.LocalBufferConfiguration{
Path: tmpDir2,
},
BufferType: storage.CDNBufferTypeFile,
},
},
Storages: map[string]storage.StorageConfiguration{
"fs-backend": {
Local: &storage.LocalStorageConfiguration{
Path: tmpDir,
Encryption: []convergent.ConvergentEncryptionConfig{
{
Cipher: aesgcm.CipherName,
LocatorSalt: "secret_locator_salt",
SecretValue: "secret_value",
},
},
},
},
"cds-backend": {
CDS: &storage.CDSStorageConfiguration{
Host: "lolcat.host",
Token: "mytoken",
},
},
},
})
require.NoError(t, err)
s.Units = cdnUnits

// Add Item in redis / fs/ cds -will be delete from redis
it := sdk.CDNItem{
ID: sdk.UUID(),
Type: sdk.CDNTypeItemRunResult,
Status: sdk.CDNStatusItemCompleted,
APIRefHash: sdk.RandomString(10),
}
require.NoError(t, item.Insert(context.TODO(), s.Mapper, db, &it))
iuCDS := sdk.CDNItemUnit{UnitID: s.Units.Storages[1].ID(), ItemID: it.ID, Type: it.Type}
require.NoError(t, storage.InsertItemUnit(context.TODO(), s.Mapper, db, &iuCDS))
iuFileBuf := sdk.CDNItemUnit{UnitID: s.Units.FileBuffer().ID(), ItemID: it.ID, Type: it.Type}
require.NoError(t, storage.InsertItemUnit(context.TODO(), s.Mapper, db, &iuFileBuf))
iuFileStorage := sdk.CDNItemUnit{UnitID: s.Units.Storages[0].ID(), ItemID: it.ID, Type: it.Type}
require.NoError(t, storage.InsertItemUnit(context.TODO(), s.Mapper, db, &iuFileStorage))

///////////////////////////////////////
// 1st test, getItem Lock the item unit
///////////////////////////////////////
lockKey := cache.Key(storage.FileBufferKey, s.Units.FileBuffer().ID(), "lock", iuFileBuf.ID)
hasLocked, err := s.Cache.Lock(lockKey, 5*time.Second, 0, 1)
require.NoError(t, err)
t.Cleanup(func() {
s.Cache.Unlock(lockKey)
})
require.True(t, hasLocked)
require.NoError(t, s.cleanBuffer(context.TODO()))

_, err = storage.LoadItemUnitByID(ctx, m, db, iuFileBuf.ID)
require.NoError(t, err)

require.NoError(t, s.Cache.Unlock(lockKey))

////////////////////////////////////////////////////////
// 2nd test, getItem is reading the file from the buffer
////////////////////////////////////////////////////////
readerKey := cache.Key(storage.FileBufferKey, s.Units.FileBuffer().ID(), "reader", iuFileBuf.ID, sdk.UUID())
require.NoError(t, s.Cache.SetWithTTL(readerKey, true, 30))
require.NoError(t, s.cleanBuffer(context.TODO()))

_, err = storage.LoadItemUnitByID(ctx, m, db, iuFileBuf.ID)
require.NoError(t, err)

require.NoError(t, s.Cache.Delete(readerKey))
////////////////////////////////////////////////////////
// 3rd test, mark as delete
////////////////////////////////////////////////////////
require.NoError(t, s.cleanBuffer(context.TODO()))

_, err = storage.LoadItemUnitByID(ctx, m, db, iuFileBuf.ID)
require.True(t, sdk.ErrorIs(err, sdk.ErrNotFound))
}
50 changes: 43 additions & 7 deletions engine/cdn/cdn_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/rockbears/log"

"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/cdn/item"
"github.com/ovh/cds/engine/cdn/storage"
"github.com/ovh/cds/engine/gorpmapper"
Expand Down Expand Up @@ -208,13 +209,47 @@ func (s *Service) getItemFileValue(ctx context.Context, t sdk.CDNItemType, apiRe

// If item is in Buffer, get from it
if itemUnit != nil {
log.Error(ctx, "getItemFileValue> Getting file from buffer")

rc, err := s.Units.FileBuffer().NewReader(ctx, *itemUnit)
log.Debug(ctx, "getItemFileValue> Getting file from buffer")
ignoreBuffer := false
lockKey := cache.Key(storage.FileBufferKey, s.Units.FileBuffer().ID(), "lock", itemUnit.ID)
hasLocked, err := s.Cache.Lock(lockKey, 5*time.Second, 0, 1)
if err != nil {
return nil, nil, nil, err
log.Error(ctx, "unable to get lock for %s", lockKey)
ignoreBuffer = true
}
if hasLocked {
// Reload to be sure that it's not marked as delete
_, err := storage.LoadItemUnitByID(ctx, s.Mapper, s.mustDBWithCtx(ctx), itemUnit.ID)
if err != nil {
if !sdk.ErrorIs(err, sdk.ErrNotFound) {
log.Error(ctx, "unable to load item unit: %v", err)
}
ignoreBuffer = true
}

if !ignoreBuffer {
readerKey := cache.Key(storage.FileBufferKey, s.Units.FileBuffer().ID(), "reader", itemUnit.ID, sdk.UUID())
if err := s.Cache.SetWithTTL(readerKey, true, 30); err != nil {
log.Error(ctx, "unable to set reader on file buffer: %v", err)
ignoreBuffer = true
}
}

if err := s.Cache.Unlock(lockKey); err != nil {
log.Error(ctx, "unable to release lock for %s", lockKey)
}
} else {
ignoreBuffer = true
}
return itemUnit, s.Units.FileBuffer(), rc, nil

if !ignoreBuffer {
rc, err := s.Units.FileBuffer().NewReader(ctx, *itemUnit)
if err != nil {
return nil, nil, nil, err
}
return itemUnit, s.Units.FileBuffer(), rc, nil
}

}

// Get from storage
Expand Down Expand Up @@ -349,12 +384,13 @@ func (s *Service) getRandomItemUnitIDByItemID(ctx context.Context, itemID string
return "", "", err
}

itemUnits = s.Units.FilterItemUnitReaderByType(itemUnits)
itemUnits = s.Units.FilterItemUnitFromBuffer(itemUnits)

if len(itemUnits) == 0 {
return "", "", sdk.WithStack(fmt.Errorf("unable to find item units for item with id: %s", itemID))
}

itemUnits = s.Units.FilterItemUnitReaderByType(itemUnits)

var unit *sdk.CDNUnit
var selectedItemUnit *sdk.CDNItemUnit
if defaultUnitName != "" {
Expand Down
52 changes: 52 additions & 0 deletions engine/cdn/cdn_item_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/rockbears/log"
"github.com/stretchr/testify/require"

"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/cdn/item"
"github.com/ovh/cds/engine/cdn/lru"
"github.com/ovh/cds/engine/cdn/redis"
Expand Down Expand Up @@ -509,3 +510,54 @@ func TestGetItemValue_ThousandLinesReverse(t *testing.T) {
require.Equal(t, int64(0), lines[226].Number)
require.Equal(t, "Line 0\n", lines[226].Value)
}

func TestGetFileItemFromBuffer(t *testing.T) {
m := gorpmapper.New()
item.InitDBMapping(m)
storage.InitDBMapping(m)

log.Factory = log.NewTestingWrapper(t)
db, factory, store, cancel := test.SetupPGToCancel(t, m, sdk.TypeCDN)
t.Cleanup(cancel)

cdntest.ClearItem(t, context.TODO(), m, db)
cdntest.ClearSyncRedisSet(t, store, "local_storage")

// Create cdn service
s := Service{
DBConnectionFactory: factory,
Cache: store,
Mapper: m,
}
s.GoRoutines = sdk.NewGoRoutines(context.TODO())

ctx, cancel := context.WithCancel(context.TODO())
t.Cleanup(cancel)

_ = test.LoadTestingConf(t, sdk.TypeCDN)
cdnUnits := newRunningStorageUnits(t, m, s.DBConnectionFactory.GetDBMap(m)(), ctx, store)
s.Units = cdnUnits

// Add Item in redis / fs/ cds -will be delete from redis
it := sdk.CDNItem{
ID: sdk.UUID(),
Type: sdk.CDNTypeItemRunResult,
Status: sdk.CDNStatusItemCompleted,
APIRefHash: sdk.RandomString(10),
}
require.NoError(t, item.Insert(context.TODO(), s.Mapper, db, &it))
iuFileBuf := sdk.CDNItemUnit{UnitID: s.Units.FileBuffer().ID(), ItemID: it.ID, Type: it.Type}
require.NoError(t, storage.InsertItemUnit(context.TODO(), s.Mapper, db, &iuFileBuf))

// IU locked by gc
lockKey := cache.Key(storage.FileBufferKey, s.Units.FileBuffer().ID(), "lock", iuFileBuf.ID)
hasLocked, err := s.Cache.Lock(lockKey, 5*time.Second, 0, 1)
require.NoError(t, err)
require.True(t, hasLocked)

ui, unit, reader, err := s.getItemFileValue(ctx, sdk.CDNTypeItemRunResult, it.APIRefHash, getItemFileOptions{})
require.Nil(t, reader)
require.Nil(t, ui)
require.Nil(t, unit)
require.Contains(t, err.Error(), "unable to find item units for item with id: "+it.ID)
}
4 changes: 4 additions & 0 deletions engine/cdn/storage/storageunit.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import (
cdslog "github.com/ovh/cds/sdk/log"
)

var (
FileBufferKey = cache.Key("cdn", "unit")
)

func (r RunningStorageUnits) Storage(name string) StorageUnit {
for _, x := range r.Storages {
if x.Name() == name {
Expand Down
20 changes: 20 additions & 0 deletions engine/cdn/storage/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,17 @@ func (x RunningStorageUnits) GetBuffer(bufferType sdk.CDNItemType) BufferUnit {
}
}

func (x *RunningStorageUnits) FilterItemUnitFromBuffer(ius []sdk.CDNItemUnit) []sdk.CDNItemUnit {
itemsUnits := make([]sdk.CDNItemUnit, 0, len(ius))
for _, u := range ius {
if x.IsBuffer(u.UnitID) {
continue
}
itemsUnits = append(itemsUnits, u)
}
return itemsUnits
}

func (x *RunningStorageUnits) FilterItemUnitReaderByType(ius []sdk.CDNItemUnit) []sdk.CDNItemUnit {
// Remove cds backend from getting something that is not a log
if ius[0].Type != sdk.CDNTypeItemStepLog && ius[0].Type != sdk.CDNTypeItemServiceLog {
Expand All @@ -307,6 +318,15 @@ func (x *RunningStorageUnits) FilterItemUnitReaderByType(ius []sdk.CDNItemUnit)
return ius
}

func (x *RunningStorageUnits) IsBuffer(id string) bool {
for _, buf := range x.Buffers {
if buf.ID() == id {
return true
}
}
return false
}

type LogConfig struct {
// Step logs
StepMaxSize int64 `toml:"stepMaxSize" default:"15728640" comment:"Max step logs size in bytes (default: 15MB)" json:"stepMaxSize"`
Expand Down