Skip to content

Commit

Permalink
Fixed replay SynchronizationMode
Browse files Browse the repository at this point in the history
Now the replay respects original query waits
  • Loading branch information
spaghettidba committed Apr 10, 2020
1 parent 4caa7bc commit e921fed
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 61 deletions.
4 changes: 2 additions & 2 deletions SharedAssemblyInfo.cs
Expand Up @@ -29,5 +29,5 @@
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.5.4")]
[assembly: AssemblyFileVersion("1.5.4")]
[assembly: AssemblyVersion("1.5.5")]
[assembly: AssemblyFileVersion("1.5.5")]
2 changes: 1 addition & 1 deletion WorkloadTools/Consumer/BufferedWorkloadConsumer.cs
Expand Up @@ -16,7 +16,7 @@ public abstract class BufferedWorkloadConsumer : WorkloadConsumer

private SpinWait spin = new SpinWait();

public int BufferSize { get; set; } = 10000;
public int BufferSize { get; set; } = 1000;

public override sealed void Consume(WorkloadEvent evt)
{
Expand Down
2 changes: 1 addition & 1 deletion WorkloadTools/Consumer/Replay/ReplayCommand.cs
Expand Up @@ -10,6 +10,6 @@ public class ReplayCommand
public string CommandText { get; set; }
public string Database { get; set; }
public string ApplicationName { get; set; }
public int BeforeSleepMilliseconds { get; set; } = 0;
public long ReplayOffset { get; set; } = 0; // milliseconds
}
}
15 changes: 11 additions & 4 deletions WorkloadTools/Consumer/Replay/ReplayConsumer.cs
Expand Up @@ -16,7 +16,7 @@ public class ReplayConsumer : BufferedWorkloadConsumer

private SpinWait spin = new SpinWait();
public int ThreadLimit = 32;
public int InactiveWorkerTerminationTimeoutSeconds = 15;
public int InactiveWorkerTerminationTimeoutSeconds = 300;
private Semaphore WorkLimiter;

public bool DisplayWorkerStats { get; set; } = true;
Expand All @@ -35,6 +35,7 @@ public class ReplayConsumer : BufferedWorkloadConsumer
private Thread sweeper;

private long eventCount;
private DateTime startTime = DateTime.MinValue;

public enum SynchronizationModeEnum
{
Expand Down Expand Up @@ -64,14 +65,19 @@ public override void ConsumeBuffered(WorkloadEvent evnt)
logger.Info($"{eventCount} events queued for replay");
}

if (startTime == DateTime.MinValue)
{
startTime = DateTime.Now;
}

ExecutionWorkloadEvent evt = (ExecutionWorkloadEvent)evnt;

ReplayCommand command = new ReplayCommand()
{
CommandText = evt.Text,
Database = evt.DatabaseName,
ApplicationName = evt.ApplicationName,
BeforeSleepMilliseconds = evt.ReplaySleepMilliseconds
ReplayOffset = evt.ReplayOffset
};

int session_id = -1;
Expand Down Expand Up @@ -100,7 +106,8 @@ public override void ConsumeBuffered(WorkloadEvent evnt)
QueryTimeoutSeconds = this.QueryTimeoutSeconds,
WorkerStatsCommandCount = this.WorkerStatsCommandCount,
MimicApplicationName = this.MimicApplicationName,
DatabaseMap = this.DatabaseMap
DatabaseMap = this.DatabaseMap,
StartTime = startTime
};
ReplayWorkers.TryAdd(session_id, rw);
rw.AppendCommand(command);
Expand All @@ -120,7 +127,7 @@ public override void ConsumeBuffered(WorkloadEvent evnt)
}
catch (Exception e)
{
try { logger.Error(e, "Unhandled exception in TraceManager.RunWorkers"); }
try { logger.Error(e, "Unhandled exception in ReplayConsumer.RunWorkers"); }
catch { Console.WriteLine(e.Message); }
}
}
Expand Down
34 changes: 25 additions & 9 deletions WorkloadTools/Consumer/Replay/ReplayWorker.cs
Expand Up @@ -32,6 +32,8 @@ class ReplayWorker : IDisposable
public int SPID { get; set; }
public bool IsRunning { get; private set; } = false;

public DateTime StartTime { get; set; }

public Dictionary<string, string> DatabaseMap { get; set; } = new Dictionary<string, string>();

private Task runner = null;
Expand Down Expand Up @@ -254,19 +256,33 @@ public void ExecuteCommand(ReplayCommand command)
conn.ChangeDatabase(command.Database);
}

// if the command comes with a replay sleep, do it now
// the amount of milliseconds to sleep is set in
// if the command comes with a replay offset, do it now
// the offset in milliseconds is set in
// FileWorkloadListener
// The other listeners do not set this value, as they
// already come with the original timing
//
// Don't remove the IF test: even Sleep(0) can end up
// sleeping for 10ms or more. Sleep guarantees that
// the current thread sleeps for AT LEAST the amount
// of milliseconds set.
if (command.BeforeSleepMilliseconds > 2)
if (command.ReplayOffset > 0)
{
Thread.Sleep(command.BeforeSleepMilliseconds);
// I am using 7 here as an average compensation for sleep
// fluctuations due to Windows preemptive scheduling
while((DateTime.Now - StartTime).TotalMilliseconds < command.ReplayOffset - 7)
{
// Thread.Sleep will not sleep exactly 1 millisecond.
// It will yield the current thread and put it back
// in the runnable queue once the sleep delay has expired.
// This means that the actual sleep time before the
// current thread gains back control can be much higher
// (15 milliseconds or more)
// However we do not need to be super precise here:
// each command has a requested offset from the beginning
// of the workload and this class does its best to respect it.
// If the previous commands take longer in the target environment
// the offset cannot be respected and the command will execute
// without further waits, but there is no way to recover
// the delay that has built up to that point.
Thread.Sleep(1);
}

}

using (SqlCommand cmd = new SqlCommand(command.CommandText))
Expand Down
4 changes: 3 additions & 1 deletion WorkloadTools/ExecutionWorkloadEvent.cs
Expand Up @@ -19,6 +19,8 @@ public class ExecutionWorkloadEvent : WorkloadEvent
public long? CPU { get; set; } // MICROSECONDS
public long? Duration { get; set; } // MICROSECONDS
public long? EventSequence { get; set; }
public int ReplaySleepMilliseconds { get; set; } = 0; // MILLISECONDS
// This is the requested offset in milliseconds
// from the the beginning of the workload
public long ReplayOffset { get; set; } = 0; // MILLISECONDS
}
}
57 changes: 14 additions & 43 deletions WorkloadTools/Listener/File/FileWorkloadListener.cs
Expand Up @@ -21,15 +21,12 @@ public class FileWorkloadListener : WorkloadListener
// after another without waiting
public bool SynchronizationMode { get; set; } = true;

private DateTime startDate = DateTime.MinValue;
private DateTime startTime = DateTime.MinValue;
private long totalEvents;
private SQLiteConnection conn;
private SQLiteDataReader reader;
private string connectionString;

private Dictionary<int, DateTime> lastEventEndTime = new Dictionary<int, DateTime>();


public FileWorkloadListener() : base()
{
Filter = new FileEventFilter();
Expand Down Expand Up @@ -137,36 +134,10 @@ private long ValidateFile()
}


private DateTime GetLastEventEndTime(int spid)
{
DateTime result = DateTime.MinValue;
if (lastEventEndTime.TryGetValue(spid, out result))
{
return result;
}
else
{
return DateTime.MinValue;
}
}

private void SetLastEventEndTime(int spid, DateTime endTime)
{
if (lastEventEndTime.ContainsKey(spid))
{
lastEventEndTime[spid] = endTime;
}
else
{
lastEventEndTime.Add(spid, endTime);
}

}

public override WorkloadEvent Read()
{
WorkloadEvent result = null;
double msSleep = 0;
long commandOffset = 0;

try
{
Expand Down Expand Up @@ -197,25 +168,25 @@ public override WorkloadEvent Read()
ExecutionWorkloadEvent execEvent = result as ExecutionWorkloadEvent;
if (SynchronizationMode)
{
DateTime lastEndTime = GetLastEventEndTime((int)execEvent.SPID);
if (lastEndTime != DateTime.MinValue)
if (startTime != DateTime.MinValue)
{
msSleep = (result.StartTime - lastEndTime).TotalMilliseconds;
if (msSleep > 0)
commandOffset = (long)((result.StartTime - startTime).TotalMilliseconds);
if (commandOffset > 0)
{
if (msSleep > Int32.MaxValue)
{
msSleep = Int32.MaxValue;
}
Thread.Sleep(Convert.ToInt32(msSleep));
execEvent.ReplayOffset = commandOffset;
}
}

SetLastEventEndTime((int)execEvent.SPID, execEvent.StartTime.AddMilliseconds((double)execEvent.Duration / 1000));
else
{
startTime = execEvent.StartTime;
}
}
else
{
execEvent.ReplaySleepMilliseconds = 0;
// Leave it at 0. The replay consumer will interpret this
// as "do not wait for the requested offset" and will replay
// the event without waiting
execEvent.ReplayOffset = 0;
}
}
// Filter events
Expand Down

0 comments on commit e921fed

Please sign in to comment.