Skip to content

Commit

Permalink
Merge pull request #169 from jostanislas/dev
Browse files Browse the repository at this point in the history
Monitoring enhancements
  • Loading branch information
NeetishPathak committed Dec 21, 2023
2 parents b06d739 + 8a2b6e6 commit 74081b4
Show file tree
Hide file tree
Showing 24 changed files with 766 additions and 629 deletions.
62 changes: 20 additions & 42 deletions cmd/proxy/proc/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,9 @@ func (r *ProxyInResponseContext) OnComplete() {
}
logging.LogToCal(opcode, r.GetOpStatus(), rht, calData)
}
if otel.IsEnabled() {
otel.RecordOperation(r.stats.Opcode.String(), r.stats.ResponseStatus.ShortNameString(), int64(rhtus))
}

otel.RecordOperation(r.stats.Opcode.String(), r.stats.ResponseStatus, int64(rhtus))

r.stats.OnComplete(uint32(rhtus), r.GetOpStatus())
proxystats.SendProcState(r.stats)

Expand Down Expand Up @@ -339,9 +339,7 @@ func (p *ProcessorBase) replyToClient(resp *ResponseWrapper) {
if cal.IsEnabled() {
calLogReqProcError(kDecrypt, []byte(errmsg))
}
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Operation, kDecrypt}, {otel.Status, otel.StatusError}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Operation, kDecrypt}, {otel.Status, otel.StatusError}})
msg := p.clientRequest.CreateResponse()
msg.SetOpStatus(proto.OpStatusInternal)
var raw proto.RawMessage
Expand Down Expand Up @@ -571,9 +569,7 @@ func (p *ProcessorBase) validateInboundRequest(r *proto.OperationalMessage) bool
data.AddReqIdString(r.GetRequestIDString())
data.AddInt([]byte("len"), szKey)
calLogReqProcEvent(kBadParamInvalidKeyLen, data.Bytes())
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidKeyLen}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidKeyLen}})
return false
}
szNs := len(r.GetNamespace())
Expand All @@ -584,9 +580,7 @@ func (p *ProcessorBase) validateInboundRequest(r *proto.OperationalMessage) bool
data.AddReqIdString(r.GetRequestIDString())
data.AddInt([]byte("len"), szNs)
calLogReqProcEvent(kBadParamInvalidNsLen, data.Bytes())
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidNsLen}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidNsLen}})
return false
}
ttl := r.GetTimeToLive()
Expand All @@ -597,9 +591,7 @@ func (p *ProcessorBase) validateInboundRequest(r *proto.OperationalMessage) bool
data.AddReqIdString(r.GetRequestIDString())
data.AddInt([]byte("ttl"), int(ttl))
calLogReqProcEvent(kBadParamInvalidTTL, data.Bytes())
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidTTL}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidTTL}})
return false
}
} else {
Expand All @@ -610,9 +602,7 @@ func (p *ProcessorBase) validateInboundRequest(r *proto.OperationalMessage) bool
data.AddInt([]byte("len"), szKey)
calLogReqProcEvent(kBadParamInvalidKeyLen, data.Bytes())
glog.Warningf("limit exceeded: key length %d > %d", szKey, limits.MaxKeyLength)
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidKeyLen}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidKeyLen}})
return false
}
if limits.MaxTimeToLive != 0 && ttl > limits.MaxTimeToLive {
Expand All @@ -621,9 +611,7 @@ func (p *ProcessorBase) validateInboundRequest(r *proto.OperationalMessage) bool
data.AddInt([]byte("ttl"), int(ttl))
calLogReqProcEvent(kBadParamInvalidTTL, data.Bytes())
glog.Warningf("limit exceeded: TTL %d > %d", ttl, limits.MaxTimeToLive)
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidTTL}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidTTL}})
return false
}
szValue := r.GetPayloadValueLength()
Expand All @@ -633,9 +621,7 @@ func (p *ProcessorBase) validateInboundRequest(r *proto.OperationalMessage) bool
data.AddInt([]byte("len"), int(szValue))
calLogReqProcEvent(kBadParamInvalidValueLen, data.Bytes())
glog.Warningf("limit exceeded: payload length %d > %d", szValue, limits.MaxPayloadLength)
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidValueLen}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidValueLen}})
return false
}
}
Expand Down Expand Up @@ -692,11 +678,11 @@ func (p *ProcessorBase) Process(request io.IRequestContext) bool {
}
}
}
if otel.IsEnabled() {
if p.clientRequest.IsForReplication() {
otel.RecordCount(otel.RAPI, nil)
}

if p.clientRequest.IsForReplication() {
otel.RecordCount(otel.RAPI, nil)
}

p.shardId = shardId.Uint16()

if err := proto.SetShardId(p.requestContext.GetMessage(), p.shardId); err != nil {
Expand Down Expand Up @@ -770,9 +756,7 @@ func (p *ProcessorBase) OnRequestTimeout() {
b.AddOpCode(p.clientRequest.GetOpCode()).AddReqIdString(p.requestID)
calLogReqProcEvent(kReqTimeout, b.Bytes())
}
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kReqTimeout}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kReqTimeout}})
p.replyStatusToClient(proto.OpStatusBusy)
}
now := time.Now()
Expand All @@ -794,9 +778,7 @@ func (p *ProcessorBase) OnCancelled() {
b.AddOpCode(p.clientRequest.GetOpCode()).AddReqIdString(p.requestID)
calLogReqProcEvent(kReqCancelled, b.Bytes())
}
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kReqCancelled}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kReqCancelled}})
p.replyStatusToClient(proto.OpStatusBusy)
}
now := time.Now()
Expand Down Expand Up @@ -841,10 +823,8 @@ func (p *ProcessorBase) handleSSTimeout(now time.Time) {
writeBasicSSRequestInfo(b, st.opCode, int(st.ssIndex), p.ssGroup.processors[st.ssIndex].GetConnInfo(), p)
calLogReqProcEvent(calNameReqTimeoutFor(st.opCode), b.Bytes())
}
if otel.IsEnabled() {
status := otel.SSReqTimeout + "_" + st.opCode.String()
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, status}})
}
status := otel.SSReqTimeout + "_" + st.opCode.String()
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, status}})
if cluster.GetShardMgr().StatsEnabled() {
zoneId, hostId := p.ssGroup.processors[st.ssIndex].GetNodeInfo()
cluster.GetShardMgr().SendStats(zoneId, hostId, true, confSSRequestTimeout.Microseconds())
Expand Down Expand Up @@ -918,10 +898,8 @@ func (p *ProcessorBase) preprocessAndValidateResponse(resp io.IResponseContext)
errStr := strings.Replace(statusText, " ", "_", -1)
calLogReqProcEvent(fmt.Sprintf("SS_%s", errStr), buf.Bytes()) //TODO revisit log as error?
}
if otel.IsEnabled() {
errStr := strings.Replace(statusText, " ", "_", -1)
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, fmt.Sprintf("SS_%s", errStr)}})
}
errStr := strings.Replace(statusText, " ", "_", -1)
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, fmt.Sprintf("SS_%s", errStr)}})
st.state = stSSResponseIOError
st.timeRespReceived = time.Now()
p.self.OnSSIOError(st)
Expand Down
4 changes: 1 addition & 3 deletions cmd/proxy/proc/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,7 @@ func (p *CreateProcessor) setInitSSRequest() bool {
if cal.IsEnabled() {
calLogReqProcError(kEncrypt, []byte(errmsg))
}
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Operation, kEncrypt}, {otel.Status, otel.StatusError}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Operation, kEncrypt}, {otel.Status, otel.StatusError}})
p.replyStatusToClient(proto.OpStatusInternal)
return false
}
Expand Down
24 changes: 6 additions & 18 deletions cmd/proxy/proc/inbreqctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ func (r *InboundRequestContext) ValidateRequest() bool {
data.AddReqIdString(r.GetRequestIDString())
data.AddInt([]byte("len"), szKey)
calLogReqProcEvent(kBadParamInvalidKeyLen, data.Bytes())
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidKeyLen}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidKeyLen}})
return false
}
szNs := len(r.GetNamespace())
Expand All @@ -88,9 +86,7 @@ func (r *InboundRequestContext) ValidateRequest() bool {
data.AddReqIdString(r.GetRequestIDString())
data.AddInt([]byte("len"), szNs)
calLogReqProcEvent(kBadParamInvalidNsLen, data.Bytes())
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidNsLen}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidNsLen}})
return false
}
ttl := r.GetTimeToLive()
Expand All @@ -101,9 +97,7 @@ func (r *InboundRequestContext) ValidateRequest() bool {
data.AddReqIdString(r.GetRequestIDString())
data.AddInt([]byte("ttl"), int(ttl))
calLogReqProcEvent(kBadParamInvalidTTL, data.Bytes())
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidTTL}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidTTL}})
return false
}
} else {
Expand All @@ -114,9 +108,7 @@ func (r *InboundRequestContext) ValidateRequest() bool {
data.AddInt([]byte("len"), szKey)
calLogReqProcEvent(kBadParamInvalidKeyLen, data.Bytes())
glog.Warningf("limit exceeded: key length %d > %d", szKey, limits.MaxKeyLength)
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidKeyLen}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidKeyLen}})
return false
}
if limits.MaxTimeToLive != 0 && ttl > limits.MaxTimeToLive {
Expand All @@ -125,9 +117,7 @@ func (r *InboundRequestContext) ValidateRequest() bool {
data.AddInt([]byte("ttl"), int(ttl))
calLogReqProcEvent(kBadParamInvalidTTL, data.Bytes())
glog.Warningf("limit exceeded: TTL %d > %d", ttl, limits.MaxTimeToLive)
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidTTL}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidTTL}})
return false
}
szValue := r.GetPayloadValueLength()
Expand All @@ -137,9 +127,7 @@ func (r *InboundRequestContext) ValidateRequest() bool {
data.AddInt([]byte("len"), int(szValue))
calLogReqProcEvent(kBadParamInvalidValueLen, data.Bytes())
glog.Warningf("limit exceeded: payload length %d > %d", ttl, limits.MaxTimeToLive)
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidValueLen}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidValueLen}})
return false
}
}
Expand Down
8 changes: 2 additions & 6 deletions cmd/proxy/proc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,7 @@ func (p *TwoPhaseProcessor) setInitSSRequest() bool {
if cal.IsEnabled() {
calLogReqProcError(kEncrypt, []byte(errmsg))
}
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Operation, kEncrypt}, {otel.Status, otel.StatusError}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Operation, kEncrypt}, {otel.Status, otel.StatusError}})
p.replyStatusToClient(proto.OpStatusInternal)
return false
}
Expand Down Expand Up @@ -351,9 +349,7 @@ func (p *TwoPhaseProcessor) onRepairFailure(rc *SSRequestContext) {
writeBasicSSRequestInfo(buf, rc.opCode, int(rc.ssIndex), p.ssGroup.processors[rc.ssIndex].GetConnInfo(), &p.ProcessorBase)
calLogReqProcEvent(kInconsistent, buf.Bytes())
}
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kInconsistent}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kInconsistent}})
p.replyStatusToClient(proto.OpStatusInconsistent)
}
}
Expand Down
17 changes: 5 additions & 12 deletions cmd/proxy/replication/replicaterequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,8 @@ func (r *RepRequestContext) complete(calstatus string, opStatus string, rht time
cal.AtomicTransaction(targetType, opCode, calstatus, rht, r.calBuf.Bytes())
}
}
if otel.IsEnabled() {
otel.RecordReplication(opCode, opStatus, target, rht.Milliseconds())
}

