Skip to content

Commit

Permalink
ENH Always process SamGroups in chunks
Browse files Browse the repository at this point in the history
This should be faster. Right now, some of the code paths still use
chunks of size 1, but this allows us to remove one of the code paths
(the non-chunked one).
  • Loading branch information
luispedro committed Jan 11, 2019
1 parent 50d54aa commit 06c7f44
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 29 deletions.
40 changes: 17 additions & 23 deletions NGLess/Data/Sam.hs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
{- Copyright 2014-2018 NGLess Authors
{- Copyright 2014-2019 NGLess Authors
- License: MIT
-}
{-# LANGUAGE ScopedTypeVariables, FlexibleContexts #-}
module Data.Sam
( SamLine(..)
, SamGroup
, samLength
, readSamGroupsC
, readSamGroupsC'
, readSamLine
, encodeSamLine
Expand Down Expand Up @@ -43,7 +42,6 @@ import Control.Error (note)
import Control.DeepSeq

import Data.Maybe
import Data.Function (on)
import Control.Monad.Except
import NGLess.NGError
import Utils.Utils
Expand Down Expand Up @@ -230,28 +228,19 @@ samIntTag samline tname
&& (fst <$> B8.uncons (B.drop 3 match)) == Just 'i' = fst <$> B8.readInt (B.drop 5 match)
| otherwise = Nothing

-- | take in *ByteLines* and transform them into groups of SamLines all refering to the same read
-- | take in *ByteLines* and transform them into groups of SamLines all
-- refering to the same read
--
-- This works in chunks (vectors) for efficiency
--
-- Header lines are silently ignored
readSamGroupsC :: (MonadError NGError m) => C.ConduitT (V.Vector ByteLine) [SamLine] m ()
readSamGroupsC = CC.concat
.| readSamLineOrDie
.| CL.groupBy groupLine
where
readSamLineOrDie = C.awaitForever $ \(ByteLine line) ->
case readSamLine line of
Left err -> throwError err
Right parsed@SamLine{} -> C.yield parsed
_ -> return ()
groupLine = (==) `on` samQName



-- | a chunked variation of readSamGroupsC which uses multiple threads
--
-- When respectPairs is False, then the two mates of the same fragment will be
-- considered grouped in different blocks
readSamGroupsC' :: forall m . (MonadError NGError m, MonadIO m) => Int -> Bool -> C.ConduitT (V.Vector ByteLine) (V.Vector [SamLine]) m ()
readSamGroupsC' :: forall m . (MonadError NGError m, MonadIO m) =>
Int -- ^ number of threads
-> Bool -- respectPairs
-> C.ConduitT (V.Vector ByteLine) (V.Vector [SamLine]) m ()
readSamGroupsC' mapthreads respectPairs = do
CC.dropWhileE (isSamHeaderString . unwrapByteLine)
CC.filter (not . V.null)
Expand Down Expand Up @@ -300,13 +289,18 @@ readSamGroupsC' mapthreads respectPairs = do
V.unsafeFreeze gs'

samStatsC :: (MonadIO m) => C.ConduitT (V.Vector ByteLine) C.Void m (NGLess (Int, Int, Int))
samStatsC = C.runExceptC $ readSamGroupsC .| samStatsC'
samStatsC = C.runExceptC $ readSamGroupsC' 1 True .| samStatsC'

samStatsC' :: (MonadError NGError m) => C.ConduitT SamGroup C.Void m (Int, Int, Int)
samStatsC' = CL.foldM summarize (0, 0, 0)
samStatsC' :: forall m. (MonadError NGError m) => C.ConduitT (V.Vector SamGroup) C.Void m (Int, Int, Int)
samStatsC' = CL.foldM summarizeV (0, 0, 0)
where
add1if !v True = v+1
add1if !v False = v

summarizeV :: (Int, Int, Int) -> V.Vector SamGroup -> m (Int, Int, Int)
summarizeV c vs = V.foldM summarize c vs

summarize :: (Int, Int, Int) -> SamGroup -> m (Int, Int, Int)
summarize c [] = return c
summarize c (samline:_)
| isHeader samline = return c
Expand Down
9 changes: 5 additions & 4 deletions NGLess/Interpretation/Map.hs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ mapToReference mapper refIndex rs extraArgs = do
return (newfp, statsp)

zipToStats :: C.ConduitT B.ByteString C.Void NGLessIO () -> C.ConduitT B.ByteString C.Void NGLessIO (Int, Int, Int)
zipToStats out = snd <$> C.toConsumer (zipSink2 out (linesVC 4096 .| readSamGroupsC .| samStatsC'))
zipToStats out = snd <$> C.toConsumer (zipSink2 out (linesVC 4096 .| readSamGroupsC' 1 True .| samStatsC'))

splitFASTA :: Int -> FilePath -> FilePath -> NGLessIO [FilePath]
splitFASTA megaBPS ifile ofileBase =
Expand Down Expand Up @@ -234,7 +234,7 @@ performMap mapper blockSize ref name rs extraArgs = do
partials <- forM blocks (\block -> fst <$> mapToReference mapper block rs extraArgs)
((total, aligned, unique), ()) <- C.runConduit $
mergeSamFiles partials
.| zipSink2 samStatsC'
.| zipSink2 (CC.conduitVector 4096 .| samStatsC')
(CL.concat
.| CL.map ((`mappend` BB.char7 '\n') . encodeSamLine)
.| CB.sinkHandleBuilder hout)
Expand Down Expand Up @@ -294,7 +294,7 @@ executeMergeSams (NGOList ifnames) _ = do
(sam, hout) <- openNGLTempFile "merging" "merged_" ".sam"
((total, aligned, unique), ()) <- C.runConduit $
mergeSamFiles partials
.| zipSink2 samStatsC'
.| zipSink2 (CC.conduitVector 4096 .| samStatsC')
(CL.concat
.| CL.map ((`mappend` BB.char7 '\n') . encodeSamLine)
.| CB.sinkHandleBuilder hout)
Expand All @@ -318,7 +318,8 @@ mergeSamFiles inputs = do
C.sequenceSources
[CB.sourceFile f
.| linesVC 4096
.| readSamGroupsC
.| readSamGroupsC' 1 True
.| CC.concat -- TODO: Remove this and adapt `mergeSAMGroups` to work directly on vectors
| f <- inputs]

.| CL.mapM (mergeSAMGroups MSBestOnly)
Expand Down
4 changes: 2 additions & 2 deletions NGLess/Interpretation/Select.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{- Copyright 2015-2018 NGLess Authors
{- Copyright 2015-2019 NGLess Authors
- License: MIT
-}

Expand Down Expand Up @@ -160,7 +160,7 @@ executeSelect (NGOMappedReadSet name istream ref) args = do
return $! NGOMappedReadSet name out ref
executeSelect o _ = throwShouldNotOccur ("NGLESS type checking error (Select received " ++ show o ++ ")")

streamedSamStats lno ifile ref = C.passthroughSink (CL.map (map fst) .| samStatsC') $ \(total, aligned, unique) ->
streamedSamStats lno ifile ref = C.passthroughSink (CL.map (V.singleton . map fst) .| samStatsC') $ \(total, aligned, unique) ->
outputMappedSetStatistics (MappingInfo lno ifile ref total aligned unique)


Expand Down

0 comments on commit 06c7f44

Please sign in to comment.