Skip to content

Commit

Permalink
* Unified the Status interface, now all mesages are of type Status
Browse files Browse the repository at this point in the history
  • Loading branch information
royger committed Oct 16, 2010
1 parent 3d25425 commit 3c8adfa
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 76 deletions.
6 changes: 3 additions & 3 deletions ChokeMgr.go
Expand Up @@ -16,14 +16,14 @@ type PeerChoke struct {
}

type ChokeMgr struct {
inStats chan chan map[string]*SpeedInfo
inStats chan chan map[string]*Status
inPeers chan chan map[string]*Peer
optimistic_unchoke int
}

type Speed []*PeerChoke

func NewChokeMgr(inStats chan chan map[string]*SpeedInfo, inPeers chan chan map[string]*Peer) (c *ChokeMgr, err os.Error) {
func NewChokeMgr(inStats chan chan map[string]*Status, inPeers chan chan map[string]*Peer) (c *ChokeMgr, err os.Error) {
c = new(ChokeMgr)
c.inStats = inStats
c.inPeers = inPeers
Expand Down Expand Up @@ -103,7 +103,7 @@ func apply(peers []*PeerChoke) {

func (c *ChokeMgr) RequestPeers() []*PeerChoke {
// Prepare peer array
inStats := make(chan map[string]*SpeedInfo)
inStats := make(chan map[string]*Status)
inPeers := make(chan map[string]*Peer)
lastPiece := int64(0)
// Request info
Expand Down
16 changes: 8 additions & 8 deletions Peer.go
Expand Up @@ -37,15 +37,15 @@ type Peer struct {
received_keepalive int64
writeQueue *PeerQueue
mutex *sync.Mutex
stats chan *PeerStatMsg
stats chan *Status
//log *logger
keepAlive *time.Ticker
inFiles chan *FileMsg
lastPiece int64
is_incoming bool
}

func NewPeer(addr, infohash, peerId string, outgoing chan *message, numPieces int64, requests chan *PieceMgrRequest, our_bitfield *Bitfield, stats chan *PeerStatMsg, inFiles chan *FileMsg, up_limit *time.Ticker, down_limit *time.Ticker) (p *Peer, err os.Error) {
func NewPeer(addr, infohash, peerId string, outgoing chan *message, numPieces int64, requests chan *PieceMgrRequest, our_bitfield *Bitfield, stats chan *Status, inFiles chan *FileMsg, up_limit *time.Ticker, down_limit *time.Ticker) (p *Peer, err os.Error) {
p = new(Peer)
p.mutex = new(sync.Mutex)
p.addr = addr
Expand Down Expand Up @@ -78,7 +78,7 @@ func NewPeer(addr, infohash, peerId string, outgoing chan *message, numPieces in
return
}

func NewPeerFromConn(conn net.Conn, infohash, peerId string, outgoing chan *message, numPieces int64, requests chan *PieceMgrRequest, our_bitfield *Bitfield, stats chan *PeerStatMsg, inFiles chan *FileMsg, up_limit *time.Ticker, down_limit *time.Ticker) (p *Peer, err os.Error) {
func NewPeerFromConn(conn net.Conn, infohash, peerId string, outgoing chan *message, numPieces int64, requests chan *PieceMgrRequest, our_bitfield *Bitfield, stats chan *Status, inFiles chan *FileMsg, up_limit *time.Ticker, down_limit *time.Ticker) (p *Peer, err os.Error) {
addr := conn.RemoteAddr().String()
p, err = NewPeer(addr, infohash, peerId, outgoing, numPieces, requests, our_bitfield, stats, inFiles, up_limit, down_limit)
p.wire, err = NewWire(p.infohash, p.our_peerId, conn, p.up_limit, p.down_limit, p.inFiles)
Expand Down Expand Up @@ -197,8 +197,8 @@ func (p *Peer) PeerWriter() {
}
// Send message to StatMgr
if msg.msgId == piece {
statMsg := new(PeerStatMsg)
statMsg.size_down = int64(msg.length - 9)
statMsg := new(Status)
statMsg.downloaded = int64(msg.length - 9)
statMsg.addr = p.addr
p.stats <- statMsg
//log.Println(statMsg)
Expand Down Expand Up @@ -236,8 +236,8 @@ func (p *Peer) PeerReader() {
p.received_keepalive = time.Seconds()
} else {
if msg.msgId == piece {
statMsg := new(PeerStatMsg)
statMsg.size_up = int64(msg.length - 9)
statMsg := new(Status)
statMsg.uploaded = int64(msg.length - 9)
statMsg.addr = p.addr
p.stats <- statMsg
}
Expand Down Expand Up @@ -373,7 +373,7 @@ func (p *Peer) Close() {
p.requests <- &PieceMgrRequest{msg: &message{length: 1, msgId: exit, addr: []string{p.addr}}}
//p.log.Output("Finished sending message")
// Sending message to Stats
p.stats <- &PeerStatMsg{size_up: 0, size_down: 0, addr: p.addr}
p.stats <- &Status{uploaded: 0, downloaded: 0, addr: p.addr}
// Finished
close(p.incoming)
close(p.in)
Expand Down
4 changes: 2 additions & 2 deletions PeerMgr.go
Expand Up @@ -32,7 +32,7 @@ type PeerMgr struct {
tracker <- chan peersList // Channel used to comunicate the Tracker thread and the PeerMgr
inTracker chan <- int
requests chan *PieceMgrRequest
stats chan *PeerStatMsg
stats chan *Status
inChokeMgr chan chan map[string]*Peer
our_bitfield *Bitfield
numPieces int64
Expand All @@ -44,7 +44,7 @@ type PeerMgr struct {

// Create a PeerMgr

func NewPeerMgr(tracker chan peersList, inTracker chan int, numPieces int64, peerid, infohash string, requests chan *PieceMgrRequest, peerMgr chan *message, our_bitfield *Bitfield, stats chan *PeerStatMsg, inListener chan net.Conn, inChokeMgr chan chan map[string]*Peer, inFiles chan *FileMsg, up_limit *time.Ticker, down_limit *time.Ticker) (p *PeerMgr, err os.Error) {
func NewPeerMgr(tracker chan peersList, inTracker chan int, numPieces int64, peerid, infohash string, requests chan *PieceMgrRequest, peerMgr chan *message, our_bitfield *Bitfield, stats chan *Status, inListener chan net.Conn, inChokeMgr chan chan map[string]*Peer, inFiles chan *FileMsg, up_limit *time.Ticker, down_limit *time.Ticker) (p *PeerMgr, err os.Error) {
p = new(PeerMgr)
p.incoming = make(chan *message)
p.tracker = tracker
Expand Down
46 changes: 18 additions & 28 deletions PieceData.go
Expand Up @@ -122,16 +122,16 @@ func (pd *PieceData) RemoveAll(addr string) {
}

func (pd *PieceData) SearchPeers(rpiece, rblock, size int64, our_addr string) (others []string){
others = make([]string, size)
i := 0
others = make([]string, 0, size)
for addr, _ := range(pd.peers) {
if addr != our_addr {
for ref, _ := range(pd.peers[addr]) {
pieceNum, blockNum := uint32(ref>>32), uint32(ref)
if int64(pieceNum) == rpiece && int64(blockNum) == rblock {
// Add to return array
others[i] = addr
i++
//others[i] = addr
//i++
others = appendString(others, addr)
// Remove from list
pd.peers[addr][ref] = 0, false
// If peer list is empty, remove peer
Expand Down Expand Up @@ -190,28 +190,6 @@ func (pd *PieceData) SearchPiece(addr string, bitfield *Bitfield) (rpiece int64,
return
}
}
// Find a better way to do this
// piece := pd.bitfield.FindNextPiece(start, bitfield.Bytes())
/*for i := start; i < totalPieces; i++ {
if !pd.bitfield.IsSet(i) && bitfield.IsSet(i) {
if _, ok := pd.pieces[i]; !ok {
// Add new piece to set
pd.Add(addr, i, 0)
rpiece, rblock = i, 0
return
}
}
}
for i:= int64(0); i < start; i++ {
if !pd.bitfield.IsSet(i) && bitfield.IsSet(i) {
if _, ok := pd.pieces[i]; !ok {
// Add new piece to set
pd.Add(addr, i, 0)
rpiece, rblock = i, 0
return
}
}
}*/
// If all pieces are taken, double up on an active piece
// if only 20% of pieces remaining
//log.Println("PieceData -> Trying to enter endgame mode")
Expand Down Expand Up @@ -263,6 +241,18 @@ func (pd *PieceData) Clean() {
}
}
}
//log.Println("Peers map:", pd.peers)
//log.Println("Pieces map:", pd.pieces)
}

func appendString(slice []string, data string) []string {
l := len(slice)
if l + 1 > cap(slice) { // reallocate
// Allocate 10 more slots
newSlice := make([]string, (l+10))
// The copy function is predeclared and works for any slice type.
copy(newSlice, slice)
slice = newSlice
}
slice = slice[0:l+1]
slice[l] = data
return slice
}
8 changes: 4 additions & 4 deletions PieceMgr.go
Expand Up @@ -26,14 +26,14 @@ type PieceMgr struct {
peerMgr chan *message
inStats chan string
inFiles chan *FileMsg
outStats chan *SpeedInfo
outStats chan *Status
pieceData *PieceData
pieceLength, lastPieceLength, totalPieces, totalSize int64
files FileStore
bitfield *Bitfield
}

func NewPieceMgr(requests chan *PieceMgrRequest, peerMgr chan *message, inStats chan string, outStats chan *SpeedInfo, files FileStore, bitfield *Bitfield, pieceLength, lastPieceLength, totalPieces, totalSize int64, inFiles chan *FileMsg) (pieceMgr *PieceMgr, err os.Error){
func NewPieceMgr(requests chan *PieceMgrRequest, peerMgr chan *message, inStats chan string, outStats chan *Status, files FileStore, bitfield *Bitfield, pieceLength, lastPieceLength, totalPieces, totalSize int64, inFiles chan *FileMsg) (pieceMgr *PieceMgr, err os.Error){
pieceMgr = new(PieceMgr)
pieceMgr.files = files
pieceMgr.pieceLength = pieceLength
Expand Down Expand Up @@ -98,8 +98,8 @@ func (p *PieceMgr) ProcessRequest(msg *PieceMgrRequest) {
speed := <- p.outStats
// Calculate number of pieces to request to have 10s worth of pieces incoming
requests := int64(DEFAULT_REQUESTS)
if speed.upload != 0 {
requests = int64(math.Ceil(float64(REQUESTS_LENGTH)/(float64(STANDARD_BLOCK_LENGTH)/float64(speed.upload))))
if speed.uploaded != 0 {
requests = int64(math.Ceil(float64(REQUESTS_LENGTH)/(float64(STANDARD_BLOCK_LENGTH)/float64(speed.uploaded))))
}
//log.Println("PieceMgr -> Requesting", requests, "from peer", msg.our_addr, "with speed:", speed.upload)
for i := p.pieceData.NumPieces(msg.our_addr); i < MAX_REQUESTS && i < requests; i++ {
Expand Down
50 changes: 28 additions & 22 deletions Stats.go
Expand Up @@ -11,22 +11,27 @@ import(
"fmt"
)

type PeerStatMsg struct {
/*type PeerStatMsg struct {
size_up int64 // bytes
size_down int64
addr string
}
}*/

type SpeedInfo struct {
/*upload int64
download int64*/
/*type SpeedInfo struct {
//upload int64
//download int64
speed int64
upload int64
download int64
}
}*/

type TrackerStatMsg struct {
/*type TrackerStatMsg struct {
uploaded, downloaded, left int64
}*/

type Status struct {
uploaded, downloaded, speed int64
addr string
}

type PeerStat struct {
Expand All @@ -39,17 +44,17 @@ type PeerStat struct {

type Stats struct {
peers map[string] *PeerStat
stats chan *PeerStatMsg
inTracker chan <- *TrackerStatMsg
inChokeMgr chan chan map[string]*SpeedInfo
stats chan *Status
inTracker chan <- *Status
inChokeMgr chan chan map[string]*Status
outPieceMgr chan string
inPieceMgr chan *SpeedInfo
inPieceMgr chan *Status
left, size, uploaded, downloaded int64
pod_up, pod_down []int64
n int
}

func NewStats(stats chan *PeerStatMsg, inTracker chan *TrackerStatMsg, inChokeMgr chan chan map[string]*SpeedInfo, outPieceMgr chan string, inPieceMgr chan *SpeedInfo, left, size int64) (s *Stats) {
func NewStats(stats chan *Status, inTracker chan *Status, inChokeMgr chan chan map[string]*Status, outPieceMgr chan string, inPieceMgr chan *Status, left, size int64) (s *Stats) {
s = new(Stats)
s.peers = make(map[string] *PeerStat)
s.stats = stats
Expand All @@ -63,14 +68,14 @@ func NewStats(stats chan *PeerStatMsg, inTracker chan *TrackerStatMsg, inChokeMg
return
}

func (s *Stats) Update(msg *PeerStatMsg) {
func (s *Stats) Update(msg *Status) {
if _, ok := s.peers[msg.addr]; !ok {
s.peers[msg.addr] = new(PeerStat)
s.peers[msg.addr].pod_up = make([]int64, PONDERATION_TIME)
s.peers[msg.addr].pod_down = make([]int64, PONDERATION_TIME)
}
s.peers[msg.addr].size_up += msg.size_up
s.peers[msg.addr].size_down += msg.size_down
s.peers[msg.addr].size_up += msg.uploaded
s.peers[msg.addr].size_down += msg.downloaded
}

func (s *Stats) Remove(addr string) {
Expand Down Expand Up @@ -135,7 +140,7 @@ func (s *Stats) Run() {
select {
case msg := <- s.stats:
//log.Println("Stats -> Received message")
if msg.size_up > 0 || msg.size_down > 0 {
if msg.uploaded > 0 || msg.downloaded > 0 {
//log.Println("Stats -> Updating peer stats")
s.Update(msg)
//log.Println("Stats -> Finished updating peer stats")
Expand All @@ -149,11 +154,12 @@ func (s *Stats) Run() {
s.Round()
//log.Println("Stats -> Finished processing stats")
case <- tracker:
s.inTracker <- &TrackerStatMsg{uploaded: s.uploaded, downloaded: s.downloaded, left: s.left}
s.inTracker <- &Status{uploaded: s.uploaded, downloaded: s.downloaded}
case c := <- s.inChokeMgr:
peers := make(map[string]*SpeedInfo)
peers := make(map[string]*Status)
for addr, peer := range(s.peers) {
choke := new(SpeedInfo)
choke := new(Status)
// use bitfield instead of "left"
if s.left == 0 {
for _, speed := range peer.pod_down {
choke.speed += speed
Expand All @@ -168,12 +174,12 @@ func (s *Stats) Run() {
}
c <- peers
case addr := <- s.outPieceMgr:
speed := new(SpeedInfo)
speed := new(Status)
if peer, ok := s.peers[addr]; ok {
for _, up := range peer.pod_up {
speed.upload += up
speed.uploaded += up
}
speed.upload = speed.upload/PONDERATION_TIME
speed.uploaded = speed.uploaded/PONDERATION_TIME
}
s.inPieceMgr <- speed
}
Expand Down
8 changes: 4 additions & 4 deletions Tracker.go
Expand Up @@ -26,8 +26,8 @@ type Tracker struct {
// Chanels
outPeerMgr chan <- peersList
inPeerMgr <- chan int
outStatus chan <- *trackerStatusMsg
inStatus <- chan *TrackerStatMsg
outStatus chan <- *Status
inStatus <- chan *Status
announce *time.Ticker
//inStatus <- chan statusMsg
// Internal data for tracker requests
Expand All @@ -53,7 +53,7 @@ type trackerStatusMsg struct {
Complete, Incomplete, Interval int
}

func NewTracker(url, infohash, port string, outPeerMgr chan peersList, inPeerMgr chan int, outStatus chan *trackerStatusMsg, inStatus chan *TrackerStatMsg, left int64) (t *Tracker) {
func NewTracker(url, infohash, port string, outPeerMgr chan peersList, inPeerMgr chan int, outStatus chan *Status, inStatus chan *Status, left int64) (t *Tracker) {
sid := CLIENT_ID + "-" + strconv.Itoa(os.Getpid()) + strconv.Itoa64(rand.Int63())
t = &Tracker{url: url,
infohash: infohash,
Expand Down Expand Up @@ -94,7 +94,7 @@ func (t *Tracker) Run() {
log.Println("Tracker -> Requesting", num, "peers in new announce")
t.num_peers = num
case stat := <- t.inStatus:
t.uploaded, t.downloaded, t.left = stat.uploaded, stat.downloaded, stat.left
t.uploaded, t.downloaded = stat.uploaded, stat.downloaded
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions test.go
Expand Up @@ -27,13 +27,13 @@ func main() {
// Create channels for test
outPeerMgr := make(chan peersList)
inTracker := make(chan int)
outStatus := make(chan *trackerStatusMsg)
inStatus := make(chan *TrackerStatMsg)
outStatus := make(chan *Status)
inStatus := make(chan *Status)
outListen := make(chan net.Conn)
inPeerMgr := make(chan chan map[string]*Peer)
outChokeMgr := make(chan chan map[string]*SpeedInfo)
outChokeMgr := make(chan chan map[string]*Status)
outPieceMgrInStats := make(chan string)
outStatsInPieceMgr := make(chan *SpeedInfo)
outStatsInPieceMgr := make(chan *Status)
outPeerInFiles := make(chan *FileMsg)
// Load torrent file
torr, err := NewTorrent(*torrent)
Expand Down Expand Up @@ -63,7 +63,7 @@ func main() {
t := NewTracker(torr.Announce, torr.Infohash, *listen_port, outPeerMgr, inTracker, outStatus, inStatus, left)
go t.Run()
// Initilize Stats
stats := make(chan *PeerStatMsg)
stats := make(chan *Status)
s := NewStats(stats, inStatus, outChokeMgr, outPieceMgrInStats, outStatsInPieceMgr, left, size)
go s.Run()
// Initialize peerMgr
Expand Down

0 comments on commit 3c8adfa

Please sign in to comment.