Skip to content

Commit

Permalink
tcp: (feature #2823) Implemented TCP Pacing
Browse files Browse the repository at this point in the history
  • Loading branch information
Vivek-anand-jain authored and natale-p committed Nov 30, 2017
1 parent 9772db1 commit b179633
Show file tree
Hide file tree
Showing 4 changed files with 262 additions and 6 deletions.
2 changes: 2 additions & 0 deletions RELEASE_NOTES
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ This release has been tested on the following platforms:
New user-visible features
-------------------------

- (tcp) Implemented the core functionality of TCP Pacing.

Bugs fixed
----------
- Bug 2505 - network: Avoid asserts in Header/Trailer deserialization
Expand Down
155 changes: 155 additions & 0 deletions scratch/tcp-pacing.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation;
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/

// Network topology
//
// n0 ----------- n1
// 40 Gbps
// 0.01 ms

// This programs illustrates how TCP pacing can be used and how user can set
// pacing rate. The program gives information about each flow like transmitted
// and received bytes (packets) and throughput of that flow. Currently, it is
// using TCP NewReno but in future after having congestion control algorithms
// which can change pacing rate can be used.

#include <string>
#include <fstream>
#include "ns3/core-module.h"
#include "ns3/point-to-point-module.h"
#include "ns3/internet-module.h"
#include "ns3/applications-module.h"
#include "ns3/network-module.h"
#include "ns3/packet-sink.h"
#include "ns3/flow-monitor-module.h"

using namespace ns3;

NS_LOG_COMPONENT_DEFINE ("TcpPacingExample");

int
main (int argc, char *argv[])
{

bool tracing = false;
uint32_t maxBytes = 0;
uint32_t TCPFlows = 1;
bool isPacingEnabled = true;
std::string pacingRate = "4Gbps";
bool isSack = false;
uint32_t maxPackets = 0;

CommandLine cmd;
cmd.AddValue ("tracing", "Flag to enable/disable tracing", tracing);
cmd.AddValue ("maxBytes",
"Total number of bytes for application to send", maxBytes);
cmd.AddValue ("maxPackets",
"Total number of bytes for application to send", maxPackets);
cmd.AddValue ("TCPFlows", "Number of application flows between sender and receiver", TCPFlows);
cmd.AddValue ("Pacing", "Flag to enable/disable pacing in TCP", isPacingEnabled);
cmd.AddValue ("Sack", "Flag to enable/disable sack in TCP", isSack);
cmd.AddValue ("PacingRate", "Max Pacing Rate in bps", pacingRate);
cmd.Parse (argc, argv);

if (maxPackets != 0 )
{
maxBytes = 500 * maxPackets;
}

Config::SetDefault ("ns3::TcpSocketState::MaxPacingRate", StringValue (pacingRate));
Config::SetDefault ("ns3::TcpSocketState::EnablePacing", BooleanValue (isPacingEnabled));
Config::SetDefault ("ns3::TcpSocketBase::Sack", BooleanValue (isSack));

NS_LOG_INFO ("Create nodes.");
NodeContainer nodes;
nodes.Create (2);

NS_LOG_INFO ("Create channels.");
PointToPointHelper pointToPoint;
pointToPoint.SetDeviceAttribute ("DataRate", StringValue ("40Gbps"));
pointToPoint.SetChannelAttribute ("Delay", StringValue ("0.01ms"));

NetDeviceContainer devices;
devices = pointToPoint.Install (nodes);

InternetStackHelper internet;
internet.Install (nodes);

NS_LOG_INFO ("Assign IP Addresses.");
Ipv4AddressHelper ipv4;
ipv4.SetBase ("10.1.1.0", "255.255.255.0");
Ipv4InterfaceContainer i = ipv4.Assign (devices);

NS_LOG_INFO ("Create Applications.");

ApplicationContainer sourceApps;
ApplicationContainer sinkApps;
for (uint32_t iterator = 0; iterator < TCPFlows; iterator++)
{
uint16_t port = 10000 + iterator;

BulkSendHelper source ("ns3::TcpSocketFactory",
InetSocketAddress (i.GetAddress (1), port));
// Set the amount of data to send in bytes. Zero is unlimited.
source.SetAttribute ("MaxBytes", UintegerValue (maxBytes));
sourceApps.Add (source.Install (nodes.Get (0)));

PacketSinkHelper sink ("ns3::TcpSocketFactory",
InetSocketAddress (Ipv4Address::GetAny (), port));
sinkApps.Add (sink.Install (nodes.Get (1)));
}

sinkApps.Start (Seconds (0.0));
sinkApps.Stop (Seconds (5));
sourceApps.Start (Seconds (1));
sourceApps.Stop (Seconds (5));

if (tracing)
{
AsciiTraceHelper ascii;
pointToPoint.EnableAsciiAll (ascii.CreateFileStream ("tcp-pacing.tr"));
pointToPoint.EnablePcapAll ("tcp-pacing", false);
}

FlowMonitorHelper flowmon;
Ptr<FlowMonitor> monitor = flowmon.InstallAll ();

NS_LOG_INFO ("Run Simulation.");
Simulator::Stop (Seconds (5));
Simulator::Run ();

monitor->CheckForLostPackets ();
Ptr<Ipv4FlowClassifier> classifier = DynamicCast<Ipv4FlowClassifier> (flowmon.GetClassifier ());
FlowMonitor::FlowStatsContainer stats = monitor->GetFlowStats ();
for (std::map<FlowId, FlowMonitor::FlowStats>::const_iterator i = stats.begin (); i != stats.end (); ++i)
{
Ipv4FlowClassifier::FiveTuple t = classifier->FindFlow (i->first);
if (t.sourceAddress == "10.1.1.2")
{
continue;
}
std::cout << "Flow " << i->first << " (" << t.sourceAddress << " -> " << t.destinationAddress << ")\n";
std::cout << " Tx Packets: " << i->second.txPackets << "\n";
std::cout << " Tx Bytes: " << i->second.txBytes << "\n";
std::cout << " TxOffered: " << i->second.txBytes * 8.0 / 9.0 / 1000 / 1000 << " Mbps\n";
std::cout << " Rx Packets: " << i->second.rxPackets << "\n";
std::cout << " Rx Bytes: " << i->second.rxBytes << "\n";
std::cout << " Throughput: " << i->second.rxBytes * 8.0 / 9.0 / 1000 / 1000 << " Mbps\n";
}

Simulator::Destroy ();
NS_LOG_INFO ("Done.");
}
96 changes: 90 additions & 6 deletions src/internet/model/tcp-socket-base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ TcpSocketBase::GetInstanceTypeId () const
return TcpSocketBase::GetTypeId ();
}

