Skip to content

Commit

Permalink
Merge branch 'cache-piece'
Browse files Browse the repository at this point in the history
Conflicts:
	Makefile
	src/Process/Peer.hs
  • Loading branch information
jlouis committed May 31, 2010
2 parents 4be1e77 + 01fdc84 commit 0cdd30c
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 16 deletions.
4 changes: 4 additions & 0 deletions src/Process.hs
Expand Up @@ -103,7 +103,11 @@ logP prio msg = do

infoP, debugP, criticalP, warningP, errorP :: Logging a => String -> Process a b ()
infoP = logP INFO
#ifdef NDEBUG
debugP _ = return ()
#else
debugP = logP DEBUG
#endif
criticalP = logP CRITICAL
warningP = logP WARNING
errorP = logP ERROR
Expand Down
12 changes: 9 additions & 3 deletions src/Process/Peer.hs
Expand Up @@ -108,7 +108,8 @@ data ST = ST { weChoke :: !Bool -- ^ True if we are choking the peer
, runningEndgame :: !Bool -- ^ True if we are in endgame
, lastMsg :: !Int -- ^ Ticks from last Message
, lastPieceMsg :: !Int -- ^ Ticks from last Piece Message
, interestingPieces :: !(S.Set PieceNum) -- ^ Pieces the peer has we are interested in
, interestingPieces :: !(S.Set PieceNum) -- ^ peer pieces we are interested in
, lastPn :: !PieceNum
}

data ExtensionConfig = ExtensionConfig
Expand Down Expand Up @@ -268,7 +269,7 @@ peerP caps pMgrC rtv pieceMgrC pm nPieces outBound inBound stv ih supC = do
spawnP (CF inBound outBound pMgrC pieceMgrC stv rtv ih pm
pdtmv havetv gbtmv cs)
(ST True False S.empty True False pieceSet nPieces
(RC.new ct) (RC.new ct) False 0 0 S.empty)
(RC.new ct) (RC.new ct) False 0 0 S.empty 0)
(cleanupP (startup nPieces) (defaultStopHandler supC) cleanup)

