Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Handle race condition in concurrent write tests
Browse files Browse the repository at this point in the history
FailNow and functions that call it are not threadsafe, and can only be
called from the main goroutine.
  • Loading branch information
JLockerman committed Aug 31, 2020
1 parent 4ad81ac commit 5082fd2
Showing 1 changed file with 22 additions and 7 deletions.
29 changes: 22 additions & 7 deletions pkg/pgmodel/end_to_end_tests/promql_write_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,20 +111,23 @@ func getHTTPWriteRequest(protoRequest *prompb.WriteRequest) (*http.Request, erro
func sendWriteRequest(t testing.TB, router *route.Router, ts []prompb.TimeSeries) {
req, err := getHTTPWriteRequest(&prompb.WriteRequest{Timeseries: ts})
if err != nil {
t.Fatalf("unable to create PromQL label names request: %v", err)
t.Errorf("unable to create PromQL label names request: %v", err)
return
}

rec := httptest.NewRecorder()
router.ServeHTTP(rec, req)

tsResp := rec.Result()
if rec.Code != 200 {
t.Fatal(rec.Code)
t.Error(rec.Code)
return
}

_, err = ioutil.ReadAll(tsResp.Body)
if err != nil {
t.Fatalf("unexpected error returned when reading connector response body:\n%s\n", err.Error())
t.Errorf("unexpected error returned when reading connector response body:\n%s\n", err.Error())
return
}
defer tsResp.Body.Close()
}
Expand All @@ -145,33 +148,39 @@ func verifyTimeseries(t testing.TB, db *pgxpool.Pool, tsSlice []prompb.TimeSerie
values = append(values, label.Value)
}
if name == "" {
t.Fatal("No ts series metric name found")
t.Error("No ts series metric name found")
return
}
for sampleIdx := range ts.Samples {
sample := ts.Samples[sampleIdx]
rows, err := db.Query(context.Background(), fmt.Sprintf("SELECT value FROM prom_data.%s WHERE time = $1 and series_id = (SELECT series_id FROM _prom_catalog.get_or_create_series_id_for_kv_array($2, $3, $4))",
name), model.Time(sample.Timestamp).Time(), name, names, values)
if err != nil {
t.Fatal(err)
t.Error(err)
return
}
defer rows.Close()
count := 0
for rows.Next() {
var val *float64
err := rows.Scan(&val)
if err != nil {
t.Fatal(err)
t.Error(err)
return
}
if val == nil {
t.Fatal("NULL value")
t.Error("NULL value")
return
}
if *val != sample.Value {
t.Errorf("Unexpected value: got %v, unexpected %v", *val, sample.Value)
return
}
count++
}
if count != 1 {
t.Errorf("Unexpected count: %d", count)
return
}
}
}
Expand Down Expand Up @@ -225,11 +234,17 @@ func sendConcurrentWrites(t testing.TB, db *pgxpool.Pool, queues int, metricGrou
ts := dg.generateTimeseries()
sendWriteRequest(t, router, ts)
if duplicates {
if t.Failed() {
return
}
sendWriteRequest(t, router, ts)
}
tss = append(tss, ts)
}
for i := range tss {
if t.Failed() {
return
}
verifyTimeseries(t, db, tss[i])
}
}()
Expand Down

0 comments on commit 5082fd2

Please sign in to comment.