-
Notifications
You must be signed in to change notification settings - Fork 50
Add support for CRON syntax for schedules #44
Conversation
e361f3b
to
3d241e4
Compare
bcb256d
to
5c6d666
Compare
305231d
to
067cea2
Compare
is there a release timeline for this feature? @rouzwawi |
Does this mean that a workflow can be scheduled to run once a minute? |
@danielnorberg yes. I was thinking of adding a lower limit on the schedule interval. Running a new pod every minute can definitely cause issues. |
@luster We're looking into merging this soon. Rough guess would around a week from now. |
2a05a2d
to
ba81f4a
Compare
Would it be relevant to consider the smearing issue in this PR as well? |
c8bd435
to
e8917d3
Compare
@danielnorberg I would prefer to do it in a separate PR, as I don't think it should be implemented on the triggering layer. See my comment in the issue. |
Uses default offsets for well known cron specifications such as HOURLY, DAILY, etc. This will preserve the current behaviour of running one period later than the time in the instance parameter.
} | ||
|
||
@Override | ||
public Map<Workflow, Optional<Instant>> workflowsWithNextNaturalTrigger() | ||
throws IOException { | ||
public void updateNextNaturalTriggerOld(WorkflowId workflowId, Instant instant) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems this method is not used anywhere. Still keeping it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's used from a test that simulates the storage condition before migrating to this. Will add a VisibleForTesting annotation.
throws IOException { | ||
Map<Workflow, Optional<Instant>> map = Maps.newHashMap(); | ||
public Map<Workflow, TriggerInstantSpec> workflowsWithNextNaturalTrigger() throws IOException { | ||
Map<Workflow, TriggerInstantSpec> map = Maps.newHashMap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
|
||
map.put(workflow, TriggerInstantSpec.create(instant, triggerInstant)); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
empty line
@@ -265,16 +286,31 @@ public void updateNextNaturalTrigger(WorkflowId workflowId, Instant nextNaturalT | |||
final Entity entity = result.next(); | |||
Workflow workflow; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
Instant instant = datetimeToInstant(entity.getDateTime(PROPERTY_NEXT_NATURAL_TRIGGER)); | ||
Instant triggerInstant; | ||
|
||
// todo: this check is only needed during a transition period |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you like to perform a data migration by adding a new property now? Or let it migrate itself when updating natural trigger?
|
||
if (entity.contains(PROPERTY_NEXT_NATURAL_TRIGGER)) { | ||
Instant instant = datetimeToInstant(entity.getDateTime(PROPERTY_NEXT_NATURAL_TRIGGER)); | ||
Instant triggerInstant; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
|
||
private ParameterUtil() { | ||
} | ||
|
||
private static final int MIN_YEAR_WIDTH = 4; | ||
private static final int MAX_YEAR_WIDTH = 10; | ||
|
||
private static final DateTimeFormatter DATE_HOUR_FORMAT = new DateTimeFormatterBuilder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This stack alike builder is not quite readable. Hopefully we will never need to change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😱
@@ -153,18 +153,18 @@ public void setUp() throws Exception { | |||
storage.storeWorkflow(Workflow.create( | |||
BACKFILL_1.workflowId().componentId(), URI.create("http://example.com"), | |||
WorkflowConfiguration.create(BACKFILL_1.workflowId().id(), Schedule.HOURS, | |||
Optional.empty(), Optional.empty(), Optional.empty(), | |||
Optional.empty(), Collections.emptyList()))); | |||
empty(), empty(), empty(), empty(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's an argument for using builders somewhere in these lines ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes :) I wonder if there's a good library for that ;)
} | ||
|
||
public enum WellKnown { | ||
HOURLY, DAILY, WEEKLY, MONTHLY, YEARLY, UNKNOWN |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WellKnown.UNKNOWN
😄
final Schedule.WellKnown wellKnown = schedule.wellKnown(); | ||
|
||
Matcher matcher; | ||
ZonedDateTime parsed; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
previous = storage.workflow(workflow.id()); | ||
storage.storeWorkflow(workflow); | ||
} catch (IOException e) { | ||
throw Throwables.propagate(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@danielnorberg found this https://github.com/google/guava/wiki/Why-we-deprecated-Throwables.propagate, so I would suggest for new code we don't use propagate
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good read. I had started questioning the usefulness of Throwables.propagate.
} | ||
|
||
@Deprecated | ||
public void updateNextNaturalTrigger(WorkflowId workflowId, Instant instant) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. Do you still need this method?
|
||
final Consumer<Workflow> workflowChangeListener = workflowChanged(workflowCache, storage, | ||
stats, stateManager, time); | ||
final WorkflowInitializer workflowInitializer = new WorkflowInitializer(storage, time); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inline this maybe as it's only used by workflowChangeListener
? or move it closer to where it is used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would just push down more of the dependencies (storage, time) into the workflowChanged
handler which does not use them at the moment. So it's a net +1 dependency. I'll keep it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't mean that change. Just
final Consumer<Workflow> workflowChangeListener =
workflowChanged(workflowCache, new WorkflowInitializer(storage, time), stats, stateManager);
But it's OK.
A few minor comments. |
public abstract Instant instant(); | ||
|
||
/** | ||
* The actual instant at which the Workflow will be instantiated, with respect to the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/instantiated/triggered ?
LGTM overall 👍 |
@honnix @danielnorberg Fixed review comments |
todo:
Partitioning
Partitioning
enum valuesoffset
field supporting ISO 8601 Durations for offsetting trigger timeparameter
generation for general CRON Partitioningfixes #37