Permalink
Browse files

update exception counter

  • Loading branch information...
shaocongcong committed Jan 29, 2019
1 parent 4840050 commit e6395773d0c9bd36a9add0d0cdf1552453b7e4b1
@@ -20,5 +20,5 @@ pinpoint:
spanqueuelen: 50

system:
turnon: true
onoff: true
gatherinterval: 10
@@ -33,19 +33,9 @@ type Config struct {
}

System struct {
TurnOn bool
OnOff bool
GatherInterval int
}

// SkyWalking struct {
// HTTPAddr string
// RPCAddr string
// JVMReportInterval int // jvm 信息上报频率
// JVMCollectorInterval int // jvm 采集频率控制
// JVMCacheLen int // 缓存长度
// TraceReportInterval int // 全链路信息上报频率 单位毫秒
// TraceCacheLen int // 缓存长度
// }
}

// Conf ...
@@ -0,0 +1,5 @@
package system

// CPU ...
type CPU struct {
}
@@ -82,7 +82,7 @@ DoSpan:
e.apis.apiCounter(rpc, elapsed, isErr)
e.events.eventsCounter(rpc, spanEvents, chunkEvents)
e.sqls.sqlCounter(spanEvents, chunkEvents)
// e.exceptions.exceptionCounter(apiStr string, elapsed int, isError int)
e.exceptions.exceptionCounter(spanEvents, chunkEvents)
}
}

@@ -104,10 +104,11 @@ func ModMs2Min(ms int64) (int64, error) {
return t.Unix() - int64(t.Second()), nil
}

