Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Design: Async Activities, Timeout, and Heartbeat #60

Closed
thantos opened this issue Dec 2, 2022 · 18 comments
Closed

Design: Async Activities, Timeout, and Heartbeat #60

thantos opened this issue Dec 2, 2022 · 18 comments

Comments

@thantos
Copy link
Contributor

thantos commented Dec 2, 2022

Problem Statement:

As a developer/workflow author, I want to create activities that run for indefinite amounts of time, involve human interaction, invoke other services, or wait for the result of outside actions. I should be able to ensure inconsistencies and fault are recoverable from. I should be able to use the service to support idempotency of partial failures.

Stories:

  • Runtime Decision - a single activity can return either sync results or async results, decided at runtime.
  • Durable - async activities can be retried, can be given a timeout, and a heartbeat
  • Heartbeat - activities may be stuck before it's total timeout. Use a heartbeat to report that the processing is continuing to operate.
  • Complete By Client - Any service with permissions to write to the workflow should be able to complete an activity with data
  • Fail by Client - Any service with permissions to write to the workflow should be able to fail an activity with an error message
  • Consistent Token - Activities should generate a token used to complete, fail, or heartbeat them. That token should encode information for the workflow to interact with the right workflow and activity.
  • Checkpointing - future - - Activities may fail during processing, let the activity report a value on heart beat that is given back to the activity when the workflow is resumed.
  • Cancellation - ??

Strawman

workflow(() => {
	const result = await act1();
});

act1 = activity<{ result: string }>({
   heartbeat: { seconds: number },
   timeout: { seconds: number }
}, (context: Context): { result: string } | AsyncToken => {
   ...doSomeWork...

   await sendToQueue({ token: context.activity.token });

   return makeAsync();
})

// lambda function consuming the queue
const workflowClient = new WorkflowClient();
export const handler = async (event) => {
    Promise.all(event.Records.map(async (record) => {
        const payload = JSON.parse(record.body);
        
        // complete the activity with a payload
	    await workflowClient.completeActivity<typeof act1>(
		    payload.token,
		    { result: "done"}
		);

		// or fail
		await workflowClient.failActivity(payload.token, {result: "done"});
    }));
}

with heartbeat

workflow(() => {
	const result = await act1();
});

act1 = activity<{ result: string }>({
   heartbeat: { seconds: 20 },
   timeout: { seconds: 160 }
}, (context: Context): { result: string } | AsyncToken => {
   ...doSomeWork...

   await sendToQueue({ token: context.activity.token });

   await sendHeartbeat();

   return makeAsync();
})

// lambda function consuming the queue
const workflowClient = new WorkflowClient();
export const handler = async (event) => {
    Promise.all(event.Records.map(async (record) => {
        const payload = JSON.parse(record.body);

		while(true) {
		   // some long process
		   await workflowClient.heartbeatActivity(payload.token);
		}
        
        // complete the activity with a payload
	    await workflowClient.completeActivity<typeof act1>(
		    payload.token,
		    { result: "done"}
		);
    }));
}

with heartbeat checkpoint - FUTURE

workflow(() => {
	const act = act1();

    act.onHeartbeat(async ({ i: 100 }) => {
	    await reportProgress(i);
    });
});

const reportProceess = activity(...);

const act1 = activity<{ result: string }, { i: 100 } | undefined>({
   heartbeat: { seconds: 20 },
   timeout: { seconds: 160 }
}, (context: Context): { result: string } | AsyncToken => {
   ...doSomeWork...

   await sendToQueue({ token: context.activity.token, start: context.checkpoint });

	// should this be on the context to be typed?
   await sendHeartbeat();

   return makeAsync();
})

// lambda function consuming the queue
const workflowClient = new WorkflowClient();
export const handler = async (event) => {
    Promise.all(event.Records.map(async (record) => {
        const payload = JSON.parse(record.body);

		const items = [...];

		const start = event.start ?? 0;

		for(const i of items.slice()) {
		   // some long process
		   await workflowClient.heartbeatActivity<typeof act1>(
			   payload.token,
			   { i }
		   );
		}
        
        // complete the activity with a payload
	    await workflowClient.completeActivity<typeof act1>(
		    payload.token,
		    { result: "done"}
		);
    }));
}
@thantos thantos changed the title Design Design: Async Activities, Timeout, and Heartbeat Dec 2, 2022
@thantos
Copy link
Contributor Author

thantos commented Dec 2, 2022

