Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Merge #589
Browse files Browse the repository at this point in the history
589: Add support to reject samples if they fall on compressed chunks. r=Harkishen-Singh a=Harkishen-Singh

Signed-off-by: Harkishen-Singh <harkishensingh@hotmail.com>

Co-authored-by: Harkishen-Singh <harkishensingh@hotmail.com>
  • Loading branch information
bors[bot] and Harkishen-Singh committed May 20, 2021
2 parents 86b4dff + ca5c597 commit d56a7a6
Show file tree
Hide file tree
Showing 18 changed files with 249 additions and 90 deletions.
3 changes: 2 additions & 1 deletion docs/cli.md
Expand Up @@ -75,8 +75,9 @@ You can also find information on flags with `promscale_<version> -help`.
| db-ssl-mode | string | require | TimescaleDB/Vanilla Postgres connection ssl mode. If you do not want to use ssl, pass `allow` as value. |
| db-writer-connection-concurrency | integer | 4 | Maximum number of database connections for writing per go process. |
| db-uri | string | | TimescaleDB/Vanilla PostgresSQL URI. Example:`postgres://postgres:password@localhost:5432/timescale?sslmode=require` |
| async-acks | boolean | false | Acknowledge asynchronous inserts. If this is true, the inserter will not wait after insertion of metric data in the database. This increases throughput at the cost of a small chance of data loss. |
| db-statements-cache | boolean | true | Whether database connection pool should use cached prepared statements. Disable if using PgBouncer. |
| ignore-samples-written-to-compressed-chunks | boolean | false | Ignore/drop samples that are being written to compressed chunks. Setting this to false allows Promscale to ingest older data by decompressing chunks that were earlier compressed. However, setting this to true will save your resources that may be required during decompression. |
| async-acks | boolean | false | Acknowledge asynchronous inserts. If this is true, the inserter will not wait after insertion of metric data in the database. This increases throughput at the cost of a small chance of data loss. |

## PromQL engine evaluation flags

Expand Down
64 changes: 47 additions & 17 deletions pkg/log/log.go
Expand Up @@ -26,8 +26,8 @@ var (
"2006-01-02T15:04:05.000Z07:00",
)

logStoreMux sync.Mutex
logStore = make(map[string][]interface{})
logMux sync.RWMutex
logStore = make(map[key][]interface{})
)

// Config represents a logger configuration used upon initialization.
Expand Down Expand Up @@ -113,7 +113,12 @@ func parseLogLevel(logLevel string) (level.Option, error) {
}
}

const logOnceTimedDuration = time.Minute
const (
logOnceTimedDuration = time.Minute

debug = iota
warn
)

// timedLogger logs from logStore every logOnceTimedDuration. It deletes the log entry from the store
// after it has logged once.
Expand All @@ -124,24 +129,49 @@ func timedLogger() {
newLogMsg = append(newLogMsg, logMsg...)
return
}
logStoreMux.Lock()
defer logStoreMux.Unlock()
for _, logMsg := range logStore {
Debug(applyKind(logMsg)...)
logMux.Lock()
defer logMux.Unlock()
for k, line := range logStore {
switch k.typ {
case debug:
Debug(applyKind(line)...)
case warn:
Warn(applyKind(line)...)
default:
panic("invalid type")
}
}
logStore = make(map[string][]interface{})
logStore = make(map[key][]interface{}) // Clear stale logs.
}

