Skip to content

Commit

Permalink
Merge pull request #12769 from benlesh/fix-breakpressure-with-integra…
Browse files Browse the repository at this point in the history
…tion-test

fix(microservices): fix backpressure with integration test
  • Loading branch information
kamilmysliwiec authored Nov 20, 2023
2 parents d4bda94 + d27d181 commit afdc15c
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 17 deletions.
21 changes: 21 additions & 0 deletions integration/microservices/e2e/sum-grpc.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,27 @@ describe('GRPC transport', () => {
});
});

it(`GRPC with backpressure control`, async function () {
// This test hit the gRPC server with 1000 messages, but the server
// has to process large (> 1MB) messages, so it will definitely hit
// issues where writing to the stream needs to be paused until a drain
// event. Prior to this test, a bug existed where the server would
// send the incorrect number of messages due to improper backpressure
// handling that wrote messages more than once.
this.timeout(10000);

const largeMessages = client.streamLargeMessages();
// [0, 1, 2, ..., 999]
const expectedIds = Array.from({ length: 1000 }, (_, n) => n);
const receivedIds: number[] = [];

await largeMessages.forEach(msg => {
receivedIds.push(msg.id);
});

expect(receivedIds).to.deep.equal(expectedIds);
});

after(async () => {
await app.close();
});
Expand Down
21 changes: 21 additions & 0 deletions integration/microservices/src/grpc/grpc.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,27 @@ export class GrpcController {
return svc.sum2({ data });
}

@GrpcMethod('Math')
streamLargeMessages(_req: unknown, _meta: unknown) {
// Send 1000 messages of >1MB each relatively fast
// This should be enough to trigger backpressure issues
// while writing to the socket.
return new Observable(subscriber => {
let n = 0;
const interval = setInterval(() => {
// We'll be checking the ids. The `data` is just to make the
// message large enough to trigger backpressure issues.
subscriber.next({ id: n++, data: 'a'.repeat(1024 * 1024) });
if (n === 1000) {
subscriber.complete();
}
}, 0);
return () => {
clearInterval(interval);
};
});
}

@Post('error')
@HttpCode(200)
serializeError(
Expand Down
8 changes: 8 additions & 0 deletions integration/microservices/src/grpc/math.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,16 @@ service Math {
rpc SumStream(stream RequestSum) returns(stream SumResult);
rpc SumStreamPass(stream RequestSum) returns(stream SumResult);
rpc Divide (RequestDivide) returns (DivideResult);
rpc StreamLargeMessages(Empty) returns (stream BackpressureData) {}
}

message BackpressureData {
int32 id = 1;
string data = 2;
}

message Empty {}

message SumResult {
int32 result = 1;
}
Expand Down
29 changes: 15 additions & 14 deletions packages/microservices/server/server-grpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
let shouldErrorAfterDraining = false;
let error: any;
let shouldResolveAfterDraining = false;
let writing = true;

// Used to manage finalization
const subscription = new Subscription();
Expand All @@ -290,19 +291,18 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
subscription.add(() => call.end());

const drain = () => {
writing = true;
while (valuesWaitingToBeDrained.length > 0) {
// Try to write the value, THEN shift it off, because
// if we can't write the value, we need to keep it in the
// buffer at it's position to ensure ordering.
const value = valuesWaitingToBeDrained[0];
if (!call.write(value)) {
// We can't write anymore so we need to wait for the drain event
// stop draining for now.
return;
const value = valuesWaitingToBeDrained.shift()!;
if (writing) {
// The first time `call.write` returns false, we need to stop.
// It wrote the value, but it won't write anything else.
writing = call.write(value);
if (!writing) {
// We can't write anymore so we need to wait for the drain event
return;
}
}

// We successfully wrote the value, so we can shift it off the buffer
valuesWaitingToBeDrained.shift();
}

if (shouldResolveAfterDraining) {
Expand All @@ -320,7 +320,9 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
subscription.add(
source.subscribe({
next(value) {
if (!call.write(value)) {
if (writing) {
writing = call.write(value);
} else {
// If we can't write, that's because we need to
// wait for the drain event before we can write again
// buffer the value and wait for the drain event
Expand Down Expand Up @@ -383,8 +385,7 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
if (isResponseStream) {
try {
await this.writeObservableToGrpc(res, call);
}
catch (err) {
} catch (err) {
call.emit('error', err);
return;
}
Expand Down
7 changes: 4 additions & 3 deletions packages/microservices/test/server/server-grpc.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -602,11 +602,12 @@ describe('ServerGrpc', () => {
const call = {
write: sinon.spy(value => {
// Simulating a writable stream becoming overwhelmed.
const canWrite = writeCounter++ < highwaterMark;
if (canWrite) {
if (writeCounter++ < highwaterMark) {
// We can write this value to the stream.
written.push(value);
}
return canWrite;
// But as soon as we pass the highwater mark, we can't write anymore.
return writeCounter < highwaterMark;
}),
end: sinon.spy(() => {
written.push('end');
Expand Down

0 comments on commit afdc15c

Please sign in to comment.