Tech Design

  1. Orchestrator
    1. Activity Scheduled
    2. Start Timeout Timer (if configured)
    3. Wait - ActivityCompleted, ActivityFailed, ActivityHeartbeat, ActivityHeartbeatTimedOut, ActivityTimedOut
  2. Activity Worker
    1. Activity Worker Locks Activity
    2. Start Heartbeat Timer (if configured)
    3. Activity Handler Invoked
    4. If the Handler returns a value - push complete event to workflow queue
    5. If the Handler returns an AsyncToken - do nothing
  3. On Activity Heartbeat Call
    1. Send ActivityHeartbeat event to the workflow
  4. On client.completeActivity(...)
    1. Send ActivityCompleted to workflow queue
  5. On client.failActivity(...)
    1. Send ActivityFailed to workflow queue
  6. On client.heartbeatActivity(...)
    1. Send ActivityHeartbeat to workflow queue
  7. Orchestrator Wakes Up With...
    1. ActivityCompleted - if the activity has not previously timedout, completed, or failed - return result else ignore
    2. ActivityFailed - if the activity has not previously timedout, completed, or failed - throw error in the workflow else ignore
    3. ActivityHeartbeat - Create an ExtendHeartbeat command, unless the activity is completed, failed, or timedout.
    4. ActivityHeartbeatTimedOut - If the activity is completed, failed, or timedout, ignore. Throw heartbeat error and fail unless there is a heartbeat event from timestamp - heartbeat timeout.
    5. ActivityTimedOut - Throw timeout error and fail if not competed, failed, or timedout.
  8. On ExtendHeartbeat command
    9. TimerClient.updateTimer() - new API which tries to update a Schedule or creates a new SQS message.
  • What is the heartbeat timer? Use the TimerClient (EB Scheduler + SQS).
  • How do we handle the repeating nature? Create a one-time timer. After each Heartbeat, create a ExtendHeartbeat command which updates the timer or creates a new heartbeat event.
  • When do we delete the heartbeat timer? Don't for now, the extra event will be ignored.
  • What happens if a Heartbeat timeout makes it to the workflow, but there has been a heartbeat recently? The orchestrator will filter out HeartbeatTimeout events that happen within X time of the last heartbeat.

@thantos
Copy link
Contributor Author

thantos commented Dec 4, 2022

More details on timeouts for all workflows here: #63

@sam-goodwin
Copy link
Owner

What is deciding that the activity is async here? The declaration or the implementation?

act1 = activity<{ result: string }>({
   heartbeat: { seconds: number },
   timeout: { seconds: number }
}, (context: Context): { result: string } | AsyncToken => {
   ...doSomeWork...

   await sendToQueue({ token: context.activity.token });

   return makeAsync();
})

@sam-goodwin
Copy link
Owner

What's the use case for heartbeat?

@cfraz89
Copy link
Contributor

cfraz89 commented Dec 5, 2022

Just spitballing, would a builder pattern make it more ergonomic?

act1 = ActivityBuilder({heartbeat: {seconds: 20}, timeout: {seconds: 20}})
  .activity(context: Context): { result: string } | AsyncToken => {
   ...doSomeWork...

   await sendToQueue({ token: context.activity.token });

   return makeAsync();
})

@sam-goodwin
Copy link
Owner

I really don't like builder patterns for constructing a function. Bottom layer should be pure and a builder can always be put on top.

Another consideration is how we use the activity/workflow functions for heuristics in the transformer

@thantos
Copy link
Contributor Author

thantos commented Dec 5, 2022

What is deciding that the activity is async here? The declaration or the implementation?

act1 = activity<{ result: string }>({
   heartbeat: { seconds: number },
   timeout: { seconds: number }
}, (context: Context): { result: string } | AsyncToken => {
   ...doSomeWork...

   await sendToQueue({ token: context.activity.token });

   return makeAsync();
})

