Skip to content

Commit

Permalink
[#8] add unsampled span to active request stats
Browse files Browse the repository at this point in the history
  • Loading branch information
dwkang committed Aug 2, 2022
1 parent 3359ed9 commit eb1e91a
Show file tree
Hide file tree
Showing 14 changed files with 161 additions and 117 deletions.
53 changes: 23 additions & 30 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ func NewAgent(config *Config) (Agent, error) {
}

if config.Sampling.NewThroughput > 0 || config.Sampling.ContinueThroughput > 0 {
agent.sampler = newThroughputLimitTraceSampler(baseSampler, config.Sampling.NewThroughput, config.Sampling.ContinueThroughput)
agent.sampler = newThroughputLimitTraceSampler(baseSampler, config.Sampling.NewThroughput,
config.Sampling.ContinueThroughput)
} else {
agent.sampler = newBasicTraceSampler(baseSampler)
}
Expand Down Expand Up @@ -206,56 +207,47 @@ func (agent *agent) Shutdown() {
agent.statGrpc.close()
}

func (agent *agent) NewSpanTracer(operation string) Tracer {
func (agent *agent) NewSpanTracer(operation string, rpcName string) Tracer {
var tracer Tracer

if agent.enable {
reader := &noopDistributedTracingContextReader{}
tracer = agent.NewSpanTracerWithReader(operation, reader)
tracer.Extract(reader)
tracer = agent.NewSpanTracerWithReader(operation, rpcName, reader)
} else {
tracer = newNoopSpan(agent)
tracer = newNoopSpan(agent, rpcName)
}
return tracer
}

func (agent *agent) NewSpanTracerWithReader(operation string, reader DistributedTracingContextReader) Tracer {
if !agent.enable {
return newNoopSpan(agent)
func (agent *agent) samplingSpan(samplingFunc func() bool, operation string, rpcName string,
reader DistributedTracingContextReader) Tracer {
if samplingFunc() {
tracer := newSampledSpan(agent, operation, rpcName)
tracer.Extract(reader)
return tracer
} else {
return newNoopSpan(agent, rpcName)
}
}

atomic.AddInt64(&agent.sequence, 1)
func (agent *agent) NewSpanTracerWithReader(operation string, rpcName string,
reader DistributedTracingContextReader) Tracer {
if !agent.enable {
return newNoopSpan(agent, rpcName)
}

sampled := reader.Get(HttpSampled)
if sampled == "s0" {
incrUnsampleCont()
return newNoopSpan(agent)
return newNoopSpan(agent, rpcName)
}

var tracer Tracer
isSampled := false

tid := reader.Get(HttpTraceId)
if tid == "" {
if agent.sampler.isNewSampled() {
tracer = newSampledSpan(agent, operation)
isSampled = true
} else {
tracer = newNoopSpan(agent)
}
return agent.samplingSpan(agent.sampler.isNewSampled, operation, rpcName, reader)
} else {
if agent.sampler.isContinueSampled() {
tracer = newSampledSpan(agent, operation)
isSampled = true
} else {
tracer = newNoopSpan(agent)
}
}

if isSampled {
tracer.Extract(reader)
return agent.samplingSpan(agent.sampler.isContinueSampled, operation, rpcName, reader)
}
return tracer
}

func (agent *agent) RegisterSpanApiId(descriptor string, apiType int) int32 {
Expand All @@ -272,6 +264,7 @@ func (agent *agent) Config() Config {
}

func (agent *agent) GenerateTransactionId() TransactionId {
atomic.AddInt64(&agent.sequence, 1)
return TransactionId{agent.config.AgentId, agent.startTime, agent.sequence}
}

Expand Down
4 changes: 2 additions & 2 deletions agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func Test_agent_NewSpanTracer(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
agent := tt.args.agent
span := agent.NewSpanTracer("test")
span := agent.NewSpanTracer("test", "/")

txid := span.TransactionId()
assert.Equal(t, txid.AgentId, "testagent", "AgentId")
Expand Down Expand Up @@ -73,7 +73,7 @@ func Test_agent_NewSpanTracerWithReader(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
agent := tt.args.agent
span := agent.NewSpanTracerWithReader("test", tt.args.reader)
span := agent.NewSpanTracerWithReader("test", "/", tt.args.reader)

txid := span.TransactionId()
assert.Equal(t, txid.AgentId, "t123456", "AgentId")
Expand Down
33 changes: 21 additions & 12 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ import (
)

var (
gGoroutineDump *GoroutineDump
gAtcStreamCount int32
realTimeActiveSpan sync.Map
)

type activeSpanInfo struct {
startTime time.Time
txId TransactionId
txId string
entryPoint string
sampled bool
}

func (agent *agent) runCommandService() {
Expand Down Expand Up @@ -61,17 +61,16 @@ func (agent *agent) runCommandService() {
go sendActiveThreadCount(atcStream)
break
case *pb.PCmdRequest_CommandActiveThreadDump:
if c := cmdReq.GetCommandActiveThreadDump(); c != nil && gGoroutineDump != nil {
if c := cmdReq.GetCommandActiveThreadDump(); c != nil {
limit := c.GetLimit()
threadName := c.GetThreadName()
localId := c.GetLocalTraceId()
agent.cmdGrpc.sendActiveThreadDump(reqId, limit, threadName, localId, gGoroutineDump)
agent.cmdGrpc.sendActiveThreadDump(reqId, limit, threadName, localId, dumpGoroutine())
}
break
case *pb.PCmdRequest_CommandActiveThreadLightDump:
limit := cmdReq.GetCommandActiveThreadLightDump().GetLimit()
if gGoroutineDump = dumpGoroutine(); gGoroutineDump != nil {
agent.cmdGrpc.sendActiveThreadLightDump(reqId, limit, gGoroutineDump)
if c := cmdReq.GetCommandActiveThreadLightDump(); c != nil {
agent.cmdGrpc.sendActiveThreadLightDump(reqId, c.GetLimit(), dumpGoroutine())
}
break
case nil:
Expand Down Expand Up @@ -110,18 +109,28 @@ func sendActiveThreadCount(s *activeThreadCountStream) {
log("cmd").Infof("active thread count stream goroutine finish: %d, %d", s.reqId, gAtcStreamCount)
}

func addRealTimeActiveSpan(span *span) {
func addRealTimeSampledActiveSpan(span *span) {
if gAtcStreamCount > 0 {
span.goroutineId = curGoroutineID()
s := activeSpanInfo{span.startTime, span.txId, span.rpcName}
s := activeSpanInfo{span.startTime, span.txId.String(), span.rpcName, span.sampled}
realTimeActiveSpan.Store(span.goroutineId, s)
log("cmd").Debug("addRealTimeActiveSpan: ", span.goroutineId, s)
}
}

func dropRealTimeActiveSpan(span *span) {
func dropRealTimeSampledActiveSpan(span *span) {
realTimeActiveSpan.Delete(span.goroutineId)
}

func addRealTimeUnSampledActiveSpan(span *noopSpan) {
if gAtcStreamCount > 0 {
span.goroutineId = curGoroutineID()
s := activeSpanInfo{span.startTime, "", span.rpcName, false}
realTimeActiveSpan.Store(span.goroutineId, s)
}
}

func dropRealTimeUnSampledActiveSpan(span *noopSpan) {
realTimeActiveSpan.Delete(span.goroutineId)
log("cmd").Debug("addRealTimeActiveSpan: ", span.goroutineId)
}

func getActiveSpanCount(now time.Time) []int32 {
Expand Down
3 changes: 1 addition & 2 deletions example/stand_alone.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ import (
)

func newSpan(agent pinpoint.Agent, name string) pinpoint.Tracer {
tracer := agent.NewSpanTracer(name)
tracer.Span().SetRpcName("/")
tracer := agent.NewSpanTracer(name, "/")
return tracer
}

Expand Down
6 changes: 4 additions & 2 deletions goroutine.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type Goroutine struct {
state string
trace string

spanInfo activeSpanInfo
span activeSpanInfo

frozen bool
buf *bytes.Buffer
Expand Down Expand Up @@ -57,6 +57,8 @@ func newGoroutine(line string) *Goroutine {
state: state,
buf: &bytes.Buffer{},
}
} else {
log("cmd").Errorf("fail to convert goroutine id: %v", err)
}

return nil
Expand Down Expand Up @@ -120,7 +122,7 @@ func parseProfile(r io.Reader) *GoroutineDump {
}

if v, ok := realTimeActiveSpan.Load(goroutine.id); ok {
goroutine.spanInfo = v.(activeSpanInfo)
goroutine.span = v.(activeSpanInfo)
dump.add(goroutine)
}
} else if line == "" {
Expand Down
87 changes: 54 additions & 33 deletions grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -972,11 +972,19 @@ func (s *activeThreadCountStream) sendActiveThreadCount() error {
func (cmdGrpc *cmdGrpc) sendActiveThreadDump(reqId int32, limit int32, threadName []string, localId []int64, dump *GoroutineDump) {
var gRes *pb.PCmdActiveThreadDumpRes

status := int32(0)
msg := ""

if dump == nil {
status = -1
msg = "An error occurred while dumping Goroutine"
}

gRes = &pb.PCmdActiveThreadDumpRes{
CommonResponse: &pb.PCmdResponse{
ResponseId: reqId,
Status: 0,
Message: &wrappers.StringValue{Value: ""},
Status: status,
Message: &wrappers.StringValue{Value: msg},
},
ThreadDump: makePActiveThreadDumpList(dump, int(limit), threadName, localId),
Type: "Go",
Expand All @@ -989,30 +997,32 @@ func (cmdGrpc *cmdGrpc) sendActiveThreadDump(reqId int32, limit int32, threadNam
ctx := grpcMetadataContext(cmdGrpc.agent, -1)
_, err := cmdGrpc.cmdClient.CommandActiveThreadDump(ctx, gRes)
if err != nil {
log("grpc").Errorf("fail to CommandActiveThreadDump() - %v", err)
log("grpc").Errorf("fail to send CommandActiveThreadDump() - %v", err)
}
}

func makePActiveThreadDumpList(dump *GoroutineDump, limit int, threadName []string, localId []int64) []*pb.PActiveThreadDump {
dumpList := make([]*pb.PActiveThreadDump, 0)

if limit < 1 {
limit = len(dump.goroutines)
}
if dump != nil {
if limit < 1 {
limit = len(dump.goroutines)
}

selected := make([]*Goroutine, 0)
for _, tn := range threadName {
g := dump.search(tn)
if g != nil {
selected = append(selected, g)
selected := make([]*Goroutine, 0)
for _, tn := range threadName {
g := dump.search(tn)
if g != nil {
selected = append(selected, g)
}
}
}

log("grpc").Debugf("send makePActiveThreadDumpList: %v", selected)
log("grpc").Debugf("send makePActiveThreadDumpList: %v", selected)

for i := 0; i < limit && i < len(selected); i++ {
aDump := makePActiveThreadDump(selected[i])
dumpList = append(dumpList, aDump)
for i := 0; i < limit && i < len(selected); i++ {
aDump := makePActiveThreadDump(selected[i])
dumpList = append(dumpList, aDump)
}
}

return dumpList
Expand All @@ -1023,7 +1033,7 @@ func makePActiveThreadDump(g *Goroutine) *pb.PActiveThreadDump {
trace = append(trace, g.trace)

aDump := &pb.PActiveThreadDump{
StartTime: g.spanInfo.startTime.UnixNano() / int64(time.Millisecond),
StartTime: g.span.startTime.UnixNano() / int64(time.Millisecond),
LocalTraceId: 0,
ThreadDump: &pb.PThreadDump{
ThreadName: g.header,
Expand All @@ -1042,9 +1052,9 @@ func makePActiveThreadDump(g *Goroutine) *pb.PActiveThreadDump {
LockedMonitor: nil,
LockedSynchronizer: nil,
},
Sampled: true,
TransactionId: g.spanInfo.txId.String(),
EntryPoint: g.spanInfo.entryPoint,
Sampled: g.span.sampled,
TransactionId: g.span.txId,
EntryPoint: g.span.entryPoint,
}

return aDump
Expand All @@ -1053,11 +1063,19 @@ func makePActiveThreadDump(g *Goroutine) *pb.PActiveThreadDump {
func (cmdGrpc *cmdGrpc) sendActiveThreadLightDump(reqId int32, limit int32, dump *GoroutineDump) {
var gRes *pb.PCmdActiveThreadLightDumpRes

status := int32(0)
msg := ""

if dump == nil {
status = -1
msg = "An error occurred while dumping Goroutine"
}

gRes = &pb.PCmdActiveThreadLightDumpRes{
CommonResponse: &pb.PCmdResponse{
ResponseId: reqId,
Status: 0, //error
Message: &wrappers.StringValue{Value: ""}, //error message
Status: status, //error
Message: &wrappers.StringValue{Value: msg}, //error message
},
ThreadDump: makePActiveThreadLightDumpList(dump, int(limit)),
Type: "Go",
Expand All @@ -1070,36 +1088,39 @@ func (cmdGrpc *cmdGrpc) sendActiveThreadLightDump(reqId int32, limit int32, dump
ctx := grpcMetadataContext(cmdGrpc.agent, -1)
_, err := cmdGrpc.cmdClient.CommandActiveThreadLightDump(ctx, gRes)
if err != nil {
log("grpc").Errorf("fail to make CommandActiveThreadLightDump() - %v", err)
log("grpc").Errorf("fail to send CommandActiveThreadLightDump() - %v", err)
}
}

func makePActiveThreadLightDumpList(dump *GoroutineDump, limit int) []*pb.PActiveThreadLightDump {
dumpList := make([]*pb.PActiveThreadLightDump, 0)

if limit < 1 {
limit = len(dump.goroutines)
}
if dump != nil {
if limit < 1 {
limit = len(dump.goroutines)
}

for i := 0; i < limit && i < len(dump.goroutines); i++ {
aDump := makePActiveThreadLightDump(dump.goroutines[i])
dumpList = append(dumpList, aDump)
for i := 0; i < limit && i < len(dump.goroutines); i++ {
aDump := makePActiveThreadLightDump(dump.goroutines[i])
dumpList = append(dumpList, aDump)
}
}

return dumpList
}

func makePActiveThreadLightDump(g *Goroutine) *pb.PActiveThreadLightDump {
aDump := &pb.PActiveThreadLightDump{
StartTime: g.spanInfo.startTime.UnixNano() / int64(time.Millisecond),
StartTime: g.span.startTime.UnixNano() / int64(time.Millisecond),
LocalTraceId: 0,
ThreadDump: &pb.PThreadLightDump{
ThreadName: g.header,
ThreadId: int64(g.id),
ThreadState: goRoutineState(g),
},
Sampled: true,
TransactionId: g.spanInfo.txId.String(),
EntryPoint: g.spanInfo.entryPoint,
Sampled: g.span.sampled,
TransactionId: g.span.txId,
EntryPoint: g.span.entryPoint,
}

return aDump
Expand Down
Loading

0 comments on commit eb1e91a

Please sign in to comment.