forked from bangalore-haskell-user-group/functorrent
-
Notifications
You must be signed in to change notification settings - Fork 1
/
ControlThread.hs
159 lines (132 loc) · 4.9 KB
/
ControlThread.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
{-# LANGUAGE OverloadedStrings #-}
module FuncTorrent.ControlThread where
import Control.Concurrent
import Control.Monad hiding (forM, forM_, mapM, mapM_, msum, sequence, sequence_)
import Data.IORef
import GHC.Conc
import FuncTorrent.Bencode (decode)
import FuncTorrent.Metainfo (Metainfo(..))
import FuncTorrent.Tracker (TrackerResponse(..), tracker, mkTrackerResponse, peers)
import FuncTorrent.Peer (Peer(..))
import FuncTorrent.PeerThread
import FuncTorrent.PeerThreadData
data ControlThread = ControlThread
{ metaInfo :: Metainfo
, trackerResponses :: [TrackerResponse]
, peerList :: [Peer]
, peerThreads :: [(PeerThread, ThreadId)]
, controlTStatus :: IORef ControlThreadStatus
, controlTAction :: IORef ControlThreadAction
}
data ControlThreadStatus =
Stopped
| Downloading
| Seeding
deriving (Eq, Show)
data ControlThreadAction =
Download
| Pause
| Seed
| Stop
deriving (Eq, Show)
-- Description
-- ControlThread handles all operations for a single torrent
-- It is responsible for
-- 1. Do parsing of torrent file.
-- 2. Communicate with trackers and obtain peers
-- 3. Initiate PeerThreads to do peer communication
-- 4. Control the activity of PeerThreads
-- 5. Maintain cache, etc
-- 6. Handle incoming connections
-- The overall operation may be divided into following parts
-- 1. Initialization.
-- 2. downloading/seeding.
-- 3. Stopping download/seed.
--
controlThreadMain :: ControlThread -> IO ()
controlThreadMain ct =
doExit =<< (mainLoop <=< doInitialization) ct
doInitialization :: ControlThread -> IO ControlThread
doInitialization ct =
getTrackerResponse ct >>= \x ->
let peerInit = take 4 (peerList x)
in foldM forkPeerThread x peerInit
mainLoop :: ControlThread -> IO ControlThread
mainLoop ct =
-- At this stage rank peers and decide if we want to disconnect
-- And create more peers/ use incoming connections.
filterBadPeers ct >>=
pieceManagement >>=
-- Loop Here and check if we need to quit/exit
-- Add delay here before polling PeerThreads again
checkAction
where
checkAction :: ControlThread -> IO ControlThread
checkAction ct1 = do
putStrLn "Check control thread action"
-- TODO: This will cause a 4s delay b/w a ^C and the app going down
threadDelay $ 4*1000*1000
action <- readIORef $ controlTAction ct1
case action of
FuncTorrent.ControlThread.Stop -> return ct1
_ -> mainLoop ct1
doExit :: ControlThread -> IO ()
doExit ct = do
putStrLn "Doing control-thread exit"
let peerTs = peerThreads ct
-- let the peer threads stop themselves
mapM_ (setPeerThreadAction FuncTorrent.PeerThreadData.Stop . fst) peerTs
-- Let the threads run for a while
-- We may add delay also if required
yield
-- remove all the threads which stopped successfully
ct1 <- clearFinishedThreads ct
-- If there are still threads waiting/blocked then either wait
-- if they are blocked due to disk write, then wait and retry
-- if thread not responding then kill the thread
unless (null (peerThreads ct1)) $ doExit ct1
where
clearFinishedThreads :: ControlThread -> IO ControlThread
clearFinishedThreads ct1 = do
remainingThreads <- filterM isRunning (peerThreads ct1)
return (ct1 {peerThreads = remainingThreads})
where
isRunning (_,tid) =
threadStatus tid >>= (\x -> return $ ThreadFinished /= x)
getTrackerResponse :: ControlThread -> IO ControlThread
getTrackerResponse ct = do
response <- tracker (metaInfo ct) "-HS0001-*-*-20150215"
-- TODO: Write to ~/.functorrent/caches
-- writeFile (name (info m) ++ ".cache") response
case decode response of
Right trackerInfo ->
case mkTrackerResponse trackerInfo of
Right trackerResp ->
let newTrackerResponses = trackerResp : trackerResponses ct
newPeerList = peerList ct ++ peers trackerResp
in return ct {trackerResponses = newTrackerResponses,
peerList = newPeerList}
Left _ -> putStrLn "mkTracker error" >> return ct
Left _ -> putStrLn "tracker resp decode error" >> return ct
-- Forks a peer-thread and add it to the peerThreads list
forkPeerThread :: ControlThread -> Peer -> IO ControlThread
forkPeerThread ct p = do
pt <- initPeerThread p
let newPeerThreads = pt : peerThreads ct -- Append pt to peerThreads
return ct { peerThreads = newPeerThreads}
-- Piece Management Stuff
pieceManagement :: ControlThread -> IO ControlThread
pieceManagement ct = do
putStrLn "Manage pieces"
return ct
filterBadPeers :: ControlThread -> IO ControlThread
filterBadPeers ct = do
putStrLn "Filter bad peers"
return ct
initControlThread :: Metainfo -> IO (ControlThread, ThreadId)
initControlThread m = do
st <- newIORef Stopped
a <- newIORef Download
let ct = ControlThread m [] [] [] st a
tid <- forkIO $ controlThreadMain ct
return (ct, tid)