Skip to content

Commit

Permalink
fixes #378 batch trigger acquisition logic to prevent early firing of…
Browse files Browse the repository at this point in the history
… trigger
  • Loading branch information
lahma committed Jun 5, 2016
1 parent 86d6761 commit d25a5fe
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 53 deletions.
87 changes: 45 additions & 42 deletions src/Quartz.Tests.Unit/Simpl/RAMJobStoreTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void SetUp()
fJobStore.Initialize(null, fSignaler);
fJobStore.SchedulerStarted();

fJobDetail = new JobDetailImpl("job1", "jobGroup1", typeof (NoOpJob));
fJobDetail = new JobDetailImpl("job1", "jobGroup1", typeof(NoOpJob));
fJobDetail.Durable = true;
fJobStore.StoreJob(fJobDetail, false);
}
Expand Down Expand Up @@ -81,7 +81,6 @@ public void TestAcquireNextTrigger()
Assert.AreEqual(trigger1, fJobStore.AcquireNextTriggers(firstFireTime.AddSeconds(10), 1, TimeSpan.Zero)[0]);
Assert.AreEqual(0, fJobStore.AcquireNextTriggers(firstFireTime.AddSeconds(10), 1, TimeSpan.Zero).Count);


// release trigger3
fJobStore.ReleaseAcquiredTrigger(trigger3);
Assert.AreEqual(trigger3, fJobStore.AcquireNextTriggers(firstFireTime.AddSeconds(10), 1, TimeSpan.FromMilliseconds(1))[0]);
Expand All @@ -90,16 +89,18 @@ public void TestAcquireNextTrigger()
[Test]
public void TestAcquireNextTriggerBatch()
{
DateTimeOffset d = DateBuilder.EvenMinuteDateAfterNow();
DateTimeOffset d = DateTimeOffset.UtcNow.Subtract(TimeSpan.FromSeconds(1));

IOperableTrigger early = new SimpleTriggerImpl("early", "triggerGroup1", fJobDetail.Name, fJobDetail.Group, d, d.AddMilliseconds(5), 2, TimeSpan.FromSeconds(2));
IOperableTrigger trigger1 = new SimpleTriggerImpl("trigger1", "triggerGroup1", fJobDetail.Name, fJobDetail.Group, d.AddMilliseconds(200000), d.AddMilliseconds(200005), 2, TimeSpan.FromSeconds(2));
IOperableTrigger trigger2 = new SimpleTriggerImpl("trigger2", "triggerGroup1", fJobDetail.Name, fJobDetail.Group, d.AddMilliseconds(200100), d.AddMilliseconds(200105), 2, TimeSpan.FromSeconds(2));
IOperableTrigger trigger3 = new SimpleTriggerImpl("trigger3", "triggerGroup1", fJobDetail.Name, fJobDetail.Group, d.AddMilliseconds(200200), d.AddMilliseconds(200205), 2, TimeSpan.FromSeconds(2));
IOperableTrigger trigger4 = new SimpleTriggerImpl("trigger4", "triggerGroup1", fJobDetail.Name, fJobDetail.Group, d.AddMilliseconds(200300), d.AddMilliseconds(200305), 2, TimeSpan.FromSeconds(2));
IOperableTrigger trigger2 = new SimpleTriggerImpl("trigger2", "triggerGroup1", fJobDetail.Name, fJobDetail.Group, d.AddMilliseconds(210000), d.AddMilliseconds(210005), 2, TimeSpan.FromSeconds(2));
IOperableTrigger trigger3 = new SimpleTriggerImpl("trigger3", "triggerGroup1", fJobDetail.Name, fJobDetail.Group, d.AddMilliseconds(220000), d.AddMilliseconds(220005), 2, TimeSpan.FromSeconds(2));
IOperableTrigger trigger4 = new SimpleTriggerImpl("trigger4", "triggerGroup1", fJobDetail.Name, fJobDetail.Group, d.AddMilliseconds(230000), d.AddMilliseconds(230005), 2, TimeSpan.FromSeconds(2));
IOperableTrigger trigger10 = new SimpleTriggerImpl("trigger10", "triggerGroup2", fJobDetail.Name, fJobDetail.Group, d.AddMilliseconds(500000), d.AddMilliseconds(700000), 2, TimeSpan.FromSeconds(2));

early.ComputeFirstFireTimeUtc(null);
early.MisfireInstruction = MisfireInstruction.IgnoreMisfirePolicy;

trigger1.ComputeFirstFireTimeUtc(null);
trigger2.ComputeFirstFireTimeUtc(null);
trigger3.ComputeFirstFireTimeUtc(null);
Expand All @@ -115,61 +116,63 @@ public void TestAcquireNextTriggerBatch()
DateTimeOffset firstFireTime = trigger1.GetNextFireTimeUtc().Value;

IList<IOperableTrigger> acquiredTriggers = fJobStore.AcquireNextTriggers(firstFireTime.AddSeconds(10), 4, TimeSpan.FromSeconds(1));
Assert.AreEqual(4, acquiredTriggers.Count);
Assert.AreEqual(1, acquiredTriggers.Count);
Assert.AreEqual(early.Key, acquiredTriggers[0].Key);
Assert.AreEqual(trigger1.Key, acquiredTriggers[1].Key);
Assert.AreEqual(trigger2.Key, acquiredTriggers[2].Key);
Assert.AreEqual(trigger3.Key, acquiredTriggers[3].Key);
fJobStore.ReleaseAcquiredTrigger(early);
fJobStore.ReleaseAcquiredTrigger(trigger1);
fJobStore.ReleaseAcquiredTrigger(trigger2);
fJobStore.ReleaseAcquiredTrigger(trigger3);

acquiredTriggers = this.fJobStore.AcquireNextTriggers(firstFireTime.AddSeconds(10), 5, TimeSpan.FromMilliseconds(1000));
Assert.AreEqual(5, acquiredTriggers.Count);

acquiredTriggers = fJobStore.AcquireNextTriggers(firstFireTime.AddSeconds(10), 4, TimeSpan.FromMilliseconds(205000));
Assert.AreEqual(2, acquiredTriggers.Count);
Assert.AreEqual(early.Key, acquiredTriggers[0].Key);
Assert.AreEqual(trigger1.Key, acquiredTriggers[1].Key);
Assert.AreEqual(trigger2.Key, acquiredTriggers[2].Key);
Assert.AreEqual(trigger3.Key, acquiredTriggers[3].Key);
Assert.AreEqual(trigger4.Key, acquiredTriggers[4].Key);
fJobStore.ReleaseAcquiredTrigger(early);
fJobStore.ReleaseAcquiredTrigger(trigger1);

fJobStore.RemoveTrigger(early.Key);

acquiredTriggers = fJobStore.AcquireNextTriggers(firstFireTime.AddSeconds(10), 5, TimeSpan.FromMilliseconds(100000));
Assert.AreEqual(4, acquiredTriggers.Count);
Assert.AreEqual(trigger1.Key, acquiredTriggers[0].Key);
Assert.AreEqual(trigger2.Key, acquiredTriggers[1].Key);
Assert.AreEqual(trigger3.Key, acquiredTriggers[2].Key);
Assert.AreEqual(trigger4.Key, acquiredTriggers[3].Key);
fJobStore.ReleaseAcquiredTrigger(trigger1);
fJobStore.ReleaseAcquiredTrigger(trigger2);
fJobStore.ReleaseAcquiredTrigger(trigger3);
fJobStore.ReleaseAcquiredTrigger(trigger4);

acquiredTriggers = fJobStore.AcquireNextTriggers(firstFireTime.AddSeconds(10), 6, TimeSpan.FromSeconds(1));
Assert.AreEqual(5, acquiredTriggers.Count);
Assert.AreEqual(early.Key, acquiredTriggers[0].Key);
Assert.AreEqual(trigger1.Key, acquiredTriggers[1].Key);
Assert.AreEqual(trigger2.Key, acquiredTriggers[2].Key);
Assert.AreEqual(trigger3.Key, acquiredTriggers[3].Key);
Assert.AreEqual(trigger4.Key, acquiredTriggers[4].Key);
fJobStore.ReleaseAcquiredTrigger(early);
acquiredTriggers = fJobStore.AcquireNextTriggers(firstFireTime.AddSeconds(10), 6, TimeSpan.FromMilliseconds(100000));

Assert.AreEqual(4, acquiredTriggers.Count);
Assert.AreEqual(trigger1.Key, acquiredTriggers[0].Key);
Assert.AreEqual(trigger2.Key, acquiredTriggers[1].Key);
Assert.AreEqual(trigger3.Key, acquiredTriggers[2].Key);
Assert.AreEqual(trigger4.Key, acquiredTriggers[3].Key);

fJobStore.ReleaseAcquiredTrigger(trigger1);
fJobStore.ReleaseAcquiredTrigger(trigger2);
fJobStore.ReleaseAcquiredTrigger(trigger3);
fJobStore.ReleaseAcquiredTrigger(trigger4);

acquiredTriggers = fJobStore.AcquireNextTriggers(firstFireTime.AddMilliseconds(1), 5, TimeSpan.Zero);
Assert.AreEqual(2, acquiredTriggers.Count);
fJobStore.ReleaseAcquiredTrigger(early);
fJobStore.ReleaseAcquiredTrigger(trigger1);
Assert.AreEqual(1, acquiredTriggers.Count);
Assert.AreEqual(trigger1.Key, acquiredTriggers[0].Key);

acquiredTriggers = fJobStore.AcquireNextTriggers(firstFireTime.AddMilliseconds(250), 5, TimeSpan.FromMilliseconds(199));
Assert.AreEqual(5, acquiredTriggers.Count);
fJobStore.ReleaseAcquiredTrigger(early);
fJobStore.ReleaseAcquiredTrigger(trigger1);
fJobStore.ReleaseAcquiredTrigger(trigger2);
fJobStore.ReleaseAcquiredTrigger(trigger3);
fJobStore.ReleaseAcquiredTrigger(trigger4);

acquiredTriggers = fJobStore.AcquireNextTriggers(firstFireTime.AddMilliseconds(150), 5, TimeSpan.FromMilliseconds(50L));
Assert.AreEqual(4, acquiredTriggers.Count);
acquiredTriggers = fJobStore.AcquireNextTriggers(firstFireTime.AddMilliseconds(250), 5, TimeSpan.FromMilliseconds(19999L));
Assert.AreEqual(2, acquiredTriggers.Count);
Assert.AreEqual(trigger1.Key, acquiredTriggers[0].Key);
Assert.AreEqual(trigger2.Key, acquiredTriggers[1].Key);

fJobStore.ReleaseAcquiredTrigger(early);
fJobStore.ReleaseAcquiredTrigger(trigger1);
fJobStore.ReleaseAcquiredTrigger(trigger2);
fJobStore.ReleaseAcquiredTrigger(trigger3);

acquiredTriggers = fJobStore.AcquireNextTriggers(firstFireTime.AddMilliseconds(150), 5, TimeSpan.FromMilliseconds(5000L));
Assert.AreEqual(1, acquiredTriggers.Count);
Assert.AreEqual(trigger1.Key, acquiredTriggers[0].Key);
fJobStore.ReleaseAcquiredTrigger(trigger1);
}

