Skip to content

Commit

Permalink
congestion control is active only during OPEN and PARTOPEN
Browse files Browse the repository at this point in the history
  • Loading branch information
Petar Maymounkov committed Jun 15, 2011
1 parent 81411c4 commit b15a4e3
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 27 deletions.
3 changes: 1 addition & 2 deletions dccp/REMARK
@@ -1,8 +1,7 @@

* CCID-inspired Reset and Reset-code separation b/w Sender/receiver

* use cc only in OPEN and PARTOPEN states
* introduce a readCongestionLoop and writeCongestionLoop to consume timeout and sync cc events
* CCID-inspired Reset and Reset-code separation b/w Sender/receiver

* Reliable transport
* test reliable transport (simulated drops)
Expand Down
136 changes: 124 additions & 12 deletions dccp/cc.go
Expand Up @@ -37,12 +37,6 @@ import "os"
// sender (aka Half-Connection Sender CCID)
type SenderCongestionControl interface {

// Start tells the Congestion Control that it is being put in use.
// This method is handy since CC is generally time-sensitive, and so having
// an indication of "start" allows the CC to distinguish between its creation
// time and the time when it actually starts being utilized.
Start()

// GetID() returns the CCID of this congestion control algorithm
GetID() byte

Expand All @@ -52,6 +46,12 @@ type SenderCongestionControl interface {
// GetRTT returns the Round-Trip Time as measured by this CCID
GetRTT() int64

// Open tells the Congestion Control that the connection has entered
// OPEN or PARTOPEN state and that the CC can now kick in. Before the
// call to Open and after the call to Close, the Strobe function is
// expected to return immediately.
Open()

// Conn calls OnWrite before a packet is sent to give CongestionControl
// an opportunity to add CCVal and options to an outgoing packet
OnWrite(htype byte, x bool, seqno int64) (ccval byte, options []*Option)
Expand All @@ -77,15 +77,13 @@ type SenderCongestionControl interface {
// receiver (aka Half-Connection Receiver CCID)
type ReceiverCongestionControl interface {

// Start tells the Congestion Control that it is being put in use.
// This method is handy since CC is generally time-sensitive, and so having
// an indication of "start" allows the CC to distinguish between its creation
// time and the time when it actually starts being utilized.
Start()

// GetID() returns the CCID of this congestion control algorithm
GetID() byte

// Open tells the Congestion Control that the connection has entered
// OPEN or PARTOPEN state and that the CC can now kick in.
Open()

// Conn calls OnWrite before a packet is sent to give CongestionControl
// an opportunity to add CCVal and options to an outgoing packet
OnWrite(htype byte, x bool, seqno int64) (options []*Option)
Expand All @@ -110,3 +108,117 @@ const (
CCID2 = 2 // TCP-like Congestion Control, RFC 4341
CCID3 = 3 // TCP-Friendly Rate Control (TFRC), RFC 4342
)

// ---> Sender Congestion Control Activator

func newActivatorForSenderCongestionControl(scc SenderCongestionControl) SenderCongestionControl {
return &senderCongestionControlActivator{ phase: ACTIVATOR_INIT, SenderCongestionControl: scc }
}

type senderCongestionControlActivator struct {
Mutex
phase byte
SenderCongestionControl
}
const (
ACTIVATOR_INIT = iota
ACTIVATOR_OPEN
ACTIVATOR_CLOSED
)

func (sa *senderCongestionControlActivator) Open() {
sa.Lock()
defer sa.Unlock()
if sa.phase != ACTIVATOR_INIT {
return
}
sa.phase = ACTIVATOR_OPEN
sa.SenderCongestionControl.Open()
}

func (sa *senderCongestionControlActivator) OnWrite(htype byte, x bool, seqno int64) (ccval byte, options []*Option) {
sa.Lock()
defer sa.Unlock()
if sa.phase == ACTIVATOR_OPEN {
return sa.SenderCongestionControl.OnWrite(htype, x, seqno)
}
return 0, nil
}

func (sa *senderCongestionControlActivator) OnRead(htype byte, x bool, seqno int64, options []*Option) os.Error {
sa.Lock()
defer sa.Unlock()
if sa.phase == ACTIVATOR_OPEN {
return sa.SenderCongestionControl.OnRead(htype, x, seqno, options)
}
return nil
}

func (sa *senderCongestionControlActivator) Strobe() os.Error {
sa.Lock()
defer sa.Unlock()
if sa.phase == ACTIVATOR_OPEN {
return sa.SenderCongestionControl.Strobe()
}
return nil
}

func (sa *senderCongestionControlActivator) Close() os.Error {
sa.Lock()
defer sa.Unlock()
if sa.phase != ACTIVATOR_OPEN {
return nil
}
sa.phase = ACTIVATOR_CLOSED
return sa.SenderCongestionControl.Close()
}

// ---> Receiver Congestion Control Activator

func newActivatorForReceiverCongestionControl(rcc ReceiverCongestionControl) ReceiverCongestionControl {
return &receiverCongestionControlActivator{ phase: ACTIVATOR_INIT, ReceiverCongestionControl: rcc }
}

type receiverCongestionControlActivator struct {
Mutex
phase byte
ReceiverCongestionControl
}

func (ra *receiverCongestionControlActivator) Open() {
ra.Lock()
defer ra.Unlock()
if ra.phase != ACTIVATOR_INIT {
return
}
ra.phase = ACTIVATOR_OPEN
ra.ReceiverCongestionControl.Open()
}

func (ra *receiverCongestionControlActivator) OnWrite(htype byte, x bool, seqno int64) (options []*Option) {
ra.Lock()
defer ra.Unlock()
if ra.phase == ACTIVATOR_OPEN {
return ra.ReceiverCongestionControl.OnWrite(htype, x, seqno)
}
return nil
}

func (ra *receiverCongestionControlActivator) OnRead(htype byte, x bool, seqno int64, ccval byte, options []*Option) os.Error {
ra.Lock()
defer ra.Unlock()
if ra.phase == ACTIVATOR_OPEN {
return ra.ReceiverCongestionControl.OnRead(htype, x, seqno, ccval, options)
}
return nil
}

func (ra *receiverCongestionControlActivator) Close() os.Error {
ra.Lock()
defer ra.Unlock()
if ra.phase != ACTIVATOR_OPEN {
return nil
}
ra.phase = ACTIVATOR_CLOSED
return ra.ReceiverCongestionControl.Close()
}
4 changes: 2 additions & 2 deletions dccp/ccfixed.go
Expand Up @@ -39,7 +39,7 @@ func newFixedRateSenderControl(every int64) *fixedRateSenderControl {
return &fixedRateSenderControl{ every: every, strobeRead: strobe, strobeWrite: strobe }
}

func (scc *fixedRateSenderControl) Start() {
func (scc *fixedRateSenderControl) Open() {
go func() {
for {
scc.Lock()
Expand Down Expand Up @@ -93,7 +93,7 @@ func newFixedRateReceiverControl() *fixedRateReceiverControl {
return &fixedRateReceiverControl{}
}

func (rcc *fixedRateReceiverControl) Start() {
func (rcc *fixedRateReceiverControl) Open() {
}

func (rcc *fixedRateReceiverControl) GetID() byte { return CCID_FIXED }
Expand Down
12 changes: 5 additions & 7 deletions dccp/conn.go
Expand Up @@ -9,21 +9,23 @@ type Conn struct {
hc HeaderConn
scc SenderCongestionControl
rcc ReceiverCongestionControl

Mutex // Protects access to socket
socket

readAppLk Mutex
readApp chan []byte // readLoop() sends application data to Read()
writeDataLk Mutex
writeData chan []byte // Write() sends application data to writeLoop()
writeNonDataLk Mutex // this lock used when sending/closing writeNonData
writeNonDataLk Mutex
writeNonData chan *Header // inject() sends wire-format non-Data packets (higher priority) to writeLoop()
}

func newConn(hc HeaderConn, scc SenderCongestionControl, rcc ReceiverCongestionControl) *Conn {
c := &Conn{
hc: hc,
scc: scc,
rcc: rcc,
scc: newActivatorForSenderCongestionControl(scc),
rcc: newActivatorForReceiverCongestionControl(rcc),
readApp: make(chan []byte, 5),
writeData: make(chan []byte),
writeNonData: make(chan *Header, 5),
Expand All @@ -43,10 +45,6 @@ func newConn(hc HeaderConn, scc SenderCongestionControl, rcc ReceiverCongestionC
c.syncWithCongestionControl()
c.Unlock()

// Start congestion control mechanisms
scc.Start()
rcc.Start()

return c
}

Expand Down
10 changes: 10 additions & 0 deletions dccp/conngoto.go
Expand Up @@ -96,6 +96,8 @@ const (
func (c *Conn) gotoPARTOPEN() {
c.AssertLocked()
c.socket.SetState(PARTOPEN)
c.scc.Open()
c.rcc.Open()
c.inject(nil) // Unblocks the writeLoop select, so it can see the state change

// Start PARTOPEN timer, according to Section 8.1.5
Expand Down Expand Up @@ -129,13 +131,17 @@ func (c *Conn) gotoOPEN(hSeqNo int64) {
c.AssertLocked()
c.socket.SetOSR(hSeqNo)
c.socket.SetState(OPEN)
c.scc.Open()
c.rcc.Open()
c.inject(nil) // Unblocks the writeLoop select, so it can see the state change
}

func (c *Conn) gotoTIMEWAIT() {
c.AssertLocked()
c.teardownUser()
c.socket.SetState(TIMEWAIT)
c.scc.Close()
c.rcc.Close()
go func() {
time.Sleep(2 * MSL)
c.abortQuietly()
Expand All @@ -146,6 +152,8 @@ func (c *Conn) gotoCLOSING() {
c.AssertLocked()
c.teardownUser()
c.socket.SetState(CLOSING)
c.scc.Close()
c.rcc.Close()
go func() {
c.Lock()
rtt := c.socket.GetRTT()
Expand Down Expand Up @@ -176,4 +184,6 @@ func (c *Conn) gotoCLOSED() {
c.AssertLocked()
c.teardownUser()
c.socket.SetState(CLOSED)
c.scc.Close()
c.rcc.Close()
}
8 changes: 4 additions & 4 deletions dccp/conninj.go
Expand Up @@ -40,7 +40,10 @@ func (c *Conn) inject(h *Header) {
}
// Dropping a nil is OK, since it happens only if there are other packets in the queue
if len(c.writeNonData) < cap(c.writeNonData) {
c.writeNonData <- c.writeCCID(h)
if h != nil {
h = c.writeCCID(h)
}
c.writeNonData <- h
if h != nil {
c.logWriteHeaderLocked(h)
}
Expand Down Expand Up @@ -159,8 +162,5 @@ func (c *Conn) writeLoop(writeNonData chan *Header, writeData chan []byte) {
}
}

// Close the congestion control here when it won't be needed any longer
Exit:
c.scc.Close()
c.rcc.Close()
}

0 comments on commit b15a4e3

Please sign in to comment.