Skip to content

Commit

Permalink
Kvscheduler transaction statistics (#1298)
Browse files Browse the repository at this point in the history
Signed-off-by: Milan Lenco <milan.lenco@pantheon.tech>
  • Loading branch information
Milan Lenčo authored and ondrej-fabry committed Apr 26, 2019
1 parent 5d8e4ed commit 2fb311e
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 36 deletions.
80 changes: 51 additions & 29 deletions plugins/kvscheduler/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/ligato/vpp-agent/pkg/metrics"
kvs "github.com/ligato/vpp-agent/plugins/kvscheduler/api"
)

var (
Expand All @@ -32,30 +33,20 @@ func init() {
stats.GraphMethods.Methods = make(metrics.Calls)
stats.AllDescriptors.Methods = make(metrics.Calls)
stats.Descriptors = make(map[string]*StructStats)
}

/*func GetDescriptorStats() map[string]metrics.Calls {
ss := make(map[string]metrics.Calls, len(stats.Descriptors))
statsMu.RLock()
for d, ds := range stats.Descriptors {
cc := make(metrics.Calls, len(ds))
for c, cs := range ds {
css := *cs
cc[c] = &css
stats.TxnStats.Methods = make(metrics.Calls)
stats.TxnStats.OperationCount = make(map[string]uint64)
stats.TxnStats.ValueStateCount = make(map[string]uint64)
for state := range kvs.ValueState_value {
stats.TxnStats.ValueStateCount[state] = 0
}
for op, opVal := range kvs.TxnOperation_name {
if op == int32(kvs.TxnOperation_UNDEFINED) ||
op == int32(kvs.TxnOperation_VALIDATE) {
continue
}
ss[d] = cc
stats.TxnStats.OperationCount[opVal] = 0
}
statsMu.RUnlock()
return ss
}*/

/*func GetGraphStats() *metrics.CallStats {
s := make(metrics.Calls, len(stats.Descriptors))
statsMu.RLock()
*s = stats.Graph
statsMu.RUnlock()
return s
}*/
}

func GetStats() *Stats {
s := new(Stats)
Expand All @@ -66,8 +57,7 @@ func GetStats() *Stats {
}

type Stats struct {
TransactionsProcessed uint64

TxnStats TxnStats
GraphMethods StructStats
AllDescriptors StructStats
Descriptors map[string]*StructStats
Expand All @@ -79,16 +69,19 @@ func (s *Stats) addDescriptor(name string) {
}
}

type TxnStats struct {
TotalProcessed uint64
OperationCount map[string]uint64
ValueStateCount map[string]uint64
ErrorCount uint64
Methods metrics.Calls
}

type StructStats struct {
Methods metrics.Calls `json:"-,omitempty"`
}

func (s *StructStats) MarshalJSON() ([]byte, error) {
/*d := make(map[string]*metrics.CallStats, len(s.Methods))
for _, ms := range s.Methods {
m := fmt.Sprintf("%s()", ms.Name)
d[m] = ms
}*/
return json.Marshal(s.Methods)
}

Expand Down Expand Up @@ -129,6 +122,35 @@ func trackGraphMethod(m string) func() {
}
}

func trackTransactionMethod(m string) func() {
t := time.Now()
s := stats.TxnStats
return func() {
took := time.Since(t)
statsMu.Lock()
ms, tracked := s.Methods[m]
if !tracked {
ms = &metrics.CallStats{Name: m}
s.Methods[m] = ms
}
ms.Increment(took)
statsMu.Unlock()
}
}

func updateTransactionStats(execOps kvs.RecordedTxnOps) {
statsMu.Lock()
defer statsMu.Unlock()
stats.TxnStats.TotalProcessed++
for _, op := range execOps {
if op.NewErr != nil {
stats.TxnStats.ErrorCount++
}
stats.TxnStats.OperationCount[op.Operation.String()]++
stats.TxnStats.ValueStateCount[op.NewState.String()]++
}
}

