Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
First pass documenting the write path
Browse files Browse the repository at this point in the history
  • Loading branch information
JLockerman committed Oct 12, 2020
1 parent 6d31d17 commit 4645d92
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 6 deletions.
2 changes: 2 additions & 0 deletions pkg/pgmodel/Readme.md
Expand Up @@ -76,3 +76,5 @@ Diagrammatically the write path cn be visualized as follows:
| insert handler | ... | insert handler | ====> | DB (metric data) |
+----------------+ +----------------+ +------------------+
```

`TODO` metric table creation
19 changes: 18 additions & 1 deletion pkg/pgmodel/ingestor.go
Expand Up @@ -49,6 +49,10 @@ type DBIngestor struct {
}

// Ingest transforms and ingests the timeseries data into Timescale database.
// input:
// tts the []Timeseries to insert
// req the WriteRequest backing tts. It will be added to our WriteRequest
// pool when it is no longer needed.
func (i *DBIngestor) Ingest(tts []prompb.TimeSeries, req *prompb.WriteRequest) (uint64, error) {
data, totalRows, err := i.parseData(tts, req)

Expand All @@ -63,10 +67,15 @@ func (i *DBIngestor) Ingest(tts []prompb.TimeSeries, req *prompb.WriteRequest) (
return rowsInserted, err
}

// Parts of metric creation not needed to insert data
func (i *DBIngestor) CompleteMetricCreation() error {
return i.db.CompleteMetricCreation()
}

// Parse data into a set of samplesInfo infos per-metric.
// returns: map[metric name][]SamplesInfo, total rows to insert
// NOTE: req will be added to our WriteRequest pool in this function, it must
// not be used afterwards.
func (i *DBIngestor) parseData(tts []prompb.TimeSeries, req *prompb.WriteRequest) (map[string][]samplesInfo, int, error) {
dataSamples := make(map[string][]samplesInfo)
rows := 0
Expand All @@ -77,6 +86,8 @@ func (i *DBIngestor) parseData(tts []prompb.TimeSeries, req *prompb.WriteRequest
continue
}

// Normalize and canonicalize t.Labels.
// After this point t.Labels should never be used again.
seriesLabels, metricName, err := labelProtosToLabels(t.Labels)
if err != nil {
return nil, rows, err
Expand All @@ -86,7 +97,7 @@ func (i *DBIngestor) parseData(tts []prompb.TimeSeries, req *prompb.WriteRequest
}
sample := samplesInfo{
seriesLabels,
-1, //sentinel marking the seriesId as unset
-1, // sentinel marking the seriesId as unset
t.Samples,
}
rows += len(t.Samples)
Expand All @@ -97,6 +108,12 @@ func (i *DBIngestor) parseData(tts []prompb.TimeSeries, req *prompb.WriteRequest
t.Samples = nil
}

// WriteRequests can contain pointers into the original buffer we deserialized
// them out of, and can be quite large in and of themselves. In order to prevent
// memory blowup, and to allow faster deserializing, we recycle the WriteRequest
// here, allowing it to be either garbage collected or reused for a new request.
// In order for this to work correctly, any data we wish to keep using (e.g.
// samples) must no longer be reachable from req.
FinishWriteRequest(req)

return dataSamples, rows, nil
Expand Down
21 changes: 19 additions & 2 deletions pkg/pgmodel/labels.go
Expand Up @@ -27,6 +27,10 @@ type Labels struct {

var LabelsInterner = sync.Map{}

// Get the canonical version of a Labels if one exists.
// input: the string representation of a Labels as defined by getStr()
// This function should not be called directly, use labelProtosToLabels() or
// LabelsFromSlice() instead.
func GetLabels(str string) (l *Labels) {
val, ok := LabelsInterner.Load(str)
if !ok {
Expand All @@ -36,12 +40,17 @@ func GetLabels(str string) (l *Labels) {
return
}

// Try to set a Labels as the canonical Labels for a given string
// representation, returning the canonical version (which can be different in
// the even of multiple goroutines setting labels concurrently).
// This function should not be called directly, use labelProtosToLabels() or
// LabelsFromSlice() instead.
func SetLabels(str string, lset *Labels) *Labels {
val, _ := LabelsInterner.LoadOrStore(str, lset)
return val.(*Labels)
}

// LabelsFromSlice converts a labels.Labels to a Labels object
// LabelsFromSlice converts a labels.Labels to a canonical Labels object
func LabelsFromSlice(ls labels.Labels) (*Labels, error) {
ll := make([]prompb.Label, len(ls))
for i := range ls {
Expand All @@ -52,7 +61,9 @@ func LabelsFromSlice(ls labels.Labels) (*Labels, error) {
return l, err
}

// initLabels intializes labels
// Get a string representation for hashing and comparison
// This representation is guaranteed to uniquely represent the underlying label
// set, though need not human-readable, or indeed, valid utf-8
func getStr(labels []prompb.Label) (string, error) {
if len(labels) == 0 {
return "", nil
Expand Down Expand Up @@ -112,6 +123,7 @@ func getStr(labels []prompb.Label) (string, error) {
return builder.String(), nil
}

// labelProtosToLabels converts a prompb.Label to a canonical Labels object
func labelProtosToLabels(labelPairs []prompb.Label) (*Labels, string, error) {
str, err := getStr(labelPairs)
if err != nil {
Expand All @@ -136,6 +148,9 @@ func labelProtosToLabels(labelPairs []prompb.Label) (*Labels, string, error) {
return labels, labels.metricName, err
}

// Get a string representation for hashing and comparison
// This representation is guaranteed to uniquely represent the underlying label
// set, though need not human-readable, or indeed, valid utf-8
func (l *Labels) String() string {
return l.str
}
Expand All @@ -151,6 +166,7 @@ func (l *Labels) Equal(b *Labels) bool {
}

// Labels implements sort.Interface
var _ sort.Interface = (*Labels)(nil)

func (l *Labels) Len() int {
return len(l.names)
Expand All @@ -166,6 +182,7 @@ func (l *Labels) Swap(i, j int) {
}

// FromLabelMatchers parses protobuf label matchers to Prometheus label matchers.
// TODO this does not belong here, it should be moved, probably to sql_reader.go
func FromLabelMatchers(matchers []*prompb.LabelMatcher) ([]*labels.Matcher, error) {
result := make([]*labels.Matcher, 0, len(matchers))
for _, matcher := range matchers {
Expand Down
85 changes: 82 additions & 3 deletions pkg/pgmodel/sql_ingest.go
@@ -1,6 +1,9 @@
// This file and its contents are licensed under the Apache License 2.0.
// Please see the included NOTICE for copyright information and
// LICENSE for a copy of the license.

// See the section on the Write path in the Readme for a high level overview of
// this file.
package pgmodel

import (
Expand Down Expand Up @@ -63,6 +66,11 @@ func newPgxInserter(conn pgxConn, cache MetricCache, cfg *Cfg) (*pgxInserter, er
log.Warn("msg", "num copiers less than 1, setting to 1")
numCopiers = 1
}

// We run inserters bus-style: all of them competing to grab requests off a
// single channel. This should offer a decent compromise between batching
// and balancing: if an inserter is awake and has little work, it'll be more
// likely to win the race, while one that's busy or asleep won't.
toCopiers := make(chan copyRequest, numCopiers)
for i := 0; i < numCopiers; i++ {
go runInserter(conn, toCopiers)
Expand Down Expand Up @@ -161,6 +169,9 @@ type insertDataTask struct {
errChan chan error
}

// Report that this task is completed, along with any error that may have
// occured. Since this is a backedge on the goroutine graph, it
// _must never block_: blocking here will cause deadlocks.
func (idt *insertDataTask) reportResult(err error) {
if err != nil {
select {
Expand All @@ -171,15 +182,26 @@ func (idt *insertDataTask) reportResult(err error) {
idt.finished.Done()
}

// Insert a batch of data into the DB.
// The data should be grouped by metric name.
// returns the number of rows we intended to insert (_not_ how many were
// actually inserted) and any error.
// Though we may insert data to multiple tables concurrently, if asyncAcks is
// unset this function will wait until _all_ the insert attempts have completed.
func (p *pgxInserter) InsertData(rows map[string][]samplesInfo) (uint64, error) {
var numRows uint64
workFinished := &sync.WaitGroup{}
workFinished.Add(len(rows))
// we only allocate enough space for a single error message here as we only
// report one error back upstream. The inserter should not block on this
// channel, but only insert if it's empty, anything else can deadlock.
errChan := make(chan error, 1)
for metricName, data := range rows {
for _, si := range data {
numRows += uint64(len(si.samples))
}
// insertMetricData() is expected to be non-blocking,
// just a channel insert
p.insertMetricData(metricName, data, workFinished, errChan)
}

Expand Down Expand Up @@ -270,9 +292,15 @@ func (p *pgxInserter) getMetricTableName(metric string) (string, error) {
return tableName, err
}

// Get the handler for a given metric name, creating a new one if none exists
func (p *pgxInserter) getMetricInserter(metric string, errChan chan error) chan insertDataRequest {
inserter, ok := p.inserters.Load(metric)
if !ok {
// The ordering is important here: we need to ensure that every call
// to getMetricInserter() returns the same inserter. Therefore, we can
// only start up the inserter routine if we know that we won the race
// to create the inserter, anything else will leave a zombie inserter
// lying around.
c := make(chan insertDataRequest, 1000)
actual, old := p.inserters.LoadOrStore(metric, c)
inserter = actual
Expand All @@ -298,6 +326,10 @@ type pendingBuffer struct {
}

const (
// maximum number of insertDataRequests that should be buffered before the
// insertHandler flushes to the next layer. We don't want too many as this
// increases the number of lost writes if the connector dies. This number
// was chosen arbitrarily.
flushSize = 2000
)

Expand Down Expand Up @@ -350,6 +382,14 @@ func runInserterRoutine(conn pgxConn, input chan insertDataRequest, metricName s

handler.handleReq(firstReq)

// Grab new requests from our channel and handle them. We do this hot-load
// style: we keep grabbing requests off the channel while we can do so
// without blocking, and flush them to the next layer when we run out, or
// reach a predetermined threshold. The theory is that wake/sleep and
// flushing is relatively expensive, and can be easily amortized over
// multiple requests, so it pays to batch as much as we are able. However,
// writes to a given metric can be relatively rare, so if we don't have
// additional requests immediately we're likely not going to for a while.
for {
if !handler.hasPendingReqs() {
stillAlive := handler.blockingHandleReq()
Expand All @@ -370,6 +410,9 @@ func runInserterRoutine(conn pgxConn, input chan insertDataRequest, metricName s
}
}

// Create the metric table for the metric we handle, if it does not already
// exist. This only does the most critical part of metric table creation, the
// rest is handled by completeMetricTableCreation().
func initializeInserterRoutine(conn pgxConn, metricName string, completeMetricCreationSignal chan struct{}, metricTableNames MetricCache) (tableName string, err error) {
tableName, err = metricTableNames.Get(metricName)
if err == ErrEntryNotFound {
Expand Down Expand Up @@ -421,6 +464,9 @@ func (h *insertHandler) nonblockingHandleReq() bool {
}

func (h *insertHandler) handleReq(req insertDataRequest) bool {
// we fill in any SeriesIds we have in cache now so we can free any Labels
// that are no longer needed, and because the SeriesIds might get flushed.
// (neither of these are that critical at the moment)
h.fillKnowSeriesIds(req.data)
needsFlush := h.pending.addReq(req)
if needsFlush {
Expand All @@ -430,8 +476,14 @@ func (h *insertHandler) handleReq(req insertDataRequest) bool {
return false
}

// Fill in any SeriesIds we already have in cache.
// This must be idempotent: if called a second time it should not affect any
// sampleInfo whose series was already set.
func (h *insertHandler) fillKnowSeriesIds(sampleInfos []samplesInfo) (numMissingSeries int) {
for i, series := range sampleInfos {
// When we first create the sampleInfos we should have set the seriesID
// to -1 for any series whose labels field is nil. Real seriesIds must
// always be greater than 0.
if series.seriesID > -1 {
continue
}
Expand All @@ -453,6 +505,7 @@ func (h *insertHandler) flush() {
h.flushPending()
}

// Set all unset SeriesIds and flush to the next layer
func (h *insertHandler) flushPending() {
_, err := h.setSeriesIds(h.pending.batch.sampleInfos)
if err != nil {
Expand All @@ -464,7 +517,12 @@ func (h *insertHandler) flushPending() {
h.pending = pendingBuffers.Get().(*pendingBuffer)
}

// Handles actual insertion into the DB.
// We have one of these per connection reserved for insertion.
func runInserter(conn pgxConn, in chan copyRequest) {
// We grab copyRequests off the channel one at a time. This, and the name is
// a legacy from when we used CopyFrom to perform the insetions, and may
// change in the future.
for {
req, ok := <-in
if !ok {
Expand Down Expand Up @@ -492,9 +550,7 @@ func insertErrorFallback(conn pgxConn, req copyRequest, err error) error {
return doInsert(conn, req)
}

// we can currently recover from two error:
// If we inserted duplicate data we switch to using INSERT ... ON CONFLICT DO NOTHING
// to skip redundant data points and try again.
// we can currently recover from one error:
// If we inserted into a compressed chunk, we decompress the chunk and try again.
// Since a single batch can have both errors, we need to remember the insert method
// we're using, so that we deduplicate if needed.
Expand Down Expand Up @@ -522,11 +578,24 @@ func tryRecovery(conn pgxConn, req copyRequest, err error) error {
return err
}

// Perform the actual insertion into the DB.
func doInsert(conn pgxConn, req copyRequest) (err error) {
numRows := 0
for i := range req.data.batch.sampleInfos {
numRows += len(req.data.batch.sampleInfos[i].samples)
}
// flatten the various series into arrays.
// there are four main bottlenecks for insertion:
// 1. The round trip time.
// 2. The number of requests sent.
// 3. The number of individual INSERT statements.
// 4. The amount of data sent.
// While the first two of these can be handled by batching, for the latter
// two we need to actually reduce the work done. It turns out that simply
// collecting all the data into a postgres array and performing a single
// INSERT using that overcomes most of the performance issues for sending
// multiple data, and brings INSERT nearly on par with CopyFrom. In the
// future we may wish to send compressed data instead.
times := make([]time.Time, 0, numRows)
vals := make([]float64, 0, numRows)
series := make([]int64, 0, numRows)
Expand Down Expand Up @@ -554,6 +623,9 @@ func doInsert(conn pgxConn, req copyRequest) (err error) {
return nil
}

// In the event we filling in old data and the chunk we want to INSERT into has
// already been compressed, we decompress the chunk and try again. When we do
// this we delay the recompression to give us time to insert additional data.
func decompressChunks(conn pgxConn, pending *pendingBuffer, table string) error {
minTime := model.Time(pending.batch.minSeen).Time()

Expand Down Expand Up @@ -585,6 +657,9 @@ func decompressChunks(conn pgxConn, pending *pendingBuffer, table string) error
return nil
}

// Report completion of an insert batch to all goroutines that may be waiting
// on it, along with any error that may have occurred.
// This function also resets the pending in preperation for the next batch.
func (pending *pendingBuffer) reportResults(err error) {
for i := 0; i < len(pending.needsResponse); i++ {
pending.needsResponse[i].reportResult(err)
Expand All @@ -600,6 +675,10 @@ func (pending *pendingBuffer) reportResults(err error) {
pending.batch.ResetPosition()
}

// Set all seriesIds for a samplesInfo, fetching any missing ones from the DB,
// and repopulating the cache accordingly.
// returns: the tableName for the metric being inserted into
// TODO move up to the rest of insertHandler
func (h *insertHandler) setSeriesIds(sampleInfos []samplesInfo) (string, error) {
numMissingSeries := h.fillKnowSeriesIds(sampleInfos)

Expand Down

0 comments on commit 4645d92

Please sign in to comment.