[Test]
Expand Down Expand Up @@ -214,7 +217,7 @@ public void TestStoreTriggerReplacesTrigger()
{
string jobName = "StoreTriggerReplacesTrigger";
string jobGroup = "StoreTriggerReplacesTriggerGroup";
JobDetailImpl detail = new JobDetailImpl(jobName, jobGroup, typeof (NoOpJob));
JobDetailImpl detail = new JobDetailImpl(jobName, jobGroup, typeof(NoOpJob));
fJobStore.StoreJob(detail, false);

string trName = "StoreTriggerReplacesTrigger";
Expand Down Expand Up @@ -249,12 +252,12 @@ public void PauseJobGroupPausesNewJob()
string jobName1 = "PauseJobGroupPausesNewJob";
string jobName2 = "PauseJobGroupPausesNewJob2";
string jobGroup = "PauseJobGroupPausesNewJobGroup";
JobDetailImpl detail = new JobDetailImpl(jobName1, jobGroup, typeof (NoOpJob));
JobDetailImpl detail = new JobDetailImpl(jobName1, jobGroup, typeof(NoOpJob));
detail.Durable = true;
fJobStore.StoreJob(detail, false);
fJobStore.PauseJobs(GroupMatcher<JobKey>.GroupEquals(jobGroup));

detail = new JobDetailImpl(jobName2, jobGroup, typeof (NoOpJob));
detail = new JobDetailImpl(jobName2, jobGroup, typeof(NoOpJob));
detail.Durable = true;
fJobStore.StoreJob(detail, false);

Expand Down
19 changes: 15 additions & 4 deletions src/Quartz/Impl/AdoJobStore/JobStoreSupport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2455,7 +2455,6 @@ protected virtual IList<IOperableTrigger> AcquireNextTrigger(ConnectionAndTransa
Collection.ISet<JobKey> acquiredJobKeysForNoConcurrentExec = new Collection.HashSet<JobKey>();
const int MaxDoLoopRetry = 3;
int currentLoopCount = 0;
DateTimeOffset? firstAcquiredTriggerFireTime = null;

do
{
Expand All @@ -2470,6 +2469,8 @@ protected virtual IList<IOperableTrigger> AcquireNextTrigger(ConnectionAndTransa
return acquiredTriggers;
}

DateTimeOffset batchEnd = noLaterThan;

foreach (TriggerKey triggerKey in keys)
{
// If our trigger is no longer available, try a new one.
Expand Down Expand Up @@ -2513,6 +2514,11 @@ protected virtual IList<IOperableTrigger> AcquireNextTrigger(ConnectionAndTransa
}
}

if (nextTrigger.GetNextFireTimeUtc() > batchEnd)
{
break;
}

// We now have a acquired trigger, let's add to return list.
// If our trigger was no longer in the expected state, try a new one.
int rowsUpdated = Delegate.UpdateTriggerStateFromOtherState(conn, triggerKey, StateAcquired, StateWaiting);
Expand All @@ -2524,11 +2530,16 @@ protected virtual IList<IOperableTrigger> AcquireNextTrigger(ConnectionAndTransa
nextTrigger.FireInstanceId = GetFiredTriggerRecordId();
Delegate.InsertFiredTrigger(conn, nextTrigger, StateAcquired, null);

acquiredTriggers.Add(nextTrigger);
if (firstAcquiredTriggerFireTime == null)
if (acquiredTriggers.Count == 0)
{
firstAcquiredTriggerFireTime = nextTrigger.GetNextFireTimeUtc();
var now = SystemTime.UtcNow();
var nextFireTime = nextTrigger.GetNextFireTimeUtc().GetValueOrDefault(DateTimeOffset.MinValue);
var max = now > nextFireTime ? now : nextFireTime;

batchEnd = max + timeWindow;
}

acquiredTriggers.Add(nextTrigger);
}

// if we didn't end up with any trigger to fire from that first
Expand Down
18 changes: 11 additions & 7 deletions src/Quartz/Simpl/RAMJobStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
using Common.Logging;

using Quartz.Collection;
using Quartz.Impl;
using Quartz.Impl.Matchers;
using Quartz.Spi;

Expand Down Expand Up @@ -1490,7 +1489,7 @@ public virtual IList<IOperableTrigger> AcquireNextTriggers(DateTimeOffset noLate
List<IOperableTrigger> result = new List<IOperableTrigger>();
Collection.ISet<JobKey> acquiredJobKeysForNoConcurrentExec = new Collection.HashSet<JobKey>();
Collection.ISet<TriggerWrapper> excludedTriggers = new Collection.HashSet<TriggerWrapper>();
DateTimeOffset? firstAcquiredTriggerFireTime = null;
DateTimeOffset batchEnd = noLaterThan;

// return empty list if store has no triggers.
if (timeTriggers.Count == 0)
Expand Down Expand Up @@ -1524,7 +1523,7 @@ public virtual IList<IOperableTrigger> AcquireNextTriggers(DateTimeOffset noLate
continue;
}

if (tw.trigger.GetNextFireTimeUtc() > noLaterThan + timeWindow)
if (tw.trigger.GetNextFireTimeUtc() > batchEnd)
{
timeTriggers.Add(tw);
break;
Expand All @@ -1549,14 +1548,19 @@ public virtual IList<IOperableTrigger> AcquireNextTriggers(DateTimeOffset noLate

tw.state = InternalTriggerState.Acquired;
tw.trigger.FireInstanceId = GetFiredTriggerRecordId();
IOperableTrigger trig = (IOperableTrigger)tw.trigger.Clone();
result.Add(trig);
IOperableTrigger trig = (IOperableTrigger) tw.trigger.Clone();

if (firstAcquiredTriggerFireTime == null)
if (result.Count == 0)
{
firstAcquiredTriggerFireTime = tw.trigger.GetNextFireTimeUtc();
var now = SystemTime.UtcNow();
var nextFireTime = tw.trigger.GetNextFireTimeUtc().GetValueOrDefault(DateTimeOffset.MinValue);
var max = now > nextFireTime ? now : nextFireTime;

batchEnd = max + timeWindow;
}

result.Add(trig);

if (result.Count == maxCount)
{
break;
Expand Down

0 comments on commit d25a5fe

Please sign in to comment.