Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: issue when inventory was not correctly reset on entityID change #811

Merged
merged 4 commits into from
Nov 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ func (a *Agent) registerEntityInventory(entity entity.Entity) error {
if a.Context.cfg.RegisterEnabled {
inv.sender, err = newPatchSenderVortex(entityKey, a.Context.getAgentKey(), a.Context, a.store, a.userAgent, a.Context.Identity, a.provideIDs, a.entityMap, a.httpClient)
} else {
fileName := entity.Key.String()
fileName := a.store.EntityFolder(entity.Key.String())
lastSubmission := delta.NewLastSubmissionStore(a.store.DataDir, fileName)
lastEntityID := delta.NewEntityIDFilePersist(a.store.DataDir, fileName)
inv.sender, err = newPatchSender(entity, a.Context, a.store, lastSubmission, lastEntityID, a.userAgent, a.Context.Identity, a.httpClient)
Expand Down Expand Up @@ -763,12 +763,12 @@ func (a *Agent) Run() (err error) {
_ = a.updateIDLookupTable(data.Data)
}

entityKey := data.Entity.Key.String()
if _, ok := a.inventories[entityKey]; !ok {
_ = a.registerEntityInventory(data.Entity)
}

if !data.NotApplicable {
entityKey := data.Entity.Key.String()
if _, ok := a.inventories[entityKey]; !ok {
_ = a.registerEntityInventory(data.Entity)
}

if err := a.storePluginOutput(data); err != nil {
alog.WithError(err).Error("problem storing plugin output")
}
Expand Down
22 changes: 12 additions & 10 deletions internal/agent/delta/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ var slog = log.WithComponent("Delta Store")

// Folders that do not belong to entities nor plugins, so they have to be ignored
var nonEntityFolders = map[string]bool{
CACHE_DIR: true,
SAMPLING_REPO: true,
CACHE_DIR: true,
SAMPLING_REPO: true,
lastSuccessSubmissionFolder: true,
lastEntityIDFolder: true,
}

type delta struct {
Expand Down Expand Up @@ -172,25 +174,25 @@ func (s *Store) DeltaFilePath(pluginItem *PluginInfo, entityKey string) string {
func (s *Store) cachedDirPath(pluginItem *PluginInfo, entityKey string) string {
return filepath.Join(s.CacheDir,
pluginItem.Plugin,
s.entityFolder(entityKey))
s.EntityFolder(entityKey))
}

func (s *Store) cachedFilePath(pluginItem *PluginInfo, entityKey string) string {
return filepath.Join(s.CacheDir,
pluginItem.Plugin,
s.entityFolder(entityKey),
s.EntityFolder(entityKey),
pluginItem.FileName)
}

func (s *Store) SourceFilePath(pluginItem *PluginInfo, entityKey string) string {
return filepath.Join(s.DataDir,
pluginItem.Plugin,
s.entityFolder(entityKey),
s.EntityFolder(entityKey),
pluginItem.FileName)
}

func (s *Store) PluginDirPath(pluginCategory, entityKey string) string {
return filepath.Join(s.DataDir, pluginCategory, s.entityFolder(entityKey))
return filepath.Join(s.DataDir, pluginCategory, s.EntityFolder(entityKey))
}

func (s *Store) clearPluginDeltaStore(pluginItem *PluginInfo, entityKey string) (err error) {
Expand Down Expand Up @@ -487,7 +489,7 @@ func (s *Store) collectPluginFiles(dir string, entityKey string, fileFilterRE *r
}

pluginList = make([]*PluginInfo, 0, len(pluginsFileInfo))
entityFolder := s.entityFolder(entityKey)
entityFolder := s.EntityFolder(entityKey)

for _, dirInfo := range pluginsFileInfo {
if dirInfo != nil && dirInfo.IsDir() && !nonEntityFolders[dirInfo.Name()] {
Expand Down Expand Up @@ -741,9 +743,9 @@ func (s *Store) ChangeDefaultEntity(newEntityKey string) {
s.defaultEntityKey = newEntityKey
}

// entityFolder provides the folder name for a given entity ID, or for the agent default entity in case entityKey is an
// EntityFolder provides the folder name for a given entity ID, or for the agent default entity in case entityKey is an
// empty string
func (s *Store) entityFolder(entityKey string) string {
func (s *Store) EntityFolder(entityKey string) string {
if entityKey == "" || entityKey == s.defaultEntityKey {
return localEntityFolder
}
Expand All @@ -752,7 +754,7 @@ func (s *Store) entityFolder(entityKey string) string {

// RemoveEntity removes the entity cached storage.
func (s *Store) RemoveEntity(entityKey string) error {
return s.RemoveEntityFolders(s.entityFolder(entityKey))
return s.RemoveEntityFolders(s.EntityFolder(entityKey))
}

// RemoveEntityFolders removes the entity cached storage from the entities whose folder is equal to the argument.
Expand Down
19 changes: 8 additions & 11 deletions internal/agent/patch_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,33 +128,35 @@ func (p *patchSenderIngest) Process() (err error) {
if longTimeDisconnected || agentEntityIDChanged {
llog.WithField("offlineTime", p.resetIfOffline).
WithField("agentEntityIDChanged", agentEntityIDChanged).
WithField("entityKey", entityKey).
Info("Removing inventory cache")

// Removing the store for the entity would force the agent recreating a fresh Delta Store
if err := p.store.RemoveEntity(entityKey); err != nil {
llog.WithError(err).Warn("Could not remove inventory cache")
}
// Relaunching one-time harvesters to avoid losing the inventories after reset
p.context.Reconnect()
p.lastDeltaRemoval = now

if agentEntityIDChanged {
if err := p.lastEntityID.UpdateEntityID(p.agentIDProvide().ID); err != nil {
llog.WithError(err).Warn("Failed to update inventory agent entityID")
}
}

p.lastDeltaRemoval = now
return fmt.Errorf("agent has to remove inventory cache")
}

if len(deltas) == 0 {
llog.Debug("Patch sender found no deltas to send.")
llog.WithField("entityid", p.entityInfo.ID.String()).
WithField("entityKey", entityKey).
Debug("Patch sender found no deltas to send.")
return nil
}

if !p.cfg.OfflineLoggingMode {
if err = p.sendAllDeltas(deltas); err == nil && lastSubmissionTimeExceeded {
// If the agent has been long time disconnected, we re-run the reconnecting plugins
p.context.Reconnect()
}
err = p.sendAllDeltas(deltas)
} else {
llog.WithField("numberOfDeltas", len(deltas)).Info("suppressed PostDeltas")
}
Expand Down Expand Up @@ -251,11 +253,6 @@ func (p *patchSenderIngest) isLastSubmissionTimeExceeded(now time.Time) bool {
func (p *patchSenderIngest) agentEntityIDChanged() bool {
entityKey := p.entityInfo.Key.String()

// Only check for entityID when is the agent sender.
if entityKey != p.context.EntityKey() {
return false
}

lastEntityID, err := p.lastEntityID.GetEntityID()
if err != nil {
pslog.WithField("entityKey", entityKey).
Expand Down
39 changes: 26 additions & 13 deletions internal/agent/patch_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ func TestPatchSender_Process_LongTermOffline(t *testing.T) {
cachePluginData(t, store, entityKey)

// And a patch sender that has been disconnected for more than 24 hours
ps := newTestPatchSender(t, dataDir, store, ls, nil)
ps := newTestPatchSender(t, dataDir, store, ls, getLastEntityIDMock())
ps.context = &context{
reconnecting: new(sync.Map),
}
nowIsEndOf18()
duration25h, _ := time.ParseDuration("25h")
assert.NoError(t, ls.UpdateTime(timeNow().Add(-duration25h)))
Expand All @@ -117,9 +120,9 @@ func TestPatchSender_Process_LongTermOffline_ReconnectPlugins(t *testing.T) {
cachePluginData(t, store, entityKey)

// And a patch sender that has been disconnected for more than 24 hours, but doesn't need to reset deltas
ps := newTestPatchSender(t, dataDir, store, ls, nil)
ps := newTestPatchSender(t, dataDir, store, ls, getLastEntityIDMock())
ps.postDeltas = FakePostDelta
ps.lastDeltaRemoval = endOf18
ps.lastDeltaRemoval = endOf18.Truncate(48 * time.Hour)
var agentKey atomic.Value
agentKey.Store("test")
ps.context = &context{
Expand All @@ -138,7 +141,7 @@ func TestPatchSender_Process_LongTermOffline_ReconnectPlugins(t *testing.T) {
wg.Add(1)

// When the patch sender tries to process the deltas
assert.NoError(t, ps.Process())
assert.Errorf(t, ps.Process(), "agent has to remove inventory cache")

// The registered plugin has been invoked to run again
assert.NoError(t, wait(3*time.Second, wg))
Expand All @@ -154,7 +157,7 @@ func TestPatchSender_Process_LongTermOffline_NoDeltasToPost_UpdateLastDeltaRemov
// When it has successfully submitted some deltas
require.NoError(t, ls.UpdateTime(time.Now()))

ps := newTestPatchSender(t, dataDir, store, ls, nil)
ps := newTestPatchSender(t, dataDir, store, ls, getLastEntityIDMock())
ps.postDeltas = FailingPostDelta
ps.lastDeltaRemoval = time.Date(2018, 12, 12, 0, 12, 12, 12, &time.Location{})
// And a patch sender that has been disconnected for less than 24 hours
Expand Down Expand Up @@ -188,7 +191,7 @@ func TestPatchSender_Process_LongTermOffline_AlreadyRemoved(t *testing.T) {
resetTime, _ := time.ParseDuration("24h")
// But the deltas were already cleaned up less than 24 hours ago
lastRemoval := time.Date(2018, 12, 12, 10, 12, 12, 12, &time.Location{})
ps := newTestPatchSender(t, dataDir, store, ls, nil)
ps := newTestPatchSender(t, dataDir, store, ls, getLastEntityIDMock())
ps.postDeltas = FailingPostDelta
ps.lastDeltaRemoval = lastRemoval
ps.resetIfOffline = resetTime
Expand Down Expand Up @@ -233,7 +236,7 @@ func TestPatchSender_Process_LastSubmissionTime_IgnoreEmptyEntityKey(t *testing.
lastSubmissionTime.On("UpdateTime", mock.Anything).Return()

// AND a patchSender
sender := newTestPatchSender(t, "", storage, lastSubmissionTime, &mockEntityIDPersist{})
sender := newTestPatchSender(t, "", storage, lastSubmissionTime, getLastEntityIDMock())
sender.entityInfo = entity.NewFromNameWithoutID("")
sender.compactEnabled = false
sender.postDeltas = assertDeltaSent(t, rawDelta)
Expand Down Expand Up @@ -269,7 +272,7 @@ func TestPatchSender_Process_ShortTermOffline(t *testing.T) {
// And a patch sender that has been disconnected for less than 24 hours
resetTime, _ := time.ParseDuration("24h")
lastDeltaRemoval := time.Date(2018, 12, 12, 0, 12, 12, 12, &time.Location{})
ps := newTestPatchSender(t, dataDir, store, ls, nil)
ps := newTestPatchSender(t, dataDir, store, ls, getLastEntityIDMock())
ps.postDeltas = FailingPostDelta
ps.lastDeltaRemoval = lastDeltaRemoval
ps.resetIfOffline = resetTime
Expand Down Expand Up @@ -297,7 +300,7 @@ func TestPatchSender_Process_DividedDeltas(t *testing.T) {
store := delta.NewStore(dataDir, "localhost", maxInventoryDataSize)
ls := delta.NewLastSubmissionInMemory()
require.NoError(t, ls.UpdateTime(timeNow()))
ps := newTestPatchSender(t, dataDir, store, ls, nil)
ps := newTestPatchSender(t, dataDir, store, ls, getLastEntityIDMock())
pdt := testhelpers.NewPostDeltaTracer(maxInventoryDataSize)
ps.postDeltas = pdt.PostDeltas
ps.lastDeltaRemoval = time.Date(2018, 12, 12, 0, 12, 12, 12, &time.Location{})
Expand Down Expand Up @@ -331,7 +334,7 @@ func TestPatchSender_Process_DisabledDeltaSplit(t *testing.T) {
dataDir, err := TempDeltaStoreDir()
assert.NoError(t, err)
store := delta.NewStore(dataDir, "localhost", delta.DisableInventorySplit)
ps := newTestPatchSender(t, dataDir, store, delta.NewLastSubmissionInMemory(), nil)
ps := newTestPatchSender(t, dataDir, store, delta.NewLastSubmissionInMemory(), getLastEntityIDMock())
pdt := testhelpers.NewPostDeltaTracer(math.MaxInt32)
ps.postDeltas = pdt.PostDeltas

Expand Down Expand Up @@ -365,7 +368,7 @@ func TestPatchSender_Process_SingleRequestDeltas(t *testing.T) {
pdt := testhelpers.NewPostDeltaTracer(maxInventoryDataSize)
resetTime, _ := time.ParseDuration("24h")
lastDeltaRemoval := time.Date(2018, 12, 12, 0, 12, 12, 12, &time.Location{})
ps := newTestPatchSender(t, dataDir, store, ls, nil)
ps := newTestPatchSender(t, dataDir, store, ls, getLastEntityIDMock())
ps.postDeltas = pdt.PostDeltas
ps.lastDeltaRemoval = lastDeltaRemoval
ps.resetIfOffline = resetTime
Expand Down Expand Up @@ -401,7 +404,7 @@ func TestPatchSender_Process_CompactEnabled(t *testing.T) {

resetTime, _ := time.ParseDuration("24h")
lastDeltaRemoval := time.Date(2018, 12, 12, 0, 12, 12, 12, &time.Location{})
ps := newTestPatchSender(t, dataDir, store, ls, nil)
ps := newTestPatchSender(t, dataDir, store, ls, getLastEntityIDMock())
ps.postDeltas = FakePostDelta
ps.lastDeltaRemoval = lastDeltaRemoval
ps.resetIfOffline = resetTime
Expand Down Expand Up @@ -435,7 +438,7 @@ func TestPatchSender_Process_Reset(t *testing.T) {

resetTime, _ := time.ParseDuration("24h")
lastDeltaRemoval := time.Date(2018, 12, 12, 0, 12, 12, 12, &time.Location{})
ps := newTestPatchSender(t, dataDir, store, ls, nil)
ps := newTestPatchSender(t, dataDir, store, ls, getLastEntityIDMock())
// And a backend service that returns ResetAll after being invoked
ps.postDeltas = ResetPostDelta
ps.lastDeltaRemoval = lastDeltaRemoval
Expand Down Expand Up @@ -485,6 +488,9 @@ func TestPathSender_Process_EntityIDChanged_ResetLocalEntityDeltas(t *testing.T)
// AND a patchSender
sender := newTestPatchSender(t, "", storage, delta.NewLastSubmissionInMemory(), lastEntityID)
sender.entityInfo = entity.NewFromNameWithoutID(agentKey)
sender.context = &context{
reconnecting: new(sync.Map),
}

// AND set a new identity
idCtx := id.NewContext(ctx.Background())
Expand Down Expand Up @@ -589,3 +595,10 @@ func nowIsEndOf18() {
return endOf18
}
}

func getLastEntityIDMock() *mockEntityIDPersist {
persistEntityID := &mockEntityIDPersist{}
persistEntityID.On("GetEntityID").Return(entity.EmptyID)
persistEntityID.On("UpdateEntityID", mock.Anything).Return(nil)
return persistEntityID
}