func init() {
expvar.Publish("kvscheduler", expvar.Func(func() interface{} {
return GetStats()
Expand Down
5 changes: 5 additions & 0 deletions plugins/kvscheduler/txn_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ func (s *Scheduler) executeTransaction(txn *transaction, graphW graph.RWAccess,
op = "simulate transaction"
}
defer trace.StartRegion(txn.ctx, op).End()
if dryRun {
defer trackTransactionMethod("simulateTransaction")()
} else {
defer trackTransactionMethod("executeTransaction")()
}

if s.logGraphWalk {
msg := fmt.Sprintf("%s (seqNum=%d)", op, txn.seqNum)
Expand Down
21 changes: 14 additions & 7 deletions plugins/kvscheduler/txn_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package kvscheduler
import (
"context"
"runtime/trace"
"sync/atomic"
"time"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -93,25 +92,28 @@ func (s *Scheduler) consumeTransactions() {
return
}
s.processTransaction(txn)
atomic.AddUint64(&stats.TransactionsProcessed, 1)
}
}

// processTransaction processes transaction in 6 steps:
// 1. Pre-processing: transaction parameters are initialized, retry operations
// are filtered from the obsolete ones and for the resync the graph is refreshed
// 2. Simulation: simulating transaction without actually executing any of the
// 2. Ordering: pre-order operations using a heuristic to get the shortest graph
// walk in average
// 3. Simulation: simulating transaction without actually executing any of the
// Create/Delete/Update operations in order to obtain the "execution plan"
// 3. Pre-recording: logging transaction arguments + plan before execution to
// 4. Pre-recording: logging transaction arguments + plan before execution to
// persist some information in case there is a crash during execution
// 4. Execution: executing the transaction, collecting errors
// 5. Recording: recording the finalized transaction (log + in-memory)
// 6. Post-processing: scheduling retry for failed operations, propagating value
// 5. Execution: executing the transaction, collecting errors
// 6. Recording: recording the finalized transaction (log + in-memory)
// 7. Post-processing: scheduling retry for failed operations, propagating value
// state updates to the subscribers and returning error/nil to the caller
// of blocking commit
// 8. Update of transaction statistics
func (s *Scheduler) processTransaction(txn *transaction) {
s.txnLock.Lock()
defer s.txnLock.Unlock()
defer trackTransactionMethod("processTransaction")()

startTime := time.Now()

Expand Down Expand Up @@ -154,12 +156,16 @@ func (s *Scheduler) processTransaction(txn *transaction) {

// 7. Post-processing:
s.postProcessTransaction(txn, executedOps)

// 8. Statistics:
updateTransactionStats(executedOps)
}

// preProcessTransaction initializes transaction parameters, filters obsolete retry
// operations and refreshes the graph for resync.
func (s *Scheduler) preProcessTransaction(txn *transaction) (skipExec, skipSimulation, record bool) {
defer trace.StartRegion(txn.ctx, "preProcessTransaction").End()
defer trackTransactionMethod("preProcessTransaction")()

// allocate new transaction sequence number
txn.seqNum = s.txnSeqNumber
Expand Down Expand Up @@ -299,6 +305,7 @@ func (s *Scheduler) preProcessRetryTxn(txn *transaction) (skip bool) {
// commit.
func (s *Scheduler) postProcessTransaction(txn *transaction, executed kvs.RecordedTxnOps) {
defer trace.StartRegion(txn.ctx, "postProcessTransaction").End()
defer trackTransactionMethod("postProcessTransaction")()

// collect new failures (combining derived with base)
toRetry := utils.NewSliceBasedKeySet()
Expand Down
2 changes: 2 additions & 0 deletions plugins/kvscheduler/txn_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func (s *Scheduler) preRecordTxnOp(args *applyValueArgs, node graph.Node) *kvs.R
func (s *Scheduler) preRecordTransaction(txn *transaction, planned kvs.RecordedTxnOps,
skippedSimulation bool) *kvs.RecordedTxn {
defer trace.StartRegion(txn.ctx, "preRecordTransaction").End()
defer trackTransactionMethod("preRecordTransaction")()

// allocate new transaction record
record := &kvs.RecordedTxn{
Expand Down Expand Up @@ -163,6 +164,7 @@ func (s *Scheduler) preRecordTransaction(txn *transaction, planned kvs.RecordedT
// recordTransaction records the finalized transaction (log + in-memory).
func (s *Scheduler) recordTransaction(txn *transaction, txnRecord *kvs.RecordedTxn, executed kvs.RecordedTxnOps, start, stop time.Time) {
defer trace.StartRegion(txn.ctx, "recordTransaction").End()
defer trackTransactionMethod("recordTransaction")()

txnRecord.PreRecord = false
txnRecord.Start = start
Expand Down

0 comments on commit 2fb311e

Please sign in to comment.