otel.RecordReplication(opCode, opStatus, target, rht.Microseconds())

r.this.OnComplete()
}
Expand Down Expand Up @@ -221,9 +220,7 @@ func (r *RepRequestContext) Reply(resp io.IResponseContext) {
}
r.calBuf.AddDropReason("MaxRetry")
}
if otel.IsEnabled() {
otel.RecordCount(otel.RRDropMaxRetry, []otel.Tags{{"target", r.targetId}})
}
otel.RecordCount(otel.RRDropMaxRetry, []otel.Tags{{"target", r.targetId}})
r.errCnt.Add(1)
r.complete(cal.StatusError, opstatus.String(), rht, opCodeText, r.targetId)
return
Expand All @@ -245,9 +242,7 @@ func (r *RepRequestContext) Reply(resp io.IResponseContext) {
}
r.calBuf.AddDropReason("QueueFull")
}
if otel.IsEnabled() {
otel.RecordCount(otel.RRDropQueueFull, []otel.Tags{{otel.Target, r.targetId}})
}
otel.RecordCount(otel.RRDropQueueFull, []otel.Tags{{otel.Target, r.targetId}})
r.dropCnt.Add(1)
r.complete(cal.StatusError, opstatus.String(), rht, opCodeText, r.targetId)
}
Expand All @@ -261,9 +256,7 @@ func (r *RepRequestContext) Reply(resp io.IResponseContext) {
}
r.calBuf.AddDropReason("RecExpired")
}
if otel.IsEnabled() {
otel.RecordCount(otel.RRDropRecExpired, []otel.Tags{{otel.Target, r.targetId}})
}
otel.RecordCount(otel.RRDropRecExpired, []otel.Tags{{otel.Target, r.targetId}})
r.complete(cal.StatusSuccess, opstatus.String(), rht, opCodeText, r.targetId)
}
}
Expand Down
10 changes: 2 additions & 8 deletions cmd/storageserv/storage/proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,19 +116,13 @@ func (p *reqProcCtxT) OnComplete() {
calData.AddOpRequestResponse(&p.request, &p.response).AddRequestHandleTime(rhtus)
cal.AtomicTransaction(cal.TxnTypeAPI, opcode.String(), calst.CalStatus(), rht, calData.Bytes())
}
if otel.IsEnabled() {
otel.RecordOperation(opcode.String(), p.response.GetOpStatus().String(), int64(rhtus))
opst := p.response.GetOpStatus()
calst := logging.CalStatus(opst)
if (opst == proto.OpStatusInconsistent) || calst.NotSuccess() {
otel.RecordCount(otel.ProcErr, []otel.Tags{{otel.Operation, opcode.String() + "_" + opst.String()}, {otel.Status, otel.StatusError}})
}
}
if (opst == proto.OpStatusInconsistent) || calst.NotSuccess() {
cal.Event("ProcErr", opcode.String()+"_"+opst.String(), cal.StatusSuccess, nil)
}
}

otel.RecordOperation(opcode.String(), p.response.GetOpStatus(), int64(rhtus))

if p.cacheable {
if p.prepareCtx != nil {
if p.prepareCtx.cacheable {
Expand Down
29 changes: 29 additions & 0 deletions docker/manifest/config/otel/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# connfig ref : https://opentelemetry.io/docs/collector/configuration/
receivers:
otlp:
protocols:
http:
endpoint: "0.0.0.0:4318"


exporters:
# Data sources: metrics
prometheus:
endpoint: "0.0.0.0:8889"
namespace: default
send_timestamps: true
metric_expiration: 180m
# resource_to_telemetry_conversion:
# enabled: true

extensions:
health_check:
pprof:
zpages:

service:
extensions: [health_check, pprof, zpages]
pipelines:
metrics:
receivers: [otlp]
exporters: [prometheus]
6 changes: 6 additions & 0 deletions docker/manifest/config/prometheus/prometheus.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
scrape_configs:
- job_name: 'otel'
scrape_interval: 10s
static_configs:
- targets: ['otel:8888']
- targets: ['otel:8889']

0 comments on commit 74081b4

Please sign in to comment.