Skip to content
Permalink
Browse files

update

  • Loading branch information...
congim committed Jun 3, 2019
1 parent b27d1b7 commit 6df2a9b24817a4fb9231d98c9312162b5c7b43cf
@@ -0,0 +1,31 @@
package service

import "sync"

// DubboAPIMap ...
type DubboAPIMap struct {
sync.RWMutex
APIs map[string]string
}

// NewDubboAPIMap ...
func NewDubboAPIMap() *DubboAPIMap {
return &DubboAPIMap{
APIs: make(map[string]string),
}
}

// Get ...
func (d *DubboAPIMap) Get(api string) (string, bool) {
d.RLock()
appName, ok := d.APIs[api]
d.RUnlock()
return appName, ok
}

// Add ...
func (d *DubboAPIMap) Add(api, appName string) {
d.Lock()
d.APIs[api] = appName
d.Unlock()
}
@@ -144,7 +144,7 @@ func (a *App) stats() {

for {
select {
// 二次聚合的api入库
// 二次聚合之后的api信息入库
case _, ok := <-a.apiTickerC:
if ok {
if err := a.apiStatsStore(); err != nil {
@@ -175,7 +175,7 @@ func (a *App) stats() {
logger.Warn("stats span error", zap.String("error", err.Error()))
}
}
// api二次聚合
// 接收到其他collecotor发送来的api信息,进行二次聚合
case packet, ok := <-a.apiC:
if ok {
if err := a.statsApi(packet); err != nil {
@@ -211,6 +211,7 @@ func (a *App) statsApi(packet *alert.Data) error {
a.apiCache[packet.Time] = cacheApp
}

// 计算http信息
for urlStr, tmpUrl := range newApp.Urls {
url, ok := cacheApp.Urls[urlStr]
if !ok {
@@ -230,7 +231,28 @@ func (a *App) statsApi(packet *alert.Data) error {
url.SatisfactionCount += tmpUrl.SatisfactionCount
url.TolerateCount += tmpUrl.TolerateCount
}
// @TODO dubbo

// 计算Dubbo信息
for dubboStr, tmpDubbo := range newApp.Dubbos {
dubbo, ok := cacheApp.Dubbos[dubboStr]
if !ok {
dubbo = stats.NewDubbo()
cacheApp.Dubbos[dubboStr] = dubbo
}
dubbo.Duration += tmpDubbo.Duration
if dubbo.MinDuration > tmpDubbo.MinDuration {
dubbo.MinDuration = tmpDubbo.MinDuration
}
if dubbo.MaxDuration < tmpDubbo.MaxDuration {
dubbo.MaxDuration = tmpDubbo.MaxDuration
}

dubbo.AccessCount += tmpDubbo.AccessCount
dubbo.AccessErrCount += tmpDubbo.AccessErrCount
dubbo.SatisfactionCount += tmpDubbo.SatisfactionCount
dubbo.TolerateCount += tmpDubbo.TolerateCount
}

return nil
}

@@ -243,6 +265,10 @@ func (a *App) statsSpan(span *trace.TSpan) error {
return err
}
a.storeAPI(span.GetRPC())
// 保存dubbo api
if span.GetServiceType() == constant.DUBBO_PROVIDER {
gCollector.apps.dubbo.Add(span.GetRPC(), span.GetApplicationName())
}
}

// 计算当前span时间范围点
@@ -258,7 +284,7 @@ func (a *App) statsSpan(span *trace.TSpan) error {
// 查找时间点,不存在新申请, span统计的范围是分钟,所以这里直接用优化过后的spanTime
stats, ok := a.statsCache[spanTime]
if !ok {
stats = plugin.NewStats(a.httpCodes, a.mutex, logger, getNameByIP)
stats = plugin.NewStats(a.httpCodes, a.mutex, logger, getNameByIP, getNameByDubboAPI)
a.statsCache[spanTime] = stats
}
stats.SpanCounter(span)
@@ -281,7 +307,7 @@ func (a *App) statsAgentStat(agentStat *pinpoint.TAgentStat) error {
// 查找时间点,不存在新申请
stats, ok := a.statsCache[agentStatTime]
if !ok {
stats = plugin.NewStats(a.httpCodes, a.mutex, logger, getNameByIP)
stats = plugin.NewStats(a.httpCodes, a.mutex, logger, getNameByIP, getNameByDubboAPI)
a.statsCache[agentStatTime] = stats
}

@@ -300,7 +326,7 @@ func (a *App) statsSpanChunk(spanChunk *trace.TSpanChunk) error {
// 查找时间点,不存在新申请
stats, ok := a.statsCache[spanChunkTime]
if !ok {
stats = plugin.NewStats(a.httpCodes, a.mutex, logger, getNameByIP)
stats = plugin.NewStats(a.httpCodes, a.mutex, logger, getNameByIP, getNameByDubboAPI)
a.statsCache[spanChunkTime] = stats
}

@@ -539,11 +565,6 @@ func (a *App) statsStore() error {
logger.Warn("publish", zap.Error(err))
}
}

// dubbo直接入库
for dubboApi, dubbo := range app.Dubbos {
gCollector.storage.InsertDubboStats(a.name, inputDate, dubboApi, dubbo)
}
}

// 上报打点信息并删除该时间点信息
@@ -590,6 +611,18 @@ func (a *App) apiStatsStore() error {
apis.APIS[urlStr] = apiAlert
}

// 遍历入库
for dubboAPI, dubbo := range a.apiCache[inputDate].Dubbos {
gCollector.storage.InsertDubboStats(a.name, inputDate, dubboAPI, dubbo)
apiAlert := &alert.API{
Desc: dubboAPI,
AccessCount: dubbo.AccessCount,
AccessErrCount: dubbo.AccessErrCount,
Duration: dubbo.Duration,
}
apis.APIS[dubboAPI] = apiAlert
}

// 发送给alert服务
// 有api数据发送给mq
if len(apis.APIS) > 0 {
@@ -25,6 +25,7 @@ type Apps struct {
apps map[string]*App // app集合
ips map[string]string
hosts map[string]string
dubbo *DubboAPIMap // dubbo api映射记录
}

func (a *Apps) start() error {
@@ -35,6 +36,10 @@ func (a *Apps) start() error {
if err := a.loadApiCodeSrv(); err != nil {
return err
}

if err := a.loadAppNameDubboSrv(); err != nil {
return err
}
return nil
}

@@ -189,6 +194,47 @@ func (a *Apps) loadApps(cql *gocql.Session) error {
return nil
}

// loadAppsStart 加载app
func (a *Apps) loadAppNameDubboSrv() error {
cql := gCollector.storage.GetStaticCql()
if err := a.loadAppNameDubbo(cql); err != nil {
logger.Warn("load app name by dubbo type", zap.String("error", err.Error()))
return err
}

go func() {
for {
time.Sleep(time.Duration(misc.Conf.Apps.LoadInterval) * time.Second)
cql := gCollector.storage.GetStaticCql()
if err := a.loadAppNameDubbo(cql); err != nil {
logger.Warn("load app name by dubbo type", zap.String("error", err.Error()))
}
}
}()
return nil
}

func (a *Apps) loadAppNameDubbo(cql *gocql.Session) error {
if cql == nil {
return fmt.Errorf("get cql failed")
}
apisIter := cql.Query(sql.LoadDubboApis).Consistency(gocql.One).Iter()
defer func() {
if err := apisIter.Close(); err != nil {
logger.Warn("close apps iter error:", zap.Error(err))
}
}()

var appName, api string
var apiType int32
for apisIter.Scan(&appName, &api, &apiType) {
if int16(apiType) == constant.DUBBO_PROVIDER {
gCollector.apps.dubbo.Add(api, appName)
}
}
return nil
}

// isExist app是否存在
func (a *Apps) storeIPandHost(appName, ip, host string) bool {
a.Lock()
@@ -245,6 +291,7 @@ func newApps() *Apps {
apps: make(map[string]*App),
ips: make(map[string]string),
hosts: make(map[string]string),
dubbo: NewDubboAPIMap(),
}
}

@@ -362,3 +362,9 @@ func getNameByIP(ip string) (string, bool) {
appName, ok := gCollector.apps.getNameByIP(ip)
return appName, ok
}

// getNameByDubboAPI 通过api获取应用名
func getNameByDubboAPI(api string) (string, bool) {
appName, ok := gCollector.apps.dubbo.Get(api)
return appName, ok
}

0 comments on commit 6df2a9b

Please sign in to comment.
You can’t perform that action at this time.