Permalink
Browse files

update

  • Loading branch information...
shaocongcong committed Jan 31, 2019
1 parent 8f0eca3 commit 027623b993cd914868af22fa4576fa931b3f2915
@@ -22,3 +22,5 @@ var ChunkEventsIterTrace string = `SELECT span_event_list FROM traces_chunk WHER
var UpdateLastCounterTime string = `UPDATE apps SET last_count_time=? WHERE app_name=?;`

var InsertAPIs string = `INSERT INTO app_apis (app_name, api) VALUES (?, ?) ;`

var QueryTraceID string = `SELECT trace_id, span_id FROM app_operation_index WHERE app_name=? and input_date>? and input_date<=?;`
@@ -14,7 +14,7 @@ import (
type Analyze struct {
stats Stats // 离线统计
blink Blink // 实时计算
serDiscovery SerDiscovery // 服务发现
srvDiscovery SrvDiscovery // 服务发现
cql *g.Cassandra
appStore *AppStore
hash *g.Hash
@@ -31,7 +31,7 @@ func New() *Analyze {
blink: newBlink(),
cql: g.NewCassandra(),
hash: g.NewHash(),
serDiscovery: newEtcd(),
srvDiscovery: newEtcd(),
analyzes: make(map[string]string),
}
gAnalyze = analyze
@@ -49,11 +49,11 @@ func (analyze *Analyze) Start() error {

reportDir := initDir(misc.Conf.Etcd.ReportDir)

if err := analyze.serDiscovery.Init(reportDir+reportValue, reportValue, watchDir); err != nil {
if err := analyze.srvDiscovery.Init(reportDir+reportValue, reportValue, watchDir); err != nil {
g.L.Fatal("Start etcd.Start", zap.String("error", err.Error()))
}

if err := analyze.serDiscovery.Start(); err != nil {
if err := analyze.srvDiscovery.Start(); err != nil {
g.L.Fatal("Start etcd.Start", zap.String("error", err.Error()))
}

@@ -91,8 +91,8 @@ func (analyze *Analyze) Close() error {
analyze.cql.Close()
}

if analyze.serDiscovery != nil {
analyze.serDiscovery.Close()
if analyze.srvDiscovery != nil {
analyze.srvDiscovery.Close()
}

g.L.Info("Close ok!")
@@ -55,11 +55,12 @@ func (appStore *AppStore) LoadAppAndCounter() {
for {
select {
case <-ticker.C:
// 定时加载app,然后计算每一个APP的数据
if err := appStore.loadApp(); err != nil {
g.L.Warn("loadApp", zap.String("error", err.Error()))
break
}

// 计算模块
if err := gAnalyze.stats.Counter(); err != nil {
g.L.Warn("Counter", zap.String("error", err.Error()))
break
@@ -11,8 +11,8 @@ import (
"go.uber.org/zap"
)

// SerDiscovery ...
type SerDiscovery interface {
// SrvDiscovery ...
type SrvDiscovery interface {
Init(reportKey, reportValue, watchDir string) error
Start() error
Close()
@@ -29,6 +29,7 @@ func spanCounter(traceID string, spanID int64, es map[int64]*Element) error {

var chunkEvents []*trace.TSpanEvent

// 查询分片的span信息
{
var spanChunkEventList []byte
iterChunkEvents := gAnalyze.appStore.cql.Session.Query(misc.ChunkEventsIterTrace, traceID, spanID).Iter()
@@ -59,6 +60,7 @@ DoSpan:
continue
}

// 查询缓存并记录API信息到数据
if app, ok := gAnalyze.appStore.getApp(appName); ok {
if _, ok := app.getAPI(rpc); !ok {
query := gAnalyze.cql.Session.Query(misc.InsertAPIs, appName, rpc)
@@ -69,10 +71,15 @@ DoSpan:
}
}

// 对index时间点到数据进行计算
if e, ok := es[index]; ok {
// API
e.apis.apiCounter(rpc, elapsed, isErr)
// 内部事件method_id相关计算
e.events.eventsCounter(rpc, spanEvents, chunkEvents)
// SQL统计
e.sqls.sqlCounter(spanEvents, chunkEvents)
// 异常统计
e.exceptions.exceptionCounter(spanEvents, chunkEvents)
}
}
@@ -60,36 +60,41 @@ func (s *stats) counter(app *App, wg *sync.WaitGroup) {
return
}

// 计算出每个分钟点,并生成map
es := GetElements(queryStartTime, queryEndTime)
queryTraceID := `SELECT trace_id, span_id FROM app_operation_index WHERE app_name=? and input_date>? and input_date<=?;`
iterTraceID := gAnalyze.appStore.cql.Session.Query(queryTraceID, app.AppName, queryStartTime, queryEndTime).Iter()

// 根据时间范围查出所有符合范围的traceID
iterTraceID := gAnalyze.appStore.cql.Session.Query(misc.QueryTraceID, app.AppName, queryStartTime, queryEndTime).Iter()

defer func() {
if err := iterTraceID.Close(); err != nil {
g.L.Warn("close iter error:", zap.Error(err))
}
}()
// SELECT trace_id, span_id FROM app_operation_index WHERE app_name='helm' and input_date>1548514140000 and input_date<=1548514200000;

var traceID string
var spanID int64

// 根据traceID 查出span并进行计算
for iterTraceID.Scan(&traceID, &spanID) {
spanCounter(traceID, spanID, es)
}

// 统计jvm信息
statsCounter(app, queryStartTime, queryEndTime, es)

// @TODO记录计算结果
// 记录计算结果
for recordTime, e := range es {
spanCounterRecord(app, recordTime, e)
}

// @TODO
// 记录计算时间到表
// 将本次计算时间记录到表中
query := gAnalyze.cql.Session.Query(misc.UpdateLastCounterTime, queryEndTime, app.AppName)
if err := query.Exec(); err != nil {
g.L.Warn("update Last Counter Time error", zap.String("error", err.Error()), zap.String("SQL", query.String()))
return
}
// 缓存到内存
app.lastCountTime = queryEndTime
}

@@ -10,8 +10,8 @@ import (
"go.etcd.io/etcd/clientv3"
)

// SerDiscovery ...
type SerDiscovery interface {
// SrvDiscovery ...
type SrvDiscovery interface {
Start() error
Close()
Watch()
@@ -19,7 +19,7 @@ type Vgo struct {
storage *Storage // 存储
pinpoint *Pinpoint // 处理pinpoint 数据
appStore *AppStore
serDiscovery SerDiscovery
srvDiscovery SrvDiscovery
}

var gVgo *Vgo
@@ -30,15 +30,15 @@ func New() *Vgo {
storage: NewStorage(),
pinpoint: NewPinpoint(),
appStore: NewAppStore(),
serDiscovery: newEtcd(),
srvDiscovery: newEtcd(),
}
return gVgo
}

// Start ...
func (v *Vgo) Start() error {

if err := v.serDiscovery.Start(); err != nil {
if err := v.srvDiscovery.Start(); err != nil {
g.L.Fatal("Start:etcd.Start", zap.String("error", err.Error()))
return err
}

0 comments on commit 027623b

Please sign in to comment.