Skip to content

Commit

Permalink
Initial pass of converting traditional threads to distributed-process.
Browse files Browse the repository at this point in the history
  • Loading branch information
kylc committed Mar 14, 2013
1 parent e874105 commit 794ebff
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 64 deletions.
11 changes: 9 additions & 2 deletions src/BitTorrent/Core.hs
Expand Up @@ -6,7 +6,10 @@ import Control.Concurrent
import Control.Concurrent.Delay
import Control.Monad
import Data.Maybe
import qualified Data.Map as Map

import Control.Distributed.Process
import Control.Distributed.Process.Node
import Network.Transport.TCP

import BitTorrent.Bencode
import BitTorrent.Metainfo
Expand All @@ -16,6 +19,9 @@ import BitTorrent.Types

run :: String -> IO ()
run f = do
Right t <- createTransport "127.0.0.1" "10501" defaultTCPParameters
node <- newLocalNode t initRemoteTable

-- Read and decode the .torrent file
b <- parseBencodeFile f

Expand All @@ -36,7 +42,8 @@ run f = do

-- Connect to peers
case resp of
Right resp -> forkIO . runPeerMgr metainfo $ resPeers resp
Right resp -> forkProcess node $
void $ spawnLocal . runPeerMgr metainfo $ resPeers resp
Left e -> fail $ "Failed to parse bencode file: " ++ e

-- Download!
Expand Down
83 changes: 35 additions & 48 deletions src/BitTorrent/Peer.hs
Expand Up @@ -4,79 +4,64 @@ module BitTorrent.Peer

import Control.Applicative
import Control.Monad
import Control.Monad.Reader
import Control.Monad.State
import Data.Array.IArray
import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as B8
import Data.Bits
import Data.Char (ord)
import Data.Word
import Network
import Network.Socket hiding (KeepAlive, send, sendTo, recv, recvFrom)
import Network.Socket.ByteString
import Network.Socket (Socket)
import qualified Network.Socket as N

import Control.Distributed.Process
import qualified Data.Attoparsec as A
import qualified Data.Attoparsec.Binary as A
import qualified System.IO.Streams as Streams
import qualified System.IO.Streams.Attoparsec as Streams

import BitTorrent.Types

import Debug.Trace
runPeer :: Metainfo -> Peer -> PeerState -> Process ()
runPeer meta peer state = do
sock <- liftIO $ peerConnect (peerIp peer) (peerPort peer)
(is, os) <- liftIO $ Streams.socketToStreams sock

say $ "Connected to: " ++ show peer

runPeer :: Peer -> PeerM ()
runPeer p = do
liftIO $ putStrLn $ "Connecting to " ++ show p
handshake <- liftIO $ do
_ <- peerHandshake os (mtInfoHash meta)
peerListen is parseHandshake

peerConnect (peerIp p) (peerPort p)
peerHandshake
peerListen
say $ "Received message: " ++ show handshake

return ()
forever $ do
message <- liftIO $ peerListen is parseMessage
say $ "Received message: " ++ show message

peerConnect :: Word32 -> Word16 -> PeerM ()
peerConnect :: Word32 -> Word16 -> IO Socket
peerConnect a p = do
sock <- liftIO $ socket AF_INET Stream defaultProtocol
liftIO $ connect sock $ addr a p
modify $ \s -> s { peerSocket = Just sock }
sock <- N.socket N.AF_INET N.Stream N.defaultProtocol
N.connect sock $ addr a p
return sock
where
addr a p = SockAddrInet (PortNum . fromIntegral $ p) a
addr a p = N.SockAddrInet (N.PortNum . fromIntegral $ p) a

peerHandshake :: PeerM ()
peerHandshake = do
ih <- fmap mtInfoHash ask
sock <- fmap peerSocket get
case sock of
Just s -> void . liftIO $ send s $
B.concat [ B.pack [protoHeaderSize]
, strToBS protoHeader
, B.pack protoReserved
, ih
, strToBS protoPeerId ]
Nothing -> fail "[handshake] Peer not yet connected."
peerHandshake :: Streams.OutputStream B.ByteString -> Hash -> IO ()
peerHandshake is ih = void $
flip Streams.write is $ Just $
B.concat [ B.pack [protoHeaderSize]
, strToBS protoHeader
, B.pack protoReserved
, ih
, strToBS protoPeerId ]
where
protoHeaderSize = fromIntegral . length $ protoHeader
protoHeader = "BitTorrent protocol"
protoReserved = replicate 8 0
protoPeerId = "-HT0001-asdefghjasdh"
strToBS = B.pack . map (fromIntegral . ord)

