Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forceful Termination of Running Orchestrations #111

Open
kanupriya15025 opened this issue Feb 13, 2023 · 3 comments
Open

Forceful Termination of Running Orchestrations #111

kanupriya15025 opened this issue Feb 13, 2023 · 3 comments
Labels
azure-functions This issue is specific to Azure Functions Enhancement New feature or request P1 Priority 1

Comments

@kanupriya15025
Copy link

Today, terminate API is an async API. So if there's an instance running and we call /terminate, the framework return with "Accepted". However, in the background if the activity function is taking time, the terminate may take time depending on the activities (Please confirm my understanding here).

I need a way to terminate the orchestration instance forcefully that would stop all the running activity functions forcefully, or have a function "stop" in activity definitions that guide the behaviour of activities in case a forceful terminate is called.

Essentially, an API might look like this /terminate?force=true

@cgillum
Copy link
Member

cgillum commented Feb 13, 2023

This is something we're looking into and interested in supporting. However, the ability to force an activity to terminate will require some changes to the Azure Functions host which haven't been prioritized yet. This (admittedly old) item is tracking: Azure/azure-functions-durable-extension#506

@cgillum cgillum added Enhancement New feature or request azure-functions This issue is specific to Azure Functions labels Feb 13, 2023
@kaibocai
Copy link
Member

kaibocai commented Jun 22, 2023

Hi @kanupriya15025, to terminate your activity function when the orchestration status is TERMINATED, you can spin up a new thread polling the status of the orchestration and pass the info to your activity functions. Inside your activity functions, write corresponding logic to stop once get the info that orchestration is terminated.

For example: (this is one way to achieve this, please feel free to use any other way to achieve the same idea.)

package com.functions;

import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.*;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

import com.microsoft.durabletask.*;
import com.microsoft.durabletask.azurefunctions.DurableActivityTrigger;
import com.microsoft.durabletask.azurefunctions.DurableClientContext;
import com.microsoft.durabletask.azurefunctions.DurableClientInput;
import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger;

/**
 * Azure Durable Functions with HTTP trigger.
 */
public class TerminateActivityWorkaround {

    private static final ExecutorService executor = Executors.newFixedThreadPool(1);
    private static final AtomicBoolean completed = new AtomicBoolean(false);
    private static final AtomicBoolean executed = new AtomicBoolean(false);
    private static final Object orchestrationLock = new Object();

    /**
     * This HTTP-triggered function starts the orchestration.
     */
    @FunctionName("StartOrchestration")
    public HttpResponseMessage startOrchestration(
            @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
            @DurableClientInput(name = "durableContext") DurableClientContext durableContext,
            final ExecutionContext context) {
        context.getLogger().info("Java HTTP trigger processed a request.");

        DurableTaskClient client = durableContext.getClient();
        String instanceId = client.scheduleNewOrchestrationInstance("Cities");
        context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId);

        pollingOrchestrationStatus(instanceId, client, context);

        return durableContext.createCheckStatusResponse(request, instanceId);
    }

    private void pollingOrchestrationStatus(String instanceId, DurableTaskClient client, ExecutionContext context) {
        //Ensure the polling logics is only executed once, to avoid spinning up too many threads.
        //Although this is not a problem for this sample, we use AtomicBoolean to guarantee both atomicity and visibility.
        if (!executed.get()) {
            synchronized (orchestrationLock) {
                if (!executed.get()) {
                    executor.submit(() -> {
                        while (true) {
                            try {
                                context.getLogger().info("Polling status for orchestration with instance ID = " + instanceId + "...");
                                OrchestrationMetadata orchestrationMetadata = client.waitForInstanceCompletion(instanceId, Duration.ofMillis(100), false);
                                if (orchestrationMetadata != null && orchestrationMetadata.isCompleted()) {
                                    context.getLogger().info("Orchestration with instance ID = " + instanceId + " has completed/terminated.");
                                    completed.set(true);
                                    return;
                                }
                            } catch (TimeoutException e) {
                                context.getLogger().info("Polling timed out. Orchestration not completed yet.  Retrying...");
                            }
                        }
                    });
                    executed.set(true);
                }
            }
        }
    }

    /**
     * This is the orchestrator function, which can schedule activity functions, create durable timers,
     * or wait for external events in a way that's completely fault-tolerant.
     */
    @FunctionName("Cities")
    public String citiesOrchestrator(
            @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
        String result = "";
        result += ctx.callActivity("Capitalize", "Austin", String.class).await();
        return result;
    }

    /**
     * This is the activity function that gets invoked by the orchestration.
     */
    @FunctionName("Capitalize")
    public String capitalize(
            @DurableActivityTrigger(name = "name") String name,
            final ExecutionContext context) {

        //activity logics part 1
        stopIfOrchestrationCompleted(context);

        //activity logics part 2
        stopIfOrchestrationCompleted(context);

        //activity logics part 3
        stopIfOrchestrationCompleted(context);

        return name.toUpperCase();
    }

    private void stopIfOrchestrationCompleted(ExecutionContext context) {
        if (completed.get()) {
            //clean up work, eg. close resources. 
            context.getLogger().info("Orchestration completed, stop the activity");
            throw new RuntimeException("Orchestration completed, stop the activity");
        }
    }
}

@jviau
Copy link
Member

jviau commented Dec 21, 2023

or have a function "stop" in activity definitions that guide the behaviour of activities in case a forceful terminate is called.

This requirement is an important distinction. It is what separates forceful termination from cooperation cancellation. These two are quite different designs and implementations on our end. The former only provides a means to terminate active work, but no ability to perform reliable cleanup work. The latter will give the implementer flexibility to acknowledge cancellation on their own terms and to reliably perform cleanup actions.

@lilyjma lilyjma added the P1 Priority 1 label Jan 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
azure-functions This issue is specific to Azure Functions Enhancement New feature or request P1 Priority 1
Projects
None yet
Development

No branches or pull requests

5 participants