/
MultiPartMIMEReader.java
1666 lines (1472 loc) · 75.7 KB
/
MultiPartMIMEReader.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
Copyright (c) 2015 LinkedIn Corp.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.linkedin.multipart;
import com.linkedin.data.ByteString;
import com.linkedin.multipart.exceptions.MultiPartIllegalFormatException;
import com.linkedin.multipart.exceptions.MultiPartReaderFinishedException;
import com.linkedin.multipart.exceptions.SinglePartBindException;
import com.linkedin.multipart.exceptions.SinglePartFinishedException;
import com.linkedin.multipart.exceptions.SinglePartNotInitializedException;
import com.linkedin.multipart.exceptions.StreamBusyException;
import com.linkedin.r2.message.stream.StreamRequest;
import com.linkedin.r2.message.stream.StreamResponse;
import com.linkedin.r2.message.stream.entitystream.EntityStream;
import com.linkedin.r2.message.stream.entitystream.ReadHandle;
import com.linkedin.r2.message.stream.entitystream.Reader;
import com.linkedin.r2.message.stream.entitystream.WriteHandle;
import com.linkedin.r2.util.LinkedDeque;
import java.nio.charset.Charset;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.TreeMap;
import java.util.concurrent.Callable;
/**
* Zero copy, async streaming multipart mime reader based on the official RFC for multipart/mime.
*
* This class uses R2 streaming and a look-ahead buffer to allow clients to walk through the data of all parts using an async,
* callback based approach.
*
* Clients must first create a callback of type {@link com.linkedin.multipart.MultiPartMIMEReaderCallback} to pass to
* {@link MultiPartMIMEReader#registerReaderCallback(com.linkedin.multipart.MultiPartMIMEReaderCallback)}. This is the first
* step to using the MultiPartMIMEReader.
*
* Upon registration, at some time in the future, MultiPartMIMEReader will create {@link com.linkedin.multipart.MultiPartMIMEReader.SinglePartMIMEReader}s
* which will be passed to {@link MultiPartMIMEReaderCallback#onNewPart(com.linkedin.multipart.MultiPartMIMEReader.SinglePartMIMEReader)}.
*
* Clients will then have to create an instance of {@link com.linkedin.multipart.SinglePartMIMEReaderCallback} to bind
* and commit to reading these parts.
*
* Note that NONE of the APIs in this class are thread safe. Furthermore it is to be noted that API calls must be event driven.
* For example, asking for more part data using {@link com.linkedin.multipart.MultiPartMIMEReader.SinglePartMIMEReader#requestPartData()}
* can only be done either upon binding to the {@link com.linkedin.multipart.MultiPartMIMEReader.SinglePartMIMEReader} or
* after receiving data on {@link SinglePartMIMEReaderCallback#onPartDataAvailable(com.linkedin.data.ByteString)}. Therefore
* attempting to queue multiple reads, instead of waiting for callback invocations to drive forward, will result in runtime exceptions.
*
* @author Karim Vidhani
*/
public class MultiPartMIMEReader implements MultiPartMIMEDataSourceIterator
{
private final R2MultiPartMIMEReader _reader;
private final EntityStream _entityStream;
private volatile MultiPartMIMEReaderCallback _clientCallback;
private volatile String _preamble;
private volatile MultiPartReaderState _multiPartReaderState;
private volatile SinglePartMIMEReader _currentSinglePartMIMEReader;
class R2MultiPartMIMEReader implements Reader
{
private volatile ReadHandle _rh;
private volatile ByteString _compoundByteStringBuffer = ByteString.empty();
//The reason for the first boundary vs normal boundary difference is because the first boundary MAY be missing the
//leading CRLF.
//Note that even though it is incorrect for a client to send a multipart/mime payload in this manner,
//the RFC states that readers should be tolerant and be able to handle such cases.
private final String _firstBoundary;
private final String _normalBoundary;
private final String _finishingBoundary;
private byte[] _firstBoundaryBytes;
private byte[] _normalBoundaryBytes;
private byte[] _finishingBoundaryBytes;
private volatile boolean _firstBoundaryEvaluated = false;
//A signal from the R2 reader has been notified that all data is done being sent over. This does NOT mean that our
//top level reader can be notified that they are done since data could still be in the buffer.
private volatile boolean _r2Done = false;
//These two fields are needed to support our iterative invocation of callbacks so that we don't end up with a recursive loop
//which would lead to a stack overflow.
private final Queue<Callable<Void>> _callbackQueue = new LinkedDeque<Callable<Void>>();
private volatile boolean _callbackInProgress = false;
///////////////////////////////////////////////////////////////////////////////////////////////////
//Reader interface implementation
@Override
public void onInit(ReadHandle rh)
{
//If there was a top level drain request performed without the registration of a callback, then at this point
//_multiPartReaderState will be FINISHED. Therefore we just cancel and return.
if (_multiPartReaderState == MultiPartReaderState.FINISHED)
{
rh.cancel();
return;
}
//Otherwise start the reading process since the top level callback has been bound.
//Note that we read ahead here and we read only what we need. Our policy is always to read only 1 chunk at a time.
_rh = rh;
_rh.request(1);
}
@Override
public void onDataAvailable(ByteString data)
{
//A response for our _rh.request(1) has come
processEventAndInvokeClient(data);
}
@Override
public void onDone()
{
//Be careful, we still could have space left in our buffer.
_r2Done = true;
//We need to trigger onDataAvailable() again with empty data because there is an outstanding request
//to _rh.request(1).
processEventAndInvokeClient();
}
@Override
public void onError(Throwable e)
{
//R2 has informed us of an error. So we notify our readers and shut things down.
//It is important to note that R2 will only call onError only once we exit onDataAvailable().
//Therefore there are no concurrency issues to be concerned with. If we are going back and forth and honoring
//client requests using data from memory we will eventually need to ask R2 for more data. At this point,
//onError() will be called by R2 and we can clean up state and notify our clients on their callbacks.
//It could be the case that we already finished, or reached an erroneous state earlier on our own
//(i.e malformed multipart mime body or a client threw an exception when we invoked their callback).
//In such a case, just return.
if (_multiPartReaderState == MultiPartReaderState.FINISHED)
{
return;
}
handleExceptions(e);
}
///////////////////////////////////////////////////////////////////////////////////////
//Core look-ahead buffering and callback logic begins here:
//Client APIs invoke this method
private void processEventAndInvokeClient()
{
processEventAndInvokeClient(ByteString.empty());
}
//OnDataAvailable() will invoke this method
private void processEventAndInvokeClient(final ByteString data)
{
//Note that only one thread should ever be calling processEventAndInvokeClient() at any time.
//We control invocation of this method using the various states.
//Client API calls (from the readers) will call processEventAndInvokeClient() to refresh the logic and drive forward
//in a forcefully sequential manner.
//When more data is needed to fulfill a client's request, we will ask R2 and R2 will call us sequentially as well.
//During the time we are waiting for R2 to fulfill our request, client API calls cannot do anything further.
//It is for this reason we don't have to synchronize any data used within this method. This is a big
//performance win.
//
//Any runtime exceptions thrown by us will make it to R2 or to our clients, depending on who invoked us.
//This should ideally never ever happen and if it happens its a bug.
//If R2 does receive a RuntimeException from us then:
//A. In case of the server, R2 send back a 500 internal server error.
//B. In case of the client, R2 will close the connection.
if (checkAndProcessEpilogue())
{
return;
}
if (checkAndProcessTopLevelDraining())
{
return;
}
//Read data into our local buffer for further processing. All subsequent operations require this.
_compoundByteStringBuffer = new ByteString.Builder().append(_compoundByteStringBuffer).append(data).build();
if (checkAndProcessPreamble())
{
return;
}
//Since this is the last step we end up returning anyway.
performPartReading();
}
//This method is used to iteratively invoke our callbacks to prevent a stack overflow.
private void processAndInvokeCallableQueue()
{
//This variable indicates that there is no current iterative invocation taking place. We can start one here.
_callbackInProgress = true;
while (!_callbackQueue.isEmpty())
{
final Callable<Void> callable = _callbackQueue.poll();
try
{
callable.call();
}
catch (Throwable clientCallbackException)
{
handleExceptions(clientCallbackException);
}
}
_callbackInProgress = false;
}
private boolean checkAndProcessEpilogue()
{
//Drop the epilogue on the ground. No need to read into our buffer.
if (_multiPartReaderState == MultiPartReaderState.READING_EPILOGUE)
{
if (_r2Done)
{
//If r2 has already notified we are done, we can wrap up. There is no need to use our
//iterative technique to call this callback because a client cannot possibly invoke us again.
_multiPartReaderState = MultiPartReaderState.FINISHED;
try
{
//This can throw so we need to notify the client that their APIs threw an exception when we invoked them.
MultiPartMIMEReader.this._clientCallback.onFinished();
}
catch (RuntimeException clientCallbackException)
{
handleExceptions(clientCallbackException);
}
return true; //Regardless of whether the invocation to onFinished() threw or not we need to return here
}
//Otherwise r2 has not notified us that we are done. So we keep getting more bytes and dropping them.
_rh.request(1);
return true;
}
return false;
}
private boolean checkAndProcessTopLevelDraining()
{
//Drop bytes for a top level drain.
if (_multiPartReaderState == MultiPartReaderState.DRAINING)
{
if (_r2Done)
{
//If r2 has already notified we are done, we can wrap up. No need to look at remaining bytes in buffer.
//Also there is no need to use our iterative technique to call this callback because a client cannot
//possibly invoke us again.
_multiPartReaderState = MultiPartReaderState.FINISHED;
try
{
//This can throw so we need to notify the client that their APIs threw an exception when we invoked them.
MultiPartMIMEReader.this._clientCallback.onDrainComplete();
}
catch (RuntimeException clientCallbackException)
{
handleExceptions(clientCallbackException);
}
return true; //Regardless of whether the invocation to onDrainComplete() threw or not we need to return here
}
//Otherwise we keep on chugging forward and dropping bytes.
_rh.request(1);
return true;
}
return false;
}
private boolean checkAndProcessPreamble()
{
//Read the preamble in.
if (_multiPartReaderState == MultiPartReaderState.CALLBACK_BOUND_AND_READING_PREAMBLE)
{
final int firstBoundaryLookup = _compoundByteStringBuffer.indexOfBytes(_firstBoundaryBytes);
final int lastBoundaryLookup = _compoundByteStringBuffer.indexOfBytes(_finishingBoundaryBytes);
//Before reading the preamble, check to see if this is an empty multipart mime envelope. This can be checked by
//examining if the location of the first boundary matches the location of the finishing boundary.
//We also have to take into consideration that the first boundary doesn't include the leading CRLF so we subtract
//the length of the CRLF when doing the comparison.
//This means that the envelope looked like:
//Content-Type: multipart/<subType>; boundary="someBoundary"
//
//--someBoundary--
if (firstBoundaryLookup - MultiPartMIMEUtils.CRLF_BYTES.length == lastBoundaryLookup)
{
//In such a case we need to let the client know that reading is complete since there were no parts.
//There is no need to use our iterative technique to call this callback because a
//client cannot possibly invoke us again.
_multiPartReaderState = MultiPartReaderState.FINISHED;
try
{
//This can throw so we need to notify the client that their APIs threw an exception when we invoked them.
MultiPartMIMEReader.this._clientCallback.onFinished();
}
catch (RuntimeException clientCallbackException)
{
handleExceptions(clientCallbackException);
}
return true; //Regardless of whether the invocation to onFinished() threw or not we need to return here.
}
//Otherwise proceed.
if (firstBoundaryLookup > -1)
{
//The boundary has been found. Everything up until this point is the preamble.
final ByteString preambleSlice = _compoundByteStringBuffer.slice(0, firstBoundaryLookup);
_preamble = preambleSlice.asString(Charset.defaultCharset());
//Make a new copy with the bytes we need leaving the old list to be GC'd
_compoundByteStringBuffer = _compoundByteStringBuffer.slice(firstBoundaryLookup, _compoundByteStringBuffer.length() - firstBoundaryLookup);
//We can now transition to normal reading.
_multiPartReaderState = MultiPartReaderState.READING_PARTS;
}
else
{
//The boundary has not been found in the buffer, so keep looking.
if (_r2Done)
{
//If this happens that means that there was a problem. This means that r2 has
//fully given us all of the stream and we haven't found the boundary.
handleExceptions(new MultiPartIllegalFormatException("Malformed multipart mime request. No boundary found!"));
return true; //We are in an unusable state so we return here.
}
_rh.request(1);
return true;
}
}
return false;
}
private boolean checkForSufficientBufferSize()
{
//We buffer forward a bit if we have is less then the finishing boundary size.
//This is the minimum amount of data we need in the buffer before we can go forward.
if (_compoundByteStringBuffer.length() < _finishingBoundaryBytes.length)
{
//If this happens and r2 has not notified us that we are done, then this is a problem. This means that
//r2 has already notified that we are done and we didn't see the finishing boundary earlier.
//This should never happen.
if (_r2Done)
{
//Notify the reader of the issue.
handleExceptions(new MultiPartIllegalFormatException("Malformed multipart mime request. Finishing boundary missing!"));
return true; //We are in an unusable state so we return here.
}
//Otherwise we need to read in some more data.
_rh.request(1);
return true;
}
return false;
}
private void performPartReading()
{
if (checkForSufficientBufferSize())
{
return;
}
//READING_PARTS represents normal part reading operation and is where most of the time will be spent.
//At this point in time in our logic this is guaranteed to be in this state.
assert (_multiPartReaderState == MultiPartReaderState.READING_PARTS);
//The goal of the logic here is the following:
//1. If the buffer does not start with the boundary, then we fully consume as much of the buffer as possible.
//We notify clients of as much data we can consume. Note that in such a case, even if the buffer does not start with
//the boundary, it could still contain the boundary. In such a case we read up until the boundary. In this situation
//the bytes read would be the last bits of data they need for the current part. Subsequent invocations of
//requestPartData() would then lead to the buffer starting with the boundary.
//
//2. Otherwise if the buffer does start with boundary then we wrap up the previous part and
//begin the new one.
//Another invariant to note is that the result of this logic below will result in ONE of the
//following (assuming there are no error conditions):
//1. onPartDataAvailable()
//OR
//2. OnDrainComplete() on SinglePartCallback followed by onNewPart() on MultiPartCallback
//OR
//3. OnFinished() on SinglePartCallback followed by onNewPart() on MultiPartCallback
//OR
//4. OnDrainComplete() on SinglePartCallback followed by onFinished() on MultiPartCallback
//OR
//5. OnFinished() on SinglePartCallback followed by onFinished() on MultiPartCallback
//
//Note that onPartDataAvailable() and onNewPart() are never called one after another in the logic
//below because upon invocation of these callbacks, clients may come back to us immediately
//and it can potentially lead to very confusing states. Furthermore its also more intuitive
//to answer each client's request with only one callback. This also allows us to use the iterative
//callback invocation technique and return at a location in the code that is different then the original
//invocation location.
final int boundaryIndex;
final int boundarySize;
if (_firstBoundaryEvaluated == false)
{
//Immediately after the preamble, i.e the first part we are seeing
boundaryIndex = _compoundByteStringBuffer.indexOfBytes(_firstBoundaryBytes);
boundarySize = _firstBoundaryBytes.length;
}
else
{
boundaryIndex = _compoundByteStringBuffer.indexOfBytes(_normalBoundaryBytes);
boundarySize = _normalBoundaryBytes.length;
}
//Continue processing part data based on whether or not the buffer begins with a boundary.
//We return no matter what anyway, so no need to check to see if we should return for either of these.
if (boundaryIndex != 0)
{
processBufferStartingWithoutBoundary(boundaryIndex);
}
else
{
processBufferStartingWithBoundary(boundarySize);
}
}
private void processBufferStartingWithoutBoundary(final int boundaryIndex)
{
//We proceed forward since a reader SHOULD be ready at this point. By ready we mean that:
//1. They are ready to receive requested data on their onPartDataAvailable() callback, meaning
//REQUESTED_DATA.
//or
//2. They have requested a drain for this part and are waiting for it to finish, meaning REQUESTED_DRAIN.
//
//It is further important to note that in the current implementation, the reader will ALWAYS be ready at this point in time.
//This is because we strictly allow only our clients to push us forward. This means they must be in a ready state
//when all of this logic is executed.
//
//Formally, here is why we don't do _rh.request(2)...i.e _rh.request(n>1):
//A. If we did this, the first onDataAvailable() invoked by R2 would potentially satisfy a client's
//request. The second onDataAvailable() invoked by R2 would then just write data into the local buffer. However
//now we have to distinguish on whether or not the client drove us forward by refreshing us or our desire for more data
//drove us forward. This leads to more complication and also performs reading of data that we don't need yet.
//
//B. Multiple threads could call the logic here concurrently thereby violating the guarantee we get that
//the logic here is only run by one thread concurrently. For example:
//If we did a _rh.request(2), then the first invocation of onDataAvailable() would satisfy a
//client's request. The client could then drive us forward again by invoking onPartDataAvailable()
//to refresh the logic. However at this time the answer to our second _rh.request() could also come in
//thereby causing multiple threads to operate in an area where there is no synchronization.
//Note that _currentSinglePartMIMEReader is guaranteed to be non-null at this point.
assert(_currentSinglePartMIMEReader != null);
final SingleReaderState currentState = _currentSinglePartMIMEReader._singleReaderState;
//Assert on our invariant described above.
assert (currentState == SingleReaderState.REQUESTED_DATA || currentState == SingleReaderState.REQUESTED_DRAIN);
//We know the buffer doesn't begin with the boundary, but we can take different action if a boundary
//exists in the buffer. This way we can consume the maximum amount of data.
//If the boundary exists in the buffer we know we can read right up until it begins.
//If it doesn't the maximum we can read out is limited (since we don't want to consume possible
//future boundary data).
if (boundaryIndex == -1)
{
//Boundary doesn't exist here, let's drain what we can.
processBufferNotContainingBoundary(currentState);
}
else
{
//Boundary is in buffer. We can only consume data up until the boundary.
processBufferContainingBoundary(boundaryIndex, currentState);
}
}
private void processBufferNotContainingBoundary(final SingleReaderState singleReaderState)
{
//Note that we can't fully drain the buffer because the end of the buffer may include the partial
//beginning of the next boundary.
final int amountToLeaveBehind = _normalBoundaryBytes.length - 1;
final int maxAmountAvailableForClient = _compoundByteStringBuffer.length() - amountToLeaveBehind;
final ByteString clientData = decomposeClientDataAndUpdateBuffer(maxAmountAvailableForClient);
if (singleReaderState == SingleReaderState.REQUESTED_DATA)
{
//We must set this before we provide the data. Otherwise if the client immediately decides to requestPartData()
//they will see an exception because we are still in REQUESTED_DATA.
_currentSinglePartMIMEReader._singleReaderState = SingleReaderState.CALLBACK_BOUND_AND_READY;
//This effectively does:
//_currentSinglePartMIMEReader._callback.onPartDataAvailable(clientData);
final Callable<Void> onPartDataAvailableInvocation =
new OnPartDataCallable(_currentSinglePartMIMEReader._callback, clientData);
//Queue up this operation
_callbackQueue.add(onPartDataAvailableInvocation);
//If the while loop before us is in progress, we just return
if (_callbackInProgress)
{
//At this point since callbackInProgress is true, we know that we have a client callback invocation in our
//call stack. The while loop (see processAndInvokeCallableQueue) that is now executing the callback in our
//call stack will also execute an invocation to the newly added callback.
return;
}
else
{
processAndInvokeCallableQueue();
//If invoking the callables resulting in things stopping, we will return anyway.
}
//The client single part reader can then drive forward themselves.
}
else
{
//This is a drain operation, so we need to drop the bytes and keep moving forward.
//Note that we don't have a client to drive us forward so we do it ourselves.
final Callable<Void> recursiveCallable = new RecursiveCallable(this);
//Queue up this operation
_callbackQueue.add(recursiveCallable);
//If the while loop before us is in progress, we just return
if (_callbackInProgress)
{
//At this point since callbackInProgress is true, we know that we have a client callback invocation in our
//call stack. The while loop (see processAndInvokeCallableQueue) that is now executing the callback in our
//call stack will also execute an invocation to the newly added callback.
return;
}
else
{
processAndInvokeCallableQueue();
//If invoking the callables resulting in things stopping, we will return anyway.
}
//No need to explicitly return here.
}
}
private void processBufferContainingBoundary(final int boundaryIndex, final SingleReaderState singleReaderState)
{
//Could be normal boundary or it could be finishing boundary. We can only construct client data right up
//until the boundary's location.
final int maxAmountAvailableForClient = boundaryIndex;
final ByteString clientData = decomposeClientDataAndUpdateBuffer(maxAmountAvailableForClient);
if (singleReaderState == SingleReaderState.REQUESTED_DATA)
{
//We must set this before we provide the data.
_currentSinglePartMIMEReader._singleReaderState = SingleReaderState.CALLBACK_BOUND_AND_READY;
//This effectively does:
//_currentSinglePartMIMEReader._callback.onPartDataAvailable(clientData);
final Callable<Void> onPartDataAvailableInvocation =
new OnPartDataCallable(_currentSinglePartMIMEReader._callback, clientData);
//Queue up this operation
_callbackQueue.add(onPartDataAvailableInvocation);
//If the while loop before us is in progress, we just return
if (_callbackInProgress)
{
//At this point since callbackInProgress is true, we know that we have a client callback invocation in our
//call stack. The while loop (see processAndInvokeCallableQueue) that is now executing the callback in our
//call stack will also execute an invocation to the newly added callback.
return;
}
else
{
processAndInvokeCallableQueue();
//If invoking the callables resulting in things stopping, we will return anyway.
}
}
else
{
//drop the bytes
final Callable<Void> recursiveCallable = new RecursiveCallable(this);
//Queue up this operation
_callbackQueue.add(recursiveCallable);
//If the while loop before us is in progress, we just return
if (_callbackInProgress)
{
//At this point since callbackInProgress is true, we know that we have a client callback invocation in our
//call stack. The while loop (see processAndInvokeCallableQueue) that is now executing the callback in our
//call stack will also execute an invocation to the newly added callback.
return;
}
else
{
processAndInvokeCallableQueue();
//If invoking the callables resulting in things stopping, we will return anyway.
}
//No need to return explicitly from here.
}
//This part is finished. Subsequently when the client asks for more data our logic
//will now see that the buffer begins with the boundary. This will finish up this part
//and then make a new part.
}
//This will decompose our _compoundByteStringBuffer and obtain the client data needed to satisfy the amount
//of data requested. Subsequently this will update the _compoundByteStringBuffer to drop any references to
//data that we gave to our Client.
private ByteString decomposeClientDataAndUpdateBuffer(final int maxAmountAvailableForClient)
{
//Note that we have to provide ByteStrings that are NOT compound internally. This is because clients who consume
//these will invariably call APIs such as asByteBuffer() which, if compound, will incur a copy in memory
//due to re-assembly.
final List<ByteString> decomposedByteStrings = _compoundByteStringBuffer.decompose();
//Find the first non empty ByteString
ByteString firstNonEmptyByteString = null;
for (int i = 0; i < decomposedByteStrings.size(); i++)
{
if (decomposedByteStrings.get(i).length() > 0)
{
firstNonEmptyByteString = decomposedByteStrings.get(i);
break;
}
}
//This is guaranteed to be non-null since we verified earlier that we have sufficient buffer size.
assert(firstNonEmptyByteString != null);
//We can take as much as maxAmountAvailableForClient. Therefore we have one of three options:
//1. If firstNonEmptyByteString is less then maxAmountAvailableForClient, then we can give this
//entire ByteString to the client. We then update our _compoundByteStringBuffer by creating a slice of it
//that starts with an offset based off of the size of firstNonEmptyByteString.
//2. If firstNonEmptyByteString is greater then maxAmountAvailableForClient, then we have to take a slice
//of firstNonEmptyByteString from 0 to maxAmountAvailableForClient. We provide this slice to our client.
//We then update our _compoundByteStringBuffer by creating a slice of it that starts with an offset based
//on the size of the ByteString we gave to our client.
//3. If firstNonEmptyByteString is equal to maxAmountAvailableForClient, then either of the above are equivalent.
//Meaning that firstNonEmptyByteString (coincidentally) is exactly the maximum we can give to our client.
final ByteString clientData;
//We lump less then or equal to into the same case.
if (firstNonEmptyByteString.length() <= maxAmountAvailableForClient)
{
clientData = firstNonEmptyByteString;
}
else
{
clientData = firstNonEmptyByteString.slice(0, maxAmountAvailableForClient);
}
//Update our buffer by trimming off references to what we don't need anymore.
_compoundByteStringBuffer = _compoundByteStringBuffer.slice(clientData.length(),
_compoundByteStringBuffer.length() - clientData.length());
return clientData;
}
private void processBufferStartingWithBoundary(final int boundarySize)
{
//The beginning of the buffer contains a boundary. Finish of the current part first.
//If performing this results in an exception then we can't continue so we return.
if (finishCurrentPart())
{
return;
}
//Before continuing verify that this isn't the final boundary in front of us.
if (_compoundByteStringBuffer.startsWith(_finishingBoundaryBytes))
{
_multiPartReaderState = MultiPartReaderState.READING_EPILOGUE;
//If r2 has already notified we are done, we can wrap up. Note that there still may be bytes
//sitting in our byteBuffer that haven't been consumed. These bytes must be the epilogue
//bytes so we can safely ignore them.
if (_r2Done)
{
_multiPartReaderState = MultiPartReaderState.FINISHED;
//There is no need to use our iterative technique to call this callback because a
//client cannot possibly invoke us again.
try
{
//This can throw so we need to notify the client that their APIs threw an exception when we invoked them.
MultiPartMIMEReader.this._clientCallback.onFinished();
}
catch (RuntimeException clientCallbackException)
{
handleExceptions(clientCallbackException);
}
return; //Regardless of whether or not the onFinished() threw, we're done so we must return here.
}
//Keep on reading bytes and dropping them.
_rh.request(1);
return;
}
processNewPart(boundarySize);
}
private boolean finishCurrentPart()
{
//Close the current single part reader (except if this is the first boundary)
if (_currentSinglePartMIMEReader != null)
{
if (_currentSinglePartMIMEReader._singleReaderState == SingleReaderState.REQUESTED_DRAIN)
{
//If they cared to be notified of completion of the draining.
if (_currentSinglePartMIMEReader._callback != null)
{
//We need to prevent the client from asking for more data because they are done.
_currentSinglePartMIMEReader._singleReaderState = SingleReaderState.FINISHED;
//Note we do not need to use our iterative invocation technique here because
//the client can't request more data.
//Also it is important to note that we CANNOT use the iterative technique.
//This is because the iterative technique will NOT return back here (to this line of
//code) which does NOT guarantee us proceeding forward from here.
//We need to proceed forward from here to move onto the next part.
try
{
_currentSinglePartMIMEReader._callback.onDrainComplete();
}
catch (RuntimeException clientCallbackException)
{
//This could throw so handle appropriately.
handleExceptions(clientCallbackException);
return true; //We return since we are in an unusable state.
}
} //else no notification will happen since there was no callback registered.
}
else
{
//This was a part that cared about its data. Let's finish him up.
//We need to prevent the client from asking for more data because they are done.
_currentSinglePartMIMEReader._singleReaderState = SingleReaderState.FINISHED;
//Note we do not need to use our iterative invocation technique here because
//the client can't request more data.
//Also it is important to note that we CANNOT use the iterative technique.
//This is because the iterative technique will NOT return back here (to this line of
//code) which does NOT guarantee us proceeding forward from here.
//We need to proceed forward from here to move onto the next part.
try
{
_currentSinglePartMIMEReader._callback.onFinished();
}
catch (RuntimeException clientCallbackException)
{
//This could throw so handle appropriately.
handleExceptions(clientCallbackException);
return true; //We return since we are in an unusable state.
}
}
_currentSinglePartMIMEReader = null;
}
return false;
}
private void processNewPart(final int boundarySize)
{
//Now read until we have all the headers. Headers may or may not exist. According to the RFC:
//If the headers do not exist, we will see two CRLFs one after another.
//If at least one header does exist, we will see the headers followed by two CRLFs
//Essentially we are looking for the first occurrence of two CRLFs after we see the boundary.
//We need to make sure we can look ahead a bit here first. The minimum size of the buffer must be
//the size of the normal boundary plus consecutive CRLFs. This would be the bare minimum as it conveys
//empty headers.
if ((boundarySize + MultiPartMIMEUtils.CONSECUTIVE_CRLFS_BYTES.length) > _compoundByteStringBuffer.length())
{
if (_r2Done)
{
//If r2 has already notified we are done, then this is a problem. This means that
//we have the remainder of the stream in memory and we see a non-finishing boundary that terminates
//immediately without a CRLF_BYTES. This is a sign of a stream that was prematurely terminated.
//MultiPartMIMEReader.this._clientCallback.onStreamError();
handleExceptions(new MultiPartIllegalFormatException("Malformed multipart mime request. Premature"
+ " termination of multipart mime body due to a boundary without a subsequent consecutive CRLF."));
return; //Unusable state, so return.
}
_rh.request(1);
return;
}
//Now we will determine the existence of headers.
//In order to do this we construct a window to look into. We will look inside of the buffer starting at the
//end of the boundary until the end of the buffer.
final ByteString possibleHeaderArea = _compoundByteStringBuffer.slice(boundarySize, _compoundByteStringBuffer.length() - boundarySize);
//Find the two consecutive CRLFs.
final int headerEnding = possibleHeaderArea.indexOfBytes(MultiPartMIMEUtils.CONSECUTIVE_CRLFS_BYTES);
if (headerEnding == -1)
{
if (_r2Done)
{
//If r2 has already notified us we are done, then this is a problem. This means that we saw a
//a boundary followed by a potential header area. This header area does not contain
//two consecutive CRLF_BYTES characters. This is a malformed stream.
handleExceptions(new MultiPartIllegalFormatException("Malformed multipart mime request. Premature " +
"termination of headers within a part."));
return;//Unusable state, so return.
}
//We need more data since the current buffer doesn't contain the CRLFs.
_rh.request(1);
return;
}
//At this point, headerEnding represents the location of the first occurrence of consecutive CRLFs.
//It is important to note that it is possible for a malformed stream to not end its headers with consecutive
//CRLFs. In such a case, everything up until the first occurrence of the consecutive CRLFs will be considered
//part of the header area.
//Let's make a window into the header area. Note that we need to include the trailing consecutive CRLF bytes
//because we need to verify if the header area is empty, meaning it contains only consecutive CRLF bytes.
final ByteString headerBytesSlice = possibleHeaderArea.slice(0, headerEnding + MultiPartMIMEUtils.CONSECUTIVE_CRLFS_BYTES.length);
//Parse the headers
final Map<String, String> headers = parseHeaders(headerBytesSlice);
if (headers == null)
{
return; //Exception occurred when parsing headers. We are in an unusable state.
}
//At this point we have actual part data starting from headerEnding going forward
//which means we can dump everything else beforehand. We need to skip past the trailing consecutive CRLFs.
final int consumedDataIndex = boundarySize + headerEnding + MultiPartMIMEUtils.CONSECUTIVE_CRLFS_BYTES.length;
//Update our buffer by trimming off references to what we don't need anymore.
_compoundByteStringBuffer = _compoundByteStringBuffer.slice(consumedDataIndex, _compoundByteStringBuffer.length() - consumedDataIndex);
//Notify the callback that we have a new part
_currentSinglePartMIMEReader = new SinglePartMIMEReader(headers);
//_clientCallback.onNewPart(_currentSinglePartMIMEReader);
final Callable<Void> onNewPartInvocation =
new OnNewPartCallable(_clientCallback, _currentSinglePartMIMEReader);
//Queue up this operation
_callbackQueue.add(onNewPartInvocation);
//We can now switch to absorbing the normal boundary
_firstBoundaryEvaluated = true;
//If the while loop before us is in progress, we just return
if (_callbackInProgress)
{
//At this point since callbackInProgress is true, we know that we have a client callback invocation in our
//call stack. The while loop (see processAndInvokeCallableQueue) that is now executing the callback in our
//call stack will also execute an invocation to the newly added callback.
return;
}
else
{
processAndInvokeCallableQueue();
//No need to explicitly return here even if this invocation results in an exception.
}
}
//This will return null for the headers if there is an exception when parsing.
private Map<String, String> parseHeaders(final ByteString headerBytes)
{
final Map<String, String> headers;
if (headerBytes.equals(MultiPartMIMEUtils.BYTE_STRING_CONSECUTIVE_CRLFS_BYTES))
{
//The region of bytes after the boundary is composed of two CRLFs. Therefore we have no headers.
headers = Collections.emptyMap();
}
else
{
headers = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER);
//We have headers, lets read them in - we search using a sliding window.
//Currently our buffer is sitting just past the end of the boundary. Beyond this boundary
//there should be a CRLF followed by the first header. We will verify that this is indeed a CRLF
//and then we will skip it below.
final ByteString leadingBytes = headerBytes.slice(0, MultiPartMIMEUtils.CRLF_BYTES.length);
if (!leadingBytes.equals(MultiPartMIMEUtils.BYTE_STRING_CRLF_BYTES))
{
handleExceptions(new MultiPartIllegalFormatException("Malformed multipart mime request. Headers are improperly constructed."));
return null; //Unusable state, so return.
}
//The sliding-window-header-split technique here works because we are essentially splitting the buffer
//by looking at occurrences of CRLF bytes. This is analogous to splitting a String in Java but instead
//we are splitting a byte array.
//We start at an offset of i and currentHeaderStart because we need to skip the first CRLF.
int currentHeaderStart = MultiPartMIMEUtils.CRLF_BYTES.length;
final StringBuilder runningFoldedHeader = new StringBuilder(); //For folded headers. See below for details.
//Note that the end of the buffer we are sliding through is composed of two consecutive CRLFs.
//Our sliding window algorithm here will NOT evaluate the very last CRLF bytes (which would otherwise
//erroneously result in an empty header).
for (int i = MultiPartMIMEUtils.CRLF_BYTES.length; i < headerBytes.length() - MultiPartMIMEUtils.CRLF_BYTES.length; i++)
{
final ByteString currentWindow = headerBytes.slice(i, MultiPartMIMEUtils.CRLF_BYTES.length);
if (currentWindow.equals(MultiPartMIMEUtils.BYTE_STRING_CRLF_BYTES))
{
final ByteString currentHeader = headerBytes.slice(currentHeaderStart, i - currentHeaderStart);
//At this point we MAY have found the end of a header because the current window is a CRLF.
//This could POTENTIALLY mean that from currentHeaderStart until i is a header.
//However before we can reach this conclusion we must check for header folding. Header folding is described
//in RFC 822 which states that headers may take up multiple lines (therefore delimited by CRLFs).
//This rule only holds true if there is exactly one CRLF followed by atleast one LWSP (linear white space).
//A LWSP can be composed of multiple spaces, tabs or newlines. However most implementations only use
//spaces or tabs. Therefore our reading of folded headers will support only CRLFs followed by atleast one
//space or tab.
//Furthermore this syntax is deprecated so there is no need for us to formally support the RFC here as
//long as we cover interoperability with major libraries.
//Therefore we have two options here:
//1. If the character in front of us IS a tab or a white space, we must consider this the first part of a
//multi line header value. In such a case we have to keep going forward and append the current header value.
//2. Otherwise the character in front of us is NOT a tab or a white space. We can then consider the current
//header bytes to compose a header that fits on a single line.
String header = currentHeader.asString(Charset.defaultCharset());
if (headerBytes.getByte(i + MultiPartMIMEUtils.CRLF_BYTES.length) == MultiPartMIMEUtils.SPACE_BYTE
|| headerBytes.getByte(i + MultiPartMIMEUtils.CRLF_BYTES.length) == MultiPartMIMEUtils.TAB_BYTE)
{
//Append the running concatenation of the folded header. We need to preserve the original header so
//we also include the CRLF. The subsequent LWSP(s) will be also preserved because we don't trim here.
runningFoldedHeader.append(header + MultiPartMIMEUtils.CRLF_STRING);
}
else
{
//This is a single line header OR we arrived at the last line of a folded header.
if (runningFoldedHeader.length() != 0)
{
runningFoldedHeader.append(header);
header = runningFoldedHeader.toString();
runningFoldedHeader.setLength(0); //Clear the buffer for future folded headers in this part
}
//Note that according to the RFC that header values may contain semi colons but header names may not.
//Therefore it is acceptable to split on semicolon here and derive the header name from 0 -> semicolonIndex.
final int colonIndex = header.indexOf(":");
if (colonIndex == -1)
{
handleExceptions(new MultiPartIllegalFormatException(
"Malformed multipart mime request. Individual headers are improperly formatted."));
return null; //Unusable state, so return.
}
headers.put(header.substring(0, colonIndex).trim(),
header.substring(colonIndex + 1, header.length()).trim());
}
currentHeaderStart = i + MultiPartMIMEUtils.CRLF_BYTES.length;
}
}
}
return headers;
}
void handleExceptions(final Throwable throwable)
{
//All exceptions caught here should put the reader in a non-usable state. Continuing from this point forward
//is not feasible.
//We also will cancel here and have R2 read and drop all bytes on the floor. Otherwise we are obliged to read
//and drop all bytes on the floor. It does not make any sense to enter this obligation when we are in
//a non-usable state.
//Exceptions here are indicative that there was malformed data provided to the MultiPartMIMEReader
//OR that the client APIs threw exceptions themselves when their callbacks were invoked.
//We will also invoke the appropriate callbacks here indicating there is an exception while reading.
//It is the responsibility of the consumer of this library to catch these exceptions and return 4xx.
_rh.cancel();
//Call the single part callback first.
if (_currentSinglePartMIMEReader != null)
{
_currentSinglePartMIMEReader._singleReaderState = SingleReaderState.FINISHED;