peerListen :: PeerM ()
peerListen = do
(Just sock) <- fmap peerSocket get
(is, os) <- liftIO $ Streams.socketToStreams sock
forever $ do
p <- parser
m <- liftIO $ Streams.parseFromStream p is
handleMessage m
trace (show m) $ return m
where
parser = do
shaken <- fmap peerHandshaken get
return $ if shaken
then parseMessage
else parseHandshake
peerListen :: Streams.InputStream B.ByteString -> A.Parser Message -> IO Message
peerListen is parser = Streams.parseFromStream parser is

parseMessage :: A.Parser Message
parseMessage = do
Expand Down Expand Up @@ -106,8 +91,9 @@ parseHandshake =
Handshake <$> (A.anyWord8 >>= A.take . fromIntegral)
<*> A.take 8 <*> A.take 20 <*> A.take 20

handleMessage :: Message -> PeerM ()
handleMessage m =
handleMessage :: Message -> IO ()
handleMessage m = return ()
{-
case m of
KeepAlive -> return () -- TODO: Send KeepAlive back
Handshake {} -> modify $ \s -> s { peerHandshaken = True }
Expand All @@ -121,6 +107,7 @@ handleMessage m =
let changes = readBitfield pieceCount x
modify $ \s -> s { peerHas = peerHas s // changes }
_ -> error $ "Unimplemented message handling of " ++ show m
-}

readBitfield :: Int -> B.ByteString -> [(Int, Bool)]
readBitfield max = filter (\(i, _) -> i <= max) . go 0 . B.unpack
Expand Down
13 changes: 7 additions & 6 deletions src/BitTorrent/PeerManager.hs
Expand Up @@ -4,24 +4,25 @@ module BitTorrent.PeerManager

import Control.Concurrent
import Control.Concurrent.Delay
import Control.Monad.Reader
import Control.Monad.State
import Control.Monad
import Data.Array.IArray
import Data.Array.Unboxed

import Control.Distributed.Process

import BitTorrent.Peer
import BitTorrent.Types

runPeerMgr :: Metainfo -> [Peer] -> IO ()
runPeerMgr :: Metainfo -> [Peer] -> Process ()
runPeerMgr m ps = do
-- Open connections
forM_ ps $ \p -> forkIO $
forM_ ps $ \p ->
let st = defaultPeerState $ length (mtPieces m)
in void $ execStateT (runReaderT (runPeer p) m) st
in spawnLocal $ runPeer m p st

-- Wait a bit for all incoming bitfields and haves to arrive
-- TODO: How long should we wait? Or should we have a smarter approach?
delaySeconds 3
liftIO $ delaySeconds 3

-- Download each piece
-- TODO: This is really dumb right now. Should randomly choose pieces based
Expand Down
6 changes: 3 additions & 3 deletions src/BitTorrent/Tracker.hs
Expand Up @@ -33,9 +33,9 @@ readBody body =
Right bcode ->
case lookupString "peers" bcode of
Just cmp ->
Right $ TrackerResponse { resInterval = fromIntegral . fromJust$ lookupInt "interval" bcode
, resPeers = decodePeers cmp
}
Right TrackerResponse { resInterval = fromIntegral . fromJust$ lookupInt "interval" bcode
, resPeers = decodePeers cmp
}
Nothing -> Left "Failed to find compact peer data"
Left e -> fail $ "Failed to parse bencode: " ++ e

Expand Down
4 changes: 0 additions & 4 deletions src/BitTorrent/Types.hs
@@ -1,7 +1,5 @@
module BitTorrent.Types where

import Control.Monad.State (StateT)
import Control.Monad.Reader (ReaderT)
import Data.Array.IArray
import Data.Array.Unboxed
import qualified Data.ByteString as B
Expand Down Expand Up @@ -59,8 +57,6 @@ data Peer = Peer
, peerPort :: Word16
} deriving (Eq, Show)

type PeerM = ReaderT Metainfo (StateT PeerState IO)

data PeerState = PeerState
{ peerSocket :: Maybe Socket
, peerHandshaken :: Bool
Expand Down
4 changes: 3 additions & 1 deletion torrent.cabal
Expand Up @@ -20,7 +20,9 @@ executable torrent
, bytestring
, HTTP
, network
, mtl
, attoparsec
, attoparsec-binary
, io-streams
, distributed-process
, network-transport-tcp
, array

0 comments on commit 794ebff

Please sign in to comment.