Permalink
Browse files

put back the initial connection delay

measure forced flushes
geometric distribution of requests
  • Loading branch information...
1 parent cf1ce73 commit 4affed0b96ebae7971a01be5861f227571d10c9f Maysam Yabandeh committed Apr 17, 2012
@@ -71,7 +71,8 @@
*/
static final int DB_SIZE = 20000000;
- private static final long PAUSE_LENGTH = 50; // in ms
+ //private static final long PAUSE_LENGTH = 50; // in ms
+ private static final long PAUSE_LENGTH = 50000; // in micro sec
/**
* Maximum number if outstanding message
@@ -164,11 +165,11 @@ public ClientHandler(Configuration conf, long nbMessage, int inflight, boolean p
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
super.channelConnected(ctx, e);
- //try {
- //Thread.sleep(15000);
- //} catch (InterruptedException e1) {
- ////ignore
- //}
+ try {
+ Thread.sleep(15000);
+ } catch (InterruptedException e1) {
+ //ignore
+ }
startDate = new Date();
channel = e.getChannel();
startTransaction();
@@ -367,6 +368,19 @@ private void sendCommitRequest(final long timestamp) {
}
}
+ //if (slowchance == -1) {
+ //slowchance = rnd.nextInt(10);
+ //if (slowchance == 0)
+ //System.out.println("I am slow");
+ //}
+
+ long randompausetime = pauseClient ? PAUSE_LENGTH : 0; //this is the average
+ double uniformrandom = rnd.nextDouble(); //[0,1)
+ //double geometricrandom = -1 * java.lang.Math.log(uniformrandom);
+ //randompausetime = (long) (randompausetime * geometricrandom);
+ randompausetime = (long) (randompausetime * 2 * uniformrandom);
+ //if (slowchance == 0)
+ //randompausetime = 1000 * randompausetime;
executor.schedule(new Runnable() {
@Override
public void run() {
@@ -380,10 +394,12 @@ public void run() {
e.printStackTrace();
}
}
- }, pauseClient ? PAUSE_LENGTH : 0, TimeUnit.MILLISECONDS);
+ }, randompausetime, TimeUnit.MICROSECONDS);
}
+ //static int slowchance = -1;
+
private static ScheduledExecutorService executor = Executors.newScheduledThreadPool(20);
private long totalCommitRequestSent;// just to keep the total number of
@@ -74,10 +74,13 @@ public void flush() {
flush(true, false);
}
+ //if deleteRef is false, the flush is forced due lack of space
private void flush(boolean deleteRef, final boolean clearPast) {
int readable = readBuffer.readableBytes() - readerIndex;
++_flushes;
+ if (!deleteRef) _forcedflushes++;
+
_flSize += readable;
if (readable == 0 && readingBuffer != pastBuffer) {
_emptyFlushes++;
@@ -360,6 +363,7 @@ private void nextBuffer() {
writeBuffer = currentBuffer.buffer;
}
+ static long _forcedflushes = 0;
static long _flushes = 0;
static long _flSize = 0;
@@ -58,6 +58,7 @@ public void run() {
long oldtotalwalkforput = CommitHashMap.gettotalwalkforput();
long oldfull = TSOMessageBuffer.itWasFull;
long oldflushes = TSOSharedMessageBuffer._flushes;
+ long oldforcedflushes = TSOSharedMessageBuffer._forcedflushes;
long oldflusheSize = TSOSharedMessageBuffer._flSize;
long oldwaited = TSOMessageBuffer.waited;
long old1B = TSOSharedMessageBuffer._1B;
@@ -93,6 +94,7 @@ public void run() {
long newfull = TSOMessageBuffer.itWasFull;
long newflushes = TSOSharedMessageBuffer._flushes;
+ long newforcedflushes = TSOSharedMessageBuffer._forcedflushes;
long newflusheSize = TSOSharedMessageBuffer._flSize;
long newwaited = TSOMessageBuffer.waited;
@@ -163,7 +165,7 @@ public void run() {
LOG.trace(String.format("SERVER: %4.3f TPS, %4.6f Abort/s "
+ "Co: %2.2f Ha: %2.2f Fa: %2.2f Li: %2.2f Avg commit: %2.4f Avg flush: %5.2f "
+ "Avg write: %5.2f Tot overflows: %d Tot flushes: %d Tot empty flu: %d "
- + "Queries: %d CurrentBuffers: %d ExtraGets: %d AskedTSO: %d",
+ + "Queries: %d CurrentBuffers: %d ExtraGets: %d AskedTSO: %d Tot fflushes: %d",
(newCounter - oldCounter) / (float)(endTime - startTime) * 1000,
(newAbortCount - oldAbortCount) / (float)(endTime - startTime) * 1000,
(newComs - oldComs) / (float)(newWrites - oldWrites) * 100,
@@ -180,7 +182,9 @@ public void run() {
newQueries - oldQueries,
TSOBuffer.nBuffers,
newExtraGetsPerformed - oldExtraGetsPerformed,
- newAskedTSO - oldAskedTSO)
+ newAskedTSO - oldAskedTSO,
+ (newforcedflushes - oldforcedflushes)
+ )
);
// if (TSOPipelineFactory.bwhandler != null) {
// TSOPipelineFactory.bwhandler.reset();
@@ -205,6 +209,7 @@ public void run() {
oldtotalwalkforput = newtotalwalkforput;
oldfull = newfull;
oldflushes = newflushes;
+ oldforcedflushes = newforcedflushes;
oldflusheSize = newflusheSize;
oldwaited = newwaited;
oldOverflow = newOverflow;

0 comments on commit 4affed0

Please sign in to comment.