Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files

(DI-489) Use reportID to identify harvester containers

This commit uses the reportID to identify harvester containers
associated with a particular report/scan run and will only delete
harvesters matching that reportID when performing cleanup on older
Docker engines where the autoremove isn't supported server-side.
  • Loading branch information...
johnmccabe committed Sep 15, 2017
1 parent 975a905 commit 538d8770ea137a4527fcc9993a8b8d943b27afe7
Showing with 54 additions and 45 deletions.
  1. +3 −3 collector/collector.go
  2. +3 −2 collector/collector_test.go
  3. +4 −4 harvester/attached.go
  4. +26 −23 harvester/attachedcontainer.go
  5. +9 −6 scheduler/scheduler.go
  6. +9 −7 storage/storage.go
@@ -15,7 +15,7 @@ var results map[string]types.ContainerReport
// RunCollector starts the collector which will block on reading all
// expected ContainerReports from the results channel, before sending
// them to the ReportStorage backend.
func RunCollector(ctx context.Context, wg *sync.WaitGroup, expectedResults int, resultsCh chan types.ContainerReport, backend storage.ReportStorage) error {
func RunCollector(ctx context.Context, wg *sync.WaitGroup, expectedResults int, resultsCh chan types.ContainerReport, backend storage.ReportStorage, reportID string) error {
defer logging.Debug("[Collector] Exiting")
defer wg.Done()

@@ -47,8 +47,8 @@ func RunCollector(ctx context.Context, wg *sync.WaitGroup, expectedResults int,
}
resultsWg.Wait()

logging.Debug("[Collector] Generating report")
err = backend.Store(results)
logging.Debug("[Collector] Generating report: %s", reportID)
err = backend.Store(results, reportID)
return err
}

@@ -19,8 +19,9 @@ var storedReport map[string]types.ContainerReport

type MockStorage struct{}

func (m MockStorage) Store(containerReports map[string]types.ContainerReport) error {
func (m MockStorage) Store(containerReports map[string]types.ContainerReport, reportID string) error {
fmt.Println("entering store")
fmt.Printf("storing reportID: %s\n", reportID)
fmt.Printf("attempting to store: %v\n", containerReports)
defer fmt.Println("exiting store")
storedReport = containerReports
@@ -85,7 +86,7 @@ func Test_collector(t *testing.T) {
go func() {
t.Logf("starting Collector, expectedResults [%d]", test.expectedResults)
defer t.Logf("exiting Collector")
RunCollector(testCtx, &wg, test.expectedResults, r, MockStorage{})
RunCollector(testCtx, &wg, test.expectedResults, r, MockStorage{}, "dummy_report_id")
c <- nil
}()

@@ -20,7 +20,7 @@ import (
// channel, when a result is received it will attempt to remove that associated
// attached container which performed the harvest before sending the result to the
// collector via the main results channel, resultsCh.
func RunAttachedHarvester(ctx context.Context, wg *sync.WaitGroup, targets []*types.TargetContainer, capabilities []types.AttachedCapability, resultsCh chan types.ContainerReport, opts types.ClientOptions, client dockeradapter.Client) error {
func RunAttachedHarvester(ctx context.Context, wg *sync.WaitGroup, targets []*types.TargetContainer, capabilities []types.AttachedCapability, resultsCh chan types.ContainerReport, opts types.ClientOptions, client dockeradapter.Client, reportID string) error {
defer logging.Debug("[Attached Harvester] Exiting")
defer wg.Done()
logging.Debug("[Attached Harvester] Running")
@@ -57,7 +57,7 @@ func RunAttachedHarvester(ctx context.Context, wg *sync.WaitGroup, targets []*ty

logging.Debug("[Attached Harvester] Creating [%d] harvesting containers", len(validTargets))
for _, target := range validTargets {
go createAndRunHarvester(ctx, client, *target, opts, rpcReceiverResultsCh)
go createAndRunHarvester(ctx, client, *target, opts, rpcReceiverResultsCh, reportID)
}

doneChannel := make(chan int)
@@ -86,9 +86,9 @@ func RunAttachedHarvester(ctx context.Context, wg *sync.WaitGroup, targets []*ty
// createAndRunHarvester creates and runs a container attached to the namespace of the target
// container which will run the harvest command to run the harvest functions from any registered
// AttachedCapabilities.
func createAndRunHarvester(ctx context.Context, client dockeradapter.Client, target types.TargetContainer, opts types.ClientOptions, rpcReceiverResultsCh chan types.ContainerReport) {
func createAndRunHarvester(ctx context.Context, client dockeradapter.Client, target types.TargetContainer, opts types.ClientOptions, rpcReceiverResultsCh chan types.ContainerReport, reportID string) {
logging.Debug("[Attached Harvester] Creating attached container for target %s", target)
harvester := NewAttachedContainer(client, types.ClientOptions{KeepHarvesters: opts.KeepHarvesters})
harvester := NewAttachedContainer(client, types.ClientOptions{KeepHarvesters: opts.KeepHarvesters}, reportID)
// TODO get image name from the current container or set alternate default for non-container use
harvester.GetImage("puppet/lumogon")
harvester.Attach(target)
@@ -16,19 +16,20 @@ import (
// AttachedContainer is a container attached to a running target container
// used to gather capability information
type AttachedContainer struct {
imageName string
id string
name string
start string
end string
schedulerID string
ctx context.Context
result *types.ContainerReport
target types.TargetContainer
containerBody *dockercontainer.ContainerCreateCreatedBody
client dockeradapter.Client
keepHarvester bool
err error
imageName string
id string
name string
start string
end string
reportID string
schedulerHostname string
ctx context.Context
result *types.ContainerReport
target types.TargetContainer
containerBody *dockercontainer.ContainerCreateCreatedBody
client dockeradapter.Client
keepHarvester bool
err error
}

// AttachedContainerInterface interface exposes Harvester lifecycle functions
@@ -39,17 +40,18 @@ type AttachedContainerInterface interface {
}

// NewAttachedContainer returns a pointer to a harvester AttachedContainer
func NewAttachedContainer(client dockeradapter.Client, opts types.ClientOptions) *AttachedContainer {
func NewAttachedContainer(client dockeradapter.Client, opts types.ClientOptions, reportID string) *AttachedContainer {

hostname, _ := os.Hostname()

attachedContainer := &AttachedContainer{
start: utils.GetTimestamp(),
name: utils.GetRandomName("lumogon_"),
client: client,
schedulerID: hostname,
keepHarvester: opts.KeepHarvesters,
ctx: context.Background(),
start: utils.GetTimestamp(),
name: utils.GetRandomName("lumogon_"),
client: client,
reportID: reportID,
schedulerHostname: hostname,
keepHarvester: opts.KeepHarvesters,
ctx: context.Background(),
}

return attachedContainer
@@ -109,9 +111,10 @@ func (a *AttachedContainer) createContainer() {
kernelCapabilities := []string{"sys_admin"} // TODO - Need to investigate making the harvester immutable? minimise risk of altering attached namespace
pidMode := fmt.Sprintf("container:%s", a.target.ID)
schedulerAliasHostname := "scheduler"
// Add an aliass for the scheduler to each harvester
links := []string{fmt.Sprintf("%s:%s", a.schedulerID, schedulerAliasHostname)}
labels := map[string]string{"lumogon_attached_container": "true",
// Add an aliass for the scheduler to each harvester to allow it to connect
// to its RPC server
links := []string{fmt.Sprintf("%s:%s", a.schedulerHostname, schedulerAliasHostname)}
labels := map[string]string{"lumogon_report_id": a.reportID,
"lumogon_attached_timestamp": a.start,
"lumogon_attached_target_name": a.target.Name,
"lumogon_attached_target_id": a.target.ID,
@@ -22,6 +22,7 @@ import (
// capabilities, building report data and submitting to the consumer
// endpoint.
type Scheduler struct {
reportID string
harvesters []harvester.AttachedContainer
capabilities registry.CapabilitiesRegistry
targets []*types.TargetContainer
@@ -40,9 +41,10 @@ var wg sync.WaitGroup
func New(args []string, opts types.ClientOptions) *Scheduler {
logging.Debug("[Scheduler] Creating scheduler")
scheduler := Scheduler{
start: utils.GetTimestamp(),
args: &args,
opts: &opts,
start: utils.GetTimestamp(),
args: &args,
opts: &opts,
reportID: utils.GenerateUUID4(),
}

client, err := dockeradapter.New()
@@ -84,10 +86,10 @@ func (s *Scheduler) Run(r registry.IRegistry) {

storageBackend := storage.Storage{ConsumerURL: s.opts.ConsumerURL}
wg.Add(1)
go collector.RunCollector(ctx, &wg, expectedResultCount, resultsChannel, storageBackend)
go collector.RunCollector(ctx, &wg, expectedResultCount, resultsChannel, storageBackend, s.reportID)

wg.Add(1)
go harvester.RunAttachedHarvester(ctx, &wg, s.targets, r.AttachedCapabilities(), resultsChannel, *s.opts, s.client)
go harvester.RunAttachedHarvester(ctx, &wg, s.targets, r.AttachedCapabilities(), resultsChannel, *s.opts, s.client, s.reportID)

wg.Add(1)
go harvester.RunDockerAPIHarvester(ctx, &wg, s.targets, r.DockerAPICapabilities(), resultsChannel, s.client)
@@ -100,7 +102,8 @@ func (s *Scheduler) Run(r registry.IRegistry) {
if versions.LessThan(s.client.ServerAPIVersion(), autoRemoveSupportedAPIVersion) {
logging.Debug("[Scheduler] Cleaning up harvester containers explicitly as Server API version %s < %s", s.client.ServerAPIVersion(), autoRemoveSupportedAPIVersion)
ctx := context.Background()
if err := s.client.CleanupHarvesters(ctx, "lumogon_attached_container"); err != nil {
harvesterLabel := fmt.Sprintf("lumogon_report_id=%s", s.reportID)
if err := s.client.CleanupHarvesters(ctx, harvesterLabel); err != nil {
fmt.Fprintf(os.Stderr, "Error returned when deleting containers: %s", err.Error())
}
}
@@ -20,14 +20,16 @@ type Storage struct {
ConsumerURL string
}

// ReportStorage handles persistence of generated container reports
// ReportStorage handles persistence of generated container reports, taking
// a map with a report for each container and the unique reportID for the overall
// scan or report.
type ReportStorage interface {
Store(map[string]types.ContainerReport) error
Store(map[string]types.ContainerReport, string) error
}

// Store marshalls the supplied types.Report before storing it
func (s Storage) Store(results map[string]types.ContainerReport) error {
report, err := createReport(results)
func (s Storage) Store(results map[string]types.ContainerReport, reportID string) error {
report, err := createReport(results, reportID)
if err != nil {
return err
}
@@ -38,7 +40,7 @@ func (s Storage) Store(results map[string]types.ContainerReport) error {

err = storeResult(report, s.ConsumerURL)
if err != nil {
logging.Debug("[Storage] Error storing report: %s ", err)
logging.Debug("[Storage] Error storing report id=%s: %s ", reportID, err.Error())
return err
}

@@ -129,7 +131,7 @@ func storeResult(report types.Report, consumerURL string) error {

// createReport returns a pointer to a types.Report built from the supplied
// map of container IDs to types.ContainerReport.
func createReport(results map[string]types.ContainerReport) (types.Report, error) {
func createReport(results map[string]types.ContainerReport, reportID string) (types.Report, error) {
logging.Debug("[Storage] Marshalling JSON")
marshalledResult, err := json.Marshal(results)
if err != nil {
@@ -143,7 +145,7 @@ func createReport(results map[string]types.ContainerReport) (types.Report, error
Owner: "default",
Group: []string{"default"},
ClientVersion: version.Version,
ReportID: utils.GenerateUUID4(),
ReportID: reportID,
Containers: results,
}
logging.Debug("[Storage] Report created")

0 comments on commit 538d877

Please sign in to comment.
You can’t perform that action at this time.