Skip to content

Commit

Permalink
RFCT Simpler output code
Browse files Browse the repository at this point in the history
Should avoid new errors like the bug fixed in
6dae650 by aggregating all the logic
related to output compression &c
  • Loading branch information
luispedro committed Sep 16, 2022
1 parent 6dae650 commit f8e933e
Showing 1 changed file with 43 additions and 34 deletions.
77 changes: 43 additions & 34 deletions NGLess/Interpretation/Write.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import qualified Data.Vector as V
import qualified Conduit as C
import qualified Data.Conduit.List as CL
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.Combinators as CC
import qualified Data.Conduit.Combinators as C
import Data.Conduit.Algorithms.Async (conduitPossiblyCompressedFile)
import qualified Data.Conduit.Algorithms.Async as CAsync
import Data.Conduit ((.|))
Expand Down Expand Up @@ -85,17 +83,29 @@ instance Default WriteOptions where
, woCompressLevel = Nothing
}

ostream :: (MonadUnliftIO m, C.MonadResource m) => FilePath -> Maybe Int -> Handle -> C.ConduitT B.ByteString C.Void m ()
ostream fp comp = case inferCompression fp of
NoCompression -> CB.sinkHandle
GzipCompression -> maybe CAsync.asyncGzipTo CAsync.asyncGzipTo' comp
BZ2Compression -> CAsync.asyncBzip2To
XZCompression -> maybe CAsync.asyncXzTo CAsync.asyncXzTo' comp
ZStdCompression -> CAsync.asyncZstdTo (fromMaybe 3 comp)

withOutputFile' :: (MonadUnliftIO m, MonadMask m) => FilePath -> (Handle -> m a) -> m a
withOutputFile' "/dev/stdout" = \inner -> inner stdout
withOutputFile' fname = withOutputFile fname
-- The type is tricky because the inner monad (m') need not be the same as the outer monad (m)
-- (e.g., the inner monad may be (C.ResourceT m) while the outer monad is m)
withOutputFileO :: (MonadUnliftIO m, MonadMask m, MonadUnliftIO m', C.MonadResource m')
=> WriteOptions -> (C.ConduitT B.ByteString C.Void m' () -> m a) -> m a
withOutputFileO wo f =
withOutputFile' (woOFile wo) $ \hout ->
f $ ostream (woOFile wo) hout
where
withOutputFile' :: (MonadUnliftIO m, MonadMask m) => FilePath -> (Handle -> m a) -> m a
withOutputFile' "/dev/stdout" = \inner -> inner stdout
withOutputFile' fname = withOutputFile fname

comp :: Maybe Int
comp = woCompressLevel wo

ostream :: (MonadUnliftIO m, C.MonadResource m) => FilePath -> Handle -> C.ConduitT B.ByteString C.Void m ()
ostream fp = case inferCompression fp of
NoCompression -> CB.sinkHandle
GzipCompression -> maybe CAsync.asyncGzipTo CAsync.asyncGzipTo' comp
BZ2Compression -> CAsync.asyncBzip2To
XZCompression -> maybe CAsync.asyncXzTo CAsync.asyncXzTo' comp
ZStdCompression -> CAsync.asyncZstdTo (fromMaybe 3 comp)

parseWriteOptions :: KwArgsValues -> NGLessIO WriteOptions
parseWriteOptions args = do
Expand Down Expand Up @@ -146,7 +156,7 @@ moveOrCopyCompress opts ifile = liftIO =<< moveOrCopyCompress' opts ifile

moveOrCopyCompress' :: WriteOptions -> FilePath -> NGLessIO (IO ())
moveOrCopyCompress' opts ifile
| ofile == "/dev/stdout" = return (C.runConduitRes $ conduitPossiblyCompressedFile ifile .| C.stdout)
| ofile == "/dev/stdout" = return (C.runConduitRes $ conduitPossiblyCompressedFile ifile .| C.stdoutC)
| ofile == ifile = return (return ()) -- trivial case. Can happen.
#ifdef WINDOWS
| ocompression == BZ2Compression = throwNotImplementedError "Compression of bzip2 files is not supported on Windows"
Expand All @@ -166,10 +176,10 @@ moveOrCopyCompress' opts ifile
icompression = inferCompression ifile
ocompression = inferCompression ofile

convertCompression :: NGLessIO (IO ())
convertCompression = return $
withOutputFile' ofile $ \hout ->
C.runConduitRes (conduitPossiblyCompressedFile ifile .| ostream ofile (woCompressLevel opts) hout)

withOutputFileO opts $ \out ->
C.runConduitRes (conduitPossiblyCompressedFile ifile .| out)

removeEnd :: String -> String -> String
removeEnd base suffix = take (length base - length suffix) base
Expand Down Expand Up @@ -207,9 +217,8 @@ executeWrite (NGOReadSet _ rs) args = do
moveOrCopyCompress' (opts {woCanMove = True, woOFile = ofname}) fp'
if woFormatFlags opts == Just "interleaved"
then
withOutputFile' ofile $ \hout ->
C.runConduitRes $
interleaveFQs rs .| ostream ofile (woCompressLevel opts) hout
withOutputFileO opts $ \out ->
C.runConduitRes (interleaveFQs rs .| out)
else case rs of
ReadSet [] singles ->
liftIO =<< moveOrCopyCompressFQs singles ofile
Expand Down Expand Up @@ -266,33 +275,33 @@ executeWrite (NGOCounts iout) args = do
fp <- asFile iout
moveOrCopyCompress opts fp
_ -> do
let unlinesVC :: V.Vector ByteLine -> B.ByteString
unlinesVC vs =
B.intercalate (B.singleton 10)
$ map unwrapByteLine
$ V.toList vs
istream = case iout of
let istream = case iout of
File fp -> C.sourceFile fp
Stream _ _ iss ->
iss .| CL.map unlinesVC
withOutputFile' (woOFile opts) $ \hout ->
withOutputFileO opts $ \out ->
C.runConduit $
(commentC "# " comment >> istream)
.| ostream (woOFile opts) (woCompressLevel opts) hout
.| out
"csv" -> do
let (fp,istream) = asStream iout
comma <- makeNGLTempFile fp "wcomma" "csv" $ \ohand ->
let (_, istream) = asStream iout
withOutputFileO opts $ \out ->
C.runConduit $
((commentC "# " comment .| linesVC 1024) >> istream)
.| CL.map (V.map tabToComma)
.| CC.concat
.| byteLineSinkHandle ohand
moveOrCopyCompress (opts { woCanMove = True}) comma
((commentC "# " comment .| linesVC 1024)
>> (istream .| CL.map (V.map tabToComma)))
.| CL.map unlinesVC
.| out
f -> throwScriptError ("Invalid format in write: {"++T.unpack f++"}.\n\tWhen writing counts, only accepted values are {tsv} (TAB separated values; default) or {csv} (COMMA separated values).")
return (NGOFilename $ woOFile opts)
where
tabToComma :: ByteLine -> ByteLine
tabToComma (ByteLine line) = ByteLine $ B8.map (\case { '\t' -> ','; c -> c }) line
nl = B.singleton 10
unlinesVC :: V.Vector ByteLine -> B.ByteString
unlinesVC vs =
B.concat
$ concatMap (\(ByteLine ell) -> [ell, nl])
$ V.toList vs

executeWrite (NGOFilename fp) args = do
opts <- parseWriteOptions args
Expand Down

0 comments on commit f8e933e

Please sign in to comment.