diff --git a/docs/cli.md b/docs/cli.md index 4717d69cb3..7fc96bcf19 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -75,8 +75,9 @@ You can also find information on flags with `promscale_ -help`. | db-ssl-mode | string | require | TimescaleDB/Vanilla Postgres connection ssl mode. If you do not want to use ssl, pass `allow` as value. | | db-writer-connection-concurrency | integer | 4 | Maximum number of database connections for writing per go process. | | db-uri | string | | TimescaleDB/Vanilla PostgresSQL URI. Example:`postgres://postgres:password@localhost:5432/timescale?sslmode=require` | -| async-acks | boolean | false | Acknowledge asynchronous inserts. If this is true, the inserter will not wait after insertion of metric data in the database. This increases throughput at the cost of a small chance of data loss. | | db-statements-cache | boolean | true | Whether database connection pool should use cached prepared statements. Disable if using PgBouncer. | +| ignore-samples-written-to-compressed-chunks | boolean | false | Ignore/drop samples that are being written to compressed chunks. Setting this to false allows Promscale to ingest older data by decompressing chunks that were earlier compressed. However, setting this to true will save your resources that may be required during decompression. | +| async-acks | boolean | false | Acknowledge asynchronous inserts. If this is true, the inserter will not wait after insertion of metric data in the database. This increases throughput at the cost of a small chance of data loss. | ## PromQL engine evaluation flags diff --git a/pkg/log/log.go b/pkg/log/log.go index ab4a25e636..6e34570056 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -26,8 +26,8 @@ var ( "2006-01-02T15:04:05.000Z07:00", ) - logStoreMux sync.Mutex - logStore = make(map[string][]interface{}) + logMux sync.RWMutex + logStore = make(map[key][]interface{}) ) // Config represents a logger configuration used upon initialization. @@ -113,7 +113,12 @@ func parseLogLevel(logLevel string) (level.Option, error) { } } -const logOnceTimedDuration = time.Minute +const ( + logOnceTimedDuration = time.Minute + + debug = iota + warn +) // timedLogger logs from logStore every logOnceTimedDuration. It deletes the log entry from the store // after it has logged once. @@ -124,24 +129,49 @@ func timedLogger() { newLogMsg = append(newLogMsg, logMsg...) return } - logStoreMux.Lock() - defer logStoreMux.Unlock() - for _, logMsg := range logStore { - Debug(applyKind(logMsg)...) + logMux.Lock() + defer logMux.Unlock() + for k, line := range logStore { + switch k.typ { + case debug: + Debug(applyKind(line)...) + case warn: + Warn(applyKind(line)...) + default: + panic("invalid type") + } } - logStore = make(map[string][]interface{}) + logStore = make(map[key][]interface{}) // Clear stale logs. } -// DebugRateLimited logs Debug level logs once in every logOnceTimedDuration. -func DebugRateLimited(keyvals ...interface{}) { - logStoreMux.Lock() - defer logStoreMux.Unlock() - val := fmt.Sprintf("%v", keyvals) +type key struct { + typ uint8 + k string +} + +func rateLimit(typ uint8, keyvals ...interface{}) { + kv := fmt.Sprintf("%v", keyvals) + k := key{typ, kv} + logMux.RLock() + _, contains := logStore[k] + logMux.RUnlock() + if contains { + return + } + logMux.Lock() + defer logMux.Unlock() if len(logStore) == 0 { go timedLogger() } - if _, toBeLogged := logStore[val]; toBeLogged { - return - } - logStore[val] = keyvals + logStore[k] = keyvals +} + +// WarnRateLimited warns once in every logOnceTimedDuration. +func WarnRateLimited(keyvals ...interface{}) { + rateLimit(warn, keyvals...) +} + +// DebugRateLimited logs Debug level logs once in every logOnceTimedDuration. +func DebugRateLimited(keyvals ...interface{}) { + rateLimit(debug, keyvals...) } diff --git a/pkg/pgclient/client.go b/pkg/pgclient/client.go index 9f5a7b6dfe..5ca4a3fb14 100644 --- a/pkg/pgclient/client.go +++ b/pkg/pgclient/client.go @@ -129,9 +129,10 @@ func NewClientWithPool(cfg *Config, numCopiers int, dbConn pgxconn.PgxConn, mt t labelsCache := cache.NewLabelsCache(cfg.CacheConfig) seriesCache := cache.NewSeriesCache(cfg.CacheConfig, sigClose) c := ingestor.Cfg{ - AsyncAcks: cfg.AsyncAcks, - ReportInterval: cfg.ReportInterval, - NumCopiers: numCopiers, + NumCopiers: numCopiers, + AsyncAcks: cfg.AsyncAcks, + ReportInterval: cfg.ReportInterval, + IgnoreCompressedChunks: cfg.IgnoreCompressedChunks, } var dbIngestor *ingestor.DBIngestor diff --git a/pkg/pgclient/config.go b/pkg/pgclient/config.go index e23c77939f..75ac71e404 100644 --- a/pkg/pgclient/config.go +++ b/pkg/pgclient/config.go @@ -31,6 +31,7 @@ type Config struct { SslMode string DbConnectRetries int DbConnectionTimeout time.Duration + IgnoreCompressedChunks bool AsyncAcks bool ReportInterval int WriteConnectionsPerProc int @@ -61,7 +62,8 @@ var ( func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config { cache.ParseFlags(fs, &cfg.CacheConfig) - fs.StringVar(&cfg.AppName, "app", DefaultApp, "'app' sets application_name in database connection string. This is helpful during debugging when looking at pg_stat_activity.") + fs.StringVar(&cfg.AppName, "app", DefaultApp, "'app' sets application_name in database connection string. "+ + "This is helpful during debugging when looking at pg_stat_activity.") fs.StringVar(&cfg.Host, "db-host", defaultDBHost, "Host for TimescaleDB/Vanilla Postgres.") fs.IntVar(&cfg.Port, "db-port", defaultDBPort, "TimescaleDB/Vanilla Postgres connection password.") fs.StringVar(&cfg.User, "db-user", defaultDBUser, "TimescaleDB/Vanilla Postgres user.") @@ -70,12 +72,19 @@ func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config { fs.StringVar(&cfg.SslMode, "db-ssl-mode", defaultSSLMode, "TimescaleDB/Vanilla Postgres connection ssl mode. If you do not want to use ssl, pass 'allow' as value.") fs.IntVar(&cfg.DbConnectRetries, "db-connect-retries", 0, "Number of retries Promscale should make for establishing connection with the database.") fs.DurationVar(&cfg.DbConnectionTimeout, "db-connection-timeout", defaultConnectionTime, "Timeout for establishing the connection between Promscale and TimescaleDB.") - fs.BoolVar(&cfg.AsyncAcks, "async-acks", false, "Acknowledge asynchronous inserts. If this is true, the inserter will not wait after insertion of metric data in the database. This increases throughput at the cost of a small chance of data loss.") + fs.BoolVar(&cfg.IgnoreCompressedChunks, "ignore-samples-written-to-compressed-chunks", false, "Ignore/drop samples that are being written to compressed chunks. "+ + "Setting this to false allows Promscale to ingest older data by decompressing chunks that were earlier compressed. "+ + "However, setting this to true will save your resources that may be required during decompression. ") + fs.BoolVar(&cfg.AsyncAcks, "async-acks", false, "Acknowledge asynchronous inserts. "+ + "If this is true, the inserter will not wait after insertion of metric data in the database. This increases throughput at the cost of a small chance of data loss.") fs.IntVar(&cfg.ReportInterval, "tput-report", 0, "Interval in seconds at which throughput should be reported.") fs.IntVar(&cfg.WriteConnectionsPerProc, "db-writer-connection-concurrency", 4, "Maximum number of database connections for writing per go process.") - fs.IntVar(&cfg.MaxConnections, "db-connections-max", -1, "Maximum number of connections to the database that should be opened at once. It defaults to 80% of the maximum connections that the database can handle.") - fs.StringVar(&cfg.DbUri, "db-uri", defaultDBUri, "TimescaleDB/Vanilla Postgres DB URI. Example DB URI `postgres://postgres:password@localhost:5432/timescale?sslmode=require`") - fs.BoolVar(&cfg.EnableStatementsCache, "db-statements-cache", defaultDbStatementsCache, "Whether database connection pool should use cached prepared statements. Disable if using PgBouncer") + fs.IntVar(&cfg.MaxConnections, "db-connections-max", -1, "Maximum number of connections to the database that should be opened at once. "+ + "It defaults to 80% of the maximum connections that the database can handle.") + fs.StringVar(&cfg.DbUri, "db-uri", defaultDBUri, "TimescaleDB/Vanilla Postgres DB URI. "+ + "Example DB URI `postgres://postgres:password@localhost:5432/timescale?sslmode=require`") + fs.BoolVar(&cfg.EnableStatementsCache, "db-statements-cache", defaultDbStatementsCache, "Whether database connection pool should use cached prepared statements. "+ + "Disable if using PgBouncer") return cfg } diff --git a/pkg/pgmodel/ingestor/copier.go b/pkg/pgmodel/ingestor/copier.go index 6a22d55878..87b9b704ca 100644 --- a/pkg/pgmodel/ingestor/copier.go +++ b/pkg/pgmodel/ingestor/copier.go @@ -29,7 +29,10 @@ type copyRequest struct { table string } -var getBatchMutex = &sync.Mutex{} +var ( + getBatchMutex = &sync.Mutex{} + handleDecompression = retryAfterDecompression +) // Handles actual insertion into the DB. // We have one of these per connection reserved for insertion. @@ -127,7 +130,7 @@ func insertBatchErrorFallback(conn pgxconn.PgxConn, reqs ...copyRequest) { reqs[i].data.batch.ResetPosition() err := doInsert(conn, reqs[i]) if err != nil { - err = insertErrorFallback(conn, err, reqs[i]) + err = tryRecovery(conn, err, reqs[i]) } reqs[i].data.reportResults(err) @@ -135,17 +138,6 @@ func insertBatchErrorFallback(conn pgxconn.PgxConn, reqs ...copyRequest) { } } -// certain errors are recoverable, handle those we can -// 1. if the table is compressed, decompress and retry the insertion -func insertErrorFallback(conn pgxconn.PgxConn, err error, req copyRequest) error { - err = tryRecovery(conn, err, req) - if err != nil { - log.Warn("msg", fmt.Sprintf("time out while processing error for %s", req.table), "error", err.Error()) - return err - } - return doInsert(conn, req) -} - // we can currently recover from one error: // If we inserted into a compressed chunk, we decompress the chunk and try again. // Since a single batch can have both errors, we need to remember the insert method @@ -159,27 +151,29 @@ func tryRecovery(conn pgxconn.PgxConn, err error, req copyRequest) error { return err } - // If the error was that the table is already compressed, decompress and try again. if strings.Contains(pgErr.Message, "insert/update/delete not permitted") { - decompressErr := decompressChunks(conn, req.data, req.table) - if decompressErr != nil { - return err - } - - req.data.batch.ResetPosition() - return nil + // If the error was that the table is already compressed, decompress and try again. + return handleDecompression(conn, req) } log.Warn("msg", fmt.Sprintf("unexpected postgres error while inserting to %s", req.table), "err", pgErr.Error()) - return err + return pgErr +} + +func skipDecompression(_ pgxconn.PgxConn, _ copyRequest) error { + log.WarnRateLimited("msg", "Rejecting samples falling on compressed chunks as decompression is disabled") + return nil } // In the event we filling in old data and the chunk we want to INSERT into has // already been compressed, we decompress the chunk and try again. When we do // this we delay the recompression to give us time to insert additional data. -func decompressChunks(conn pgxconn.PgxConn, pending *pendingBuffer, table string) error { - minTime := model.Time(pending.batch.MinSeen).Time() - +func retryAfterDecompression(conn pgxconn.PgxConn, req copyRequest) error { + var ( + table = req.table + pending = req.data + minTime = model.Time(pending.batch.MinSeen).Time() + ) //how much faster are we at ingestion than wall-clock time? ingestSpeedup := 2 //delay the next compression job proportional to the duration between now and the data time + a constant safety @@ -205,7 +199,9 @@ func decompressChunks(conn pgxconn.PgxConn, pending *pendingBuffer, table string metrics.DecompressCalls.Inc() metrics.DecompressEarliest.WithLabelValues(table).Set(float64(minTime.UnixNano()) / 1e9) - return nil + + req.data.batch.ResetPosition() + return doInsert(conn, req) // Attempt an insert again. } /* diff --git a/pkg/pgmodel/ingestor/dispatcher.go b/pkg/pgmodel/ingestor/dispatcher.go index 6c4e334fb7..3cea45ab28 100644 --- a/pkg/pgmodel/ingestor/dispatcher.go +++ b/pkg/pgmodel/ingestor/dispatcher.go @@ -58,6 +58,11 @@ func newPgxDispatcher(conn pgxconn.PgxConn, cache cache.MetricCache, scache cach toCopiers := make(chan copyRequest, copierCap) setCopierChannelToMonitor(toCopiers) + if cfg.IgnoreCompressedChunks { + // Handle decompression to not decompress anything. + handleDecompression = skipDecompression + } + for i := 0; i < numCopiers; i++ { go runCopier(conn, toCopiers) } diff --git a/pkg/pgmodel/ingestor/ingestor.go b/pkg/pgmodel/ingestor/ingestor.go index 011994537c..101c7fd351 100644 --- a/pkg/pgmodel/ingestor/ingestor.go +++ b/pkg/pgmodel/ingestor/ingestor.go @@ -16,10 +16,11 @@ import ( ) type Cfg struct { - AsyncAcks bool - ReportInterval int - NumCopiers int - DisableEpochSync bool + AsyncAcks bool + ReportInterval int + NumCopiers int + DisableEpochSync bool + IgnoreCompressedChunks bool } // DBIngestor ingest the TimeSeries data into Timescale database. @@ -41,10 +42,13 @@ func NewPgxIngestor(conn pgxconn.PgxConn, cache cache.MetricCache, sCache cache. // NewPgxIngestorForTests returns a new Ingestor that write to PostgreSQL using PGX // with an empty config, a new default size metrics cache and a non-ha-aware data parser -func NewPgxIngestorForTests(conn pgxconn.PgxConn) (*DBIngestor, error) { +func NewPgxIngestorForTests(conn pgxconn.PgxConn, cfg *Cfg) (*DBIngestor, error) { + if cfg == nil { + cfg = &Cfg{} + } c := &cache.MetricNameCache{Metrics: clockcache.WithMax(cache.DefaultMetricCacheSize)} s := cache.NewSeriesCache(cache.DefaultConfig, nil) - return NewPgxIngestor(conn, c, s, &Cfg{}) + return NewPgxIngestor(conn, c, s, cfg) } // Ingest transforms and ingests the timeseries data into Timescale database. diff --git a/pkg/tests/end_to_end_tests/concurrent_sql_test.go b/pkg/tests/end_to_end_tests/concurrent_sql_test.go index 76c04dc841..eeb189f3aa 100644 --- a/pkg/tests/end_to_end_tests/concurrent_sql_test.go +++ b/pkg/tests/end_to_end_tests/concurrent_sql_test.go @@ -222,7 +222,7 @@ func testConcurrentInsertSimple(t testing.TB, db *pgxpool.Pool, metric string) { }, } - ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db)) + ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) if err != nil { t.Fatal(err) } @@ -281,7 +281,7 @@ func testConcurrentInsertAdvanced(t testing.TB, db *pgxpool.Pool) { }, } - ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db)) + ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) if err != nil { t.Fatal(err) } diff --git a/pkg/tests/end_to_end_tests/create_test.go b/pkg/tests/end_to_end_tests/create_test.go index 07cc7b974e..b36ce89b5b 100644 --- a/pkg/tests/end_to_end_tests/create_test.go +++ b/pkg/tests/end_to_end_tests/create_test.go @@ -1,6 +1,7 @@ // This file and its contents are licensed under the Apache License 2.0. // Please see the included NOTICE for copyright information and // LICENSE for a copy of the license. + package end_to_end_tests import ( @@ -167,7 +168,7 @@ func TestSQLChunkInterval(t *testing.T) { }, }, } - ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db)) + ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) if err != nil { t.Fatal(err) } @@ -425,7 +426,7 @@ func TestSQLIngest(t *testing.T) { withDB(t, databaseName, func(dbOwner *pgxpool.Pool, t testing.TB) { db := testhelpers.PgxPoolWithRole(t, databaseName, "prom_writer") defer db.Close() - ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db)) + ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) if err != nil { t.Fatal(err) } @@ -520,7 +521,7 @@ func TestInsertCompressedDuplicates(t *testing.T) { }, }, } - ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db)) + ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) if err != nil { t.Fatal(err) } @@ -651,7 +652,7 @@ func TestMetricBatcherLabelsBatching(t *testing.T) { ts = append(ts, t) } - ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db)) + ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) if err != nil { t.Fatal(err) } @@ -711,7 +712,7 @@ func TestInsertCompressed(t *testing.T) { }, }, } - ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db)) + ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) if err != nil { t.Fatal(err) } @@ -811,7 +812,7 @@ func insertMultinodeAddNodes(t *testing.T, attachExisting bool) { }, }, } - ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db)) + ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) if err != nil { t.Fatal(err) } @@ -853,7 +854,7 @@ func insertMultinodeAddNodes(t *testing.T, attachExisting bool) { }, }, } - ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db)) + ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) if err != nil { t.Fatal(err) } @@ -940,7 +941,7 @@ func TestCompressionSetting(t *testing.T) { }, }, } - ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db)) + ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) if err != nil { t.Fatal(err) } @@ -1042,7 +1043,7 @@ func TestCustomCompressionJob(t *testing.T) { }, }, } - ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db)) + ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) if err != nil { t.Fatal(err) } @@ -1265,7 +1266,7 @@ func TestExecuteMaintenanceCompressionJob(t *testing.T) { }, }, } - ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db)) + ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) if err != nil { t.Fatal(err) } @@ -1430,7 +1431,7 @@ func TestExecuteCompressionMetricsLocked(t *testing.T) { }, }, } - ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db)) + ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) if err != nil { t.Fatal(err) } diff --git a/pkg/tests/end_to_end_tests/delete_test.go b/pkg/tests/end_to_end_tests/delete_test.go index 0ffb4c3180..bdf0a26afb 100644 --- a/pkg/tests/end_to_end_tests/delete_test.go +++ b/pkg/tests/end_to_end_tests/delete_test.go @@ -105,7 +105,7 @@ func TestDeleteWithMetricNameEQL(t *testing.T) { ts = generateRealTimeseries() } - ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db)) + ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) if err != nil { t.Fatal(err) } @@ -214,7 +214,7 @@ func TestDeleteWithCompressedChunks(t *testing.T) { if *extendedTest { ts = generateRealTimeseries() } - ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db)) + ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) if err != nil { t.Fatal(err) } @@ -320,7 +320,7 @@ func TestDeleteWithMetricNameEQLRegex(t *testing.T) { if *extendedTest { ts = generateRealTimeseries() } - ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db)) + ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) if err != nil { t.Fatal(err) } @@ -470,7 +470,7 @@ func TestDeleteMixins(t *testing.T) { if *extendedTest { ts = generateRealTimeseries() } - ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db)) + ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) if err != nil { t.Fatal(err) } diff --git a/pkg/tests/end_to_end_tests/drop_test.go b/pkg/tests/end_to_end_tests/drop_test.go index 49eac49ab2..dc910ddf72 100644 --- a/pkg/tests/end_to_end_tests/drop_test.go +++ b/pkg/tests/end_to_end_tests/drop_test.go @@ -51,7 +51,7 @@ func TestSQLRetentionPeriod(t *testing.T) { }, }, } - ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db)) + ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) if err != nil { t.Fatal(err) } @@ -155,7 +155,7 @@ func TestSQLDropChunk(t *testing.T) { }, }, } - ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db)) + ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) if err != nil { t.Fatal(err) } @@ -239,7 +239,7 @@ func TestSQLDropDataWithoutTimescaleDB(t *testing.T) { }, }, } - ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db)) + ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) if err != nil { t.Fatal(err) } @@ -549,7 +549,7 @@ func TestSQLDropMetricChunk(t *testing.T) { scache.Reset() ingestor.Close() - ingestor2, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db)) + ingestor2, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) if err != nil { t.Fatal(err) } @@ -604,7 +604,7 @@ func TestSQLDropAllMetricData(t *testing.T) { } } - ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db)) + ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) if err != nil { t.Fatal(err) } @@ -677,7 +677,7 @@ func TestSQLDropAllMetricData(t *testing.T) { //Restart ingestor to avoid stale cache issues. //Other tests should check for that ingestor.Close() - ingestor2, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db)) + ingestor2, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) if err != nil { t.Fatal(err) } diff --git a/pkg/tests/end_to_end_tests/insert_compressed_chunks_test.go b/pkg/tests/end_to_end_tests/insert_compressed_chunks_test.go new file mode 100644 index 0000000000..b9c3a903e2 --- /dev/null +++ b/pkg/tests/end_to_end_tests/insert_compressed_chunks_test.go @@ -0,0 +1,112 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + +package end_to_end_tests + +import ( + "context" + "testing" + + "github.com/jackc/pgx/v4/pgxpool" + "github.com/stretchr/testify/require" + ingstr "github.com/timescale/promscale/pkg/pgmodel/ingestor" + "github.com/timescale/promscale/pkg/pgmodel/model" + "github.com/timescale/promscale/pkg/pgxconn" + "github.com/timescale/promscale/pkg/prompb" +) + +func TestInsertInCompressedChunks(t *testing.T) { + ts := generateSmallTimeseries() + if !*useTimescaleDB { + // Ingest in plain postgres to ensure everything works well even if TimescaleDB is not installed. + withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) { + ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) + require.NoError(t, err) + defer ingestor.Close() + _, err = ingestor.Ingest(newWriteRequestWithTs(copyMetrics(ts))) + require.NoError(t, err) + r, err := db.Query(context.Background(), "SELECT * from prom_data.\"firstMetric\";") + require.NoError(t, err) + defer r.Close() + + count := 0 + for r.Next() { + count++ + } + require.Equal(t, 5, count) + }) + return + } + + sample := []prompb.TimeSeries{ + { + Labels: []prompb.Label{ + {Name: model.MetricNameLabelName, Value: "firstMetric"}, + {Name: "foo", Value: "bar"}, + {Name: "common", Value: "tag"}, + {Name: "empty", Value: ""}, + }, + Samples: []prompb.Sample{ + {Timestamp: 7, Value: 0.7}, + }, + }, + } + // With decompress chunks being true. + withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) { + ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) + require.NoError(t, err) + defer ingestor.Close() + _, err = ingestor.Ingest(newWriteRequestWithTs(copyMetrics(ts))) + require.NoError(t, err) + err = ingestor.CompleteMetricCreation() + if err != nil { + t.Fatal(err) + } + _, err = db.Exec(context.Background(), "SELECT compress_chunk(i) from show_chunks('prom_data.\"firstMetric\"') i;") + require.NoError(t, err) + + // Insert data into compressed chunk. + _, err = ingestor.Ingest(newWriteRequestWithTs(copyMetrics(sample))) + require.NoError(t, err) + + r, err := db.Query(context.Background(), "SELECT * from prom_data.\"firstMetric\";") + require.NoError(t, err) + defer r.Close() + + count := 0 + for r.Next() { + count++ + } + require.Equal(t, 6, count) + }) + + // With decompress chunks being false. + withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) { + ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), &ingstr.Cfg{IgnoreCompressedChunks: true}) + require.NoError(t, err) + defer ingestor.Close() + _, err = ingestor.Ingest(newWriteRequestWithTs(copyMetrics(ts))) + require.NoError(t, err) + err = ingestor.CompleteMetricCreation() + if err != nil { + t.Fatal(err) + } + _, err = db.Exec(context.Background(), "SELECT compress_chunk(i) from show_chunks('prom_data.\"firstMetric\"') i;") + require.NoError(t, err) + + // Insert data into compressed chunk. + _, err = ingestor.Ingest(newWriteRequestWithTs(copyMetrics(sample))) + require.NoError(t, err) + + r, err := db.Query(context.Background(), "SELECT * from prom_data.\"firstMetric\";") + require.NoError(t, err) + defer r.Close() + + count := 0 + for r.Next() { + count++ + } + require.Equal(t, 5, count) // The recent sample did not get ingested. This is because the chunks were compressed and we were asked to not ingest into compressed chunks. + }) +} diff --git a/pkg/tests/end_to_end_tests/nan_test.go b/pkg/tests/end_to_end_tests/nan_test.go index 7e2b71e710..ec78755b0f 100644 --- a/pkg/tests/end_to_end_tests/nan_test.go +++ b/pkg/tests/end_to_end_tests/nan_test.go @@ -74,7 +74,7 @@ func TestSQLStaleNaN(t *testing.T) { }, } - ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(dbOwner)) + ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(dbOwner), nil) if err != nil { t.Fatal(err) } diff --git a/pkg/tests/end_to_end_tests/null_chars_test.go b/pkg/tests/end_to_end_tests/null_chars_test.go index ffeb9c5b87..0ae54d5a08 100644 --- a/pkg/tests/end_to_end_tests/null_chars_test.go +++ b/pkg/tests/end_to_end_tests/null_chars_test.go @@ -41,7 +41,7 @@ func TestOperationWithNullChars(t *testing.T) { withDB(t, *testDatabase, func(dbOwner *pgxpool.Pool, t testing.TB) { db := testhelpers.PgxPoolWithRole(t, *testDatabase, "prom_writer") defer db.Close() - ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db)) + ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) require.NoError(t, err) defer ingestor.Close() diff --git a/pkg/tests/end_to_end_tests/query_integration_test.go b/pkg/tests/end_to_end_tests/query_integration_test.go index a59ce37863..0df566460e 100644 --- a/pkg/tests/end_to_end_tests/query_integration_test.go +++ b/pkg/tests/end_to_end_tests/query_integration_test.go @@ -643,7 +643,7 @@ func TestSQLQuery(t *testing.T) { } func ingestQueryTestDataset(db *pgxpool.Pool, t testing.TB, metrics []prompb.TimeSeries) { - ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db)) + ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) if err != nil { t.Fatal(err) } diff --git a/pkg/tests/end_to_end_tests/view_test.go b/pkg/tests/end_to_end_tests/view_test.go index 8b5b02ab2c..97e96cf6a2 100644 --- a/pkg/tests/end_to_end_tests/view_test.go +++ b/pkg/tests/end_to_end_tests/view_test.go @@ -156,7 +156,7 @@ func TestSQLView(t *testing.T) { }, } - ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db)) + ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) if err != nil { t.Fatal(err) } @@ -255,7 +255,7 @@ func TestSQLViewSelectors(t *testing.T) { }, } - ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db)) + ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) if err != nil { t.Fatal(err) } diff --git a/pkg/tests/end_to_end_tests/zlast_test.go b/pkg/tests/end_to_end_tests/zlast_test.go index 4648a7094b..0f0e9c28e7 100644 --- a/pkg/tests/end_to_end_tests/zlast_test.go +++ b/pkg/tests/end_to_end_tests/zlast_test.go @@ -30,7 +30,7 @@ func TestDeleteMetricSQLAPI(t *testing.T) { if *extendedTest { ts = generateRealTimeseries() } - ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db)) + ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) if err != nil { t.Fatal(err) } diff --git a/pkg/tests/upgrade_tests/upgrade_test.go b/pkg/tests/upgrade_tests/upgrade_test.go index 1394764dca..88611efb2a 100644 --- a/pkg/tests/upgrade_tests/upgrade_test.go +++ b/pkg/tests/upgrade_tests/upgrade_test.go @@ -144,7 +144,7 @@ func getUpgradedDbInfo(t *testing.T, noData bool, extensionState testhelpers.Ext defer db.Close() if !noData { - ing, err := ingestor.NewPgxIngestorForTests(pgxconn.NewPgxConn(db)) + ing, err := ingestor.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) if err != nil { t.Fatalf("error connecting to DB: %v", err) } @@ -166,7 +166,7 @@ func getPristineDbInfo(t *testing.T, noData bool, extensionState testhelpers.Ext if noData { return } - ing, err := ingestor.NewPgxIngestorForTests(pgxconn.NewPgxConn(db)) + ing, err := ingestor.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) if err != nil { t.Fatalf("error connecting to DB: %v", err) } @@ -177,7 +177,7 @@ func getPristineDbInfo(t *testing.T, noData bool, extensionState testhelpers.Ext /* postRestart */ func(container testcontainers.Container, _ string, db *pgxpool.Pool, tmpDir string) { if !noData { - ing, err := ingestor.NewPgxIngestorForTests(pgxconn.NewPgxConn(db)) + ing, err := ingestor.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) if err != nil { t.Fatalf("error connecting to DB: %v", err) }