func spanCounterRecord(app *App, recordTime int64, e *Element) error {
e.apis.apiRecord(app, recordTime)
e.events.eventRecord(app, recordTime)
e.sqls.sqlRecord(app, recordTime)
e.stats.statRecord(app, recordTime)
func spanCounterRecord(app *App, inputDate int64, e *Element) error {
e.apis.apiRecord(app, inputDate)
e.events.eventRecord(app, inputDate)
e.sqls.sqlRecord(app, inputDate)
e.stats.statRecord(app, inputDate)
e.exceptions.exceptionRecord(app, inputDate)
return nil
}
@@ -1,54 +1,137 @@
package service

import (
"github.com/mafanr/g"
"github.com/mafanr/vgo/proto/pinpoint/thrift/trace"
"go.uber.org/zap"
)

// SpanExceptions ...
type SpanExceptions struct {
exceptions map[int32]*SpanException
apiExceptions map[int32]*Exceptions
}

// NewSpanExceptions ...
func NewSpanExceptions() *SpanExceptions {
return &SpanExceptions{
exceptions: make(map[int32]*SpanException),
apiExceptions: make(map[int32]*Exceptions),
}
}

var gInserExceptionRecord string = `INSERT INTO exception_stats (app_name, method_id, exception_info, input_date, total_elapsed, max_elapsed,
min_elapsed, count) VALUES (?,?,?,?,?,?,?,?);`

// ExceptionRecord ...
func (spanExceptions *SpanExceptions) exceptionRecord(app *App, inputDate int64) error {

for apiID, apiEx := range spanExceptions.apiExceptions {
for exStr, exinfo := range apiEx.exceptions {
query := gAnalyze.cql.Session.Query(gInserExceptionRecord,
app.AppName,
apiID,
exStr,
inputDate,
exinfo.elapsed,
exinfo.maxElapsed,
exinfo.minElapsed,
exinfo.count,
)
if err := query.Exec(); err != nil {
g.L.Warn("exceptionRecord error", zap.String("error", err.Error()), zap.String("SQL", query.String()))
}

}
}
return nil
}

// exceptionCounter ...
func (spanExceptions *SpanExceptions) exceptionCounter(urlStr string, elapsed int, isError int) error {
// url, ok := spanExceptions.exceptions[urlStr]
// if !ok {
// url = NewSpanURL()
// spanUrls.urls[urlStr] = url
// }
// url.elapsed += elapsed
// url.count++
// if isError != 0 {
// url.errCount++
// }

// if elapsed > url.maxElapsed {
// url.maxElapsed = url.elapsed
// }

// if url.minElapsed == 0 || url.minElapsed > elapsed {
// url.minElapsed = elapsed
// }

// url.averageElapsed = url.elapsed / url.count
func (spanExceptions *SpanExceptions) exceptionCounter(events []*trace.TSpanEvent, chunkEvents []*trace.TSpanEvent) error {

for _, event := range events {
exinfo := event.GetExceptionInfo()
if exinfo == nil {
continue
}

apiEx, ok := spanExceptions.apiExceptions[event.GetApiId()]
if !ok {
apiEx = NewExceptions()
spanExceptions.apiExceptions[event.GetApiId()] = apiEx
}
ex, ok := apiEx.exceptions[exinfo.GetStringValue()]
if !ok {
ex = NewException()
apiEx.exceptions[exinfo.GetStringValue()] = ex
}

ex.elapsed += int(event.GetEndElapsed())
ex.serviceType = event.GetServiceType()
ex.count++

if int(event.GetEndElapsed()) > ex.maxElapsed {
ex.maxElapsed = int(event.GetEndElapsed())
}

if ex.minElapsed == 0 || ex.minElapsed > int(event.GetEndElapsed()) {
ex.minElapsed = int(event.GetEndElapsed())
}
}

for _, event := range chunkEvents {
exinfo := event.GetExceptionInfo()
if exinfo == nil {
continue
}

apiEx, ok := spanExceptions.apiExceptions[event.GetApiId()]
if !ok {
apiEx = NewExceptions()
spanExceptions.apiExceptions[event.GetApiId()] = apiEx
}
ex, ok := apiEx.exceptions[exinfo.GetStringValue()]
if !ok {
ex = NewException()
apiEx.exceptions[exinfo.GetStringValue()] = ex
}

ex.elapsed += int(event.GetEndElapsed())
ex.serviceType = event.GetServiceType()
ex.count++

if int(event.GetEndElapsed()) > ex.maxElapsed {
ex.maxElapsed = int(event.GetEndElapsed())
}

if ex.minElapsed == 0 || ex.minElapsed > int(event.GetEndElapsed()) {
ex.minElapsed = int(event.GetEndElapsed())
}
}
return nil
}

// SpanException ...
type SpanException struct {
serType int
elapsed int
maxElapsed int
minElapsed int
averageElapsed float64
count int
errCount int
// Exceptions ...
type Exceptions struct {
exceptions map[string]*Exception
}

// NewExceptions ...
func NewExceptions() *Exceptions {
return &Exceptions{
exceptions: make(map[string]*Exception),
}
}

// Exception ...
type Exception struct {
serviceType int16
count int
elapsed int
maxElapsed int
minElapsed int
}

// NewSpanException ...
func NewSpanException() *SpanException {
return &SpanException{}
// NewException ...
func NewException() *Exception {
return &Exception{}
}
@@ -42,6 +42,7 @@ func (s *stats) Close() error {
func (s *stats) counter(app *App, wg *sync.WaitGroup) {
defer wg.Done()
// 如果最后一次计算点为0,那么放弃本次计算
// if app.lastCountTime == 0 || len(app.Agents) == 0 {
if app.lastCountTime == 0 {
// log.Println("如果最后一次计算点为0,那么放弃本次计算")
return
@@ -257,6 +257,25 @@ CREATE CUSTOM INDEX IF NOT EXISTS ON api_stats (input_date)



CREATE TABLE IF NOT EXISTS exception_stats (
app_name text,
method_id int,
exception_info text,
input_date bigint,
total_elapsed int,
max_elapsed int,
min_elapsed int,
count int,
PRIMARY KEY (app_name, method_id, exception_info, input_date)
) WITH gc_grace_seconds = 10800;


CREATE CUSTOM INDEX IF NOT EXISTS ON exception_stats (input_date)
USING 'org.apache.cassandra.index.sasi.SASIIndex'
WITH OPTIONS = {'mode': 'SPARSE'};



CREATE TABLE IF NOT EXISTS app_apis (
app_name text,
api text,
@@ -51,3 +51,8 @@ var InsertAgentStatTTL string = `
VALUES (?, ?, ?, ?, ?) USING TTL ?;`

var CheckApp string = `SELECT count(*) FROM apps WHERE app_name = ?;`

var InsertApp string = `
INSERT
INTO apps(app_name)
VALUES (?)`
@@ -10,6 +10,7 @@ import (
"github.com/mafanr/vgo/proto/pinpoint/thrift/pinpoint"
"github.com/mafanr/vgo/proto/pinpoint/thrift/trace"
"github.com/mafanr/vgo/util"
"github.com/mafanr/vgo/vgo/misc"
"github.com/vmihailenco/msgpack"
"go.uber.org/zap"
)
@@ -43,15 +44,12 @@ func (p *Pinpoint) dealUpload(conn net.Conn, inPacket *util.VgoPacket) error {
}

if !gVgo.appStore.checkApp(agentInfo.AppName) {
insertApp := `
INSERT
INTO apps(app_name)
VALUES (?)`
if err := gVgo.storage.cql.Query(
insertApp,
query := gVgo.storage.cql.Query(
misc.InsertApp,
agentInfo.AppName,
).Exec(); err != nil {
g.L.Warn("inster apps error", zap.String("error", err.Error()), zap.String("SQL", insertApp))
)
if err := query.Exec(); err != nil {
g.L.Warn("inster apps error", zap.String("error", err.Error()), zap.String("SQL", query.String()))
return err
}
}
@@ -82,6 +80,9 @@ func (p *Pinpoint) dealUpload(conn net.Conn, inPacket *util.VgoPacket) error {
g.L.Warn("msgpack.Unmarshal", zap.String("error", err.Error()))
return err
}

g.L.Info("AgentOffline", zap.String("appName", agentInfo.AppName), zap.String("agentID", agentInfo.AgentID), zap.Bool("isLive", agentInfo.IsLive))

// 清理内存缓存Agent信息
gVgo.appStore.RemoveAgent(agentInfo)

@@ -13,7 +13,7 @@ module.exports = {
proxyTable: {},

// Various Dev Server settings
host: '10.50.38.63', // can be overwritten by process.env.HOST
host: '10.50.9.207', // can be overwritten by process.env.HOST
port: 9530, // can be overwritten by process.env.PORT, if port is in use, a free one will be determined
autoOpenBrowser: true,
errorOverlay: true,
BIN +20.8 MB web/web
Binary file not shown.

0 comments on commit e639577

Please sign in to comment.