Skip to content

Commit

Permalink
ENH Use ZStd for temporary preprocess files
Browse files Browse the repository at this point in the history
This should be faster (both compressing/decompressing) and since most
pipelines will not rely on the files beyond piping them to map() later,
it should come at no cost.
  • Loading branch information
luispedro committed Aug 9, 2020
1 parent 3f82ea7 commit 63b7f02
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 6 deletions.
1 change: 1 addition & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
Version 1.2.0+
* Add early check that block assignments are always to block variables
* Use ZStd compression for temporary files from preprocess()

Version 1.2.0 2020-07-12 by luispedro
* Add load_fastq_directory to builtin functions
Expand Down
12 changes: 6 additions & 6 deletions NGLess/Interpret.hs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ import qualified Data.Conduit as C
import Data.Conduit ((.|))
import qualified Data.Conduit.Algorithms.Utils as CAlg
import qualified Data.Conduit.Algorithms.Async as CAlg
import Data.Conduit.Algorithms.Async (conduitPossiblyCompressedFile, asyncZstdTo)
import Data.Conduit.Algorithms.Async (conduitPossiblyCompressedFile)
import qualified Control.Concurrent.Async as A
import qualified Control.Concurrent.STM.TBMQueue as TQ
import qualified Data.Conduit.TQueue as CA
Expand Down Expand Up @@ -453,13 +453,13 @@ executePreprocess (NGOReadSet name (ReadSet pairs singles)) args (Block (Variabl
write nt h q =
writeAndContinue q
.| CAlg.asyncMapC nt (B.concat . map (fqEncode outenc) . V.toList)
.| CAlg.asyncGzipTo h
.| CAlg.asyncZstdTo 3 h

let processpairs :: (V.Vector ShortRead, V.Vector ShortRead) -> NGLess (V.Vector ShortRead, V.Vector ShortRead, V.Vector ShortRead)
processpairs = liftM splitPreprocessPair . vMapMaybeLifted (runInterpretationRO env . intercalate keepSingles) . uncurry V.zip
(fp1', out1) <- openNGLTempFile "" "preprocessed.1." "fq.gz"
(fp2', out2) <- openNGLTempFile "" "preprocessed.2." "fq.gz"
(fp3', out3) <- openNGLTempFile "" "preprocessed.singles." "fq.gz"
(fp1', out1) <- openNGLTempFile "" "preprocessed.1." "fq.zst"
(fp2', out2) <- openNGLTempFile "" "preprocessed.2." "fq.zst"
(fp3', out3) <- openNGLTempFile "" "preprocessed.singles." "fq.zst"

C.runConduit $
zipSource2 (asSource (fst <$> pairs)) (asSource (snd <$> pairs))
Expand Down Expand Up @@ -560,7 +560,7 @@ executeSelectWBlock input@NGOMappedReadSet{ nglSamFile = isam} args (Block (Vari
.| CL.map concatBytelines
readSamGroupsC' mapthreads paired
.| CAlg.asyncMapEitherC mapthreads (fmap concatLines . V.mapM (runInterpretationRO env . selectBlock doReinject))
.| asyncZstdTo 3 ohandle
.| CAlg.asyncZstdTo 3 ohandle
return input { nglSamFile = File oname }
where
concatBytelines :: V.Vector ByteLine -> B.ByteString
Expand Down

0 comments on commit 63b7f02

Please sign in to comment.