Skip to content

Commit

Permalink
simulate data race in test for real
Browse files Browse the repository at this point in the history
  • Loading branch information
naphatkrit committed Nov 15, 2023
1 parent fc217ae commit e70689c
Showing 1 changed file with 63 additions and 10 deletions.
Expand Up @@ -16,6 +16,8 @@ package test

import (
"context"
"fmt"
"io"
"net"
"sync"
"testing"
Expand All @@ -24,6 +26,8 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/interop"
"google.golang.org/grpc/status"

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/attribute"
Expand All @@ -35,6 +39,7 @@ import (
"go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
testpb "google.golang.org/grpc/interop/grpc_testing"
)

func TestStatsHandler(t *testing.T) {
Expand Down Expand Up @@ -1318,7 +1323,9 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) {
metricdatatest.AssertEqual(t, expectedScopeMetric, rm.ScopeMetrics[0], metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
}

func TestStatsHandlerConcurrentSafe(t *testing.T) {
// Ensure there is no data race for the following scenario:
// Bidirectional streaming + client cancels context in the middle of streaming.
func TestStatsHandlerConcurrentSafeContextCancellation(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err, "failed to open port")
client := newGrpcTest(t, listener,
Expand All @@ -1330,14 +1337,60 @@ func TestStatsHandlerConcurrentSafe(t *testing.T) {
},
)

wg := &sync.WaitGroup{}
const n = 100
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
defer wg.Done()
doCalls(client)
}()
const testCount = 100
for i := 0; i < testCount; i++ {
t.Run(fmt.Sprintf("run_%d", i), func(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(2)
ctx, cancel := context.WithCancel(context.Background())
stream, err := client.FullDuplexCall(ctx)
require.NoError(t, err)
const messageCount = 100
go func() {
defer wg.Done()
sendWg := &sync.WaitGroup{}
for i := 0; i < messageCount; i++ {
sendWg.Add(1)
go func() {
defer sendWg.Done()
const reqSize = 10
pl := interop.ClientNewPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
respParam := []*testpb.ResponseParameters{
{
Size: reqSize,
},
}
req := &testpb.StreamingOutputCallRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseParameters: respParam,
Payload: pl,
}
err := stream.Send(req)
if err == io.EOF { // possible due to context cancellation
require.ErrorIs(t, ctx.Err(), context.Canceled)
} else {
require.NoError(t, err)
}
}()
}
sendWg.Wait()
require.NoError(t, stream.CloseSend())
}()
go func() {
defer wg.Done()
for i := 0; i < messageCount; i++ {
_, err := stream.Recv()
if i > messageCount/2 {
cancel()
}
// must continue to receive messages until server acknowledges the cancellation, to ensure no data race happens there too
if status.Code(err) == codes.Canceled {
return
}
require.NoError(t, err)
}
}()
wg.Wait()
})
}
wg.Wait()
}

0 comments on commit e70689c

Please sign in to comment.