NS_OBJECT_ENSURE_REGISTERED (TcpSocketState);

TypeId
TcpSocketState::GetTypeId (void)
Expand All @@ -213,6 +214,14 @@ TcpSocketState::GetTypeId (void)
.SetParent<Object> ()
.SetGroupName ("Internet")
.AddConstructor <TcpSocketState> ()
.AddAttribute ("EnablePacing", "Enable Pacing",
BooleanValue (false),
MakeBooleanAccessor (&TcpSocketState::m_pacing),
MakeBooleanChecker ())
.AddAttribute ("MaxPacingRate", "Set Max Pacing Rate",
DataRateValue (DataRate ("4Gb/s")),
MakeDataRateAccessor (&TcpSocketState::m_maxPacingRate),
MakeDataRateChecker ())
.AddTraceSource ("CongestionWindow",
"The TCP connection's congestion window",
MakeTraceSourceAccessor (&TcpSocketState::m_cWnd),
Expand Down Expand Up @@ -250,7 +259,10 @@ TcpSocketState::TcpSocketState (void)
// Change m_nextTxSequence for non-zero initial sequence number
m_nextTxSequence (0),
m_rcvTimestampValue (0),
m_rcvTimestampEchoReply (0)
m_rcvTimestampEchoReply (0),
m_pacing (false),
m_maxPacingRate (0),
m_currentPacingRate (0)
{
}

Expand All @@ -266,7 +278,10 @@ TcpSocketState::TcpSocketState (const TcpSocketState &other)
m_highTxMark (other.m_highTxMark),
m_nextTxSequence (other.m_nextTxSequence),
m_rcvTimestampValue (other.m_rcvTimestampValue),
m_rcvTimestampEchoReply (other.m_rcvTimestampEchoReply)
m_rcvTimestampEchoReply (other.m_rcvTimestampEchoReply),
m_pacing (other.m_pacing),
m_maxPacingRate (other.m_maxPacingRate),
m_currentPacingRate (other.m_currentPacingRate)
{
}

Expand Down Expand Up @@ -332,13 +347,17 @@ TcpSocketBase::TcpSocketBase (void)
m_retxThresh (3),
m_limitedTx (false),
m_congestionControl (0),
m_isFirstPartialAck (true)
m_isFirstPartialAck (true),
m_pacingTimer (Timer::REMOVE_ON_DESTROY)
{
NS_LOG_FUNCTION (this);
m_rxBuffer = CreateObject<TcpRxBuffer> ();
m_txBuffer = CreateObject<TcpTxBuffer> ();
m_tcb = CreateObject<TcpSocketState> ();

m_tcb->m_currentPacingRate = m_tcb->m_maxPacingRate;
m_pacingTimer.SetFunction (&TcpSocketBase::NotifyPacingPerformed, this);

bool ok;

ok = m_tcb->TraceConnectWithoutContext ("CongestionWindow",
Expand Down Expand Up @@ -409,7 +428,8 @@ TcpSocketBase::TcpSocketBase (const TcpSocketBase& sock)
m_limitedTx (sock.m_limitedTx),
m_isFirstPartialAck (sock.m_isFirstPartialAck),
m_txTrace (sock.m_txTrace),
m_rxTrace (sock.m_rxTrace)
m_rxTrace (sock.m_rxTrace),
m_pacingTimer (Timer::REMOVE_ON_DESTROY)
{
NS_LOG_FUNCTION (this);
NS_LOG_LOGIC ("Invoked the copy constructor");
Expand All @@ -429,6 +449,10 @@ TcpSocketBase::TcpSocketBase (const TcpSocketBase& sock)
m_txBuffer = CopyObject (sock.m_txBuffer);
m_rxBuffer = CopyObject (sock.m_rxBuffer);
m_tcb = CopyObject (sock.m_tcb);

m_tcb->m_currentPacingRate = m_tcb->m_maxPacingRate;
m_pacingTimer.SetFunction (&TcpSocketBase::NotifyPacingPerformed, this);

if (sock.m_congestionControl)
{
m_congestionControl = sock.m_congestionControl->Fork ();
Expand Down Expand Up @@ -2663,6 +2687,22 @@ TcpSocketBase::SendDataPacket (SequenceNumber32 seq, uint32_t maxSize, bool with
uint8_t flags = withAck ? TcpHeader::ACK : 0;
uint32_t remainingData = m_txBuffer->SizeFromSequence (seq + SequenceNumber32 (sz));
if (m_tcb->m_pacing)
{
NS_LOG_DEBUG ("Pacing is enabled");
if (m_pacingTimer.IsExpired ())
{
NS_LOG_DEBUG ("Current Pacing Rate " << m_tcb->m_currentPacingRate);
NS_LOG_DEBUG ("Timer is in expired state, activate it " << m_tcb->m_currentPacingRate.CalculateBytesTxTime (sz));
m_pacingTimer.Schedule(m_tcb->m_currentPacingRate.CalculateBytesTxTime (sz));
}
else
{
NS_LOG_DEBUG ("Timer is already in running state");
}
}
if (withAck)
{
m_delAckEvent.Cancel ();
Expand Down Expand Up @@ -2834,6 +2874,17 @@ TcpSocketBase::SendPendingData (bool withAck)
// else branch to control silly window syndrome and Nagle)
while (availableWindow > 0)
{
if (m_tcb->m_pacing)
{
NS_LOG_DEBUG ("Pacing is enabled");
if (m_pacingTimer.IsRunning ())
{
NS_LOG_INFO ("Skipping Packet due to pacing" << m_pacingTimer.GetDelayLeft ());
break;
}
NS_LOG_DEBUG ("Timer is not running");
}
if (m_tcb->m_congState == TcpSocketState::CA_OPEN
&& m_state == TcpSocket::FIN_WAIT_1)
{
Expand Down Expand Up @@ -2902,8 +2953,18 @@ TcpSocketBase::SendPendingData (bool withAck)
" total unAck: " << UnAckDataCount () <<
" sent seq " << m_tcb->m_nextTxSequence <<
" size " << sz);
++nPacketsSent;
if (m_tcb->m_pacing)
{
NS_LOG_DEBUG ("Pacing is enabled");
if (m_pacingTimer.IsExpired ())
{
NS_LOG_DEBUG ("Current Pacing Rate " << m_tcb->m_currentPacingRate);
NS_LOG_DEBUG ("Timer is in expired state, activate it " << m_tcb->m_currentPacingRate.CalculateBytesTxTime (sz));
m_pacingTimer.Schedule(m_tcb->m_currentPacingRate.CalculateBytesTxTime (sz));
break;
}
}
}
// (C.4) The estimate of the amount of data outstanding in the
Expand Down Expand Up @@ -3218,9 +3279,12 @@ TcpSocketBase::ReTxTimeout ()
{
return;
}

NS_LOG_DEBUG ("Checking if Connection is Established");
// If all data are received (non-closing socket and nothing to send), just return
if (m_state <= ESTABLISHED && m_txBuffer->HeadSequence () >= m_tcb->m_highTxMark)
if (m_state <= ESTABLISHED && m_txBuffer->HeadSequence () >= m_tcb->m_highTxMark && m_txBuffer->Size () == 0)
{
NS_LOG_DEBUG ("Already Sent full data" << m_txBuffer->HeadSequence () << " " << m_tcb->m_highTxMark);
return;
}

Expand Down Expand Up @@ -3287,6 +3351,8 @@ TcpSocketBase::ReTxTimeout ()
m_congestionControl->CongestionStateSet (m_tcb, TcpSocketState::CA_LOSS);
m_tcb->m_congState = TcpSocketState::CA_LOSS;

m_pacingTimer.Cancel ();

NS_LOG_DEBUG ("RTO. Reset cwnd to " << m_tcb->m_cWnd << ", ssthresh to " <<
m_tcb->m_ssThresh << ", restart from seqnum " <<
m_txBuffer->HeadSequence () << " doubled rto to " <<
Expand Down Expand Up @@ -3415,6 +3481,14 @@ TcpSocketBase::DoRetransmit ()
m_tcb->m_nextTxSequence = m_txBuffer->HeadSequence ();
uint32_t sz = SendDataPacket (m_txBuffer->HeadSequence (), m_tcb->m_segmentSize, true);

if (m_tcb->m_pacing && m_tcb->m_congState == TcpSocketState::CA_LOSS)
{
m_pacingTimer.Cancel ();
NS_LOG_DEBUG ("RTO Current Pacing Rate " << m_tcb->m_currentPacingRate);
NS_LOG_DEBUG ("RTO Timer is in expired state, activate it " << m_tcb->m_currentPacingRate.CalculateBytesTxTime (sz));
m_pacingTimer.Schedule(m_tcb->m_currentPacingRate.CalculateBytesTxTime (sz));
}

// In case of RTO, advance m_tcb->m_nextTxSequence
if (oldSequence == m_tcb->m_nextTxSequence.Get ())
{
Expand All @@ -3437,6 +3511,7 @@ TcpSocketBase::CancelAllTimers ()
m_lastAckEvent.Cancel ();
m_timewaitEvent.Cancel ();
m_sendPendingDataEvent.Cancel ();
m_pacingTimer.Cancel ();
}

/* Move TCP to Time_Wait state and schedule a transition to Closed state */
Expand Down Expand Up @@ -3930,6 +4005,15 @@ TcpSocketBase::SafeSubtraction (uint32_t a, uint32_t b)
return 0;
}

void
TcpSocketBase::NotifyPacingPerformed (void)
{
NS_LOG_FUNCTION_NOARGS ();
NS_LOG_INFO ("Performing Pacing");
SendPendingData (m_connected);
}


//RttHistory methods
RttHistory::RttHistory (SequenceNumber32 s, uint32_t c, Time t)
: seq (s),
Expand Down
Loading

0 comments on commit b179633

Please sign in to comment.