From 3b622407f8a24d221f69fea21632bfaffe16827b Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sat, 22 Feb 2025 09:49:04 -0500 Subject: [PATCH 1/2] add tests for custom partition stream --- quest_test.go | 64 +++++++++++---------------------------------------- test_utils.go | 9 ++++++++ 2 files changed, 23 insertions(+), 50 deletions(-) diff --git a/quest_test.go b/quest_test.go index ce099fc..16d435b 100644 --- a/quest_test.go +++ b/quest_test.go @@ -189,6 +189,19 @@ func TestLoadStream_StaticSchema_EventWithNewField(t *testing.T) { DeleteStream(t, NewGlob.QueryClient, staticSchemaStream) } +func TestCreateStream_WithCustomPartition_Success(t *testing.T) { + customPartitionStream := NewGlob.Stream + "custompartition" + customHeader := map[string]string{"X-P-Custom-Partition": "level"} + CreateStreamWithHeader(t, NewGlob.QueryClient, customPartitionStream, customHeader) + DeleteStream(t, NewGlob.QueryClient, customPartitionStream) +} + +func TestCreateStream_WithCustomPartition_Error(t *testing.T) { + customPartitionStream := NewGlob.Stream + "custompartition" + customHeader := map[string]string{"X-P-Custom-Partition": "level,os"} + CreateStreamWithCustompartitionError(t, NewGlob.QueryClient, customPartitionStream, customHeader) +} + func TestSmokeQueryTwoStreams(t *testing.T) { stream1 := NewGlob.Stream + "1" stream2 := NewGlob.Stream + "2" @@ -600,7 +613,7 @@ func TestLoadHistoricalStreamBatchWithK6(t *testing.T) { func TestLoadStreamBatchWithCustomPartitionWithK6(t *testing.T) { customPartitionStream := NewGlob.Stream + "custompartition" - customHeader := map[string]string{"X-P-Custom-Partition": "level,os"} + customHeader := map[string]string{"X-P-Custom-Partition": "level"} CreateStreamWithHeader(t, NewGlob.QueryClient, customPartitionStream, customHeader) if NewGlob.IngestorUrl.String() == "" { cmd := exec.Command("k6", @@ -645,55 +658,6 @@ func TestLoadStreamBatchWithCustomPartitionWithK6(t *testing.T) { DeleteStream(t, NewGlob.QueryClient, customPartitionStream) } -func TestLoadStreamBatchWithTimeAndCustomPartitionWithK6(t *testing.T) { - if NewGlob.Mode == "load" { - customPartitionStream := NewGlob.Stream + "timeandcustompartition" - customHeader := map[string]string{"X-P-Custom-Partition": "level,os", "X-P-Time-Partition": "source_time"} - CreateStreamWithHeader(t, NewGlob.QueryClient, customPartitionStream, customHeader) - if NewGlob.IngestorUrl.String() == "" { - cmd := exec.Command("k6", - "run", - "-e", fmt.Sprintf("P_URL=%s", NewGlob.QueryUrl.String()), - "-e", fmt.Sprintf("P_USERNAME=%s", NewGlob.QueryUsername), - "-e", fmt.Sprintf("P_PASSWORD=%s", NewGlob.QueryPassword), - "-e", fmt.Sprintf("P_STREAM=%s", customPartitionStream), - "-e", fmt.Sprintf("P_SCHEMA_COUNT=%s", schema_count), - "-e", fmt.Sprintf("P_EVENTS_COUNT=%s", events_count), - "./scripts/load_historical_batch_events.js", - "--vus=", vus, - "--duration=", duration) - - cmd.Run() - op, err := cmd.Output() - if err != nil { - t.Log(err) - } - t.Log(string(op)) - } else { - cmd := exec.Command("k6", - "run", - "-e", fmt.Sprintf("P_URL=%s", NewGlob.IngestorUrl.String()), - "-e", fmt.Sprintf("P_USERNAME=%s", NewGlob.IngestorUsername), - "-e", fmt.Sprintf("P_PASSWORD=%s", NewGlob.IngestorPassword), - "-e", fmt.Sprintf("P_STREAM=%s", customPartitionStream), - "-e", fmt.Sprintf("P_SCHEMA_COUNT=%s", schema_count), - "-e", fmt.Sprintf("P_EVENTS_COUNT=%s", events_count), - "./scripts/load_historical_batch_events.js", - "--vus=", vus, - "--duration=", duration) - - cmd.Run() - op, err := cmd.Output() - if err != nil { - t.Log(err) - } - t.Log(string(op)) - } - - DeleteStream(t, NewGlob.QueryClient, customPartitionStream) - } -} - func TestLoadStreamNoBatchWithK6(t *testing.T) { if NewGlob.Mode == "load" { CreateStream(t, NewGlob.QueryClient, NewGlob.Stream) diff --git a/test_utils.go b/test_utils.go index 7fb90df..32beda0 100644 --- a/test_utils.go +++ b/test_utils.go @@ -80,6 +80,15 @@ func CreateStreamWithHeader(t *testing.T, client HTTPClient, stream string, head require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s", response.Status) } +func CreateStreamWithCustompartitionError(t *testing.T, client HTTPClient, stream string, header map[string]string) { + req, _ := client.NewRequest("PUT", "logstream/"+stream, nil) + for k, v := range header { + req.Header.Add(k, v) + } + response, _ := client.Do(req) + require.Equalf(t, 500, response.StatusCode, "Server returned http code: %s", response.Status) +} + func CreateStreamWithSchemaBody(t *testing.T, client HTTPClient, stream string, header map[string]string, schema_payload string) { req, _ := client.NewRequest("PUT", "logstream/"+stream, bytes.NewBufferString(schema_payload)) From 3adc91a476abd660faeabd7c06257617fadbe19e Mon Sep 17 00:00:00 2001 From: Nitish Tiwari Date: Sat, 22 Feb 2025 22:39:50 +0530 Subject: [PATCH 2/2] change to timepartition test only (#1) --- quest_test.go | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/quest_test.go b/quest_test.go index 16d435b..0403b11 100644 --- a/quest_test.go +++ b/quest_test.go @@ -658,6 +658,55 @@ func TestLoadStreamBatchWithCustomPartitionWithK6(t *testing.T) { DeleteStream(t, NewGlob.QueryClient, customPartitionStream) } +func TestLoadStreamBatchWithTimePartitionWithK6(t *testing.T) { + if NewGlob.Mode == "load" { + customPartitionStream := NewGlob.Stream + "timepartition" + customHeader := map[string]string{"X-P-Time-Partition": "source_time"} + CreateStreamWithHeader(t, NewGlob.QueryClient, customPartitionStream, customHeader) + if NewGlob.IngestorUrl.String() == "" { + cmd := exec.Command("k6", + "run", + "-e", fmt.Sprintf("P_URL=%s", NewGlob.QueryUrl.String()), + "-e", fmt.Sprintf("P_USERNAME=%s", NewGlob.QueryUsername), + "-e", fmt.Sprintf("P_PASSWORD=%s", NewGlob.QueryPassword), + "-e", fmt.Sprintf("P_STREAM=%s", customPartitionStream), + "-e", fmt.Sprintf("P_SCHEMA_COUNT=%s", schema_count), + "-e", fmt.Sprintf("P_EVENTS_COUNT=%s", events_count), + "./scripts/load_historical_batch_events.js", + "--vus=", vus, + "--duration=", duration) + + cmd.Run() + op, err := cmd.Output() + if err != nil { + t.Log(err) + } + t.Log(string(op)) + } else { + cmd := exec.Command("k6", + "run", + "-e", fmt.Sprintf("P_URL=%s", NewGlob.IngestorUrl.String()), + "-e", fmt.Sprintf("P_USERNAME=%s", NewGlob.IngestorUsername), + "-e", fmt.Sprintf("P_PASSWORD=%s", NewGlob.IngestorPassword), + "-e", fmt.Sprintf("P_STREAM=%s", customPartitionStream), + "-e", fmt.Sprintf("P_SCHEMA_COUNT=%s", schema_count), + "-e", fmt.Sprintf("P_EVENTS_COUNT=%s", events_count), + "./scripts/load_historical_batch_events.js", + "--vus=", vus, + "--duration=", duration) + + cmd.Run() + op, err := cmd.Output() + if err != nil { + t.Log(err) + } + t.Log(string(op)) + } + + DeleteStream(t, NewGlob.QueryClient, customPartitionStream) + } +} + func TestLoadStreamNoBatchWithK6(t *testing.T) { if NewGlob.Mode == "load" { CreateStream(t, NewGlob.QueryClient, NewGlob.Stream)