Skip to content
This repository has been archived by the owner on Dec 15, 2020. It is now read-only.

Commit

Permalink
Fix cleanup of orphaned queries (#2316)
Browse files Browse the repository at this point in the history
The expiration logic was incorrect leading to queries not being cleaned
up properly. Tests added for the whole subroutine.

Fixes #2302
  • Loading branch information
zwass committed Oct 6, 2020
1 parent 50dbdb3 commit e8e4bc9
Show file tree
Hide file tree
Showing 3 changed files with 255 additions and 5 deletions.
6 changes: 3 additions & 3 deletions server/pubsub/inmem_query_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ func (im *inmemQueryResults) WriteResult(result kolide.DistributedQueryResult) e
return nil
}

func (im *inmemQueryResults) ReadChannel(ctx context.Context, query kolide.DistributedQueryCampaign) (<-chan interface{}, error) {
channel := im.getChannel(query.ID)
func (im *inmemQueryResults) ReadChannel(ctx context.Context, campaign kolide.DistributedQueryCampaign) (<-chan interface{}, error) {
channel := im.getChannel(campaign.ID)
go func() {
<-ctx.Done()
close(channel)
im.channelMutex.Lock()
delete(im.resultChannels, query.ID)
delete(im.resultChannels, campaign.ID)
im.channelMutex.Unlock()
}()
return channel, nil
Expand Down
7 changes: 5 additions & 2 deletions server/service/service_osquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,10 +603,10 @@ func (svc service) ingestDistributedQuery(host kolide.Host, name string, rows []
return osqueryError{message: "loading orphaned campaign: " + err.Error()}
}

if campaign.CreatedAt.Before(svc.clock.Now().Add(5 * time.Second)) {
if campaign.CreatedAt.After(svc.clock.Now().Add(-5 * time.Second)) {
// Give the client 5 seconds to connect before considering the
// campaign orphaned
return osqueryError{message: "campaign waiting for listener"}
return osqueryError{message: "campaign waiting for listener (please retry)"}
}

if campaign.Status != kolide.QueryComplete {
Expand All @@ -619,6 +619,9 @@ func (svc service) ingestDistributedQuery(host kolide.Host, name string, rows []
if err := svc.liveQueryStore.StopQuery(strconv.Itoa(int(campaignID))); err != nil {
return osqueryError{message: "stopping orphaned campaign: " + err.Error()}
}

// No need to record query completion in this case
return nil
}

err = svc.liveQueryStore.QueryCompletedByHost(strconv.Itoa(int(campaignID)), host.ID)
Expand Down
247 changes: 247 additions & 0 deletions server/service/service_osquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1073,6 +1073,253 @@ func TestDistributedQueryResults(t *testing.T) {
require.Nil(t, err)
}

func TestIngestDistributedQueryParseIdError(t *testing.T) {
mockClock := clock.NewMockClock()
ds := new(mock.Store)
rs := pubsub.NewInmemQueryResults()
lq := new(live_query.MockLiveQuery)
svc := service{
ds: ds,
resultStore: rs,
liveQueryStore: lq,
logger: log.NewNopLogger(),
clock: mockClock,
}

host := kolide.Host{ID: 1}
err := svc.ingestDistributedQuery(host, "bad_name", []map[string]string{}, false)
require.Error(t, err)
assert.Contains(t, err.Error(), "unable to parse campaign")
}

func TestIngestDistributedQueryOrphanedCampaignLoadError(t *testing.T) {
mockClock := clock.NewMockClock()
ds := new(mock.Store)
rs := pubsub.NewInmemQueryResults()
lq := new(live_query.MockLiveQuery)
svc := service{
ds: ds,
resultStore: rs,
liveQueryStore: lq,
logger: log.NewNopLogger(),
clock: mockClock,
}

ds.DistributedQueryCampaignFunc = func(id uint) (*kolide.DistributedQueryCampaign, error) {
return nil, fmt.Errorf("missing campaign")
}

host := kolide.Host{ID: 1}

err := svc.ingestDistributedQuery(host, "kolide_distributed_query_42", []map[string]string{}, false)
require.Error(t, err)
assert.Contains(t, err.Error(), "loading orphaned campaign")
}

func TestIngestDistributedQueryOrphanedCampaignWaitListener(t *testing.T) {
mockClock := clock.NewMockClock()
ds := new(mock.Store)
rs := pubsub.NewInmemQueryResults()
lq := new(live_query.MockLiveQuery)
svc := service{
ds: ds,
resultStore: rs,
liveQueryStore: lq,
logger: log.NewNopLogger(),
clock: mockClock,
}

campaign := &kolide.DistributedQueryCampaign{
ID: 42,
UpdateCreateTimestamps: kolide.UpdateCreateTimestamps{
CreateTimestamp: kolide.CreateTimestamp{
CreatedAt: mockClock.Now().Add(-1 * time.Second),
},
},
}

ds.DistributedQueryCampaignFunc = func(id uint) (*kolide.DistributedQueryCampaign, error) {
return campaign, nil
}

host := kolide.Host{ID: 1}

err := svc.ingestDistributedQuery(host, "kolide_distributed_query_42", []map[string]string{}, false)
require.Error(t, err)
assert.Contains(t, err.Error(), "campaign waiting for listener")
}

func TestIngestDistributedQueryOrphanedCloseError(t *testing.T) {
mockClock := clock.NewMockClock()
ds := new(mock.Store)
rs := pubsub.NewInmemQueryResults()
lq := new(live_query.MockLiveQuery)
svc := service{
ds: ds,
resultStore: rs,
liveQueryStore: lq,
logger: log.NewNopLogger(),
clock: mockClock,
}

campaign := &kolide.DistributedQueryCampaign{
ID: 42,
UpdateCreateTimestamps: kolide.UpdateCreateTimestamps{
CreateTimestamp: kolide.CreateTimestamp{
CreatedAt: mockClock.Now().Add(-30 * time.Second),
},
},
}

ds.DistributedQueryCampaignFunc = func(id uint) (*kolide.DistributedQueryCampaign, error) {
return campaign, nil
}
ds.SaveDistributedQueryCampaignFunc = func(campaign *kolide.DistributedQueryCampaign) error {
return fmt.Errorf("failed save")
}

host := kolide.Host{ID: 1}

err := svc.ingestDistributedQuery(host, "kolide_distributed_query_42", []map[string]string{}, false)
require.Error(t, err)
assert.Contains(t, err.Error(), "closing orphaned campaign")
}

func TestIngestDistributedQueryOrphanedStopError(t *testing.T) {
mockClock := clock.NewMockClock()
ds := new(mock.Store)
rs := pubsub.NewInmemQueryResults()
lq := new(live_query.MockLiveQuery)
svc := service{
ds: ds,
resultStore: rs,
liveQueryStore: lq,
logger: log.NewNopLogger(),
clock: mockClock,
}

campaign := &kolide.DistributedQueryCampaign{
ID: 42,
UpdateCreateTimestamps: kolide.UpdateCreateTimestamps{
CreateTimestamp: kolide.CreateTimestamp{
CreatedAt: mockClock.Now().Add(-30 * time.Second),
},
},
}

ds.DistributedQueryCampaignFunc = func(id uint) (*kolide.DistributedQueryCampaign, error) {
return campaign, nil
}
ds.SaveDistributedQueryCampaignFunc = func(campaign *kolide.DistributedQueryCampaign) error {
return nil
}
lq.On("StopQuery", strconv.Itoa(int(campaign.ID))).Return(fmt.Errorf("failed"))

host := kolide.Host{ID: 1}

err := svc.ingestDistributedQuery(host, "kolide_distributed_query_42", []map[string]string{}, false)
require.Error(t, err)
assert.Contains(t, err.Error(), "stopping orphaned campaign")
}

func TestIngestDistributedQueryOrphanedStop(t *testing.T) {
mockClock := clock.NewMockClock()
ds := new(mock.Store)
rs := pubsub.NewInmemQueryResults()
lq := new(live_query.MockLiveQuery)
svc := service{
ds: ds,
resultStore: rs,
liveQueryStore: lq,
logger: log.NewNopLogger(),
clock: mockClock,
}

campaign := &kolide.DistributedQueryCampaign{
ID: 42,
UpdateCreateTimestamps: kolide.UpdateCreateTimestamps{
CreateTimestamp: kolide.CreateTimestamp{
CreatedAt: mockClock.Now().Add(-30 * time.Second),
},
},
}

ds.DistributedQueryCampaignFunc = func(id uint) (*kolide.DistributedQueryCampaign, error) {
return campaign, nil
}
ds.SaveDistributedQueryCampaignFunc = func(campaign *kolide.DistributedQueryCampaign) error {
return nil
}
lq.On("StopQuery", strconv.Itoa(int(campaign.ID))).Return(nil)

host := kolide.Host{ID: 1}

err := svc.ingestDistributedQuery(host, "kolide_distributed_query_42", []map[string]string{}, false)
require.NoError(t, err)
lq.AssertExpectations(t)
}

func TestIngestDistributedQueryRecordCompletionError(t *testing.T) {
mockClock := clock.NewMockClock()
ds := new(mock.Store)
rs := pubsub.NewInmemQueryResults()
lq := new(live_query.MockLiveQuery)
svc := service{
ds: ds,
resultStore: rs,
liveQueryStore: lq,
logger: log.NewNopLogger(),
clock: mockClock,
}

campaign := &kolide.DistributedQueryCampaign{ID: 42}
host := kolide.Host{ID: 1}

lq.On("QueryCompletedByHost", strconv.Itoa(int(campaign.ID)), host.ID).Return(fmt.Errorf("fail"))

go func() {
ch, err := rs.ReadChannel(context.Background(), *campaign)
require.NoError(t, err)
<-ch
}()
time.Sleep(10 * time.Millisecond)

err := svc.ingestDistributedQuery(host, "kolide_distributed_query_42", []map[string]string{}, false)
require.Error(t, err)
assert.Contains(t, err.Error(), "record query completion")
lq.AssertExpectations(t)
}

func TestIngestDistributedQuery(t *testing.T) {
mockClock := clock.NewMockClock()
ds := new(mock.Store)
rs := pubsub.NewInmemQueryResults()
lq := new(live_query.MockLiveQuery)
svc := service{
ds: ds,
resultStore: rs,
liveQueryStore: lq,
logger: log.NewNopLogger(),
clock: mockClock,
}

campaign := &kolide.DistributedQueryCampaign{ID: 42}
host := kolide.Host{ID: 1}

lq.On("QueryCompletedByHost", strconv.Itoa(int(campaign.ID)), host.ID).Return(nil)

go func() {
ch, err := rs.ReadChannel(context.Background(), *campaign)
require.NoError(t, err)
<-ch
}()
time.Sleep(10 * time.Millisecond)

err := svc.ingestDistributedQuery(host, "kolide_distributed_query_42", []map[string]string{}, false)
require.NoError(t, err)
lq.AssertExpectations(t)
}

func TestUpdateHostIntervals(t *testing.T) {
ds := new(mock.Store)

Expand Down

0 comments on commit e8e4bc9

Please sign in to comment.