-
Notifications
You must be signed in to change notification settings - Fork 2
/
Internal.hs
881 lines (787 loc) · 28.5 KB
/
Internal.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ViewPatterns #-}
module Codec.Compression.LZMA.Internal
(
-- * Compression
CompressParams(..)
, defaultCompressParams
, C.Preset
, C.defaultPreset
, C.extremePreset
, C.customPreset
, C.Check(..)
-- ** Lazy 'L.ByteString's
, compress
-- ** Incremental processing
, CompressStream
, compressST
, compressIO
, compressStream
-- * Decompression
, DecompressParams(..)
, defaultDecompressParams
-- ** Lazy 'L.ByteString's
, decompress
-- ** Incremental processing
-- *** Sequential decompression
, DecompressStream
, decompressST
, decompressIO
, decompressStream
-- *** Decompression with random access support
, SeekableDecompressStream
, seekableDecompressIO
, seekableDecompressStream
-- *** Decoding indicies
, decodeIndex
, DecodeStream
, runDecodeStream
, decodeIndexIO
, decodeIndexStream
-- * Types for incremental processing
, ReadRequest(..)
, Position
, Compression(..)
, Size
-- * Exceptions
, SomeLZMAException(..)
, CompressException(..)
, DecompressException(..)
, DecodeException(..)
-- * Utils
, hasMagicBytes
) where
import Control.Applicative
import Control.Exception (IOException, assert)
import Control.Monad
import Data.Maybe (fromMaybe)
import Data.Monoid
import Data.Typeable (Typeable)
import Data.Word
import Foreign hiding (void)
import System.IO
import Prelude
import Control.Monad.Catch
import Control.Monad.Trans
import Foreign.Var
import Pipes hiding (next, void)
import Pipes.Core
import Pipes.Safe ()
import qualified Control.Monad.ST as S
import qualified Control.Monad.ST.Lazy as L
import qualified Data.ByteString as S
import qualified Data.ByteString.Internal as S
import qualified Data.ByteString.Lazy as L
import qualified Data.ByteString.Lazy.Internal as L
import qualified Pipes.Internal as P
import Codec.Compression.LZMA.Internal.IndexDecoder (IndexDecoder)
import Codec.Compression.LZMA.Internal.Stream (Stream)
import Codec.Compression.LZMA.Internal.Types
import qualified Codec.Compression.LZMA.Internal.C as C
import qualified Codec.Compression.LZMA.Internal.IndexDecoder as ID
import qualified Codec.Compression.LZMA.Internal.Stream as Stream
#if MIN_VERSION_bytestring(0, 10, 2)
import qualified Data.ByteString.Builder as B
#else
import qualified Data.ByteString.Lazy.Builder as B
#endif
#if DEBUG
import Debug.Trace
#endif
-- | The full set of parameters for compression. The defaults are
-- 'defaultCompressParams'.
data CompressParams = CompressParams
{ compressPreset :: !C.Preset
-- ^
, compressIntegrityCheck :: !C.Check
-- ^
, compressBufferSize :: !Int
-- ^
, compressMemoryLimit :: !Word64
-- ^
}
-- | The default set of parameters for compression. This is typically used with
-- the 'compressWith' function with specific paramters opverridden.
defaultCompressParams :: CompressParams
defaultCompressParams = CompressParams
{ compressPreset = C.defaultPreset
, compressIntegrityCheck = C.CheckCrc64
, compressBufferSize = defaultCompressBufferSize
, compressMemoryLimit = maxBound -- No limit
}
-- | The full set of parameters for decompression. The defaults are
-- 'defaultDecompressParams'.
data DecompressParams = DecompressParams
{ decompressBufferSize :: !Int
-- ^ The size of the first output buffer, containing the uncompressed data.
-- If you know an exact or approximate upper bound on the size of the
-- decompressed data then setting this parameter can save memory. The default
-- decompression output buffer size is 32k. If your estimate is wrong it does
-- not matter too much, the default buffer size will be used for the remaining
-- chunks.
--
-- One paticular use case for setting the 'decompressBufferSize' is if you
-- know the exact size of the decompressed data and want to produce a strict
-- 'S.ByteString'. The compression and decompression function use lazy
-- 'L.ByteString's but if you set the 'decompressBufferSize' correctly then
-- you can generate a lazy 'L.ByteString' with exactly one chunk, which can
-- be converted to a strict 'S.ByteString' in @O(1)@ time using
-- @'S.concat' . 'L.toChunks'@.
, decompressMemoryLimit :: !Word64
-- ^
}
-- | The default set of parameters for decompression. This is typically used
-- with the 'decompressWith' function with specific parameters overridden.
defaultDecompressParams :: DecompressParams
defaultDecompressParams = DecompressParams
{ decompressBufferSize = defaultDecompressBufferSize
, decompressMemoryLimit = maxBound -- No limit
}
defaultCompressBufferSize :: Int
defaultCompressBufferSize = 16 * 1024 - L.chunkOverhead
defaultDecompressBufferSize :: Int
defaultDecompressBufferSize = 32 * 1024 - L.chunkOverhead
-----------------------------------------------------------
-- Compression
-- | The unfolding of the compression process, where you provide a sequence of
-- uncompressed data chunks as input and receive a sequence of compressed data
-- chunks as output. The process is incremental, in that the demand for input
-- and provision of output are interleaved.
type CompressStream = Pipe S.ByteString S.ByteString
-- |
data CompressException = CompressError
Stream.ErrorCode
--- ^
String
--- ^
deriving (Eq, Show, Typeable)
handleCompRet
:: (MonadTrans t, Monad (t m), MonadThrow m)
=> String -- ^ Description of an error if exists
-> t m C.Ret
-> t m ()
handleCompRet reason m = do
ret <- m
case ret of
C.Error code -> lift $ throwCompressError code reason
_ -> return ()
throwCompressError
:: MonadThrow m
=> C.ErrorCode
-> String -- ^ Description of the error
-> m a
throwCompressError = (throwM .) . CompressError
-- |
instance Exception CompressException where
toException = Stream.lzmaExceptionToException
fromException = Stream.lzmaExceptionFromException
compress :: CompressParams -> L.ByteString -> L.ByteString
compress = streamToLBS . compressStream
compressST :: CompressParams -> CompressStream (L.ST s) ()
compressST = streamToST . compressStream
compressIO :: CompressParams -> CompressStream IO ()
compressIO = streamToIO . compressStream
compressStream :: CompressParams -> CompressStream Stream ()
compressStream params = do
handleCompRet "Failed to initialize a stream encoder" $
lift $ Stream.easyEncoder
(compressPreset params)
(compressIntegrityCheck params)
loop
where
loop = fillBuffers (compressBufferSize params) () >>= drainBuffers
drainBuffers isLastChunk = do
lift $ assertBuffers isLastChunk
res <- lift $ Stream.code $ if isLastChunk
then Stream.Finish
else Stream.Run
#if DEBUG
liftIO $ traceIO $ "code -> " ++ show res
#endif
case res of
Stream.Ok -> do
outputBufferFull <- lift Stream.isOutputBufferFull
when outputBufferFull $ do
-- write out if the output buffer became full
(outFPtr, outOffset, outLen) <- lift Stream.popOutputBuffer
yield $ S.PS outFPtr outOffset outLen
if isLastChunk
then do
fillOutputBuffer $ compressBufferSize params
drainBuffers isLastChunk
else loop
Stream.StreamEnd -> do
inputBufferEmpty <- lift Stream.isInputBufferEmpty
assert inputBufferEmpty $ return ()
void $ finalizeStream 0
Stream.Error code -> do
void $ finalizeStream 0
lift $ throwCompressError code "The stream encoder failed."
-----------------------------------------------------------
-- Decompression
-- | The unfolding of the decompression process, where you provide a sequence
-- of compressed data chunks as input and receive a sequence of uncompressed
-- data chunks as output. The process is incremental, in that the demand for
-- input and provision of output are interleaved.
type DecompressStream = Pipe S.ByteString S.ByteString
handleDecompRet
:: (MonadTrans t, Monad (t m), MonadThrow m)
=> String -- ^ Description of an error if exists
-> t m C.Ret
-> t m ()
handleDecompRet reason m = do
ret <- m
case ret of
C.Error code -> lift $ throwDecompressError code reason
_ -> return ()
throwDecompressError
:: MonadThrow m
=> C.ErrorCode
-> String -- ^ Description of the error
-> m a
throwDecompressError = (throwM .) . DecompressError
-- | The possible error cases when decompressing a stream.
data DecompressException = DecompressError
Stream.ErrorCode
--- ^ Error code from liblzma
String
--- ^ Description of the error
deriving (Eq, Show, Typeable)
instance Exception DecompressException where
toException = Stream.lzmaExceptionToException
fromException = Stream.lzmaExceptionFromException
decompress :: DecompressParams -> L.ByteString -> L.ByteString
decompress = streamToLBS . decompressStream
decompressST :: DecompressParams -> DecompressStream (L.ST s) ()
decompressST = streamToST . decompressStream
decompressIO :: DecompressParams -> DecompressStream IO ()
decompressIO = streamToIO . decompressStream
decompressStream :: DecompressParams -> DecompressStream Stream ()
decompressStream params = do
handleDecompRet "Failed to initialize a stream decoder" $
lift $ Stream.autoDecoder (decompressMemoryLimit params) mempty
loop
where
loop = fillBuffers (decompressBufferSize params) () >>= drainBuffers
drainBuffers isLastChunk = do
lift $ assertBuffers isLastChunk
res <- lift $ Stream.code $ if isLastChunk
then Stream.Finish
else Stream.Run
case res of
Stream.Ok -> do
outputBufferFull <- lift Stream.isOutputBufferFull
when outputBufferFull $ do
-- write out if the output buffer became full
(outFPtr, outOffset, outLen) <- lift Stream.popOutputBuffer
yield $ S.PS outFPtr outOffset outLen
loop
Stream.StreamEnd ->
void $ finalizeStream 0
Stream.Error code -> do
void $ finalizeStream 0
lift $ throwDecompressError code "The stream decoder failed."
-----------------------------------------------------------
-- | Convert a (de)compression stream into a lazy 'L.ByteString' transformer.
streamToLBS
:: Proxy x' S.ByteString () S.ByteString Stream a
-> L.ByteString -> L.ByteString
streamToLBS stream input = L.runST $ do
state <- L.strictToLazyST Stream.newState
go stream state input
where
go (P.Request _ next) state inChunks =
case inChunks of
L.Empty ->
go (next S.empty) state L.Empty
L.Chunk chunk chunks ->
go (next chunk) state chunks
go (P.Respond outChunk next) state inChunks = do
outChunks <- go (next ()) state inChunks
-- a lazy bytestring shouldn't contain empty chunks
return $! if S.length outChunk > 0
then L.Chunk outChunk outChunks
else outChunks
go (P.M m) state inChunks = do
(next, state') <- L.strictToLazyST $ Stream.runStream m state
go next state' inChunks
go (P.Pure _) _ !_inChunks = return L.Empty
streamToST
:: Proxy x' x y' y Stream a
-> Proxy x' x y' y (L.ST s) a
streamToST stream = do
state <- lift $ L.strictToLazyST Stream.newState
go stream state
where
go (P.Request req next) state = do
chunk <- request req
go (next chunk) state
go (P.Respond chunk next) state = do
res <- respond chunk
go (next res) state
go (P.M m) state = do
(next, state') <- lift $ L.strictToLazyST $ Stream.runStream m state
go next state'
go (P.Pure chunk) _ = return chunk
streamToIO
:: Proxy x' x y' y Stream a
-> Proxy x' x y' y IO a
streamToIO stream = do
state <- lift $ S.stToIO Stream.newState
go stream state
where
go (P.Request req next) state = do
chunk <- request req
go (next chunk) state
go (P.Respond chunk next) state = do
res <- respond chunk
go (next res) state
go (P.M m) state = do
(next, state') <- lift $ S.stToIO $ Stream.runStream m state
go next state'
go (P.Pure chunk) _ = return chunk
------------------------------------------------------------
-- | The unfolding of the decompression process with random seek support.
--
-- Downstream demands uncompressed bytes from a specific position using
-- 'ReadRequest's. 'SeekableDecompressStream' translates them to compressed
-- position using 'Index' and sends them to upstream.
type SeekableDecompressStream = Proxy
(ReadRequest 'Compressed) S.ByteString
(ReadRequest 'Uncompressed) S.ByteString
seekableDecompressIO
:: DecompressParams
-> C.Index
-- ^ Index of the stream
-> ReadRequest 'Uncompressed
-- ^ Initial request
-> SeekableDecompressStream IO ()
seekableDecompressIO params index =
streamToIO . seekableDecompressStream params index
seekableDecompressStream
:: DecompressParams
-> C.Index
-- ^ Index for the stream
-> ReadRequest 'Uncompressed
-- ^ Initial request
-> SeekableDecompressStream Stream ()
seekableDecompressStream params index req0 = do
iter <- liftIO $ C.indexIterInit index
decodeLoop iter req0
where
decodeLoop iter req = do
found <- locateBlock iter req
when found $ do
req' <- decodeBlock iter req
decodeLoop iter req'
-- | Decode a new block
decodeBlock
:: C.IndexIter
-> ReadRequest 'Uncompressed
-> SeekableDecompressStream Stream (ReadRequest 'Uncompressed)
decodeBlock iter req = do
lift Stream.flushBuffers
#if DEBUG
traceM $ "decodeBlock " ++ show iter ++ " " ++ show req
#endif
blockCount <- liftIO $ get $ C.indexIterStreamBlockCount iter
compressedFileOffset <-
liftIO $ get $ C.indexIterBlockCompressedFileOffset iter
uncompressedFileOffset <-
liftIO $ get $ C.indexIterBlockUncompressedFileOffset iter
blockNumberInStream <-
liftIO $ get $ C.indexIterBlockNumberInStream iter
let
blockPos :: Position 'Compressed
blockPos = fromIntegral compressedFileOffset
blockUncompPos :: Position 'Uncompressed
blockUncompPos = fromIntegral uncompressedFileOffset
-- Check if the block number doesn't exceed the total block count.
assert (blockNumberInStream <= blockCount) $ return ()
isLastChunk <- fillBuffers (decompressBufferSize params) (PRead blockPos)
block <- liftIO C.newBlock
filters <- liftIO C.newFiltersMaxLength
handleDecompRet "Failed to initialize a block decoder" $
lift $ Stream.blockDecoder iter block filters
#if DEBUG
lift $ Stream.dump "decodeBlock"
#endif
req' <- drainBuffers iter isLastChunk
(calculateSkipBytes req blockUncompPos)
liftIO $ do
-- The filters and the block need to be kept in memory until a whole
-- block is decoded. They're not touched in Haskell but are used in
-- liblzma during decoding.
C.touchFilters filters
C.touchBlock block
return req'
calculateSkipBytes
:: ReadRequest 'Uncompressed -> Position 'Uncompressed -> Int
calculateSkipBytes req blockPos = case req of
PRead pos -> fromIntegral (pos - blockPos)
Read -> 0
fillBuffers'
:: C.IndexIter
-> ReadRequest 'Compressed
-> Int -- ^ Offset from the beginning of the block to the target position
-> SeekableDecompressStream Stream (ReadRequest 'Uncompressed)
fillBuffers' iter req skipBytes = do
isLastChunk <- fillBuffers (decompressBufferSize params) req
drainBuffers iter isLastChunk skipBytes
drainBuffers
:: C.IndexIter
-> Bool -- ^ Last chunk or not
-> Int -- ^ Offset from the beginning of the block to the target position
-> SeekableDecompressStream Stream (ReadRequest 'Uncompressed)
drainBuffers iter isLastChunk skipBytes = do
lift $ assertBuffers isLastChunk
ret <- lift $ Stream.code $ if isLastChunk
then Stream.Finish
else Stream.Run
#if DEBUG
traceM $ "code -> " ++ show ret
#endif
case ret of
Stream.Ok -> do
outputBufferFull <- lift Stream.isOutputBufferFull
if outputBufferFull
then do
(outFPtr, outOffset, outLen) <- lift Stream.popOutputBuffer
if outLen <= skipBytes
then fillBuffers' iter Read (skipBytes - outLen)
else do
req <- respond $ S.PS outFPtr
(outOffset + skipBytes)
(outLen - skipBytes)
case req of
PRead _pos -> return req
Read -> fillBuffers' iter Read 0
else
fillBuffers' iter Read skipBytes
Stream.StreamEnd -> do
req'm <- finalizeStream skipBytes
return $ fromMaybe Read req'm
Stream.Error code -> do
void $ finalizeStream skipBytes
lift $ throwDecompressError code "Failed to decode a block"
-- | If the 'ReadRequest' has a position, find the block which contains the
-- position. If it doesn't have a position, find the next non-empty block.
--
-- This function return true when the block is found.
locateBlock
:: C.IndexIter
-> ReadRequest 'Uncompressed
-> SeekableDecompressStream Stream Bool
locateBlock iter req = not <$> liftIO act
where
act = case req of
-- If the request has a position, decode the block which contains the
-- position.
PRead pos -> C.indexIterLocate iter (fromIntegral pos)
-- If the request doesn't have a position, continue reading bytes from
-- the current position.
Read -> C.indexIterNext iter C.IndexIterNonEmptyBlockMode
fillBuffers
:: Int -- ^ Buffer size
-> req -- ^ Request
-> Proxy req S.ByteString y' y Stream Bool
fillBuffers bufferSize req = do
#ifdef DEBUG
lift $ Stream.consistencyCheck
#endif
fillOutputBuffer bufferSize
fillInputBuffer req
fillOutputBuffer
:: MonadTrans t
=> Int -- ^ Buffer size
-> t Stream ()
fillOutputBuffer bufferSize = lift $ do
outputBufferFull <- Stream.isOutputBufferFull
when outputBufferFull $ do
outFPtr <- liftIO $ S.mallocByteString bufferSize
Stream.pushOutputBuffer outFPtr 0 bufferSize
fillInputBuffer
:: req -- ^ Request
-> Proxy req S.ByteString y' y Stream Bool
fillInputBuffer req = do
inputBufferEmpty <- lift Stream.isInputBufferEmpty
if inputBufferEmpty
then do
chunk <- request req
case chunk of
_ | S.null chunk -> return True
S.PS inFPtr inOffset inLen -> do
lift $ Stream.pushInputBuffer inFPtr inOffset inLen
return False
else return False
-- | Sanity checks for buffers
assertBuffers :: Bool -> Stream ()
assertBuffers isLastChunk = do
inputBufferEmpty <- Stream.isInputBufferEmpty
outputBufferFull <- Stream.isOutputBufferFull
let isSane = not outputBufferFull && (isLastChunk || not inputBufferEmpty)
assert isSane $ return ()
finalizeStream :: Int -> Proxy x' x y' S.ByteString Stream (Maybe y')
finalizeStream skipBytes = do
outputBufferBytesAvailable <- lift Stream.outputBufferBytesAvailable
if outputBufferBytesAvailable > skipBytes
then do
(outFPtr, outOffset, outLen) <- lift Stream.popOutputBuffer
lift Stream.end
req <- respond $ S.PS outFPtr (outOffset + skipBytes) (outLen - skipBytes)
return $ Just req
else do
lift Stream.end
return Nothing
-----------------------------------------------------------
-- Index decoder
decodeIndex :: Handle -> IO (C.Index, C.VLI)
decodeIndex h = do
size <- hFileSize h
runDecodeStream h $ decodeIndexIO (fromIntegral size)
decodeIndexIO :: Size -> DecodeStream IO (C.Index, C.VLI)
decodeIndexIO size = bracket acquire release $ uncurry $
indexDecodingToIO
(decodeIndexStream (fromIntegral size))
ID.newIndexDecoderState
where
acquire = liftIO $ (,) <$> C.mallocStreamFlags <*> C.mallocStreamFlags
release (header, footer) = liftIO $ do
C.freeStreamFlags header
C.freeStreamFlags footer
runDecodeStream
:: (MonadIO m, MonadThrow m)
=> Handle
-> DecodeStream m a
-> m a
runDecodeStream h = runEffect . loop
where
loop (P.Request seekRequest next) = do
case seekRequest of
PRead (fromIntegral -> pos) -> do
r <- liftIO $ try $ hSeek h AbsoluteSeek pos
case r of
Left e -> lift $ throwM (e :: IOException)
Right () -> return ()
Read -> return ()
chunk <- liftIO $ S.hGetSome h L.defaultChunkSize
loop (next chunk)
loop (P.Respond _ next) = loop (next ())
loop (P.M m) = lift m >>= loop
loop (P.Pure a) = return a
-- | Seek to an absolute position and ask for an input with given size. Note:
--
-- * This is inefficient when you ask for a large amount of bytes.
-- * The returned ByteString will be shorter than the requested size if the file
-- is insufficiently large.
pread
:: Monad m
=> Position 'Compressed
-> Size
-- ^ Input size
-> DecodeStream m S.ByteString
pread pos size = do
builder <- loop size (PRead pos) mempty
return $! L.toStrict $ B.toLazyByteString builder
where
loop nbytes req chunks
| nbytes <= 0 = return chunks
| otherwise = do
chunk <- request req
if S.null chunk
then return chunks
else let chunks' = chunks <> B.byteString (S.take nbytes chunk)
in loop (nbytes - S.length chunk) Read chunks'
-- | Decode things from compressed stream.
type DecodeStream = Client (ReadRequest 'Compressed) S.ByteString
-- | Seek operation failure in downstream.
data DecodeException = DecodeError
C.ErrorCode
--- ^ Error code from liblzma
String
--- ^ Description of the error
deriving (Show, Typeable)
instance Exception DecodeException where
toException = lzmaExceptionToException
fromException = lzmaExceptionFromException
------------------------------------------------------------
headerSize, footerSize :: Integral a => a
headerSize = fromIntegral C.streamHeaderSize
footerSize = headerSize
-- | Seek thorough a seekable stream and build a combined index. The index can
-- later be used for seeking.
decodeIndexStream
:: Size -- ^ Size of the file
-> DecodeStream IndexDecoder (C.Index, C.VLI)
decodeIndexStream fileSize = do
lift $ ID.setPosition $ fromIntegral fileSize
index <- decodeIndex1
loop index
padding <- lift ID.getStreamPadding
return (index, padding)
where
loop :: C.Index -> DecodeStream IndexDecoder ()
loop index = do
pos <- lift ID.getPosition
when (pos > 0) $ do
index' <- decodeIndex1
handleDecompRet "Failed to concatenate indicies." $
liftIO $ C.indexCat index index'
loop index
-- | Parse an index
decodeIndex1 :: DecodeStream IndexDecoder C.Index
decodeIndex1 = do
padding <- parseStreamFooter
index <- parseIndex 8192 -- FIXME: Set appropreate size
parseStreamHeader index
checkIntegrity index
handleDecompRet "Failed to set stream padding" $
liftIO $ C.indexStreamPadding index padding
lift $ ID.modifyStreamPadding' (+ padding)
return index
-- | Skip stream padding if exists then parse a stream footer. It returns the
-- the length of the stream padding.
parseStreamFooter :: DecodeStream IndexDecoder C.VLI
parseStreamFooter = loop 0
where
loop padding = do
endPos <- lift ID.getPosition
when (endPos < 2 * headerSize) $
lift $ throwM $ DecodeError C.DataError
"This file is too small to be a valid .xz file."
chunk@(S.PS inFPtr inOffset inLength) <-
pread (endPos - footerSize) footerSize
assert (inOffset == 0 && inLength == footerSize) $ return ()
if containsStreamPadding chunk
then do
padding' <- lift $ skipStreamPadding chunk
loop $! padding + padding'
else do
lift $ ID.setPosition $ endPos - footerSize
footer <- lift ID.getStreamFooter
handleDecompRet "Failed to decode a stream footer." $
liftIO $ withForeignPtr inFPtr $ C.streamFooterDecode footer
version <- liftIO $ get $ C.streamFlagsVersion footer
unless (version == 0) $
lift $ throwM $ DecodeError C.OptionsError
"The stream footer specifies something that we don't support."
liftIO $ touchForeignPtr inFPtr
return padding
skipStreamPadding :: S.ByteString -> IndexDecoder C.VLI
skipStreamPadding chunk =
assert (paddingLength `mod` 4 == 0) $ do
ID.modifyPosition' (subtract $ fromIntegral paddingLength)
return $! fromIntegral paddingLength
where
(_, S.length -> paddingLength) = S.spanEnd (== 0) chunk
containsStreamPadding :: S.ByteString -> Bool
containsStreamPadding = S.all (== 0) . S.take 4 . S.drop 8
getIndexSize :: IndexDecoder C.VLI
getIndexSize = do
footer <- ID.getStreamFooter
liftIO $ get $ C.streamFlagsBackwardSize footer
-- | Decode a stream index.
parseIndex
:: C.VLI -- ^ Buffer size
-> DecodeStream IndexDecoder C.Index
parseIndex bufSize = do
indexSize <- lift getIndexSize
-- Set posision to the beginning of the index.
lift $ ID.modifyPosition' $ subtract $ fromIntegral indexSize
stream <- liftIO C.newStream
(ret, indexFPtr) <- liftIO $ C.indexDecoder stream maxBound -- FIXME: Set proper value
unless (ret == C.Ok) $ lift $ throwM $ DecodeError C.ProgError
"Failed to initialize an index decoder."
loop stream indexSize
liftIO $ C.peekIndexFPtr indexFPtr
where
loop stream indexSize = do
let inAvail :: Integral a => a
inAvail = fromIntegral $ min bufSize indexSize
liftIO $ C.streamAvailIn stream $=! inAvail
chunk <- do
pos <- lift ID.getPosition
pread pos inAvail
lift $ ID.modifyPosition' (+ inAvail)
let indexSize' = indexSize - inAvail
ret <- liftIO $ withByteString chunk $ \inPtr -> do
C.streamNextIn stream $= inPtr
C.code stream C.Run
case ret of
C.Ok -> loop stream indexSize'
C.Error C.BufError ->
lift $ throwM $ DecodeError C.DataError $
"The index decoder has liked more input than what the index " ++
"should be according to stream footer."
C.Error code ->
lift $ throwM $ DecodeError code "The index decoder faild."
C.StreamEnd -> do
inAvail' <- liftIO $ get $ C.streamAvailIn stream
unless (indexSize' == 0 && inAvail' == 0) $
lift $ throwM $ DecodeError C.DataError $
"The index decoder didn't consume as much input as indicated " ++
"by the backward size field."
-- | Decode the stream header and check that its stream flags match the stream
-- footer.
parseStreamHeader
:: C.Index
-> DecodeStream IndexDecoder ()
parseStreamHeader index = do
indexSize <- lift getIndexSize
lift $ ID.modifyPosition' (subtract $ fromIntegral indexSize + headerSize)
blocksSize <- liftIO $ C.indexTotalSize index
lift $ ID.modifyPosition' (subtract $ fromIntegral blocksSize)
chunk <- do
pos <- lift ID.getPosition
pread pos headerSize
header <- lift ID.getStreamHeader
handleDecompRet "Failed to decode a stream header." $
liftIO $ withByteString chunk $ C.streamHeaderDecode header
checkIntegrity
:: C.Index
-> DecodeStream IndexDecoder ()
checkIntegrity index = do
header <- lift ID.getStreamHeader
footer <- lift ID.getStreamFooter
handleDecompRet "The stream header and the footer didn't agree." $
liftIO $ C.streamFlagsCompare header footer
handleDecompRet "Failed to set the footer to the index." $
liftIO $ C.indexStreamFlags index footer
padding <- lift ID.getStreamPadding
handleDecompRet "Failed to set stream padding to the index." $
liftIO $ C.indexStreamPadding index padding
indexDecodingToIO
:: DecodeStream IndexDecoder a
-> ID.IndexDecoderState
-> C.StreamFlags -- ^ Stream header
-> C.StreamFlags -- ^ Stream footer
-> DecodeStream IO a
indexDecodingToIO stream0 state0 header footer = go stream0 state0
where
go (P.Request req next) state =
P.Request req $ \chunk -> go (next chunk) state
go (P.Respond out next) state =
P.Respond out $ \resp -> go (next resp) state
go (P.M m) state = do
(state', stream') <- liftIO $ ID.runIndexDecoder m state header footer
go stream' state'
go (P.Pure a) _ = P.Pure a
-----------------------------------------------------------
hasMagicBytes :: Monad m => DecodeStream m Bool
hasMagicBytes = do
chunk <- pread beginning 6
return $! chunk == "\xfd\&7zXZ\x00" -- \& is an empty string
where
beginning :: Position 'Compressed
beginning = 0
-----------------------------------------------------------
withByteString :: S.ByteString -> (Ptr Word8 -> IO a) -> IO a
withByteString (S.PS fptr off _len) f =
withForeignPtr fptr $ \ptr ->
f (advancePtr ptr off)