Skip to content

Commit

Permalink
[#8] fix to use goroutine id only when active request is in service
Browse files Browse the repository at this point in the history
  • Loading branch information
dwkang committed Aug 1, 2022
1 parent 3a0ffd7 commit 3359ed9
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 43 deletions.
69 changes: 63 additions & 6 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,29 @@ package pinpoint

import (
pb "github.com/pinpoint-apm/pinpoint-go-agent/protobuf"
"io"
"sync"
"sync/atomic"
"time"
)

var gDump *GoroutineDump
var (
gGoroutineDump *GoroutineDump
gAtcStreamCount int32
realTimeActiveSpan sync.Map
)

type activeSpanInfo struct {
startTime time.Time
txId TransactionId
entryPoint string
}

func (agent *agent) runCommandService() {
log("cmd").Info("command service goroutine start")
defer agent.wg.Done()

gAtcStreamCount = 0
cmdStream := agent.cmdGrpc.newCommandStreamWithRetry()

for true {
Expand Down Expand Up @@ -47,17 +61,17 @@ func (agent *agent) runCommandService() {
go sendActiveThreadCount(atcStream)
break
case *pb.PCmdRequest_CommandActiveThreadDump:
if c := cmdReq.GetCommandActiveThreadDump(); c != nil && gDump != nil {
if c := cmdReq.GetCommandActiveThreadDump(); c != nil && gGoroutineDump != nil {
limit := c.GetLimit()
threadName := c.GetThreadName()
localId := c.GetLocalTraceId()
agent.cmdGrpc.sendActiveThreadDump(reqId, limit, threadName, localId, gDump)
agent.cmdGrpc.sendActiveThreadDump(reqId, limit, threadName, localId, gGoroutineDump)
}
break
case *pb.PCmdRequest_CommandActiveThreadLightDump:
limit := cmdReq.GetCommandActiveThreadLightDump().GetLimit()
if gDump = dumpGoroutine(); gDump != nil {
agent.cmdGrpc.sendActiveThreadLightDump(reqId, limit, gDump)
if gGoroutineDump = dumpGoroutine(); gGoroutineDump != nil {
agent.cmdGrpc.sendActiveThreadLightDump(reqId, limit, gGoroutineDump)
}
break
case nil:
Expand All @@ -77,13 +91,56 @@ func (agent *agent) runCommandService() {
}

func sendActiveThreadCount(s *activeThreadCountStream) {
atomic.AddInt32(&gAtcStreamCount, 1)
log("cmd").Infof("active thread count stream goroutine start: %d, %d", s.reqId, gAtcStreamCount)

for true {
err := s.sendActiveThreadCount()
if err != nil {
log("cmd").Errorf("fail to sendActiveThreadCount(): %d, %v", s.reqId, err)
if err != io.EOF {
log("cmd").Errorf("fail to sendActiveThreadCount(): %d, %v", s.reqId, err)
}
break
}
time.Sleep(1 * time.Second)
}
s.close()

atomic.AddInt32(&gAtcStreamCount, -1)
log("cmd").Infof("active thread count stream goroutine finish: %d, %d", s.reqId, gAtcStreamCount)
}

func addRealTimeActiveSpan(span *span) {
if gAtcStreamCount > 0 {
span.goroutineId = curGoroutineID()
s := activeSpanInfo{span.startTime, span.txId, span.rpcName}
realTimeActiveSpan.Store(span.goroutineId, s)
log("cmd").Debug("addRealTimeActiveSpan: ", span.goroutineId, s)
}
}

func dropRealTimeActiveSpan(span *span) {
realTimeActiveSpan.Delete(span.goroutineId)
log("cmd").Debug("addRealTimeActiveSpan: ", span.goroutineId)
}

func getActiveSpanCount(now time.Time) []int32 {
counts := []int32{0, 0, 0, 0}
realTimeActiveSpan.Range(func(k, v interface{}) bool {
s := v.(activeSpanInfo)
d := now.Sub(s.startTime).Seconds()

if d < 1 {
counts[0]++
} else if d < 3 {
counts[1]++
} else if d < 5 {
counts[2]++
} else {
counts[3]++
}
return true
})

return counts
}
2 changes: 1 addition & 1 deletion goroutine.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func parseProfile(r io.Reader) *GoroutineDump {
return nil
}

if v, ok := activeSpan.Load(goroutine.id); ok {
if v, ok := realTimeActiveSpan.Load(goroutine.id); ok {
goroutine.spanInfo = v.(activeSpanInfo)
dump.add(goroutine)
}
Expand Down
4 changes: 2 additions & 2 deletions grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1044,7 +1044,7 @@ func makePActiveThreadDump(g *Goroutine) *pb.PActiveThreadDump {
},
Sampled: true,
TransactionId: g.spanInfo.txId.String(),
EntryPoint: "",
EntryPoint: g.spanInfo.entryPoint,
}

return aDump
Expand Down Expand Up @@ -1099,7 +1099,7 @@ func makePActiveThreadLightDump(g *Goroutine) *pb.PActiveThreadLightDump {
},
Sampled: true,
TransactionId: g.spanInfo.txId.String(),
EntryPoint: "", //path
EntryPoint: g.spanInfo.entryPoint,
}

return aDump
Expand Down
44 changes: 10 additions & 34 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,6 @@ var skipCont int64

var activeSpan sync.Map

type activeSpanInfo struct {
startTime time.Time
txId TransactionId
}

func initStats() {
err := syscall.Getrusage(syscall.RUSAGE_SELF, &lastRusage)
if err != nil {
Expand Down Expand Up @@ -82,9 +77,9 @@ func getStats() *inspectorStats {

activeSpanCount := []int32{0, 0, 0, 0}
activeSpan.Range(func(k, v interface{}) bool {
s := v.(activeSpanInfo)
d := now.Sub(s.startTime).Seconds()
log("stats").Debug("getStats: ", now, s.startTime, d)
s := v.(time.Time)
d := now.Sub(s).Seconds()
log("stats").Debug("getStats: ", now, s, d)

if d < 1 {
activeSpanCount[0]++
Expand Down Expand Up @@ -208,36 +203,17 @@ func resetResponseTime() {
}

func addActiveSpan(span *span) {
span.goroutineId = curGoroutineID()
s := activeSpanInfo{span.startTime, span.txId}
activeSpan.Store(span.goroutineId, s)
log("stats").Debug("addActiveSpan: ", span.goroutineId, s)
}
activeSpan.Store(span.spanId, span.startTime)
log("stats").Debug("addActiveSpan: ", span.spanId, span.startTime)

func dropActiveSpan(span *span) {
activeSpan.Delete(span.goroutineId)
log("stats").Debug("dropActiveSpan: ", span.goroutineId)
addRealTimeActiveSpan(span)
}

func getActiveSpanCount(now time.Time) []int32 {
activeSpanCount := []int32{0, 0, 0, 0}
activeSpan.Range(func(k, v interface{}) bool {
s := v.(activeSpanInfo)
d := now.Sub(s.startTime).Seconds()

if d < 1 {
activeSpanCount[0]++
} else if d < 3 {
activeSpanCount[1]++
} else if d < 5 {
activeSpanCount[2]++
} else {
activeSpanCount[3]++
}
return true
})
func dropActiveSpan(span *span) {
activeSpan.Delete(span.spanId)
log("stats").Debug("dropActiveSpan: ", span.spanId)

return activeSpanCount
dropRealTimeActiveSpan(span)
}

func incrSampleNew() {
Expand Down

0 comments on commit 3359ed9

Please sign in to comment.