configCapabilities :: [Capabilities] -> ExtensionConfig
Expand Down Expand Up @@ -709,13 +710,17 @@ queuePieces :: [(PieceNum, Block)] -> Process CF ST ()
queuePieces toQueue = {-# SCC "queuePieces" #-} do
s <- get
let bq = blockQueue s
unless (Prelude.null toQueue) $ updateLastPnCache (head toQueue)
q <- forM toQueue
(\(p, b) -> do
if S.member (p, b) bq
then return Nothing -- Ignore pieces which are already in queue
else do outChan $ SenderQ.SenderQM $ Request p b
return $ Just (p, b))
put $! s { blockQueue = S.union bq (S.fromList $ catMaybes q) }
where
updateLastPnCache (pn, _) =
modify (\s -> s { lastPn = pn })

-- | Tell the PieceManager to store the given block
storeBlock :: PieceNum -> Block -> B.ByteString -> Process CF ST ()
Expand All @@ -727,7 +732,8 @@ grabBlocks :: Int -> Process CF ST [(PieceNum, Block)]
grabBlocks n = do
c <- asks grabBlockTV
ps <- gets peerPieces
msgPieceMgr (GrabBlocks n ps c)
lpn <- gets lastPn
msgPieceMgr (GrabBlocks n ps c lpn)
blks <- liftIO $ do atomically $ takeTMVar c
case blks of
Leech bs -> return bs
Expand Down
34 changes: 21 additions & 13 deletions src/Process/PieceMgr.hs
Expand Up @@ -90,7 +90,7 @@ data Blocks = Leech [(PieceNum, Block)]
| Endgame [(PieceNum, Block)]

-- | Messages for RPC towards the PieceMgr.
data PieceMgrMsg = GrabBlocks Int PS.PieceSet (TMVar Blocks)
data PieceMgrMsg = GrabBlocks Int PS.PieceSet (TMVar Blocks) PieceNum
-- ^ Ask for grabbing some blocks
| StoreBlock PieceNum Block B.ByteString
-- ^ Ask for storing a block on the file system
Expand All @@ -104,7 +104,7 @@ data PieceMgrMsg = GrabBlocks Int PS.PieceSet (TMVar Blocks)
-- ^ A peer relinquished the given piece Indexes

instance Show PieceMgrMsg where
show (GrabBlocks x _ _) = "GrabBlocks " ++ show x
show (GrabBlocks x _ _ _) = "GrabBlocks " ++ show x
show (StoreBlock pn blk _) = "StoreBlock " ++ show pn ++ " " ++ show blk
show (PutbackBlocks x) = "PutbackBlocks " ++ show x
show (GetDone _) = "GetDone"
Expand Down Expand Up @@ -163,8 +163,8 @@ rpcMessage = do
m <- {-# SCC "Channel_Read" #-} liftIO . atomically $ readTChan ch
traceMsg m
case m of
GrabBlocks n eligible c -> {-# SCC "GrabBlocks" #-}
do blocks <- grabBlocks n eligible
GrabBlocks n eligible c lastpn -> {-# SCC "GrabBlocks" #-}
do blocks <- grabBlocks n eligible lastpn
liftIO . atomically $ do putTMVar c blocks -- Is never supposed to block
StoreBlock pn blk d ->
storeBlock pn blk d
Expand Down Expand Up @@ -351,10 +351,10 @@ updateProgress pn blk = {-# SCC "updateProgress" #-} do
-- at times
else do
let pg' = pg { ipHaveBlocks = S.insert blk blkSet }
modify (\db -> db { pieces = M.insert pn pg' (pieces db) })
debugP $ "Iphave : " ++ show (ipHave pg') ++ " ipDone: " ++ show (ipDone pg')
db <- get
put $! db { pieces = M.insert pn pg' (pieces db) }
return (ipHave pg' == ipDone pg')
where ipHave = S.size . ipHaveBlocks
where ipHave = {-# SCC "updateProgress_ipHave" #-} S.size . ipHaveBlocks

blockPiece :: BlockSize -> PieceSize -> [Block]
blockPiece blockSz pieceSize = build pieceSize 0 []
Expand All @@ -368,9 +368,9 @@ blockPiece blockSz pieceSize = build pieceSize 0 []
-- | The call @grabBlocks n eligible@ tries to pick off up to @n@ pieces from
-- to download. In doing so, it will only consider pieces in @eligible@. It
-- returns a list of Blocks which where grabbed.
grabBlocks :: Int -> PS.PieceSet -> PieceMgrProcess Blocks
grabBlocks k eligible = {-# SCC "grabBlocks" #-} do
blocks <- tryGrab k eligible
grabBlocks :: Int -> PS.PieceSet -> PieceNum -> PieceMgrProcess Blocks
grabBlocks k eligible lastpn = {-# SCC "grabBlocks" #-} do
blocks <- tryGrab k eligible lastpn
ps <- gets pieces
let pendN = M.null $ M.filter (\a -> case a of Pending -> True
_ -> False) ps
Expand All @@ -393,9 +393,17 @@ inProgressPieces m = M.keys $ M.filter f m

-- Grabbing blocks is a state machine implemented by tail calls
-- Try grabbing pieces from the pieces in progress first
tryGrab :: PieceNum -> PS.PieceSet -> Process CF ST [(PieceNum, Block)]
tryGrab k ps = {-# SCC "tryGrabProgress" #-}
tryGrabProgress k ps [] =<< (inProgressPieces <$> gets pieces)
tryGrab :: PieceNum -> PS.PieceSet -> PieceNum -> Process CF ST [(PieceNum, Block)]
tryGrab k ps lastpn = {-# SCC "tryGrabProgress" #-}
tryGrabProgress k ps [] =<< ipp
where
ipp :: Process CF ST [PieceNum]
ipp = do
p <- gets pieces
let inProgress = inProgressPieces p
case M.lookup lastpn p of
Just (InProgress _ _ _) -> return $ lastpn : inProgress
_ -> return $ inProgress

tryGrabProgress :: PieceNum -> PS.PieceSet -> [(PieceNum, Block)] -> [PieceNum]
-> Process CF ST [(PieceNum, Block)]
Expand Down

0 comments on commit 0cdd30c

Please sign in to comment.