Skip to content

Commit

Permalink
Fix dotnetspy panic on premature exit and add tests. (#203)
Browse files Browse the repository at this point in the history
There is a case when dotnetspy couldn't establish a connection to
Diagnostics Server before the next snapshot/stop call (e.g, target
process exited, or the socket file cannot be found, etc).

This fix adds a check whether the session has been actually created
before accessing it. Given that there are no race conditions between
reset/stop/snapshot calls, redundant mutexes were removed.
  • Loading branch information
kolesnikovae committed May 21, 2021
1 parent 0f526ea commit b2fff2b
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 19 deletions.
7 changes: 0 additions & 7 deletions pkg/agent/dotnetspy/dotnetspy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,11 @@
package dotnetspy

import (
"sync"

"github.com/pyroscope-io/pyroscope/pkg/agent/spy"
)

type DotnetSpy struct {
session *session
m sync.Mutex
reset bool
}

Expand All @@ -29,14 +26,10 @@ func (s *DotnetSpy) Stop() error {
}

func (s *DotnetSpy) Reset() {
s.m.Lock()
defer s.m.Unlock()
s.reset = true
}

func (s *DotnetSpy) Snapshot(cb func([]byte, uint64, error)) {
s.m.Lock()
defer s.m.Unlock()
if !s.reset {
return
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/agent/dotnetspy/dotnetspy_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// +build dotnetspy

package dotnetspy_test

import (
"testing"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

func TestDotnetSpy(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, ".NET Spy Suite")
}
36 changes: 36 additions & 0 deletions pkg/agent/dotnetspy/dotnetspy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// +build dotnetspy

package dotnetspy

import (
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

var _ = Describe("agent.DotnetSpy", func() {
Describe("Does not panic if a session has not been established", func() {
s := newSession(31337)
s.timeout = time.Millisecond * 10
Expect(s.start()).To(HaveOccurred())
spy := &DotnetSpy{session: s}

It("On Snapshot before Reset", func() {
spy.Snapshot(func(name []byte, samples uint64, err error) {
Fail("Snapshot callback must not be called")
})
})

It("On Snapshot after Reset", func() {
spy.Reset()
spy.Snapshot(func(name []byte, samples uint64, err error) {
Fail("Snapshot callback must not be called")
})
})

It("On Stop", func() {
Expect(spy.Stop()).ToNot(HaveOccurred())
})
})
})
40 changes: 28 additions & 12 deletions pkg/agent/dotnetspy/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package dotnetspy
import (
"context"
"io"
"sync"
"time"

"github.com/pyroscope-io/dotnetdiag"
Expand All @@ -14,13 +13,13 @@ import (
)

type session struct {
pid int
pid int
timeout time.Duration

config dotnetdiag.CollectTracingConfig
session *dotnetdiag.Session

ch chan line
m sync.Mutex
stopped bool
}

Expand All @@ -31,7 +30,8 @@ type line struct {

func newSession(pid int) *session {
return &session{
pid: pid,
pid: pid,
timeout: 3 * time.Second,
config: dotnetdiag.CollectTracingConfig{
CircularBufferSizeMB: 100,
Providers: []dotnetdiag.ProviderConfig{
Expand All @@ -45,10 +45,13 @@ func newSession(pid int) *session {
}
}

// start opens a new diagnostic session to the process given, and asynchronously
// processes the event stream.
func (s *session) start() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
ctx, cancel := context.WithTimeout(context.Background(), s.timeout)
defer cancel()

// If the process does not create Diagnostic Server, the next call will
// fail, and a session won't be created.
client := dotnetdiag.NewClient(waitDiagnosticServer(ctx, s.pid))
ns, err := client.CollectTracing(s.config)
if err != nil {
Expand Down Expand Up @@ -78,6 +81,8 @@ func (s *session) start() error {
case nil:
continue
case io.EOF:
// The session is closed by us (on flush or stop call),
// or the target process has exited.
for k, v := range p.Samples() {
s.ch <- line{
name: []byte(k),
Expand All @@ -92,30 +97,41 @@ func (s *session) start() error {
return nil
}

// flush closes NetTrace stream in order to retrieve samples,
// and starts a new session, if not in stopped state.
func (s *session) flush(cb func([]byte, uint64)) error {
// Ignore call, if NetTrace session has not been established.
if s.session == nil {
return nil
}
_ = s.session.Close()
for v := range s.ch {
cb(v.name, uint64(v.val))
}
s.m.Lock()
defer s.m.Unlock()
if s.stopped {
return nil
}
return s.start()
}

// stop closes diagnostic session, if it was established, and sets the
// flag preventing session to start again.
func (s *session) stop() error {
s.m.Lock()
defer s.m.Unlock()
_ = s.session.Close()
if s.session != nil {
_ = s.session.Close()
}
s.stopped = true
return nil
}

// .Net runtime requires some time to initialize diagnostic IPC server and
// start accepting connections.
// start accepting connections. If it fails before context cancel, an empty
// string will be returned.
func waitDiagnosticServer(ctx context.Context, pid int) string {
// Do not wait for the timer to fire for the first time.
if addr := dotnetdiag.DefaultServerAddress(pid); addr != "" {
return addr
}
ticker := time.NewTicker(time.Millisecond * 100)
defer ticker.Stop()
for {
Expand Down

0 comments on commit b2fff2b

Please sign in to comment.