The activity decides that it needs to be async and a single activity can support both patterns (return sync when possible and go async when necessary.

The workflow decides how long it is willing to wait for the activity to complete.

Controls the Workflow Has:

  1. Heartbeat - report back every X or fail
  2. Timeout - finish within X or fail

Controls the Activity Has:

  1. Return Sync or Async
  2. Succeed or Fail
  3. Use Heartbeat to store checkpoints
  4. Use heartbeat to determine if the workflow is still alive

An abstraction would be to support activities that are explicitly async from the workflow like Step Functions does, but it would be basically the same under the hood.

workflow(() => {
   await asyncEventActivity((token) => {}); // create an event which contains a token and waits on the response
});
// or maybe a special activity type?
const myActivity = eventActivity<string>((token, input) => ({ type: "myEvent", token, input }));

And then the other way to do it would be like SFN's Activities which provide a queue to poll on from anywhere.

Which again could just be a special activity type that is called by the workflow like any other activity.

const myActivity = queueActivity<string>(myQueue); // a queue that contains activity requests to resolve using the token they contain.

@thantos
Copy link
Contributor Author

thantos commented Dec 5, 2022

What's the use case for heartbeat?

Heartbeat is important in durable systems. Let say you have a long running activity that may take up to a week, so you set it's timeout to 2 weeks just in case. That means if something goes wrong and the message is lost, the workflow won't wake up for 2 weeks just to find it failed. Now you could set a hourly or daily heartbeat which allows the activity's system to report back to the workflow to say it is still alive.

Yehuda expressed how important that this is in his systems when long running processes are involved.

From Temporal's Docs:

An Activity Heartbeat is a ping from the Worker that is executing the Activity to the Temporal Cluster. Each ping informs the Temporal Cluster that the Activity Execution is making progress and the Worker has not crashed.
For long-running Activities, we recommend using a relatively short Heartbeat Timeout and a frequent Heartbeat. That way if a Worker fails it can be handled in a timely manner.

Step Functions:

It's a good practice to set a timeout value and a heartbeat interval for long-running activities. This can be done by specifying the timeout and heartbeat values, or by setting them dynamically.

@thantos
Copy link
Contributor Author

thantos commented Dec 5, 2022

Use Cases:

  1. Health Ping from Activity to Workflow
  2. Activity checking if the workflow is still alive - Future use case
  3. Checkpointing (activity can save partial data in the heatbeat) - Future use case

@sam-goodwin
Copy link
Owner

I could have been clearer, I do know why they are important. Just not sure why it's important right now.

@thantos
Copy link
Contributor Author

thantos commented Dec 5, 2022

Yehuda will ask about them and I think we can get the basic impl done quickly.

@sam-goodwin
Copy link
Owner

Yehuda will ask about them and I think we can get the basic impl done quickly.

So low effort high roi? Sounds good. Let's try and think of some examples when we implement it and add them to the test app?

I may be being pedantic, just trying to learn the lesson of functionless and focus on examples and features, not just features.

@thantos
Copy link
Contributor Author

thantos commented Dec 5, 2022

Totally agree. From chats with people, timeout and heartbeat are important parts of long running workflows that would make the service look more legit/complete. Because timers are implemented from sleep, it is easy to create timeouts now and the only new part about heartbeat is adding the client operation.

Will start to work on this and if it proves to be high effort, will push off.

@thantos
Copy link
Contributor Author

thantos commented Dec 6, 2022

Was looking at how to avoid the context argument.

Option 1: context method

activity((...args) => {
    const { asyncToken } = getActivityContext(); // fails when called from outside of an activity.
    async sqs.send(new SendMessageCommand(... token ... ));
    return makeAsync();
});

Option 2: token is provided by the makeAsync function via a callback.

activity((...args) => {
    return makeAsync(async (token) => {
       async sqs.send(new SendMessageCommand(... token ... ));
    });
});

Option 3: context parameter

activity((...args, context) => {
    async sqs.send(new SendMessageCommand(... context.asyncToken ... ));

    return makeAsync();
})

@sam-goodwin
Copy link
Owner

What's wrong with the context parameter? I think we wanted to update activities to only allow a single input argument just like a workflow so that it aligns with a lambda contract. There was another reason I think too, but can't remember.

Context argument is preferable because it's discoverable.

This was referenced Dec 7, 2022
@sam-goodwin
Copy link
Owner

While writing the documentation, I found myself confused about why heartbeat is a global. have we closed on our decision to change activities to be single argument only and then add a context parameter? We could then provide the heartbeat function on the context parameter instead.

@thantos
Copy link
Contributor Author

thantos commented Dec 29, 2022

How do you decide what is a context method and what is an intrinsic? Is the difference that heartbeat is specific to activities (and systems acting on behalf of an async activity)?

Would we apply the same logic to workflow only things, sleep, Promise.*, signal, etc?

Heartbeat for an activity can be done by anything with access to the token. I see it as the same as completeActivity and failActivity, an operation performed by an activity or by something acting as an activity. For example, when an activity is async, a workflow, event handler, or some random lambda using the client will need to call heartbeat.

Options:

  1. No intrinsic - Move all intrinsics (sleep, heartbeat, etc) to their respective objects and/or context variables
  2. Move only heartbeat for activities to context (add activity.heartbeat, keep workflowclient.heartbeatActivity)
  3. Rename to heartbeatActivity, add intrinsic for completeActivity and failActivity.

@sam-goodwin
Copy link
Owner

Was building something today and found myself really wanting a context variable in an activity so i can get the execution ID without having to explicitly pass it through from the workflow.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants