Skip to content

Commit

Permalink
[dbnode] Ignore Dupe ID Errors Earlier and Add Metrics/Error Logs to …
Browse files Browse the repository at this point in the history
…Shard Insert Queue (#2322)

Co-authored-by: Rob Skillington <rob@chronosphere.io>
  • Loading branch information
notbdu and robskillington committed May 8, 2020
1 parent a6fb73c commit fb9c9e0
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 34 deletions.
192 changes: 189 additions & 3 deletions integrations/grafana/m3db_dashboard.json
Original file line number Diff line number Diff line change
Expand Up @@ -5234,18 +5234,100 @@
"hide": false,
"intervalFactor": 1,
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeShift": null,
"title": "Index Queue",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "$datasource",
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 7,
"w": 8,
"x": 8,
"y": 105
},
"hiddenSeries": false,
"id": 116,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"options": {
"dataLinks": []
},
"percentage": false,
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "rate(dbindex_index_error{instance=~\"$instance\"}[$step])",
"format": "time_series",
"intervalFactor": 1,
"refId": "B"
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Index Queue",
"title": "Index Errors",
"tooltip": {
"shared": true,
"sort": 0,
Expand Down Expand Up @@ -5289,12 +5371,14 @@
"dashes": false,
"datasource": "$datasource",
"fill": 0,
"fillGradient": 0,
"gridPos": {
"h": 7,
"w": 8,
"x": 16,
"y": 46
"y": 105
},
"hiddenSeries": false,
"id": 79,
"legend": {
"avg": false,
Expand All @@ -5309,6 +5393,9 @@
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"options": {
"dataLinks": []
},
"percentage": false,
"pointradius": 5,
"points": false,
Expand All @@ -5323,10 +5410,15 @@
"format": "time_series",
"intervalFactor": 1,
"refId": "A"
},
{
"expr": "histogram_quantile(0.99, sum(rate(dbindex_insert_end_to_end_latency_bucket{instance=~\"$instance\"}[$step])) by (le, instance)) ",
"refId": "B"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Indexing End-to-End Latency - P99",
"tooltip": {
Expand Down Expand Up @@ -5364,6 +5456,100 @@
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "$datasource",
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 7,
"w": 8,
"x": 0,
"y": 112
},
"hiddenSeries": false,
"id": 114,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"options": {
"dataLinks": []
},
"percentage": false,
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "rate(dbshard_insert_async_errors{instance=~\"$instance\"}[$step])",
"format": "time_series",
"intervalFactor": 1,
"refId": "A"
},
{
"expr": "rate(dbshard_insert_queue_inserts_batch_errors{instance=~\"$instance\"}[$step])",
"format": "time_series",
"intervalFactor": 1,
"refId": "B"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Shard Insert Errors",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
}
],
"repeat": null,
Expand Down
8 changes: 4 additions & 4 deletions src/dbnode/storage/index/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func TestBlockWriteActualSegmentPartialFailure(t *testing.T) {
batch.Append(WriteBatchEntry{
Timestamp: nowNotBlockStartAligned,
OnIndexSeries: h2,
}, testDoc1DupeID())
}, doc.Document{})
res, err := b.WriteBatch(batch)
require.Error(t, err)
require.Equal(t, int64(1), res.NumSuccess)
Expand All @@ -276,15 +276,15 @@ func TestBlockWriteActualSegmentPartialFailure(t *testing.T) {
batch.ForEach(func(
idx int,
entry WriteBatchEntry,
doc doc.Document,
_ doc.Document,
result WriteBatchEntryResult,
) {
verified++
if idx == 0 {
require.NoError(t, result.Err)
} else {
require.Error(t, result.Err)
require.Equal(t, index.ErrDuplicateID, result.Err)
require.Equal(t, doc.ErrEmptyDocument, result.Err)
}
})
require.Equal(t, 2, verified)
Expand Down Expand Up @@ -326,7 +326,7 @@ func TestBlockWritePartialFailure(t *testing.T) {
batch.Append(WriteBatchEntry{
Timestamp: nowNotBlockStartAligned,
OnIndexSeries: h2,
}, testDoc1DupeID())
}, doc.Document{})

