Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
congim committed Apr 30, 2019
1 parent d2987f9 commit bbcf4f3
Show file tree
Hide file tree
Showing 21 changed files with 111 additions and 663 deletions.
2 changes: 1 addition & 1 deletion agent/agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ common:
admintoken: "tracing.dev"

agent:
tracingaddr: "127.0.0.1:8081"
keepliveinterval: 3
useenv: false
env: "APM_TEST"
Expand All @@ -14,6 +13,7 @@ agent:
etcd:
addrs:
- "127.0.0.1:2379"
# 测试环境
# - "10.7.24.191:2379"
# - "10.7.24.192:2379"
timeout: 10
Expand Down
5 changes: 2 additions & 3 deletions alert/ticker/ticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,9 @@ func (t *Ticker) addTask(id int64, channel chan bool) {

func (t *Ticker) removeTask(id int64) {
t.Lock()
if task, ok := t.tasks[id]; ok {
close(task.channel)
if _, ok := t.tasks[id]; ok {
delete(t.tasks, id)
}
delete(t.tasks, id)
t.Unlock()
}

Expand Down
5 changes: 5 additions & 0 deletions collector/apps/apps.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package apps

// Apps apps
type Apps struct {
}
2 changes: 2 additions & 0 deletions collector/service/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ package service
type Agent struct {
id string // agent id
startTime int64 // 启动时间
isLive bool
hostName string
}

func newAgent(id string, starttime int64) *Agent {
Expand Down
35 changes: 35 additions & 0 deletions collector/service/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func newApp(name string, appType int32) *App {
appType: appType,
name: name,
agents: make(map[string]*Agent),
stopC: make(chan bool, 1),
tickerC: make(chan bool, 10),
spanC: make(chan *trace.TSpan, 200),
spanChunkC: make(chan *trace.TSpanChunk, 200),
Expand Down Expand Up @@ -91,6 +92,27 @@ func (a *App) storeAgent(agentid string, startTime int64) {
return
}

// storeAgent 保存agent
func (a *App) storeAgentnew(agentid string, startTime int64, isLive bool, hostName string) {
a.RLock()
agent, ok := a.agents[agentid]
a.RUnlock()
if ok {
agent.hostName = hostName
agent.isLive = isLive
return
}

agent = newAgent(agentid, startTime)
a.Lock()
agent.isLive = isLive
agent.hostName = hostName
a.agents[agentid] = agent
a.Unlock()

return
}

// stats 计算模块
func (a *App) stats() {
for {
Expand Down Expand Up @@ -209,6 +231,19 @@ func (a *App) start() {
go a.stats()
}

// close 退出
func (a *App) close() {
// 获取任务ID
logger.Info("app close", zap.String("name", a.name), zap.Int64("taskID", a.taskID))

gCollector.ticker.RemoveTask(a.taskID)
close(a.tickerC)
close(a.stopC)
close(a.spanC)
close(a.spanChunkC)
close(a.statC)
}

// apiIsExist 检查api是否缓存
func (a *App) apiIsExist(api string) bool {
a.RLock()
Expand Down
23 changes: 20 additions & 3 deletions collector/service/app_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func (a *AppStore) start() error {
logger.Warn("loadApps", zap.String("error", err.Error()))
return err
}

go func() {
for {
time.Sleep(1 * time.Second)
Expand All @@ -48,18 +49,20 @@ func (a *AppStore) loadApps() error {
}
}()

// @TODO 这里未来要做hash过滤,不属于该collector节点App信息不需要保存,以节省资源
var appName string
for appsIter.Scan(&appName) {
var appType int32
var agentID, ip string
var startTime int64
var isLive bool
var hostName string

agentsIter := cql.Query(sql.LoadAgents, appName).Iter()
for agentsIter.Scan(&appType, &agentID, &startTime, &ip) {
gCollector.apps.storeAgent(appName, agentID, startTime, appType)
for agentsIter.Scan(&appType, &agentID, &startTime, &ip, &isLive, &hostName) {
gCollector.apps.storeAgentnew(appName, agentID, startTime, appType, isLive, hostName)
misc.AddrStore.Add(appName, ip)
}

if err := agentsIter.Close(); err != nil {
logger.Warn("close apps iter error:", zap.Error(err))
}
Expand Down Expand Up @@ -93,6 +96,20 @@ func (a *AppStore) storeAgent(name string, id string, startTime int64, appType i
app.storeAgent(id, startTime)
}

func (a *AppStore) storeAgentnew(name string, id string, startTime int64, appType int32, isLive bool, hostName string) {
a.RLock()
app, ok := a.apps[name]
a.RUnlock()
if !ok {
app = newApp(name, appType)
a.Lock()
a.apps[name] = app
a.Unlock()
}
app.appType = appType
app.storeAgentnew(id, startTime, isLive, hostName)
}

// isExist agent是否存在
func (a *AppStore) isAgentExist(name, agentid string) bool {
a.RLock()
Expand Down
9 changes: 5 additions & 4 deletions collector/service/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,10 @@ func tcpClient(conn net.Conn) {
conn.Close()
}
close(quitC)
app, ok := gCollector.apps.getApp(appname)
if ok {
app.close()
}

if err := gCollector.storage.UpdateAgentState(appname, agentid, false); err != nil {
logger.Warn("update agent state Store", zap.String("error", err.Error()))
Expand Down Expand Up @@ -235,10 +239,7 @@ func tcpClient(conn net.Conn) {
}
break
case constant.TypeOfSystem:
// if err := v.dealSystem(packet); err != nil {
// logger.Warn("dealSystem", zap.String("error", err.Error()))
// return
// }

log.Println("TypeOfSystem")
break
}
Expand Down
6 changes: 6 additions & 0 deletions collector/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,18 @@ func (s *Storage) AgentStore(agentInfo *network.AgentInfo, islive bool) error {

// UpdateAgentState agent在线状态更新
func (s *Storage) UpdateAgentState(appname string, agentid string, islive bool) error {
var entTime int64
if !islive {
entTime = time.Now().Unix() * 1000
}
query := s.cql.Query(
sql.UpdateAgentState,
islive,
entTime,
appname,
agentid,
)

if err := query.Exec(); err != nil {
s.logger.Warn("update agent state error", zap.String("SQL", query.String()), zap.String("error", err.Error()))
return err
Expand Down
20 changes: 20 additions & 0 deletions collector/ticker/ticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,18 @@ func (t *Tickers) AddTask(id int64, channel chan bool) {
t.tickers[key].addTask(id, channel)
}

// RemoveTask 添加任务
func (t *Tickers) RemoveTask(id int64) {
// id 通过hash计算出来key
key, err := t.hash.Get(fmt.Sprintf("%d", id))
if err != nil {
logger.Warn("hash get", zap.String("error", err.Error()))
return
}
// 加入任务
t.tickers[key].removeTask(id)
}

// Ticker 定时器
type Ticker struct {
sync.RWMutex
Expand Down Expand Up @@ -106,6 +118,14 @@ func (t *Ticker) addTask(id int64, channel chan bool) {
t.Unlock()
}

func (t *Ticker) removeTask(id int64) {
t.Lock()
if _, ok := t.tasks[id]; ok {
delete(t.tasks, id)
}
t.Unlock()
}

// Task 任务
type Task struct {
id int64 // 编号
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ var InsertAgent string = `INSERT INTO agents (app_name, agent_id, service_type,
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);`

// 更新agent 在线信息
var UpdateAgentState string = `UPDATE agents SET is_live=? WHERE app_name =? AND agent_id =?;`
var UpdateAgentState string = `UPDATE agents SET is_live=?, end_time=? WHERE app_name =? AND agent_id =?;`

// agent info 信息入库
var InsertAgentInfo string = `INSERT INTO agents (app_name, agent_id, start_time, agent_info)
Expand Down Expand Up @@ -150,7 +150,7 @@ var InsertUnknowParentMap string = `INSERT INTO service_map (
var InsertAPIMapStats string = `INSERT INTO api_map (source_name, source_type, target_name, target_type, access_count, access_err_count, access_duration, api, input_date)
VALUES (?,?,?,?,?,?,?,?,?);`

var LoadAgents string = `SELECT service_type, agent_id, start_time, ip FROM agents WHERE app_name=?;`
var LoadAgents string = `SELECT service_type, agent_id, start_time, ip, is_live, host_name FROM agents WHERE app_name=?;`
var LoadApps string = `SELECT app_name FROM apps ;`

// // InsertRuntimeStats ...
Expand Down
25 changes: 12 additions & 13 deletions pkg/util/agent_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,18 @@ package util

// AgentInfo ...
type AgentInfo struct {
AppName string `db:"app_name" json:"applicationName" msg:"applicationName"`
AgentID string `db:"agent_id" json:"agentId" msg:"agentId"`
ServiceType int32 `db:"ser_type" json:"serviceType" msg:"serviceType"`
SocketID int32 `db:"socket_id" json:"socketId" msg:"socketId"`
HostName string `db:"host_name" json:"hostName" msg:"hostName"`
IP4S string `db:"ip" json:"ip" msg:"ip"`
Pid int32 `db:"pid" json:"pid" msg:"pid"`
Version string `db:"version" json:"version" msg:"version"`
StartTimestamp int64 `db:"start_time" json:"startTimestamp" msg:"startTimestamp"`
EndTimestamp int64 `db:"end_time" json:"end_time" msg:"end_time"`
IsLive bool `db:"is_live" json:"is_live" msg:"is_live"`
IsContainer bool `db:"is_container" json:"is_container" msg:"is_container"`
OperatingEnv int32 `db:"operating_env" json:"operating_env" msg:"operating_env"`
AppName string `db:"app_name" json:"applicationName" msg:"applicationName"`
AgentID string `db:"agent_id" json:"agentId" msg:"agentId"`
Type int32 `db:"ser_type" json:"serviceType" msg:"serviceType"`
HostName string `db:"host_name" json:"hostName" msg:"hostName"`
IP4S string `db:"ip" json:"ip" msg:"ip"`
Pid int32 `db:"pid" json:"pid" msg:"pid"`
Version string `db:"version" json:"version" msg:"version"`
StartTime int64 `db:"start_time" json:"startTimestamp" msg:"startTimestamp"`
StopTime int64 `db:"end_time" json:"end_time" msg:"end_time"`
IsLive bool `db:"is_live" json:"is_live" msg:"is_live"`
IsContainer bool `db:"is_container" json:"is_container" msg:"is_container"`
OperatingEnv int32 `db:"operating_env" json:"operating_env" msg:"operating_env"`
}

// NewAgentInfo ...
Expand Down
16 changes: 0 additions & 16 deletions pkg/util/app.go

This file was deleted.

22 changes: 0 additions & 22 deletions pkg/util/cmd.go

This file was deleted.

79 changes: 0 additions & 79 deletions pkg/util/const.go

This file was deleted.

Loading

0 comments on commit bbcf4f3

Please sign in to comment.