Permalink
Browse files

THRIFT-904: disable nagle and linger

git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1063966 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
1 parent e61fef3 commit 7070aaa23bce996f9c40f75903d72fe427072713 @tjake tjake committed Jan 27, 2011
View
@@ -17,7 +17,7 @@
# under the License.
#
-AC_PREREQ(2.65)
+#AC_PREREQ(2.65)
AC_INIT([thrift], [0.7.0-dev])
@@ -24,9 +24,12 @@ namespace Thrift.Transport
public class TFramedTransport : TTransport
{
protected TTransport transport = null;
- protected MemoryStream writeBuffer = new MemoryStream(1024);
+ protected MemoryStream writeBuffer;
protected MemoryStream readBuffer = null;
+ private const int header_size = 4;
+ private static byte[] header_dummy = new byte[header_size]; // used as header placeholder while initilizing new write buffer
+
public class Factory : TTransportFactory
{
public override TTransport GetTransport(TTransport trans)
@@ -35,7 +38,12 @@ public override TTransport GetTransport(TTransport trans)
}
}
- public TFramedTransport(TTransport transport)
+ public TFramedTransport()
+ {
+ InitWriteBuffer();
+ }
+
+ public TFramedTransport(TTransport transport) : this()
{
this.transport = transport;
}
@@ -77,8 +85,8 @@ public override int Read(byte[] buf, int off, int len)
private void ReadFrame()
{
- byte[] i32rd = new byte[4];
- transport.ReadAll(i32rd, 0, 4);
+ byte[] i32rd = new byte[header_size];
+ transport.ReadAll(i32rd, 0, header_size);
int size =
((i32rd[0] & 0xff) << 24) |
((i32rd[1] & 0xff) << 16) |
@@ -99,16 +107,31 @@ public override void Flush()
{
byte[] buf = writeBuffer.GetBuffer();
int len = (int)writeBuffer.Length;
- writeBuffer = new MemoryStream(writeBuffer.Capacity);
-
- byte[] i32out = new byte[4];
- i32out[0] = (byte)(0xff & (len >> 24));
- i32out[1] = (byte)(0xff & (len >> 16));
- i32out[2] = (byte)(0xff & (len >> 8));
- i32out[3] = (byte)(0xff & (len));
- transport.Write(i32out, 0, 4);
+ int data_len = len - header_size;
+ if ( data_len < 0 )
+ throw new System.InvalidOperationException (); // logic error actually
+
+ InitWriteBuffer();
+
+ // Inject message header into the reserved buffer space
+ buf[0] = (byte)(0xff & (data_len >> 24));
+ buf[1] = (byte)(0xff & (data_len >> 16));
+ buf[2] = (byte)(0xff & (data_len >> 8));
+ buf[3] = (byte)(0xff & (data_len));
+
+ // Send the entire message at once
transport.Write(buf, 0, len);
+
transport.Flush();
}
+
+ private void InitWriteBuffer ()
+ {
+ // Create new buffer instance
+ writeBuffer = new MemoryStream(1024);
+
+ // Reserve space for message header to be put right before sending it out
+ writeBuffer.Write ( header_dummy, 0, header_size );
+ }
}
}
@@ -39,7 +39,7 @@ public static void Execute(string[] args)
int port = 9090;
string url = null;
int numThreads = 1;
- bool buffered = false;
+ bool buffered = false, framed = false;
try
{
@@ -67,6 +67,11 @@ public static void Execute(string[] args)
buffered = true;
Console.WriteLine("Using buffered sockets");
}
+ else if (args[i] == "-f" || args[i] == "-framed")
+ {
+ framed = true;
+ Console.WriteLine("Using framed transport");
+ }
else if (args[i] == "-t")
{
numThreads = Convert.ToInt32(args[++i]);
@@ -89,16 +94,13 @@ public static void Execute(string[] args)
threads[test] = t;
if (url == null)
{
- TSocket socket = new TSocket(host, port);
+ TTransport trans = new TSocket(host, port);
if (buffered)
- {
- TBufferedTransport buffer = new TBufferedTransport(socket);
- t.Start(buffer);
- }
- else
- {
- t.Start(socket);
- }
+ trans = new TBufferedTransport(trans as TStreamTransport);
+ if (framed)
+ trans = new TFramedTransport(trans);
+
+ t.Start(trans);
}
else
{
@@ -428,6 +430,12 @@ public static void ClientTest(TTransport transport)
Console.WriteLine("Test Oneway(1)");
client.testOneway(1);
+
+ Console.Write("Test Calltime()");
+ var startt = DateTime.UtcNow;
+ for ( int k=0; k<1000; ++k )
+ client.testVoid();
+ Console.WriteLine(" = " + (DateTime.UtcNow - startt).TotalSeconds.ToString() + " ms a testVoid() call" );
}
}
}
@@ -301,15 +301,31 @@ public static void Execute(string[] args)
{
try
{
- bool useBufferedSockets = false;
+ bool useBufferedSockets = false, useFramed = false;
int port = 9090;
if (args.Length > 0)
{
port = int.Parse(args[0]);
if (args.Length > 1)
{
- bool.TryParse(args[1], out useBufferedSockets);
+ if ( args[1] == "raw" )
+ {
+ // as default
+ }
+ else if ( args[1] == "buffered" )
+ {
+ useBufferedSockets = true;
+ }
+ else if ( args[1] == "framed" )
+ {
+ useFramed = true;
+ }
+ else
+ {
+ // Fall back to the older boolean syntax
+ bool.TryParse(args[1], out useBufferedSockets);
+ }
}
}
@@ -320,10 +336,12 @@ public static void Execute(string[] args)
// Transport
TServerSocket tServerSocket = new TServerSocket(port, 0, useBufferedSockets);
- TServer serverEngine;
-
// Simple Server
- serverEngine = new TSimpleServer(testProcessor, tServerSocket);
+ TServer serverEngine;
+ if ( useFramed )
+ serverEngine = new TSimpleServer(testProcessor, tServerSocket, new TFramedTransport.Factory());
+ else
+ serverEngine = new TSimpleServer(testProcessor, tServerSocket);
// ThreadPool Server
// serverEngine = new TThreadPoolServer(testProcessor, tServerSocket);
@@ -334,7 +352,10 @@ public static void Execute(string[] args)
testHandler.server = serverEngine;
// Run it
- Console.WriteLine("Starting the server on port " + port + (useBufferedSockets ? " with buffered socket" : "") + "...");
+ Console.WriteLine("Starting the server on port " + port +
+ (useBufferedSockets ? " with buffered socket" : "") +
+ (useFramed ? " with framed transport" : "") +
+ "...");
serverEngine.Serve();
}

0 comments on commit 7070aaa

Please sign in to comment.