@@ -263,6 +263,8 @@ void markPrefaceSent() {
263263 private final Decoder hpackIn ;
264264 final SettingsFrame clientSettings ;
265265 private volatile SettingsFrame serverSettings ;
266+ private record PushContinuationState (HeaderDecoder pushContDecoder , PushPromiseFrame pushContFrame ) {}
267+ private volatile PushContinuationState pushContinuationState ;
266268 private final String key ; // for HttpClientImpl.connections map
267269 private final FramesDecoder framesDecoder ;
268270 private final FramesEncoder framesEncoder = new FramesEncoder ();
@@ -775,8 +777,8 @@ void processFrame(Http2Frame frame) throws IOException {
775777 }
776778
777779 if (!(frame instanceof ResetFrame )) {
778- if (frame instanceof DataFrame ) {
779- dropDataFrame (( DataFrame ) frame );
780+ if (frame instanceof DataFrame df ) {
781+ dropDataFrame (df );
780782 }
781783 if (isServerInitiatedStream (streamid )) {
782784 if (streamid < nextPushStream ) {
@@ -793,26 +795,44 @@ void processFrame(Http2Frame frame) throws IOException {
793795 }
794796 return ;
795797 }
796- if ( frame instanceof PushPromiseFrame ) {
797- PushPromiseFrame pp = ( PushPromiseFrame ) frame ;
798- try {
799- handlePushPromise ( stream , pp );
800- } catch ( UncheckedIOException e ) {
801- protocolError ( ResetFrame . PROTOCOL_ERROR , e . getMessage ());
802- return ;
803- }
804- } else if ( frame instanceof HeaderFrame ) {
805- // decode headers (or continuation)
806- try {
807- decodeHeaders (( HeaderFrame ) frame , stream . rspHeadersConsumer ());
808- } catch ( UncheckedIOException e ) {
809- debug . log ( "Error decoding headers: " + e . getMessage (), e ) ;
810- protocolError (ResetFrame .PROTOCOL_ERROR , e . getMessage () );
798+
799+ // While push frame is not null, the only acceptable frame on this
800+ // stream is a Continuation frame
801+ if ( pushContinuationState != null ) {
802+ if ( frame instanceof ContinuationFrame cf ) {
803+ try {
804+ handlePushContinuation ( stream , cf ) ;
805+ } catch ( UncheckedIOException e ) {
806+ debug . log ( "Error handling Push Promise with Continuation: " + e . getMessage (), e );
807+ protocolError ( ErrorFrame . PROTOCOL_ERROR , e . getMessage ());
808+ return ;
809+ }
810+ } else {
811+ pushContinuationState = null ;
812+ protocolError (ErrorFrame .PROTOCOL_ERROR , "Expected a Continuation frame but received " + frame );
811813 return ;
812814 }
813- stream .incoming (frame );
814815 } else {
815- stream .incoming (frame );
816+ if (frame instanceof PushPromiseFrame pp ) {
817+ try {
818+ handlePushPromise (stream , pp );
819+ } catch (UncheckedIOException e ) {
820+ protocolError (ErrorFrame .PROTOCOL_ERROR , e .getMessage ());
821+ return ;
822+ }
823+ } else if (frame instanceof HeaderFrame hf ) {
824+ // decode headers
825+ try {
826+ decodeHeaders (hf , stream .rspHeadersConsumer ());
827+ } catch (UncheckedIOException e ) {
828+ debug .log ("Error decoding headers: " + e .getMessage (), e );
829+ protocolError (ErrorFrame .PROTOCOL_ERROR , e .getMessage ());
830+ return ;
831+ }
832+ stream .incoming (frame );
833+ } else {
834+ stream .incoming (frame );
835+ }
816836 }
817837 }
818838 }
@@ -843,11 +863,34 @@ private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp)
843863 {
844864 // always decode the headers as they may affect connection-level HPACK
845865 // decoding state
866+ assert pushContinuationState == null ;
846867 HeaderDecoder decoder = new HeaderDecoder ();
847868 decodeHeaders (pp , decoder );
869+ int promisedStreamid = pp .getPromisedStream ();
870+ if (pp .endHeaders ()) {
871+ completePushPromise (promisedStreamid , parent , decoder .headers ());
872+ } else {
873+ pushContinuationState = new PushContinuationState (decoder , pp );
874+ }
875+ }
876+
877+ private <T > void handlePushContinuation (Stream <T > parent , ContinuationFrame cf )
878+ throws IOException {
879+ var pcs = pushContinuationState ;
880+ decodeHeaders (cf , pcs .pushContDecoder );
881+ // if all continuations are sent, set pushWithContinuation to null
882+ if (cf .endHeaders ()) {
883+ completePushPromise (pcs .pushContFrame .getPromisedStream (), parent ,
884+ pcs .pushContDecoder .headers ());
885+ pushContinuationState = null ;
886+ }
887+ }
848888
889+ private <T > void completePushPromise (int promisedStreamid , Stream <T > parent , HttpHeaders headers )
890+ throws IOException {
891+ // Perhaps the following checks could be moved to handlePushPromise()
892+ // to reset the PushPromise stream earlier?
849893 HttpRequestImpl parentReq = parent .request ;
850- int promisedStreamid = pp .getPromisedStream ();
851894 if (promisedStreamid != nextPushStream ) {
852895 resetStream (promisedStreamid , ResetFrame .PROTOCOL_ERROR );
853896 return ;
@@ -858,7 +901,6 @@ private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp)
858901 nextPushStream += 2 ;
859902 }
860903
861- HttpHeaders headers = decoder .headers ();
862904 HttpRequestImpl pushReq = HttpRequestImpl .createPushRequest (parentReq , headers );
863905 Exchange <T > pushExch = new Exchange <>(pushReq , parent .exchange .multi );
864906 Stream .PushedStream <T > pushStream = createPushStream (parent , pushExch );
0 commit comments