Skip to content

Commit

Permalink
eBPF loaded indicator (#848)
Browse files Browse the repository at this point in the history
* Add indicator channel for when all the probes are loaded

* revert changges in dependabot

* changelog

* restore deleted file
  • Loading branch information
RonFed committed May 23, 2024
1 parent 217bc6d commit 6968f4f
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ OpenTelemetry Go Automatic Instrumentation adheres to [Semantic Versioning](http
- Support Go `v1.22.3`. ([#824](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/824))
- Support `google.golang.org/grpc` `1.65.0-dev`. ([#827](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/827))
- Support `google.golang.org/grpc` `1.64.0`. ([#843](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/843))
- `WithLoadedIndicator` `InstrumentationOption` to configure an Instrumentation to notify the caller once all the eBPF probes are loaded. ([#848](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/848))
- Support `go.opentelemetry.io/otel@v1.27.0`. ([#850](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/850))

### Fixed
Expand Down
12 changes: 11 additions & 1 deletion cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ func main() {

logger.Info("building OpenTelemetry Go instrumentation ...", "globalImpl", globalImpl)

instOptions := []auto.InstrumentationOption{auto.WithEnv()}
loadedIndicator := make(chan struct{})
instOptions := []auto.InstrumentationOption{auto.WithEnv(), auto.WithLoadedIndicator(loadedIndicator)}
if globalImpl {
instOptions = append(instOptions, auto.WithGlobal())
}
Expand All @@ -104,6 +105,15 @@ func main() {
return
}

go func() {
select {
case <-ctx.Done():
return
case <-loadedIndicator:
logger.Info("instrumentation loaded successfully")
}
}()

logger.Info("starting instrumentation...")
if err = inst.Run(ctx); err != nil && !errors.Is(err, process.ErrInterrupted) {
logger.Error(err, "instrumentation crashed")
Expand Down
14 changes: 13 additions & 1 deletion instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func NewInstrumentation(ctx context.Context, opts ...InstrumentationOption) (*In
return nil, err
}

mngr, err := instrumentation.NewManager(logger, ctrl, c.globalImpl)
mngr, err := instrumentation.NewManager(logger, ctrl, c.globalImpl, c.loadIndicator)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -174,6 +174,7 @@ type instConfig struct {
serviceName string
additionalResAttrs []attribute.KeyValue
globalImpl bool
loadIndicator chan struct{}
}

func newInstConfig(ctx context.Context, opts []InstrumentationOption) (instConfig, error) {
Expand Down Expand Up @@ -463,3 +464,14 @@ func WithResourceAttributes(attrs ...attribute.KeyValue) InstrumentationOption {
return c, nil
})
}

// WithLoadedIndicator returns an [InstrumentationOption] that will configure an
// [Instrumentation] to close the provided indicator channel when the target
// process has been instrumented (i.e. all probes have been loaded).
// The provided indicator channel needs to be initialized by the caller.
func WithLoadedIndicator(indicator chan struct{}) InstrumentationOption {
return fnOpt(func(_ context.Context, c instConfig) (instConfig, error) {
c.loadIndicator = indicator
return c, nil
})
}
38 changes: 22 additions & 16 deletions internal/pkg/instrumentation/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,29 @@ import (

// Manager handles the management of [probe.Probe] instances.
type Manager struct {
logger logr.Logger
probes map[probe.ID]probe.Probe
done chan bool
incomingEvents chan *probe.Event
otelController *opentelemetry.Controller
globalImpl bool
wg sync.WaitGroup
closingErrors chan error
logger logr.Logger
probes map[probe.ID]probe.Probe
done chan bool
incomingEvents chan *probe.Event
otelController *opentelemetry.Controller
globalImpl bool
wg sync.WaitGroup
closingErrors chan error
loadedIndicator chan struct{}
}

// NewManager returns a new [Manager].
func NewManager(logger logr.Logger, otelController *opentelemetry.Controller, globalImpl bool) (*Manager, error) {
func NewManager(logger logr.Logger, otelController *opentelemetry.Controller, globalImpl bool, loadIndicator chan struct{}) (*Manager, error) {
logger = logger.WithName("Manager")
m := &Manager{
logger: logger,
probes: make(map[probe.ID]probe.Probe),
done: make(chan bool, 1),
incomingEvents: make(chan *probe.Event),
otelController: otelController,
globalImpl: globalImpl,
closingErrors: make(chan error, 1),
logger: logger,
probes: make(map[probe.ID]probe.Probe),
done: make(chan bool, 1),
incomingEvents: make(chan *probe.Event),
otelController: otelController,
globalImpl: globalImpl,
closingErrors: make(chan error, 1),
loadedIndicator: loadIndicator,
}

err := m.registerProbes()
Expand Down Expand Up @@ -163,6 +165,10 @@ func (m *Manager) Run(ctx context.Context, target *process.TargetDetails) error
}(i)
}

if m.loadedIndicator != nil {
close(m.loadedIndicator)
}

for {
select {
case <-ctx.Done():
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/instrumentation/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func fakeManager(t *testing.T) *Manager {
logger := stdr.New(log.New(os.Stderr, "", log.LstdFlags))
logger = logger.WithName("Instrumentation")

m, err := NewManager(logger, nil, true)
m, err := NewManager(logger, nil, true, nil)
assert.NoError(t, err)
assert.NotNil(t, m)

Expand Down

0 comments on commit 6968f4f

Please sign in to comment.