res, err := b.WriteBatch(batch)
require.Error(t, err)
Expand Down
53 changes: 37 additions & 16 deletions src/dbnode/storage/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,30 +197,43 @@ type dbShardRuntimeOptions struct {
}

type dbShardMetrics struct {
create tally.Counter
close tally.Counter
closeStart tally.Counter
closeLatency tally.Timer
insertAsyncInsertErrors tally.Counter
insertAsyncWriteErrors tally.Counter
seriesTicked tally.Gauge
create tally.Counter
close tally.Counter
closeStart tally.Counter
closeLatency tally.Timer
seriesTicked tally.Gauge
insertAsyncInsertErrors tally.Counter
insertAsyncWriteInternalErrors tally.Counter
insertAsyncWriteInvalidParamsErrors tally.Counter
insertAsyncIndexErrors tally.Counter
}

func newDatabaseShardMetrics(shardID uint32, scope tally.Scope) dbShardMetrics {
const insertErrorName = "insert-async.errors"
return dbShardMetrics{
create: scope.Counter("create"),
close: scope.Counter("close"),
closeStart: scope.Counter("close-start"),
closeLatency: scope.Timer("close-latency"),
insertAsyncInsertErrors: scope.Tagged(map[string]string{
"error_type": "insert-series",
}).Counter("insert-async.errors"),
insertAsyncWriteErrors: scope.Tagged(map[string]string{
"error_type": "write-value",
}).Counter("insert-async.errors"),
seriesTicked: scope.Tagged(map[string]string{
"shard": fmt.Sprintf("%d", shardID),
}).Gauge("series-ticked"),
insertAsyncInsertErrors: scope.Tagged(map[string]string{
"error_type": "insert-series",
"suberror_type": "shard-entry-insert-error",
}).Counter(insertErrorName),
insertAsyncWriteInternalErrors: scope.Tagged(map[string]string{
"error_type": "write-value",
"suberror_type": "internal-error",
}).Counter(insertErrorName),
insertAsyncWriteInvalidParamsErrors: scope.Tagged(map[string]string{
"error_type": "write-value",
"suberror_type": "invalid-params-error",
}).Counter(insertErrorName),
insertAsyncIndexErrors: scope.Tagged(map[string]string{
"error_type": "reverse-index",
"suberror_type": "write-batch-error",
}).Counter(insertErrorName),
}
}

Expand Down Expand Up @@ -284,7 +297,7 @@ func newDatabaseShard(
metrics: newDatabaseShardMetrics(shard, scope),
}
s.insertQueue = newDatabaseShardInsertQueue(s.insertSeriesBatch,
s.nowFn, scope)
s.nowFn, scope, opts.InstrumentOptions().Logger())

registerRuntimeOptionsListener := func(listener runtime.OptionsListener) {
elem := opts.RuntimeOptionsManager().RegisterListener(listener)
Expand Down Expand Up @@ -1467,7 +1480,12 @@ func (s *dbShard) insertSeriesBatch(inserts []dbShardInsert) error {
_, err := entry.Series.Write(ctx, write.timestamp, write.value,
write.unit, annotationBytes, write.opts)
if err != nil {
s.metrics.insertAsyncWriteErrors.Inc(1)
if xerrors.IsInvalidParams(err) {
s.metrics.insertAsyncWriteInvalidParamsErrors.Inc(1)
} else {
s.metrics.insertAsyncWriteInternalErrors.Inc(1)
s.logger.Error("error with async insert write", zap.Error(err))
}
}

if write.annotation != nil {
Expand Down Expand Up @@ -1516,8 +1534,11 @@ func (s *dbShard) insertSeriesBatch(inserts []dbShardInsert) error {

var err error
// index all requested entries in batch.
if indexBatch.Len() > 0 {
if n := indexBatch.Len(); n > 0 {
err = s.reverseIndex.WriteBatch(indexBatch)
if err != nil {
s.metrics.insertAsyncIndexErrors.Inc(int64(n))
}
}

// Avoid goroutine spinning up to close this context
Expand Down
Loading

0 comments on commit fb9c9e0

Please sign in to comment.