// DebugRateLimited logs Debug level logs once in every logOnceTimedDuration.
func DebugRateLimited(keyvals ...interface{}) {
logStoreMux.Lock()
defer logStoreMux.Unlock()
val := fmt.Sprintf("%v", keyvals)
type key struct {
typ uint8
k string
}

func rateLimit(typ uint8, keyvals ...interface{}) {
kv := fmt.Sprintf("%v", keyvals)
k := key{typ, kv}
logMux.RLock()
_, contains := logStore[k]
logMux.RUnlock()
if contains {
return
}
logMux.Lock()
defer logMux.Unlock()
if len(logStore) == 0 {
go timedLogger()
}
if _, toBeLogged := logStore[val]; toBeLogged {
return
}
logStore[val] = keyvals
logStore[k] = keyvals
}

// WarnRateLimited warns once in every logOnceTimedDuration.
func WarnRateLimited(keyvals ...interface{}) {
rateLimit(warn, keyvals...)
}

// DebugRateLimited logs Debug level logs once in every logOnceTimedDuration.
func DebugRateLimited(keyvals ...interface{}) {
rateLimit(debug, keyvals...)
}
7 changes: 4 additions & 3 deletions pkg/pgclient/client.go
Expand Up @@ -129,9 +129,10 @@ func NewClientWithPool(cfg *Config, numCopiers int, dbConn pgxconn.PgxConn, mt t
labelsCache := cache.NewLabelsCache(cfg.CacheConfig)
seriesCache := cache.NewSeriesCache(cfg.CacheConfig, sigClose)
c := ingestor.Cfg{
AsyncAcks: cfg.AsyncAcks,
ReportInterval: cfg.ReportInterval,
NumCopiers: numCopiers,
NumCopiers: numCopiers,
AsyncAcks: cfg.AsyncAcks,
ReportInterval: cfg.ReportInterval,
IgnoreCompressedChunks: cfg.IgnoreCompressedChunks,
}

var dbIngestor *ingestor.DBIngestor
Expand Down
19 changes: 14 additions & 5 deletions pkg/pgclient/config.go
Expand Up @@ -31,6 +31,7 @@ type Config struct {
SslMode string
DbConnectRetries int
DbConnectionTimeout time.Duration
IgnoreCompressedChunks bool
AsyncAcks bool
ReportInterval int
WriteConnectionsPerProc int
Expand Down Expand Up @@ -61,7 +62,8 @@ var (
func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config {
cache.ParseFlags(fs, &cfg.CacheConfig)

fs.StringVar(&cfg.AppName, "app", DefaultApp, "'app' sets application_name in database connection string. This is helpful during debugging when looking at pg_stat_activity.")
fs.StringVar(&cfg.AppName, "app", DefaultApp, "'app' sets application_name in database connection string. "+
"This is helpful during debugging when looking at pg_stat_activity.")
fs.StringVar(&cfg.Host, "db-host", defaultDBHost, "Host for TimescaleDB/Vanilla Postgres.")
fs.IntVar(&cfg.Port, "db-port", defaultDBPort, "TimescaleDB/Vanilla Postgres connection password.")
fs.StringVar(&cfg.User, "db-user", defaultDBUser, "TimescaleDB/Vanilla Postgres user.")
Expand All @@ -70,12 +72,19 @@ func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config {
fs.StringVar(&cfg.SslMode, "db-ssl-mode", defaultSSLMode, "TimescaleDB/Vanilla Postgres connection ssl mode. If you do not want to use ssl, pass 'allow' as value.")
fs.IntVar(&cfg.DbConnectRetries, "db-connect-retries", 0, "Number of retries Promscale should make for establishing connection with the database.")
fs.DurationVar(&cfg.DbConnectionTimeout, "db-connection-timeout", defaultConnectionTime, "Timeout for establishing the connection between Promscale and TimescaleDB.")
fs.BoolVar(&cfg.AsyncAcks, "async-acks", false, "Acknowledge asynchronous inserts. If this is true, the inserter will not wait after insertion of metric data in the database. This increases throughput at the cost of a small chance of data loss.")
fs.BoolVar(&cfg.IgnoreCompressedChunks, "ignore-samples-written-to-compressed-chunks", false, "Ignore/drop samples that are being written to compressed chunks. "+
"Setting this to false allows Promscale to ingest older data by decompressing chunks that were earlier compressed. "+
"However, setting this to true will save your resources that may be required during decompression. ")
fs.BoolVar(&cfg.AsyncAcks, "async-acks", false, "Acknowledge asynchronous inserts. "+
"If this is true, the inserter will not wait after insertion of metric data in the database. This increases throughput at the cost of a small chance of data loss.")
fs.IntVar(&cfg.ReportInterval, "tput-report", 0, "Interval in seconds at which throughput should be reported.")
fs.IntVar(&cfg.WriteConnectionsPerProc, "db-writer-connection-concurrency", 4, "Maximum number of database connections for writing per go process.")
fs.IntVar(&cfg.MaxConnections, "db-connections-max", -1, "Maximum number of connections to the database that should be opened at once. It defaults to 80% of the maximum connections that the database can handle.")
fs.StringVar(&cfg.DbUri, "db-uri", defaultDBUri, "TimescaleDB/Vanilla Postgres DB URI. Example DB URI `postgres://postgres:password@localhost:5432/timescale?sslmode=require`")
fs.BoolVar(&cfg.EnableStatementsCache, "db-statements-cache", defaultDbStatementsCache, "Whether database connection pool should use cached prepared statements. Disable if using PgBouncer")
fs.IntVar(&cfg.MaxConnections, "db-connections-max", -1, "Maximum number of connections to the database that should be opened at once. "+
"It defaults to 80% of the maximum connections that the database can handle.")
fs.StringVar(&cfg.DbUri, "db-uri", defaultDBUri, "TimescaleDB/Vanilla Postgres DB URI. "+
"Example DB URI `postgres://postgres:password@localhost:5432/timescale?sslmode=require`")
fs.BoolVar(&cfg.EnableStatementsCache, "db-statements-cache", defaultDbStatementsCache, "Whether database connection pool should use cached prepared statements. "+
"Disable if using PgBouncer")
return cfg
}

Expand Down
48 changes: 22 additions & 26 deletions pkg/pgmodel/ingestor/copier.go
Expand Up @@ -29,7 +29,10 @@ type copyRequest struct {
table string
}

var getBatchMutex = &sync.Mutex{}
var (
getBatchMutex = &sync.Mutex{}
handleDecompression = retryAfterDecompression
)

// Handles actual insertion into the DB.
// We have one of these per connection reserved for insertion.
Expand Down Expand Up @@ -127,25 +130,14 @@ func insertBatchErrorFallback(conn pgxconn.PgxConn, reqs ...copyRequest) {
reqs[i].data.batch.ResetPosition()
err := doInsert(conn, reqs[i])
if err != nil {
err = insertErrorFallback(conn, err, reqs[i])
err = tryRecovery(conn, err, reqs[i])
}

reqs[i].data.reportResults(err)
reqs[i].data.release()
}
}

// certain errors are recoverable, handle those we can
// 1. if the table is compressed, decompress and retry the insertion
func insertErrorFallback(conn pgxconn.PgxConn, err error, req copyRequest) error {
err = tryRecovery(conn, err, req)
if err != nil {
log.Warn("msg", fmt.Sprintf("time out while processing error for %s", req.table), "error", err.Error())
return err
}
return doInsert(conn, req)
}

// we can currently recover from one error:
// If we inserted into a compressed chunk, we decompress the chunk and try again.
// Since a single batch can have both errors, we need to remember the insert method
Expand All @@ -159,27 +151,29 @@ func tryRecovery(conn pgxconn.PgxConn, err error, req copyRequest) error {
return err
}

// If the error was that the table is already compressed, decompress and try again.
if strings.Contains(pgErr.Message, "insert/update/delete not permitted") {
decompressErr := decompressChunks(conn, req.data, req.table)
if decompressErr != nil {
return err
}

req.data.batch.ResetPosition()
return nil
// If the error was that the table is already compressed, decompress and try again.
return handleDecompression(conn, req)
}

log.Warn("msg", fmt.Sprintf("unexpected postgres error while inserting to %s", req.table), "err", pgErr.Error())
return err
return pgErr
}

func skipDecompression(_ pgxconn.PgxConn, _ copyRequest) error {
log.WarnRateLimited("msg", "Rejecting samples falling on compressed chunks as decompression is disabled")
return nil
}

// In the event we filling in old data and the chunk we want to INSERT into has
// already been compressed, we decompress the chunk and try again. When we do
// this we delay the recompression to give us time to insert additional data.
func decompressChunks(conn pgxconn.PgxConn, pending *pendingBuffer, table string) error {
minTime := model.Time(pending.batch.MinSeen).Time()

func retryAfterDecompression(conn pgxconn.PgxConn, req copyRequest) error {
var (
table = req.table
pending = req.data
minTime = model.Time(pending.batch.MinSeen).Time()
)
//how much faster are we at ingestion than wall-clock time?
ingestSpeedup := 2
//delay the next compression job proportional to the duration between now and the data time + a constant safety
Expand All @@ -205,7 +199,9 @@ func decompressChunks(conn pgxconn.PgxConn, pending *pendingBuffer, table string

metrics.DecompressCalls.Inc()
metrics.DecompressEarliest.WithLabelValues(table).Set(float64(minTime.UnixNano()) / 1e9)
return nil

req.data.batch.ResetPosition()
return doInsert(conn, req) // Attempt an insert again.
}

/*
Expand Down
5 changes: 5 additions & 0 deletions pkg/pgmodel/ingestor/dispatcher.go
Expand Up @@ -58,6 +58,11 @@ func newPgxDispatcher(conn pgxconn.PgxConn, cache cache.MetricCache, scache cach
toCopiers := make(chan copyRequest, copierCap)
setCopierChannelToMonitor(toCopiers)

if cfg.IgnoreCompressedChunks {
// Handle decompression to not decompress anything.
handleDecompression = skipDecompression
}

for i := 0; i < numCopiers; i++ {
go runCopier(conn, toCopiers)
}
Expand Down
16 changes: 10 additions & 6 deletions pkg/pgmodel/ingestor/ingestor.go
Expand Up @@ -16,10 +16,11 @@ import (
)

type Cfg struct {
AsyncAcks bool
ReportInterval int
NumCopiers int
DisableEpochSync bool
AsyncAcks bool
ReportInterval int
NumCopiers int
DisableEpochSync bool
IgnoreCompressedChunks bool
}

// DBIngestor ingest the TimeSeries data into Timescale database.
Expand All @@ -41,10 +42,13 @@ func NewPgxIngestor(conn pgxconn.PgxConn, cache cache.MetricCache, sCache cache.

// NewPgxIngestorForTests returns a new Ingestor that write to PostgreSQL using PGX
// with an empty config, a new default size metrics cache and a non-ha-aware data parser
func NewPgxIngestorForTests(conn pgxconn.PgxConn) (*DBIngestor, error) {
func NewPgxIngestorForTests(conn pgxconn.PgxConn, cfg *Cfg) (*DBIngestor, error) {
if cfg == nil {
cfg = &Cfg{}
}
c := &cache.MetricNameCache{Metrics: clockcache.WithMax(cache.DefaultMetricCacheSize)}
s := cache.NewSeriesCache(cache.DefaultConfig, nil)
return NewPgxIngestor(conn, c, s, &Cfg{})
return NewPgxIngestor(conn, c, s, cfg)
}

// Ingest transforms and ingests the timeseries data into Timescale database.
Expand Down
4 changes: 2 additions & 2 deletions pkg/tests/end_to_end_tests/concurrent_sql_test.go
Expand Up @@ -222,7 +222,7 @@ func testConcurrentInsertSimple(t testing.TB, db *pgxpool.Pool, metric string) {
},
}

ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db))
ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -281,7 +281,7 @@ func testConcurrentInsertAdvanced(t testing.TB, db *pgxpool.Pool) {
},
}

ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db))
ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil)
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit d56a7a6

Please sign in to comment.