From e921fedd2aa1db54a02f61ff9115a31c0d322805 Mon Sep 17 00:00:00 2001 From: Gianluca Sartori Date: Fri, 10 Apr 2020 19:04:30 +0200 Subject: [PATCH] Fixed replay SynchronizationMode Now the replay respects original query waits --- SharedAssemblyInfo.cs | 4 +- .../Consumer/BufferedWorkloadConsumer.cs | 2 +- .../Consumer/Replay/ReplayCommand.cs | 2 +- .../Consumer/Replay/ReplayConsumer.cs | 15 +++-- WorkloadTools/Consumer/Replay/ReplayWorker.cs | 34 ++++++++--- WorkloadTools/ExecutionWorkloadEvent.cs | 4 +- .../Listener/File/FileWorkloadListener.cs | 57 +++++-------------- 7 files changed, 57 insertions(+), 61 deletions(-) diff --git a/SharedAssemblyInfo.cs b/SharedAssemblyInfo.cs index b7a2f13..3f50077 100644 --- a/SharedAssemblyInfo.cs +++ b/SharedAssemblyInfo.cs @@ -29,5 +29,5 @@ // You can specify all the values or you can default the Build and Revision Numbers // by using the '*' as shown below: // [assembly: AssemblyVersion("1.0.*")] -[assembly: AssemblyVersion("1.5.4")] -[assembly: AssemblyFileVersion("1.5.4")] +[assembly: AssemblyVersion("1.5.5")] +[assembly: AssemblyFileVersion("1.5.5")] diff --git a/WorkloadTools/Consumer/BufferedWorkloadConsumer.cs b/WorkloadTools/Consumer/BufferedWorkloadConsumer.cs index 861eea1..d8fb39c 100644 --- a/WorkloadTools/Consumer/BufferedWorkloadConsumer.cs +++ b/WorkloadTools/Consumer/BufferedWorkloadConsumer.cs @@ -16,7 +16,7 @@ public abstract class BufferedWorkloadConsumer : WorkloadConsumer private SpinWait spin = new SpinWait(); - public int BufferSize { get; set; } = 10000; + public int BufferSize { get; set; } = 1000; public override sealed void Consume(WorkloadEvent evt) { diff --git a/WorkloadTools/Consumer/Replay/ReplayCommand.cs b/WorkloadTools/Consumer/Replay/ReplayCommand.cs index 037914d..6097402 100644 --- a/WorkloadTools/Consumer/Replay/ReplayCommand.cs +++ b/WorkloadTools/Consumer/Replay/ReplayCommand.cs @@ -10,6 +10,6 @@ public class ReplayCommand public string CommandText { get; set; } public string Database { get; set; } public string ApplicationName { get; set; } - public int BeforeSleepMilliseconds { get; set; } = 0; + public long ReplayOffset { get; set; } = 0; // milliseconds } } diff --git a/WorkloadTools/Consumer/Replay/ReplayConsumer.cs b/WorkloadTools/Consumer/Replay/ReplayConsumer.cs index 594c2e3..b5dd3bf 100644 --- a/WorkloadTools/Consumer/Replay/ReplayConsumer.cs +++ b/WorkloadTools/Consumer/Replay/ReplayConsumer.cs @@ -16,7 +16,7 @@ public class ReplayConsumer : BufferedWorkloadConsumer private SpinWait spin = new SpinWait(); public int ThreadLimit = 32; - public int InactiveWorkerTerminationTimeoutSeconds = 15; + public int InactiveWorkerTerminationTimeoutSeconds = 300; private Semaphore WorkLimiter; public bool DisplayWorkerStats { get; set; } = true; @@ -35,6 +35,7 @@ public class ReplayConsumer : BufferedWorkloadConsumer private Thread sweeper; private long eventCount; + private DateTime startTime = DateTime.MinValue; public enum SynchronizationModeEnum { @@ -64,6 +65,11 @@ public override void ConsumeBuffered(WorkloadEvent evnt) logger.Info($"{eventCount} events queued for replay"); } + if (startTime == DateTime.MinValue) + { + startTime = DateTime.Now; + } + ExecutionWorkloadEvent evt = (ExecutionWorkloadEvent)evnt; ReplayCommand command = new ReplayCommand() @@ -71,7 +77,7 @@ public override void ConsumeBuffered(WorkloadEvent evnt) CommandText = evt.Text, Database = evt.DatabaseName, ApplicationName = evt.ApplicationName, - BeforeSleepMilliseconds = evt.ReplaySleepMilliseconds + ReplayOffset = evt.ReplayOffset }; int session_id = -1; @@ -100,7 +106,8 @@ public override void ConsumeBuffered(WorkloadEvent evnt) QueryTimeoutSeconds = this.QueryTimeoutSeconds, WorkerStatsCommandCount = this.WorkerStatsCommandCount, MimicApplicationName = this.MimicApplicationName, - DatabaseMap = this.DatabaseMap + DatabaseMap = this.DatabaseMap, + StartTime = startTime }; ReplayWorkers.TryAdd(session_id, rw); rw.AppendCommand(command); @@ -120,7 +127,7 @@ public override void ConsumeBuffered(WorkloadEvent evnt) } catch (Exception e) { - try { logger.Error(e, "Unhandled exception in TraceManager.RunWorkers"); } + try { logger.Error(e, "Unhandled exception in ReplayConsumer.RunWorkers"); } catch { Console.WriteLine(e.Message); } } } diff --git a/WorkloadTools/Consumer/Replay/ReplayWorker.cs b/WorkloadTools/Consumer/Replay/ReplayWorker.cs index 9ab7a1f..cbd2714 100644 --- a/WorkloadTools/Consumer/Replay/ReplayWorker.cs +++ b/WorkloadTools/Consumer/Replay/ReplayWorker.cs @@ -32,6 +32,8 @@ class ReplayWorker : IDisposable public int SPID { get; set; } public bool IsRunning { get; private set; } = false; + public DateTime StartTime { get; set; } + public Dictionary DatabaseMap { get; set; } = new Dictionary(); private Task runner = null; @@ -254,19 +256,33 @@ public void ExecuteCommand(ReplayCommand command) conn.ChangeDatabase(command.Database); } - // if the command comes with a replay sleep, do it now - // the amount of milliseconds to sleep is set in + // if the command comes with a replay offset, do it now + // the offset in milliseconds is set in // FileWorkloadListener // The other listeners do not set this value, as they // already come with the original timing - // - // Don't remove the IF test: even Sleep(0) can end up - // sleeping for 10ms or more. Sleep guarantees that - // the current thread sleeps for AT LEAST the amount - // of milliseconds set. - if (command.BeforeSleepMilliseconds > 2) + if (command.ReplayOffset > 0) { - Thread.Sleep(command.BeforeSleepMilliseconds); + // I am using 7 here as an average compensation for sleep + // fluctuations due to Windows preemptive scheduling + while((DateTime.Now - StartTime).TotalMilliseconds < command.ReplayOffset - 7) + { + // Thread.Sleep will not sleep exactly 1 millisecond. + // It will yield the current thread and put it back + // in the runnable queue once the sleep delay has expired. + // This means that the actual sleep time before the + // current thread gains back control can be much higher + // (15 milliseconds or more) + // However we do not need to be super precise here: + // each command has a requested offset from the beginning + // of the workload and this class does its best to respect it. + // If the previous commands take longer in the target environment + // the offset cannot be respected and the command will execute + // without further waits, but there is no way to recover + // the delay that has built up to that point. + Thread.Sleep(1); + } + } using (SqlCommand cmd = new SqlCommand(command.CommandText)) diff --git a/WorkloadTools/ExecutionWorkloadEvent.cs b/WorkloadTools/ExecutionWorkloadEvent.cs index 0dcc561..9717548 100644 --- a/WorkloadTools/ExecutionWorkloadEvent.cs +++ b/WorkloadTools/ExecutionWorkloadEvent.cs @@ -19,6 +19,8 @@ public class ExecutionWorkloadEvent : WorkloadEvent public long? CPU { get; set; } // MICROSECONDS public long? Duration { get; set; } // MICROSECONDS public long? EventSequence { get; set; } - public int ReplaySleepMilliseconds { get; set; } = 0; // MILLISECONDS + // This is the requested offset in milliseconds + // from the the beginning of the workload + public long ReplayOffset { get; set; } = 0; // MILLISECONDS } } diff --git a/WorkloadTools/Listener/File/FileWorkloadListener.cs b/WorkloadTools/Listener/File/FileWorkloadListener.cs index 5c9a1ba..82ba6b5 100644 --- a/WorkloadTools/Listener/File/FileWorkloadListener.cs +++ b/WorkloadTools/Listener/File/FileWorkloadListener.cs @@ -21,15 +21,12 @@ public class FileWorkloadListener : WorkloadListener // after another without waiting public bool SynchronizationMode { get; set; } = true; - private DateTime startDate = DateTime.MinValue; + private DateTime startTime = DateTime.MinValue; private long totalEvents; private SQLiteConnection conn; private SQLiteDataReader reader; private string connectionString; - private Dictionary lastEventEndTime = new Dictionary(); - - public FileWorkloadListener() : base() { Filter = new FileEventFilter(); @@ -137,36 +134,10 @@ private long ValidateFile() } - private DateTime GetLastEventEndTime(int spid) - { - DateTime result = DateTime.MinValue; - if (lastEventEndTime.TryGetValue(spid, out result)) - { - return result; - } - else - { - return DateTime.MinValue; - } - } - - private void SetLastEventEndTime(int spid, DateTime endTime) - { - if (lastEventEndTime.ContainsKey(spid)) - { - lastEventEndTime[spid] = endTime; - } - else - { - lastEventEndTime.Add(spid, endTime); - } - - } - public override WorkloadEvent Read() { WorkloadEvent result = null; - double msSleep = 0; + long commandOffset = 0; try { @@ -197,25 +168,25 @@ public override WorkloadEvent Read() ExecutionWorkloadEvent execEvent = result as ExecutionWorkloadEvent; if (SynchronizationMode) { - DateTime lastEndTime = GetLastEventEndTime((int)execEvent.SPID); - if (lastEndTime != DateTime.MinValue) + if (startTime != DateTime.MinValue) { - msSleep = (result.StartTime - lastEndTime).TotalMilliseconds; - if (msSleep > 0) + commandOffset = (long)((result.StartTime - startTime).TotalMilliseconds); + if (commandOffset > 0) { - if (msSleep > Int32.MaxValue) - { - msSleep = Int32.MaxValue; - } - Thread.Sleep(Convert.ToInt32(msSleep)); + execEvent.ReplayOffset = commandOffset; } } - - SetLastEventEndTime((int)execEvent.SPID, execEvent.StartTime.AddMilliseconds((double)execEvent.Duration / 1000)); + else + { + startTime = execEvent.StartTime; + } } else { - execEvent.ReplaySleepMilliseconds = 0; + // Leave it at 0. The replay consumer will interpret this + // as "do not wait for the requested offset" and will replay + // the event without waiting + execEvent.ReplayOffset = 0; } } // Filter events